1use super::transport::{DeliveryReceipt, DeliveryStatus, ReceivedMessage};
3use super::types::*;
4use super::{DhtClient, KeyExchange, MessageStore, MessageTransport};
5use crate::identity::FourWordAddress;
6use crate::messaging::user_handle::UserHandle;
7use anyhow::Result;
8use chrono::{Duration, Utc};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, broadcast};
12use tracing::{info, warn};
13
14pub async fn channel_recipients(_channel_id: &ChannelId) -> Result<Vec<FourWordAddress>> {
17 Ok(Vec::new())
39}
40
41pub struct MessagingService {
43 identity: FourWordAddress,
45 store: MessageStore,
47 transport: Arc<MessageTransport>,
49 key_exchange: Arc<KeyExchange>,
51 _dht_client: DhtClient,
53 event_tx: broadcast::Sender<ReceivedMessage>,
55 online_users: Arc<RwLock<HashMap<FourWordAddress, chrono::DateTime<Utc>>>>,
57}
58
59#[derive(Debug, Clone, Default)]
61pub struct SendOptions {
62 pub ephemeral: bool,
63 pub expiry_seconds: Option<u64>,
64 pub reply_to: Option<MessageId>,
65 pub thread_id: Option<ThreadId>,
66 pub attachments: Vec<Attachment>,
67}
68
69impl MessagingService {
70 pub async fn new(identity: FourWordAddress, dht_client: DhtClient) -> Result<Self> {
72 let store = MessageStore::new(dht_client.clone(), None).await?;
74
75 #[cfg(test)]
77 let network = Arc::new(crate::network::P2PNode::new_for_tests()?);
78
79 #[cfg(not(test))]
80 let network = {
81 let config = crate::network::NodeConfig::new()?;
83 let node = crate::network::P2PNode::new(config).await?;
84 Arc::new(node)
85 };
86 let transport = Arc::new(MessageTransport::new(network, dht_client.clone()).await?);
87 let key_exchange = Arc::new(KeyExchange::new(identity.clone(), dht_client.clone()).await?);
88
89 let (event_tx, _) = broadcast::channel(1000);
90
91 Ok(Self {
92 identity,
93 store,
94 transport,
95 key_exchange,
96 _dht_client: dht_client,
97 event_tx,
98 online_users: Arc::new(RwLock::new(HashMap::new())),
99 })
100 }
101
102 pub async fn send_message(
104 &self,
105 recipients: Vec<FourWordAddress>,
106 content: MessageContent,
107 channel_id: ChannelId,
108 options: SendOptions,
109 ) -> Result<(MessageId, DeliveryReceipt)> {
110 let mut message = RichMessage::new(
112 UserHandle::from(self.identity.to_string()),
113 channel_id,
114 content,
115 );
116
117 message.ephemeral = options.ephemeral;
119 if let Some(seconds) = options.expiry_seconds {
120 message.expires_at = Some(Utc::now() + Duration::seconds(seconds as i64));
121 }
122 message.reply_to = options.reply_to;
123 message.thread_id = options.thread_id;
124 message.attachments = options.attachments;
125
126 self.store.store_message(&message).await?;
128
129 let mut delivery_results = Vec::new();
131
132 for recipient in &recipients {
133 let encryption_key = match self.key_exchange.get_session_key(recipient).await {
135 Ok(key) => key,
136 Err(e) => {
137 let _ = self.key_exchange.initiate_exchange(recipient.clone()).await;
139 return Err(anyhow::anyhow!(
140 "No session key established for {}: {}",
141 recipient,
142 e
143 ));
144 }
145 };
146
147 let encrypted = self
149 .encrypt_message_with_key(&message, &encryption_key)
150 .await?;
151
152 match self
154 .transport
155 .send_message(&encrypted, vec![recipient.clone()])
156 .await
157 {
158 Ok(_receipt) => {
159 delivery_results.push((recipient.clone(), DeliveryStatus::Queued));
160 }
161 Err(e) => {
162 warn!("Failed to send to {}: {}", recipient, e);
163 delivery_results
164 .push((recipient.clone(), DeliveryStatus::Failed(e.to_string())));
165 }
166 }
167 }
168
169 let receipt = DeliveryReceipt {
171 message_id: message.id,
172 timestamp: Utc::now(),
173 delivery_status: delivery_results,
174 };
175
176 info!(
177 "Sent message {} to {} recipients",
178 message.id,
179 recipients.len()
180 );
181
182 Ok((message.id, receipt))
183 }
184
185 pub async fn send_message_to_channel(
187 &self,
188 channel_id: ChannelId,
189 content: MessageContent,
190 options: SendOptions,
191 ) -> Result<(MessageId, DeliveryReceipt)> {
192 let recipients = channel_recipients(&channel_id).await?;
194
195 if recipients.is_empty() {
196 return Err(anyhow::anyhow!(
197 "No recipients found for channel {}",
198 channel_id
199 ));
200 }
201
202 self.send_message(recipients, content, channel_id, options)
204 .await
205 }
206
207 pub async fn subscribe_messages(
209 &self,
210 channel_filter: Option<ChannelId>,
211 ) -> broadcast::Receiver<ReceivedMessage> {
212 let rx = self.event_tx.subscribe();
213
214 let transport = self.transport.clone();
216 let event_tx = self.event_tx.clone();
217 let key_exchange = self.key_exchange.clone();
218 let store = self.store.clone();
219
220 tokio::spawn(async move {
221 let mut receiver = transport.receive_messages().await;
222
223 while let Ok(received) = receiver.recv().await {
224 if let Ok(decrypted) =
226 Self::decrypt_received_message(&received.message, &key_exchange).await
227 {
228 let _ = store.store_message(&decrypted).await;
230
231 if let Some(filter) = channel_filter
233 && decrypted.channel_id != filter
234 {
235 continue;
236 }
237
238 let _ = event_tx.send(ReceivedMessage {
240 message: received.message,
241 received_at: received.received_at,
242 });
243 }
244 }
245 });
246
247 rx
248 }
249
250 pub async fn get_message_status(&self, message_id: MessageId) -> Result<DeliveryStatus> {
252 if let Ok(_msg) = self.store.get_message(message_id).await {
257 let online = self.online_users.read().await;
259 if !online.is_empty() {
260 Ok(DeliveryStatus::Delivered(Utc::now()))
261 } else {
262 Ok(DeliveryStatus::Queued)
263 }
264 } else {
265 Ok(DeliveryStatus::Failed("Message not found".to_string()))
266 }
267 }
268
269 pub async fn get_message(&self, message_id: MessageId) -> Result<RichMessage> {
271 self.store.get_message(message_id).await
272 }
273
274 pub async fn mark_user_online(&self, user: FourWordAddress) -> Result<()> {
276 let mut online = self.online_users.write().await;
277 online.insert(user, Utc::now());
278 Ok(())
279 }
280
281 pub async fn mark_delivered(
283 &self,
284 message_id: MessageId,
285 recipient: FourWordAddress,
286 ) -> Result<()> {
287 if let Ok(mut msg) = self.store.get_message(message_id).await {
289 msg.delivered_to.insert(
290 crate::messaging::user_resolver::resolve_handle(&recipient),
291 Utc::now(),
292 );
293 self.store.update_message(&msg).await?;
294 }
295 Ok(())
296 }
297
298 pub async fn process_message_queue(&self) -> Result<()> {
300 self.transport.process_message_queue().await;
302 Ok(())
303 }
304
305 pub async fn encrypt_message(
307 &self,
308 recipient: FourWordAddress,
309 channel_id: ChannelId,
310 content: MessageContent,
311 ) -> Result<EncryptedMessage> {
312 let message = RichMessage::new(
313 UserHandle::from(self.identity.to_string()),
314 channel_id,
315 content,
316 );
317
318 let key = self
320 .key_exchange
321 .get_session_key(&recipient)
322 .await
323 .unwrap_or_else(|_| vec![0u8; 32]); self.encrypt_message_with_key(&message, &key).await
326 }
327
328 pub async fn decrypt_message(&self, encrypted: EncryptedMessage) -> Result<RichMessage> {
330 Self::decrypt_received_message(&encrypted, &self.key_exchange).await
331 }
332
333 async fn encrypt_message_with_key(
335 &self,
336 message: &RichMessage,
337 key: &[u8],
338 ) -> Result<EncryptedMessage> {
339 use saorsa_pqc::{ChaCha20Poly1305Cipher, SymmetricKey};
340
341 let plaintext = serde_json::to_vec(message)?;
342 let mut k = [0u8; 32];
343 if key.len() != 32 {
344 return Err(anyhow::anyhow!("Invalid session key length"));
345 }
346 k.copy_from_slice(&key[..32]);
347 let sk = SymmetricKey::from_bytes(k);
348 let cipher = ChaCha20Poly1305Cipher::new(&sk);
349 let (ciphertext, nonce) = cipher
350 .encrypt(&plaintext, None)
351 .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
352
353 Ok(EncryptedMessage {
354 id: message.id,
355 channel_id: message.channel_id,
356 sender: self.identity.clone(),
357 ciphertext,
358 nonce: nonce.to_vec(),
359 key_id: format!("key_{}", self.identity),
360 })
361 }
362
363 async fn decrypt_received_message(
365 encrypted: &EncryptedMessage,
366 key_exchange: &Arc<KeyExchange>,
367 ) -> Result<RichMessage> {
368 use saorsa_pqc::{ChaCha20Poly1305Cipher, SymmetricKey};
369
370 let key = key_exchange
372 .get_session_key(&encrypted.sender)
373 .await
374 .map_err(|e| anyhow::anyhow!("No session key for {}: {}", encrypted.sender, e))?;
375 if key.len() != 32 {
376 return Err(anyhow::anyhow!("Invalid session key length"));
377 }
378 let mut k = [0u8; 32];
379 k.copy_from_slice(&key[..32]);
380 let sk = SymmetricKey::from_bytes(k);
381 let cipher = ChaCha20Poly1305Cipher::new(&sk);
382 if encrypted.nonce.len() != 12 {
384 return Err(anyhow::anyhow!(
385 "Invalid nonce length: expected 12, got {}",
386 encrypted.nonce.len()
387 ));
388 }
389 let mut nonce_array = [0u8; 12];
390 nonce_array.copy_from_slice(&encrypted.nonce);
391
392 let plaintext = cipher
393 .decrypt(&encrypted.ciphertext, &nonce_array, None)
394 .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
395
396 let message: RichMessage = serde_json::from_slice(&plaintext)?;
398
399 Ok(message)
400 }
401
402 #[cfg(test)]
404 pub fn create_test_message(
405 &self,
406 sender: UserHandle,
407 channel_id: ChannelId,
408 content: MessageContent,
409 ) -> RichMessage {
410 RichMessage::new(sender, channel_id, content)
411 }
412
413 #[cfg(test)]
414 pub async fn inject_test_message(&self, message: RichMessage) -> Result<()> {
415 self.store.store_message(&message).await?;
416
417 let encrypted = EncryptedMessage {
419 id: message.id,
420 channel_id: message.channel_id,
421 sender: self.identity.clone(),
422 ciphertext: vec![],
423 nonce: vec![],
424 key_id: "test".to_string(),
425 };
426
427 let _ = self.event_tx.send(ReceivedMessage {
428 message: encrypted,
429 received_at: Utc::now(),
430 });
431
432 Ok(())
433 }
434}
435
436