1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use bytes::Bytes;
5use enigma_identity::LocalIdentity;
6use enigma_packet::{Message, MessageMeta, MessageType};
7use std::time::{SystemTime, UNIX_EPOCH};
8use uuid::Uuid;
9
10use crate::attachment::{build_attachment_chunks, build_attachment_end, build_attachment_init};
11use crate::error::{EnigmaProtocolError, Result};
12use crate::session::{AttachmentUpdate, Session};
13use crate::transport::Transport;
14use crate::types::{AttachmentKind, ClientEvent, SessionBootstrap};
15
16pub struct MessengerClient {
17 local: LocalIdentity,
18 transport: Arc<dyn Transport>,
19 sessions: HashMap<String, Session>,
20 events: VecDeque<ClientEvent>,
21}
22
23impl MessengerClient {
24 pub fn new(local: LocalIdentity, transport: Arc<dyn Transport>) -> Self {
25 Self {
26 local,
27 transport,
28 sessions: HashMap::new(),
29 events: VecDeque::new(),
30 }
31 }
32
33 pub fn ensure_session_with(&mut self, remote: &str, bootstrap: SessionBootstrap) -> Result<()> {
34 if self.sessions.contains_key(remote) {
35 return Ok(());
36 }
37 let local_user = self.local.user().username.clone();
38 let session = Session::new(&local_user, remote, &bootstrap)?;
39 self.sessions.insert(remote.to_string(), session);
40 Ok(())
41 }
42
43 pub async fn send_text(&mut self, remote: &str, text_utf8: &str) -> Result<Uuid> {
44 let session = self
45 .sessions
46 .get_mut(remote)
47 .ok_or(EnigmaProtocolError::UnknownSession)?;
48 let message = Message {
49 id: Uuid::new_v4(),
50 sender: session.local_id().to_string(),
51 receiver: session.remote_id().to_string(),
52 timestamp_ms: now_ms(),
53 msg_type: MessageType::Text,
54 payload: text_utf8.as_bytes().to_vec(),
55 meta: MessageMeta::Basic {
56 content_type: Some("text/plain".to_string()),
57 },
58 };
59 let ciphertext = session.encrypt_message(&message)?;
60 self.transport.send(Bytes::from(ciphertext)).await?;
61 Ok(message.id)
62 }
63
64 pub async fn send_attachment_bytes(
65 &mut self,
66 remote: &str,
67 kind: AttachmentKind,
68 filename: Option<&str>,
69 content_type: Option<&str>,
70 bytes: &[u8],
71 chunk_size: usize,
72 ) -> Result<Uuid> {
73 if chunk_size == 0 {
74 return Err(EnigmaProtocolError::Attachment);
75 }
76 let transport = Arc::clone(&self.transport);
77 let session = self
78 .sessions
79 .get_mut(remote)
80 .ok_or(EnigmaProtocolError::UnknownSession)?;
81 let chunk_size_u32 =
82 u32::try_from(chunk_size).map_err(|_| EnigmaProtocolError::Attachment)?;
83 let total_size =
84 u64::try_from(bytes.len()).map_err(|_| EnigmaProtocolError::SizeLimitExceeded)?;
85 let chunk_count = ((bytes.len() + chunk_size - 1) / chunk_size) as u32;
86 let attachment_id = Uuid::new_v4();
87 let timestamp = now_ms();
88 let init = build_attachment_init(
89 session.local_id(),
90 session.remote_id(),
91 attachment_id,
92 kind,
93 filename,
94 content_type,
95 total_size,
96 chunk_size_u32,
97 chunk_count,
98 timestamp,
99 );
100 let mut ciphertext = session.encrypt_message(&init)?;
101 transport.send(Bytes::from(ciphertext)).await?;
102 let chunks = build_attachment_chunks(
103 session.local_id(),
104 session.remote_id(),
105 attachment_id,
106 bytes,
107 chunk_size,
108 timestamp,
109 );
110 for chunk in chunks {
111 ciphertext = session.encrypt_message(&chunk)?;
112 transport.send(Bytes::from(ciphertext)).await?;
113 }
114 let end_ts = timestamp.saturating_add(chunk_count as u64 + 1);
115 let end = build_attachment_end(
116 session.local_id(),
117 session.remote_id(),
118 attachment_id,
119 total_size,
120 chunk_count,
121 end_ts,
122 );
123 ciphertext = session.encrypt_message(&end)?;
124 transport.send(Bytes::from(ciphertext)).await?;
125 Ok(attachment_id)
126 }
127
128 pub async fn poll_once(&mut self) -> Result<Option<ClientEvent>> {
129 if let Some(event) = self.events.pop_front() {
130 return Ok(Some(event));
131 }
132 let bytes = self.transport.recv().await?;
133 let session_key = self.sole_session_key()?;
134 let (message, update, remote_label) = {
135 let session = self
136 .sessions
137 .get_mut(&session_key)
138 .ok_or(EnigmaProtocolError::InvalidState)?;
139 let msg = session.decrypt_packet(&bytes)?;
140 let attachment_update = session.handle_attachment_message(&msg)?;
141 let label = session.remote_id().to_string();
142 (msg, attachment_update, label)
143 };
144 self.events.push_back(ClientEvent::MessageReceived {
145 from: remote_label.clone(),
146 message: message.clone(),
147 });
148 if let Some(update) = update {
149 match update {
150 AttachmentUpdate::Init {
151 attachment_id,
152 total_chunks,
153 } => self.events.push_back(ClientEvent::AttachmentProgress {
154 from: remote_label.clone(),
155 attachment_id,
156 received_chunks: 0,
157 total_chunks,
158 }),
159 AttachmentUpdate::Chunk {
160 attachment_id,
161 received_chunks,
162 total_chunks,
163 } => self.events.push_back(ClientEvent::AttachmentProgress {
164 from: remote_label.clone(),
165 attachment_id,
166 received_chunks,
167 total_chunks,
168 }),
169 AttachmentUpdate::End {
170 attachment_id,
171 total_size,
172 } => self.events.push_back(ClientEvent::AttachmentCompleted {
173 from: remote_label.clone(),
174 attachment_id,
175 total_size,
176 }),
177 }
178 }
179 Ok(self.events.pop_front())
180 }
181
182 fn sole_session_key(&self) -> Result<String> {
183 if self.sessions.len() != 1 {
184 return Err(EnigmaProtocolError::InvalidState);
185 }
186 self.sessions
187 .keys()
188 .next()
189 .cloned()
190 .ok_or(EnigmaProtocolError::InvalidState)
191 }
192
193 pub async fn close(&self) -> Result<()> {
194 self.transport.close().await
195 }
196}
197
198fn now_ms() -> u64 {
199 SystemTime::now()
200 .duration_since(UNIX_EPOCH)
201 .map(|d| d.as_millis() as u64)
202 .unwrap_or(0)
203}