enigma_protocol/
client.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use bytes::Bytes;
5use enigma_identity::LocalIdentity;
6use enigma_packet::{Message, MessageMeta, MessageType};
7use std::time::{SystemTime, UNIX_EPOCH};
8use uuid::Uuid;
9
10use crate::attachment::{build_attachment_chunks, build_attachment_end, build_attachment_init};
11use crate::error::{EnigmaProtocolError, Result};
12use crate::session::{AttachmentUpdate, Session};
13use crate::transport::Transport;
14use crate::types::{AttachmentKind, ClientEvent, SessionBootstrap};
15
16pub struct MessengerClient {
17    local: LocalIdentity,
18    transport: Arc<dyn Transport>,
19    sessions: HashMap<String, Session>,
20    events: VecDeque<ClientEvent>,
21}
22
23impl MessengerClient {
24    pub fn new(local: LocalIdentity, transport: Arc<dyn Transport>) -> Self {
25        Self {
26            local,
27            transport,
28            sessions: HashMap::new(),
29            events: VecDeque::new(),
30        }
31    }
32
33    pub fn ensure_session_with(&mut self, remote: &str, bootstrap: SessionBootstrap) -> Result<()> {
34        if self.sessions.contains_key(remote) {
35            return Ok(());
36        }
37        let local_user = self.local.user().username.clone();
38        let session = Session::new(&local_user, remote, &bootstrap)?;
39        self.sessions.insert(remote.to_string(), session);
40        Ok(())
41    }
42
43    pub async fn send_text(&mut self, remote: &str, text_utf8: &str) -> Result<Uuid> {
44        let session = self
45            .sessions
46            .get_mut(remote)
47            .ok_or(EnigmaProtocolError::UnknownSession)?;
48        let message = Message {
49            id: Uuid::new_v4(),
50            sender: session.local_id().to_string(),
51            receiver: session.remote_id().to_string(),
52            timestamp_ms: now_ms(),
53            msg_type: MessageType::Text,
54            payload: text_utf8.as_bytes().to_vec(),
55            meta: MessageMeta::Basic {
56                content_type: Some("text/plain".to_string()),
57            },
58        };
59        let ciphertext = session.encrypt_message(&message)?;
60        self.transport.send(Bytes::from(ciphertext)).await?;
61        Ok(message.id)
62    }
63
64    pub async fn send_attachment_bytes(
65        &mut self,
66        remote: &str,
67        kind: AttachmentKind,
68        filename: Option<&str>,
69        content_type: Option<&str>,
70        bytes: &[u8],
71        chunk_size: usize,
72    ) -> Result<Uuid> {
73        if chunk_size == 0 {
74            return Err(EnigmaProtocolError::Attachment);
75        }
76        let transport = Arc::clone(&self.transport);
77        let session = self
78            .sessions
79            .get_mut(remote)
80            .ok_or(EnigmaProtocolError::UnknownSession)?;
81        let chunk_size_u32 =
82            u32::try_from(chunk_size).map_err(|_| EnigmaProtocolError::Attachment)?;
83        let total_size =
84            u64::try_from(bytes.len()).map_err(|_| EnigmaProtocolError::SizeLimitExceeded)?;
85        let chunk_count = ((bytes.len() + chunk_size - 1) / chunk_size) as u32;
86        let attachment_id = Uuid::new_v4();
87        let timestamp = now_ms();
88        let init = build_attachment_init(
89            session.local_id(),
90            session.remote_id(),
91            attachment_id,
92            kind,
93            filename,
94            content_type,
95            total_size,
96            chunk_size_u32,
97            chunk_count,
98            timestamp,
99        );
100        let mut ciphertext = session.encrypt_message(&init)?;
101        transport.send(Bytes::from(ciphertext)).await?;
102        let chunks = build_attachment_chunks(
103            session.local_id(),
104            session.remote_id(),
105            attachment_id,
106            bytes,
107            chunk_size,
108            timestamp,
109        );
110        for chunk in chunks {
111            ciphertext = session.encrypt_message(&chunk)?;
112            transport.send(Bytes::from(ciphertext)).await?;
113        }
114        let end_ts = timestamp.saturating_add(chunk_count as u64 + 1);
115        let end = build_attachment_end(
116            session.local_id(),
117            session.remote_id(),
118            attachment_id,
119            total_size,
120            chunk_count,
121            end_ts,
122        );
123        ciphertext = session.encrypt_message(&end)?;
124        transport.send(Bytes::from(ciphertext)).await?;
125        Ok(attachment_id)
126    }
127
128    pub async fn poll_once(&mut self) -> Result<Option<ClientEvent>> {
129        if let Some(event) = self.events.pop_front() {
130            return Ok(Some(event));
131        }
132        let bytes = self.transport.recv().await?;
133        let session_key = self.sole_session_key()?;
134        let (message, update, remote_label) = {
135            let session = self
136                .sessions
137                .get_mut(&session_key)
138                .ok_or(EnigmaProtocolError::InvalidState)?;
139            let msg = session.decrypt_packet(&bytes)?;
140            let attachment_update = session.handle_attachment_message(&msg)?;
141            let label = session.remote_id().to_string();
142            (msg, attachment_update, label)
143        };
144        self.events.push_back(ClientEvent::MessageReceived {
145            from: remote_label.clone(),
146            message: message.clone(),
147        });
148        if let Some(update) = update {
149            match update {
150                AttachmentUpdate::Init {
151                    attachment_id,
152                    total_chunks,
153                } => self.events.push_back(ClientEvent::AttachmentProgress {
154                    from: remote_label.clone(),
155                    attachment_id,
156                    received_chunks: 0,
157                    total_chunks,
158                }),
159                AttachmentUpdate::Chunk {
160                    attachment_id,
161                    received_chunks,
162                    total_chunks,
163                } => self.events.push_back(ClientEvent::AttachmentProgress {
164                    from: remote_label.clone(),
165                    attachment_id,
166                    received_chunks,
167                    total_chunks,
168                }),
169                AttachmentUpdate::End {
170                    attachment_id,
171                    total_size,
172                } => self.events.push_back(ClientEvent::AttachmentCompleted {
173                    from: remote_label.clone(),
174                    attachment_id,
175                    total_size,
176                }),
177            }
178        }
179        Ok(self.events.pop_front())
180    }
181
182    fn sole_session_key(&self) -> Result<String> {
183        if self.sessions.len() != 1 {
184            return Err(EnigmaProtocolError::InvalidState);
185        }
186        self.sessions
187            .keys()
188            .next()
189            .cloned()
190            .ok_or(EnigmaProtocolError::InvalidState)
191    }
192
193    pub async fn close(&self) -> Result<()> {
194        self.transport.close().await
195    }
196}
197
198fn now_ms() -> u64 {
199    SystemTime::now()
200        .duration_since(UNIX_EPOCH)
201        .map(|d| d.as_millis() as u64)
202        .unwrap_or(0)
203}