1use liserk_ope::simplified_version::encrypt_ope;
2use liserk_shared::{
3 message::{
4 ClientAuthentication, ClientSetupSecureConnection, Delete, Insertion,
5 InsertionOpe, Message, Update,
6 },
7 message_type::{MessageType, MessageTypeError},
8 query::Query,
9};
10use rand::Rng;
11use tokio::{
12 io::{AsyncReadExt, AsyncWriteExt},
13 net::{
14 tcp::{OwnedReadHalf, OwnedWriteHalf},
15 TcpStream,
16 },
17};
18use tracing::{debug, info, trace};
19
20use crate::{basic_decrypt, basic_encrypt, error::Error};
21
22#[derive(Debug)]
23pub enum QueryResult {
24 EmptyResult,
25 SingleValue(Vec<u8>),
26 MultipleValues(Vec<Vec<u8>>),
27}
28
29#[derive(Debug, Default)]
31pub struct UnconnectedClient;
32
33#[derive(Debug)]
35pub struct ConnectedClient {
36 pub stream: TcpStream,
38}
39
40#[derive(Debug)]
42pub struct AuthenticatedClient {
43 pub read: OwnedReadHalf,
45
46 pub write: OwnedWriteHalf,
48
49 pub key: [u8; 32],
50}
51
52impl UnconnectedClient {
53 pub async fn connect(self, url: &str) -> Result<ConnectedClient, Error> {
59 let mut rng = rand::thread_rng();
60 let kyber_key = pqc_kyber::keypair(&mut rng);
61 let mut stream = TcpStream::connect(url).await?;
62 let setup_security = Message::ClientSetup(ClientSetupSecureConnection::new(
63 kyber_key.public.to_vec(),
64 ));
65 let message = setup_security.setup_for_network()?;
66
67 stream.write_all(&message).await?;
68 Ok(ConnectedClient { stream })
69 }
70}
71
72impl ConnectedClient {
73 pub async fn authenticate(
95 mut self,
96 username: String,
97 password: String,
98 key: [u8; 32],
99 ) -> Result<AuthenticatedClient, Error> {
100 let client_authentication = ClientAuthentication { username, password };
101 let message = Message::ClientAuthentification(client_authentication);
102 let message = message.setup_for_network()?;
103 self.stream.write_all(&message).await?;
105
106 let (read, write) = self.stream.into_split();
107 let auth_client = AuthenticatedClient { read, write, key };
108 Ok(auth_client)
109 }
110}
111
112impl AuthenticatedClient {
113 pub fn is_alive(&self) -> bool {
119 true
120 }
121
122 pub async fn terminate_connection(&mut self) -> Result<(), Error> {
124 let message = Message::EndOfCommunication;
125 let message = message.setup_for_network()?;
126 self.write.write_all(&message).await?;
128 Ok(())
129 }
130
131 pub async fn insert(
141 &mut self,
142 collection: String,
143 data: Vec<u8>,
144 associated_data: Vec<u8>,
145 acl: Vec<String>,
146 usecases: Vec<String>,
147 ) -> Result<String, Error> {
148 let mut nonce = [0u8; 12];
149 rand::thread_rng().fill(&mut nonce);
150 let encrypt_data = basic_encrypt(&self.key, &nonce, &data, &associated_data)?;
151 let message = Message::Insert(Insertion {
152 acl,
153 collection,
154 data: encrypt_data,
155 usecases,
156 nonce: nonce.to_vec(),
157 });
158 let message = message.setup_for_network()?;
159 self.write.write_all(&message).await?;
160 let message = parse_message_from_tcp_stream(&mut self.read).await?;
161 info!("message: {:?}", message);
162 match message {
163 Message::InsertResponse { inserted_id } => Ok(inserted_id),
164 _ => Err(Error::MessageTypeError(MessageTypeError::default())),
165 }
166 }
167
168 pub async fn insert_ope(
177 &mut self,
178 number_to_encrypt: f64,
179 acl: Vec<String>,
180 usecases: Vec<String>,
181 collection: String,
182 ) -> Result<String, Error> {
183 let encrypted_number = encrypt_ope(number_to_encrypt);
184 let data = encrypted_number.to_string().as_bytes().to_vec();
185
186 let message =
187 Message::InsertOpe(InsertionOpe { acl, collection, data, usecases });
188 let message = message.setup_for_network()?;
189 self.write.write_all(&message).await?;
190 let message = parse_message_from_tcp_stream(&mut self.read).await?;
191 info!("message: {:?}", message);
192 match message {
193 Message::InsertResponse { inserted_id } => Ok(inserted_id),
194 _ => Err(Error::MessageTypeError(MessageTypeError::default())),
195 }
196 }
197
198 pub async fn query(&mut self, query: Query) -> Result<QueryResult, Error> {
204 let message = Message::Query(query);
205 let message = message.setup_for_network()?;
206 self.write.write_all(&message).await?;
207 let message = parse_message_from_tcp_stream(&mut self.read).await?;
208 info!("message: {:?}", message);
209 match message {
210 Message::QueryResponse((data, nonces)) => {
211 let mut values = Vec::with_capacity(data.len());
212 for (cipher, nonce) in data.iter().zip(nonces.unwrap().iter()) {
213 let value = basic_decrypt(
214 &self.key,
215 convert_to_array12(&nonce).expect("12 elements"),
216 &cipher,
217 &[],
218 )?;
219 values.push(value);
220 }
221 Ok(QueryResult::MultipleValues(values))
222 }
223 Message::SingleValueResponse { data, nonce } => {
224 if data.is_none() || nonce.is_none() {
225 return Ok(QueryResult::EmptyResult);
226 }
227 let value = basic_decrypt(
228 &self.key,
229 convert_to_array12(&nonce.expect("Not ope")).expect("12 elements"),
230 &data.expect("if is none reutrn empty result"),
231 &[],
232 )?;
233 Ok(QueryResult::SingleValue(value))
234 }
235 _ => Err(Error::MessageTypeError(MessageTypeError::default())),
236 }
237 }
238
239 pub async fn modify(
247 &mut self,
248 id: String,
249 collection: String,
250 new_value: Vec<u8>,
251 ) -> Result<Message, Error> {
252 let update = Update { collection, id, new_value };
253 let message = Message::Update(update);
254 let message = message.setup_for_network()?;
255 self.write.write_all(&message).await?;
256 let message = parse_message_from_tcp_stream(&mut self.read).await?;
257
258 info!("message: {:?}", message);
259 match message {
260 Message::UpdateResponse { .. } => Ok(message),
261 _ => Err(Error::MessageTypeError(MessageTypeError::default())),
262 }
263 }
264
265 pub async fn delete(
272 &mut self,
273 id: String,
274 collection: String,
275 ) -> Result<Message, Error> {
276 let delete = Delete { collection, id };
277 let message = Message::Delete(delete);
278 let message = message.setup_for_network()?;
279 self.write.write_all(&message).await?;
280 let message = parse_message_from_tcp_stream(&mut self.read).await?;
281
282 info!("message: {:?}", message);
283 match message {
284 Message::DeleteResult(_) => Ok(message),
285 _ => Err(Error::MessageTypeError(MessageTypeError::default())),
286 }
287 }
288}
289
290pub async fn parse_message_from_tcp_stream(
300 stream: &mut OwnedReadHalf,
301) -> Result<Message, Error> {
302 let mut buffer = [0; 1];
303 let _ = stream.read(&mut buffer).await;
304 let message_type = MessageType::try_from(buffer[0]);
305 info!("messageType: {:?}", message_type);
306
307 let mut message_size = [0; 4];
308 let _size_error = stream.read(&mut message_size).await;
309 let decimal_size = u32::from_be_bytes(message_size);
310 trace!("message size: {}", decimal_size);
311
312 let mut slice = vec![0; decimal_size as usize];
313 let _size_read = stream.read_exact(&mut slice).await;
314 trace!("slice: {:?}", slice);
315 let message: Message = serde_cbor::from_slice(&slice)?;
316 debug!("parsed message: {:#?}", message);
317 Ok(message)
318}
319
320fn convert_to_array12(slice: &Vec<u8>) -> Option<&[u8; 12]> {
321 if slice.len() == 12 {
322 let array_ref: &[u8; 12] = slice.as_slice().try_into().unwrap();
323 Some(array_ref)
324 } else {
325 None
326 }
327}