1use super::transport::{DeliveryReceipt, DeliveryStatus, ReceivedMessage};
3use super::types::*;
4use super::{DhtClient, KeyExchange, MessageStore, MessageTransport};
5use crate::identity::FourWordAddress;
6use crate::messaging::user_handle::UserHandle;
7use anyhow::{Context, Result};
8use chrono::{Duration, Utc};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, broadcast};
12use tracing::{debug, info, warn};
13
14pub struct MessagingService {
16 identity: FourWordAddress,
18 store: MessageStore,
20 transport: Arc<MessageTransport>,
22 key_exchange: Arc<KeyExchange>,
24 _dht_client: DhtClient,
26 event_tx: broadcast::Sender<ReceivedMessage>,
28 online_users: Arc<RwLock<HashMap<FourWordAddress, chrono::DateTime<Utc>>>>,
30}
31
32#[derive(Debug, Clone, Default)]
34pub struct SendOptions {
35 pub ephemeral: bool,
36 pub expiry_seconds: Option<u64>,
37 pub reply_to: Option<MessageId>,
38 pub thread_id: Option<ThreadId>,
39 pub attachments: Vec<Attachment>,
40}
41
42impl MessagingService {
43 pub async fn new(identity: FourWordAddress, dht_client: DhtClient) -> Result<Self> {
45 let store = MessageStore::new(dht_client.clone(), None).await?;
47
48 #[cfg(test)]
50 let network = Arc::new(crate::network::P2PNode::new_mock());
51
52 #[cfg(not(test))]
53 let network = Arc::new(crate::network::P2PNode::new_mock()); let transport = Arc::new(MessageTransport::new(network, dht_client.clone()).await?);
55 let key_exchange = Arc::new(KeyExchange::new(identity.clone())?);
56
57 let (event_tx, _) = broadcast::channel(1000);
58
59 Ok(Self {
60 identity,
61 store,
62 transport,
63 key_exchange,
64 _dht_client: dht_client,
65 event_tx,
66 online_users: Arc::new(RwLock::new(HashMap::new())),
67 })
68 }
69
70 pub async fn send_message(
72 &self,
73 recipients: Vec<FourWordAddress>,
74 content: MessageContent,
75 channel_id: ChannelId,
76 options: SendOptions,
77 ) -> Result<(MessageId, DeliveryReceipt)> {
78 let mut message = RichMessage::new(UserHandle::from(self.identity.to_string()), channel_id, content);
80
81 message.ephemeral = options.ephemeral;
83 if let Some(seconds) = options.expiry_seconds {
84 message.expires_at = Some(Utc::now() + Duration::seconds(seconds as i64));
85 }
86 message.reply_to = options.reply_to;
87 message.thread_id = options.thread_id;
88 message.attachments = options.attachments;
89
90 self.store.store_message(&message).await?;
92
93 let mut delivery_results = Vec::new();
95
96 for recipient in &recipients {
97 let encryption_key = match self.key_exchange.get_session_key(recipient).await {
99 Ok(key) => key,
100 Err(_) => {
101 let _kex_msg = self
103 .key_exchange
104 .initiate_exchange(recipient.clone())
105 .await?;
106 debug!("Initiated key exchange with {}", recipient);
108
109 vec![0u8; 32]
111 }
112 };
113
114 let encrypted = self
116 .encrypt_message_with_key(&message, &encryption_key)
117 .await?;
118
119 match self
121 .transport
122 .send_message(&encrypted, vec![recipient.clone()])
123 .await
124 {
125 Ok(_receipt) => {
126 delivery_results.push((recipient.clone(), DeliveryStatus::Queued));
127 }
128 Err(e) => {
129 warn!("Failed to send to {}: {}", recipient, e);
130 delivery_results
131 .push((recipient.clone(), DeliveryStatus::Failed(e.to_string())));
132 }
133 }
134 }
135
136 let receipt = DeliveryReceipt {
138 message_id: message.id,
139 timestamp: Utc::now(),
140 delivery_status: delivery_results,
141 };
142
143 info!(
144 "Sent message {} to {} recipients",
145 message.id,
146 recipients.len()
147 );
148
149 Ok((message.id, receipt))
150 }
151
152 pub async fn subscribe_messages(
154 &self,
155 channel_filter: Option<ChannelId>,
156 ) -> broadcast::Receiver<ReceivedMessage> {
157 let rx = self.event_tx.subscribe();
158
159 let transport = self.transport.clone();
161 let event_tx = self.event_tx.clone();
162 let key_exchange = self.key_exchange.clone();
163 let store = self.store.clone();
164
165 tokio::spawn(async move {
166 let mut receiver = transport.receive_messages().await;
167
168 while let Ok(received) = receiver.recv().await {
169 if let Ok(decrypted) =
171 Self::decrypt_received_message(&received.message, &key_exchange).await
172 {
173 let _ = store.store_message(&decrypted).await;
175
176 if let Some(filter) = channel_filter
178 && decrypted.channel_id != filter
179 {
180 continue;
181 }
182
183 let _ = event_tx.send(ReceivedMessage {
185 message: received.message,
186 received_at: received.received_at,
187 });
188 }
189 }
190 });
191
192 rx
193 }
194
195 pub async fn get_message_status(&self, message_id: MessageId) -> Result<DeliveryStatus> {
197 if let Ok(_msg) = self.store.get_message(message_id).await {
202 let online = self.online_users.read().await;
204 if !online.is_empty() {
205 Ok(DeliveryStatus::Delivered(Utc::now()))
206 } else {
207 Ok(DeliveryStatus::Queued)
208 }
209 } else {
210 Ok(DeliveryStatus::Failed("Message not found".to_string()))
211 }
212 }
213
214 pub async fn get_message(&self, message_id: MessageId) -> Result<RichMessage> {
216 self.store.get_message(message_id).await
217 }
218
219 pub async fn mark_user_online(&self, user: FourWordAddress) -> Result<()> {
221 let mut online = self.online_users.write().await;
222 online.insert(user, Utc::now());
223 Ok(())
224 }
225
226 pub async fn mark_delivered(
228 &self,
229 message_id: MessageId,
230 recipient: FourWordAddress,
231 ) -> Result<()> {
232 if let Ok(mut msg) = self.store.get_message(message_id).await {
234 msg.delivered_to.insert(
235 crate::messaging::user_resolver::resolve_handle(&recipient),
236 Utc::now(),
237 );
238 self.store.update_message(&msg).await?;
239 }
240 Ok(())
241 }
242
243 pub async fn process_message_queue(&self) -> Result<()> {
245 self.transport.process_message_queue().await;
247 Ok(())
248 }
249
250 pub async fn encrypt_message(
252 &self,
253 recipient: FourWordAddress,
254 channel_id: ChannelId,
255 content: MessageContent,
256 ) -> Result<EncryptedMessage> {
257 let message = RichMessage::new(UserHandle::from(self.identity.to_string()), channel_id, content);
258
259 let key = self
261 .key_exchange
262 .get_session_key(&recipient)
263 .await
264 .unwrap_or_else(|_| vec![0u8; 32]); self.encrypt_message_with_key(&message, &key).await
267 }
268
269 pub async fn decrypt_message(&self, encrypted: EncryptedMessage) -> Result<RichMessage> {
271 Self::decrypt_received_message(&encrypted, &self.key_exchange).await
272 }
273
274 async fn encrypt_message_with_key(
276 &self,
277 message: &RichMessage,
278 key: &[u8],
279 ) -> Result<EncryptedMessage> {
280 use chacha20poly1305::{
281 ChaCha20Poly1305, Nonce,
282 aead::{Aead, KeyInit, OsRng},
283 };
284 use rand::RngCore;
285
286 let plaintext = serde_json::to_vec(message)?;
288
289 let mut nonce_bytes = [0u8; 12];
291 OsRng.fill_bytes(&mut nonce_bytes);
292 let nonce = Nonce::from_slice(&nonce_bytes);
293
294 let cipher = ChaCha20Poly1305::new_from_slice(key).context("Invalid key length")?;
296 let ciphertext = cipher
297 .encrypt(nonce, plaintext.as_ref())
298 .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
299
300 Ok(EncryptedMessage {
301 id: message.id,
302 channel_id: message.channel_id,
303 sender: self.identity.clone(),
304 ciphertext,
305 nonce: nonce_bytes.to_vec(),
306 key_id: format!("key_{}", self.identity),
307 })
308 }
309
310 async fn decrypt_received_message(
312 encrypted: &EncryptedMessage,
313 key_exchange: &Arc<KeyExchange>,
314 ) -> Result<RichMessage> {
315 use chacha20poly1305::{
316 ChaCha20Poly1305, Nonce,
317 aead::{Aead, KeyInit},
318 };
319
320 let key = key_exchange
322 .get_session_key(&encrypted.sender)
323 .await
324 .unwrap_or_else(|_| vec![0u8; 32]); let cipher = ChaCha20Poly1305::new_from_slice(&key).context("Invalid key length")?;
328 let nonce = Nonce::from_slice(&encrypted.nonce);
329
330 let plaintext = cipher
331 .decrypt(nonce, encrypted.ciphertext.as_ref())
332 .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
333
334 let message: RichMessage = serde_json::from_slice(&plaintext)?;
336
337 Ok(message)
338 }
339
340 #[cfg(test)]
342 pub fn create_test_message(
343 &self,
344 sender: UserHandle,
345 channel_id: ChannelId,
346 content: MessageContent,
347 ) -> RichMessage {
348 RichMessage::new(sender, channel_id, content)
349 }
350
351 #[cfg(test)]
352 pub async fn inject_test_message(&self, message: RichMessage) -> Result<()> {
353 self.store.store_message(&message).await?;
354
355 let encrypted = EncryptedMessage {
357 id: message.id,
358 channel_id: message.channel_id,
359 sender: self.identity.clone(),
360 ciphertext: vec![],
361 nonce: vec![],
362 key_id: "test".to_string(),
363 };
364
365 let _ = self.event_tx.send(ReceivedMessage {
366 message: encrypted,
367 received_at: Utc::now(),
368 });
369
370 Ok(())
371 }
372}
373
374