liserk_client/
stream.rs

1use liserk_ope::simplified_version::encrypt_ope;
2use liserk_shared::{
3    message::{
4        ClientAuthentication, ClientSetupSecureConnection, Delete, Insertion,
5        InsertionOpe, Message, Update,
6    },
7    message_type::{MessageType, MessageTypeError},
8    query::Query,
9};
10use rand::Rng;
11use tokio::{
12    io::{AsyncReadExt, AsyncWriteExt},
13    net::{
14        tcp::{OwnedReadHalf, OwnedWriteHalf},
15        TcpStream,
16    },
17};
18use tracing::{debug, info, trace};
19
20use crate::{basic_decrypt, basic_encrypt, error::Error};
21
22#[derive(Debug)]
23pub enum QueryResult {
24    EmptyResult,
25    SingleValue(Vec<u8>),
26    MultipleValues(Vec<Vec<u8>>),
27}
28
29/// Represents a client that has not yet established a connection to the server.
30#[derive(Debug, Default)]
31pub struct UnconnectedClient;
32
33/// Represents a client that has established a connection to the server but is not yet authenticated.
34#[derive(Debug)]
35pub struct ConnectedClient {
36    /// The TCP stream representing the connection to the server.
37    pub stream: TcpStream,
38}
39
40/// Represents a client that has been authenticated.
41#[derive(Debug)]
42pub struct AuthenticatedClient {
43    /// The read half of the TCP stream.
44    pub read: OwnedReadHalf,
45
46    /// The write half of the TCP stream.
47    pub write: OwnedWriteHalf,
48
49    pub key: [u8; 32],
50}
51
52impl UnconnectedClient {
53    /// Connects to the server at the given URL and returns a `ConnectedClient`.
54    ///
55    /// # Arguments
56    ///
57    /// * `url` - The URL of the server to connect to.
58    pub async fn connect(self, url: &str) -> Result<ConnectedClient, Error> {
59        let mut rng = rand::thread_rng();
60        let kyber_key = pqc_kyber::keypair(&mut rng);
61        let mut stream = TcpStream::connect(url).await?;
62        let setup_security = Message::ClientSetup(ClientSetupSecureConnection::new(
63            kyber_key.public.to_vec(),
64        ));
65        let message = setup_security.setup_for_network()?;
66
67        stream.write_all(&message).await?;
68        Ok(ConnectedClient { stream })
69    }
70}
71
72impl ConnectedClient {
73    /// Authenticates the connected client with a username and password.
74    ///
75    /// # Arguments
76    ///
77    /// * `username` - The username as a String.
78    /// * `password` - The password as a String.
79    ///
80    /// # Returns
81    ///
82    /// * `Result<AuthenticatedClient, Error>` - If successful, returns an instance of AuthenticatedClient.
83    ///                                          Otherwise, returns an Error indicating what went wrong.
84    ///
85    /// # Example
86    ///
87    /// ```
88    /// # async fn run_example() -> Result<(), Error> {
89    /// let unconnected_client = UnconnectedClient;
90    /// let connected_client = unconnected_client.connect("127.0.0.1:12345").await?;
91    /// let authenticated_client = connected_client.authenticate("username".to_string(), "password".to_string()).await?;
92    /// # Ok(()) }
93    /// ```
94    pub async fn authenticate(
95        mut self,
96        username: String,
97        password: String,
98        key: [u8; 32],
99    ) -> Result<AuthenticatedClient, Error> {
100        let client_authentication = ClientAuthentication { username, password };
101        let message = Message::ClientAuthentification(client_authentication);
102        let message = message.setup_for_network()?;
103        // debug!("message {:?}", message);
104        self.stream.write_all(&message).await?;
105
106        let (read, write) = self.stream.into_split();
107        let auth_client = AuthenticatedClient { read, write, key };
108        Ok(auth_client)
109    }
110}
111
112impl AuthenticatedClient {
113    /// Checks if the client connection is alive.
114    ///
115    /// # Returns
116    ///
117    /// * `bool` - `true` if the connection is alive, `false` otherwise.
118    pub fn is_alive(&self) -> bool {
119        true
120    }
121
122    /// Terminates the connection of the client.
123    pub async fn terminate_connection(&mut self) -> Result<(), Error> {
124        let message = Message::EndOfCommunication;
125        let message = message.setup_for_network()?;
126        // debug!("terminate Connection {:?}", message);
127        self.write.write_all(&message).await?;
128        Ok(())
129    }
130
131    /// Inserts data into a specified collection.
132    ///
133    /// # Arguments
134    ///
135    /// * `collection` - The name of the collection to insert the data into.
136    /// * `data` - The data to be inserted.
137    /// * `associated_data` - The associated data to be verified.
138    /// * `acl` - The access control list.
139    /// * `usecases` - The use cases associated with the data.
140    pub async fn insert(
141        &mut self,
142        collection: String,
143        data: Vec<u8>,
144        associated_data: Vec<u8>,
145        acl: Vec<String>,
146        usecases: Vec<String>,
147    ) -> Result<String, Error> {
148        let mut nonce = [0u8; 12];
149        rand::thread_rng().fill(&mut nonce);
150        let encrypt_data = basic_encrypt(&self.key, &nonce, &data, &associated_data)?;
151        let message = Message::Insert(Insertion {
152            acl,
153            collection,
154            data: encrypt_data,
155            usecases,
156            nonce: nonce.to_vec(),
157        });
158        let message = message.setup_for_network()?;
159        self.write.write_all(&message).await?;
160        let message = parse_message_from_tcp_stream(&mut self.read).await?;
161        info!("message: {:?}", message);
162        match message {
163            Message::InsertResponse { inserted_id } => Ok(inserted_id),
164            _ => Err(Error::MessageTypeError(MessageTypeError::default())),
165        }
166    }
167
168    /// Inserts a number into the database with Order Preserving Encryption (OPE).
169    ///
170    /// # Arguments
171    ///
172    /// * `number_to_encrypt` - The number to be encrypted and inserted.
173    /// * `acl` - The access control list.
174    /// * `usecases` - The use cases associated with the data.
175    /// * `collection` - The name of the collection to insert the data into.
176    pub async fn insert_ope(
177        &mut self,
178        number_to_encrypt: f64,
179        acl: Vec<String>,
180        usecases: Vec<String>,
181        collection: String,
182    ) -> Result<String, Error> {
183        let encrypted_number = encrypt_ope(number_to_encrypt);
184        let data = encrypted_number.to_string().as_bytes().to_vec();
185
186        let message =
187            Message::InsertOpe(InsertionOpe { acl, collection, data, usecases });
188        let message = message.setup_for_network()?;
189        self.write.write_all(&message).await?;
190        let message = parse_message_from_tcp_stream(&mut self.read).await?;
191        info!("message: {:?}", message);
192        match message {
193            Message::InsertResponse { inserted_id } => Ok(inserted_id),
194            _ => Err(Error::MessageTypeError(MessageTypeError::default())),
195        }
196    }
197
198    /// Queries the database and returns the results.
199    ///
200    /// # Arguments
201    ///
202    /// * `query` - The query object representing the database query.
203    pub async fn query(&mut self, query: Query) -> Result<QueryResult, Error> {
204        let message = Message::Query(query);
205        let message = message.setup_for_network()?;
206        self.write.write_all(&message).await?;
207        let message = parse_message_from_tcp_stream(&mut self.read).await?;
208        info!("message: {:?}", message);
209        match message {
210            Message::QueryResponse((data, nonces)) => {
211                let mut values = Vec::with_capacity(data.len());
212                for (cipher, nonce) in data.iter().zip(nonces.unwrap().iter()) {
213                    let value = basic_decrypt(
214                        &self.key,
215                        convert_to_array12(&nonce).expect("12 elements"),
216                        &cipher,
217                        &[],
218                    )?;
219                    values.push(value);
220                }
221                Ok(QueryResult::MultipleValues(values))
222            }
223            Message::SingleValueResponse { data, nonce } => {
224                if data.is_none() || nonce.is_none() {
225                    return Ok(QueryResult::EmptyResult);
226                }
227                let value = basic_decrypt(
228                    &self.key,
229                    convert_to_array12(&nonce.expect("Not ope")).expect("12 elements"),
230                    &data.expect("if is none reutrn empty result"),
231                    &[],
232                )?;
233                Ok(QueryResult::SingleValue(value))
234            }
235            _ => Err(Error::MessageTypeError(MessageTypeError::default())),
236        }
237    }
238
239    /// Modifies an existing document in the database.
240    ///
241    /// # Arguments
242    ///
243    /// * `id` - The identifier of the document to be modified.
244    /// * `collection` - The name of the collection containing the document.
245    /// * `new_value` - The new value to be set in the document.
246    pub async fn modify(
247        &mut self,
248        id: String,
249        collection: String,
250        new_value: Vec<u8>,
251    ) -> Result<Message, Error> {
252        let update = Update { collection, id, new_value };
253        let message = Message::Update(update);
254        let message = message.setup_for_network()?;
255        self.write.write_all(&message).await?;
256        let message = parse_message_from_tcp_stream(&mut self.read).await?;
257
258        info!("message: {:?}", message);
259        match message {
260            Message::UpdateResponse { .. } => Ok(message),
261            _ => Err(Error::MessageTypeError(MessageTypeError::default())),
262        }
263    }
264
265    /// Deletes a document from the database.
266    ///
267    /// # Arguments
268    ///
269    /// * `id` - The identifier of the document to be deleted.
270    /// * `collection` - The name of the collection containing the document.
271    pub async fn delete(
272        &mut self,
273        id: String,
274        collection: String,
275    ) -> Result<Message, Error> {
276        let delete = Delete { collection, id };
277        let message = Message::Delete(delete);
278        let message = message.setup_for_network()?;
279        self.write.write_all(&message).await?;
280        let message = parse_message_from_tcp_stream(&mut self.read).await?;
281
282        info!("message: {:?}", message);
283        match message {
284            Message::DeleteResult(_) => Ok(message),
285            _ => Err(Error::MessageTypeError(MessageTypeError::default())),
286        }
287    }
288}
289
290/// Parses a message from a TCP stream.
291///
292/// # Arguments
293///
294/// * `stream` - A mutable reference to the read half of a TCP stream.
295///
296/// # Returns
297///
298/// * `Result<Message, Error>` - The parsed message, or an error if parsing fails.
299pub async fn parse_message_from_tcp_stream(
300    stream: &mut OwnedReadHalf,
301) -> Result<Message, Error> {
302    let mut buffer = [0; 1];
303    let _ = stream.read(&mut buffer).await;
304    let message_type = MessageType::try_from(buffer[0]);
305    info!("messageType: {:?}", message_type);
306
307    let mut message_size = [0; 4];
308    let _size_error = stream.read(&mut message_size).await;
309    let decimal_size = u32::from_be_bytes(message_size);
310    trace!("message size: {}", decimal_size);
311
312    let mut slice = vec![0; decimal_size as usize];
313    let _size_read = stream.read_exact(&mut slice).await;
314    trace!("slice: {:?}", slice);
315    let message: Message = serde_cbor::from_slice(&slice)?;
316    debug!("parsed message: {:#?}", message);
317    Ok(message)
318}
319
320fn convert_to_array12(slice: &Vec<u8>) -> Option<&[u8; 12]> {
321    if slice.len() == 12 {
322        let array_ref: &[u8; 12] = slice.as_slice().try_into().unwrap();
323        Some(array_ref)
324    } else {
325        None
326    }
327}