saorsa_core/messaging/
service.rs

1// High-level messaging service API
2use super::transport::{DeliveryReceipt, DeliveryStatus, ReceivedMessage};
3use super::types::*;
4use super::{DhtClient, KeyExchange, MessageStore, MessageTransport};
5use crate::identity::FourWordAddress;
6use crate::messaging::user_handle::UserHandle;
7use anyhow::Result;
8use chrono::{Duration, Utc};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, broadcast};
12use tracing::{info, warn};
13
14/// Resolve channel members to their FourWordAddress recipients
15/// This maps channel member user_ids to their FourWordAddress representation
16pub async fn channel_recipients(_channel_id: &ChannelId) -> Result<Vec<FourWordAddress>> {
17    // Load channel metadata from storage/database
18    // Note: This is a placeholder implementation that should be connected to
19    // the actual channel storage system (e.g., ChatManager)
20
21    // For now, we'll return an empty list. In production, this would:
22    // 1. Load channel from storage
23    // 2. Get member list
24    // 3. Map each member's user_id to their FourWordAddress
25    // 4. Return the list of addresses
26
27    // Example production implementation:
28    // let channel = chat_manager.get_channel(channel_id).await?;
29    // let mut recipients = Vec::new();
30    // for member_id in channel.members {
31    //     if let Ok(addr) = FourWordAddress::from_user_id(&member_id) {
32    //         recipients.push(addr);
33    //     }
34    // }
35    // Ok(recipients)
36
37    // TODO: Integrate with actual channel storage
38    Ok(Vec::new())
39}
40
41/// High-level messaging service that coordinates all messaging components
42pub struct MessagingService {
43    /// Local user identity
44    identity: FourWordAddress,
45    /// Message store for persistence
46    store: MessageStore,
47    /// Transport layer for network communication
48    transport: Arc<MessageTransport>,
49    /// Key exchange for E2E encryption
50    key_exchange: Arc<KeyExchange>,
51    /// DHT client for distributed storage
52    _dht_client: DhtClient,
53    /// Message event broadcaster
54    event_tx: broadcast::Sender<ReceivedMessage>,
55    /// Online users tracking
56    online_users: Arc<RwLock<HashMap<FourWordAddress, chrono::DateTime<Utc>>>>,
57}
58
59/// Options for sending messages
60#[derive(Debug, Clone, Default)]
61pub struct SendOptions {
62    pub ephemeral: bool,
63    pub expiry_seconds: Option<u64>,
64    pub reply_to: Option<MessageId>,
65    pub thread_id: Option<ThreadId>,
66    pub attachments: Vec<Attachment>,
67}
68
69impl MessagingService {
70    /// Create a new messaging service
71    pub async fn new(identity: FourWordAddress, dht_client: DhtClient) -> Result<Self> {
72        // Initialize components
73        let store = MessageStore::new(dht_client.clone(), None).await?;
74
75        // Create mock network for testing
76        #[cfg(test)]
77        let network = Arc::new(crate::network::P2PNode::new_for_tests()?);
78
79        #[cfg(not(test))]
80        let network = {
81            // Use a real P2P node with defaults for production wiring
82            let config = crate::network::NodeConfig::new()?;
83            let node = crate::network::P2PNode::new(config).await?;
84            Arc::new(node)
85        };
86        let transport = Arc::new(MessageTransport::new(network, dht_client.clone()).await?);
87        let key_exchange = Arc::new(KeyExchange::new(identity.clone(), dht_client.clone()).await?);
88
89        let (event_tx, _) = broadcast::channel(1000);
90
91        Ok(Self {
92            identity,
93            store,
94            transport,
95            key_exchange,
96            _dht_client: dht_client,
97            event_tx,
98            online_users: Arc::new(RwLock::new(HashMap::new())),
99        })
100    }
101
102    /// Send a message to recipients
103    pub async fn send_message(
104        &self,
105        recipients: Vec<FourWordAddress>,
106        content: MessageContent,
107        channel_id: ChannelId,
108        options: SendOptions,
109    ) -> Result<(MessageId, DeliveryReceipt)> {
110        // Create rich message
111        let mut message = RichMessage::new(
112            UserHandle::from(self.identity.to_string()),
113            channel_id,
114            content,
115        );
116
117        // Apply options
118        message.ephemeral = options.ephemeral;
119        if let Some(seconds) = options.expiry_seconds {
120            message.expires_at = Some(Utc::now() + Duration::seconds(seconds as i64));
121        }
122        message.reply_to = options.reply_to;
123        message.thread_id = options.thread_id;
124        message.attachments = options.attachments;
125
126        // Store locally first
127        self.store.store_message(&message).await?;
128
129        // Encrypt for each recipient
130        let mut delivery_results = Vec::new();
131
132        for recipient in &recipients {
133            // Get or establish encryption key
134            let encryption_key = match self.key_exchange.get_session_key(recipient).await {
135                Ok(key) => key,
136                Err(e) => {
137                    // Attempt to initiate PQC key exchange; if unavailable, error out
138                    let _ = self.key_exchange.initiate_exchange(recipient.clone()).await;
139                    return Err(anyhow::anyhow!(
140                        "No session key established for {}: {}",
141                        recipient,
142                        e
143                    ));
144                }
145            };
146
147            // Encrypt message
148            let encrypted = self
149                .encrypt_message_with_key(&message, &encryption_key)
150                .await?;
151
152            // Send via transport
153            match self
154                .transport
155                .send_message(&encrypted, vec![recipient.clone()])
156                .await
157            {
158                Ok(_receipt) => {
159                    delivery_results.push((recipient.clone(), DeliveryStatus::Queued));
160                }
161                Err(e) => {
162                    warn!("Failed to send to {}: {}", recipient, e);
163                    delivery_results
164                        .push((recipient.clone(), DeliveryStatus::Failed(e.to_string())));
165                }
166            }
167        }
168
169        // Create delivery receipt
170        let receipt = DeliveryReceipt {
171            message_id: message.id,
172            timestamp: Utc::now(),
173            delivery_status: delivery_results,
174        };
175
176        info!(
177            "Sent message {} to {} recipients",
178            message.id,
179            recipients.len()
180        );
181
182        Ok((message.id, receipt))
183    }
184
185    /// Send a message to a channel
186    pub async fn send_message_to_channel(
187        &self,
188        channel_id: ChannelId,
189        content: MessageContent,
190        options: SendOptions,
191    ) -> Result<(MessageId, DeliveryReceipt)> {
192        // Resolve recipients from channel members
193        let recipients = channel_recipients(&channel_id).await?;
194
195        if recipients.is_empty() {
196            return Err(anyhow::anyhow!(
197                "No recipients found for channel {}",
198                channel_id
199            ));
200        }
201
202        // Call existing send_message with resolved recipients
203        self.send_message(recipients, content, channel_id, options)
204            .await
205    }
206
207    /// Subscribe to incoming messages
208    pub async fn subscribe_messages(
209        &self,
210        channel_filter: Option<ChannelId>,
211    ) -> broadcast::Receiver<ReceivedMessage> {
212        let rx = self.event_tx.subscribe();
213
214        // Start message receiver if not already running
215        let transport = self.transport.clone();
216        let event_tx = self.event_tx.clone();
217        let key_exchange = self.key_exchange.clone();
218        let store = self.store.clone();
219
220        tokio::spawn(async move {
221            let mut receiver = transport.receive_messages().await;
222
223            while let Ok(received) = receiver.recv().await {
224                // Decrypt message
225                if let Ok(decrypted) =
226                    Self::decrypt_received_message(&received.message, &key_exchange).await
227                {
228                    // Store in database
229                    let _ = store.store_message(&decrypted).await;
230
231                    // Apply channel filter if specified
232                    if let Some(filter) = channel_filter
233                        && decrypted.channel_id != filter
234                    {
235                        continue;
236                    }
237
238                    // Broadcast to subscribers
239                    let _ = event_tx.send(ReceivedMessage {
240                        message: received.message,
241                        received_at: received.received_at,
242                    });
243                }
244            }
245        });
246
247        rx
248    }
249
250    /// Get message delivery status
251    pub async fn get_message_status(&self, message_id: MessageId) -> Result<DeliveryStatus> {
252        // Check local confirmations first
253        // In production, this would query the transport layer's confirmation tracking
254
255        // For now, check if message exists in store
256        if let Ok(_msg) = self.store.get_message(message_id).await {
257            // Check if delivered (simplified logic)
258            let online = self.online_users.read().await;
259            if !online.is_empty() {
260                Ok(DeliveryStatus::Delivered(Utc::now()))
261            } else {
262                Ok(DeliveryStatus::Queued)
263            }
264        } else {
265            Ok(DeliveryStatus::Failed("Message not found".to_string()))
266        }
267    }
268
269    /// Retrieve a message by ID
270    pub async fn get_message(&self, message_id: MessageId) -> Result<RichMessage> {
271        self.store.get_message(message_id).await
272    }
273
274    /// Mark a user as online
275    pub async fn mark_user_online(&self, user: FourWordAddress) -> Result<()> {
276        let mut online = self.online_users.write().await;
277        online.insert(user, Utc::now());
278        Ok(())
279    }
280
281    /// Mark message as delivered
282    pub async fn mark_delivered(
283        &self,
284        message_id: MessageId,
285        recipient: FourWordAddress,
286    ) -> Result<()> {
287        // Update delivery status in store
288        if let Ok(mut msg) = self.store.get_message(message_id).await {
289            msg.delivered_to.insert(
290                crate::messaging::user_resolver::resolve_handle(&recipient),
291                Utc::now(),
292            );
293            self.store.update_message(&msg).await?;
294        }
295        Ok(())
296    }
297
298    /// Process queued messages
299    pub async fn process_message_queue(&self) -> Result<()> {
300        // Trigger transport layer queue processing
301        self.transport.process_message_queue().await;
302        Ok(())
303    }
304
305    /// Encrypt a message for a recipient
306    pub async fn encrypt_message(
307        &self,
308        recipient: FourWordAddress,
309        channel_id: ChannelId,
310        content: MessageContent,
311    ) -> Result<EncryptedMessage> {
312        let message = RichMessage::new(
313            UserHandle::from(self.identity.to_string()),
314            channel_id,
315            content,
316        );
317
318        // Get encryption key
319        let key = self
320            .key_exchange
321            .get_session_key(&recipient)
322            .await
323            .unwrap_or_else(|_| vec![0u8; 32]); // Placeholder
324
325        self.encrypt_message_with_key(&message, &key).await
326    }
327
328    /// Decrypt a message
329    pub async fn decrypt_message(&self, encrypted: EncryptedMessage) -> Result<RichMessage> {
330        Self::decrypt_received_message(&encrypted, &self.key_exchange).await
331    }
332
333    // Helper: Encrypt message with key
334    async fn encrypt_message_with_key(
335        &self,
336        message: &RichMessage,
337        key: &[u8],
338    ) -> Result<EncryptedMessage> {
339        use saorsa_pqc::{ChaCha20Poly1305Cipher, SymmetricKey};
340
341        let plaintext = serde_json::to_vec(message)?;
342        let mut k = [0u8; 32];
343        if key.len() != 32 {
344            return Err(anyhow::anyhow!("Invalid session key length"));
345        }
346        k.copy_from_slice(&key[..32]);
347        let sk = SymmetricKey::from_bytes(k);
348        let cipher = ChaCha20Poly1305Cipher::new(&sk);
349        let (ciphertext, nonce) = cipher
350            .encrypt(&plaintext, None)
351            .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
352
353        Ok(EncryptedMessage {
354            id: message.id,
355            channel_id: message.channel_id,
356            sender: self.identity.clone(),
357            ciphertext,
358            nonce: nonce.to_vec(),
359            key_id: format!("key_{}", self.identity),
360        })
361    }
362
363    // Helper: Decrypt received message
364    async fn decrypt_received_message(
365        encrypted: &EncryptedMessage,
366        key_exchange: &Arc<KeyExchange>,
367    ) -> Result<RichMessage> {
368        use saorsa_pqc::{ChaCha20Poly1305Cipher, SymmetricKey};
369
370        // Get decryption key
371        let key = key_exchange
372            .get_session_key(&encrypted.sender)
373            .await
374            .map_err(|e| anyhow::anyhow!("No session key for {}: {}", encrypted.sender, e))?;
375        if key.len() != 32 {
376            return Err(anyhow::anyhow!("Invalid session key length"));
377        }
378        let mut k = [0u8; 32];
379        k.copy_from_slice(&key[..32]);
380        let sk = SymmetricKey::from_bytes(k);
381        let cipher = ChaCha20Poly1305Cipher::new(&sk);
382        // Convert Vec<u8> nonce back to [u8; 12] array
383        if encrypted.nonce.len() != 12 {
384            return Err(anyhow::anyhow!(
385                "Invalid nonce length: expected 12, got {}",
386                encrypted.nonce.len()
387            ));
388        }
389        let mut nonce_array = [0u8; 12];
390        nonce_array.copy_from_slice(&encrypted.nonce);
391
392        let plaintext = cipher
393            .decrypt(&encrypted.ciphertext, &nonce_array, None)
394            .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
395
396        // Deserialize
397        let message: RichMessage = serde_json::from_slice(&plaintext)?;
398
399        Ok(message)
400    }
401
402    // Test helpers
403    #[cfg(test)]
404    pub fn create_test_message(
405        &self,
406        sender: UserHandle,
407        channel_id: ChannelId,
408        content: MessageContent,
409    ) -> RichMessage {
410        RichMessage::new(sender, channel_id, content)
411    }
412
413    #[cfg(test)]
414    pub async fn inject_test_message(&self, message: RichMessage) -> Result<()> {
415        self.store.store_message(&message).await?;
416
417        // Create encrypted version for event
418        let encrypted = EncryptedMessage {
419            id: message.id,
420            channel_id: message.channel_id,
421            sender: self.identity.clone(),
422            ciphertext: vec![],
423            nonce: vec![],
424            key_id: "test".to_string(),
425        };
426
427        let _ = self.event_tx.send(ReceivedMessage {
428            message: encrypted,
429            received_at: Utc::now(),
430        });
431
432        Ok(())
433    }
434}
435
436// Use mock implementations from mocks module
437// These are now properly implemented in mocks.rs