saorsa_core/messaging/
service.rs

1// High-level messaging service API
2use super::transport::{DeliveryReceipt, DeliveryStatus, ReceivedMessage};
3use super::types::*;
4use super::{DhtClient, KeyExchange, MessageStore, MessageTransport};
5use crate::identity::FourWordAddress;
6use crate::messaging::user_handle::UserHandle;
7use anyhow::{Context, Result};
8use chrono::{Duration, Utc};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, broadcast};
12use tracing::{debug, info, warn};
13
14/// High-level messaging service that coordinates all messaging components
15pub struct MessagingService {
16    /// Local user identity
17    identity: FourWordAddress,
18    /// Message store for persistence
19    store: MessageStore,
20    /// Transport layer for network communication
21    transport: Arc<MessageTransport>,
22    /// Key exchange for E2E encryption
23    key_exchange: Arc<KeyExchange>,
24    /// DHT client for distributed storage
25    _dht_client: DhtClient,
26    /// Message event broadcaster
27    event_tx: broadcast::Sender<ReceivedMessage>,
28    /// Online users tracking
29    online_users: Arc<RwLock<HashMap<FourWordAddress, chrono::DateTime<Utc>>>>,
30}
31
32/// Options for sending messages
33#[derive(Debug, Clone, Default)]
34pub struct SendOptions {
35    pub ephemeral: bool,
36    pub expiry_seconds: Option<u64>,
37    pub reply_to: Option<MessageId>,
38    pub thread_id: Option<ThreadId>,
39    pub attachments: Vec<Attachment>,
40}
41
42impl MessagingService {
43    /// Create a new messaging service
44    pub async fn new(identity: FourWordAddress, dht_client: DhtClient) -> Result<Self> {
45        // Initialize components
46        let store = MessageStore::new(dht_client.clone(), None).await?;
47
48        // Create mock network for testing
49        #[cfg(test)]
50        let network = Arc::new(crate::network::P2PNode::new_mock());
51
52        #[cfg(not(test))]
53        let network = Arc::new(crate::network::P2PNode::new_mock()); // Mock for now
54        let transport = Arc::new(MessageTransport::new(network, dht_client.clone()).await?);
55        let key_exchange = Arc::new(KeyExchange::new(identity.clone())?);
56
57        let (event_tx, _) = broadcast::channel(1000);
58
59        Ok(Self {
60            identity,
61            store,
62            transport,
63            key_exchange,
64            _dht_client: dht_client,
65            event_tx,
66            online_users: Arc::new(RwLock::new(HashMap::new())),
67        })
68    }
69
70    /// Send a message to recipients
71    pub async fn send_message(
72        &self,
73        recipients: Vec<FourWordAddress>,
74        content: MessageContent,
75        channel_id: ChannelId,
76        options: SendOptions,
77    ) -> Result<(MessageId, DeliveryReceipt)> {
78        // Create rich message
79        let mut message = RichMessage::new(UserHandle::from(self.identity.to_string()), channel_id, content);
80
81        // Apply options
82        message.ephemeral = options.ephemeral;
83        if let Some(seconds) = options.expiry_seconds {
84            message.expires_at = Some(Utc::now() + Duration::seconds(seconds as i64));
85        }
86        message.reply_to = options.reply_to;
87        message.thread_id = options.thread_id;
88        message.attachments = options.attachments;
89
90        // Store locally first
91        self.store.store_message(&message).await?;
92
93        // Encrypt for each recipient
94        let mut delivery_results = Vec::new();
95
96        for recipient in &recipients {
97            // Get or establish encryption key
98            let encryption_key = match self.key_exchange.get_session_key(recipient).await {
99                Ok(key) => key,
100                Err(_) => {
101                    // Initiate key exchange if no session exists
102                    let _kex_msg = self
103                        .key_exchange
104                        .initiate_exchange(recipient.clone())
105                        .await?;
106                    // In production, send kex_msg via transport
107                    debug!("Initiated key exchange with {}", recipient);
108
109                    // For now, use a placeholder key
110                    vec![0u8; 32]
111                }
112            };
113
114            // Encrypt message
115            let encrypted = self
116                .encrypt_message_with_key(&message, &encryption_key)
117                .await?;
118
119            // Send via transport
120            match self
121                .transport
122                .send_message(&encrypted, vec![recipient.clone()])
123                .await
124            {
125                Ok(_receipt) => {
126                    delivery_results.push((recipient.clone(), DeliveryStatus::Queued));
127                }
128                Err(e) => {
129                    warn!("Failed to send to {}: {}", recipient, e);
130                    delivery_results
131                        .push((recipient.clone(), DeliveryStatus::Failed(e.to_string())));
132                }
133            }
134        }
135
136        // Create delivery receipt
137        let receipt = DeliveryReceipt {
138            message_id: message.id,
139            timestamp: Utc::now(),
140            delivery_status: delivery_results,
141        };
142
143        info!(
144            "Sent message {} to {} recipients",
145            message.id,
146            recipients.len()
147        );
148
149        Ok((message.id, receipt))
150    }
151
152    /// Subscribe to incoming messages
153    pub async fn subscribe_messages(
154        &self,
155        channel_filter: Option<ChannelId>,
156    ) -> broadcast::Receiver<ReceivedMessage> {
157        let rx = self.event_tx.subscribe();
158
159        // Start message receiver if not already running
160        let transport = self.transport.clone();
161        let event_tx = self.event_tx.clone();
162        let key_exchange = self.key_exchange.clone();
163        let store = self.store.clone();
164
165        tokio::spawn(async move {
166            let mut receiver = transport.receive_messages().await;
167
168            while let Ok(received) = receiver.recv().await {
169                // Decrypt message
170                if let Ok(decrypted) =
171                    Self::decrypt_received_message(&received.message, &key_exchange).await
172                {
173                    // Store in database
174                    let _ = store.store_message(&decrypted).await;
175
176                    // Apply channel filter if specified
177                    if let Some(filter) = channel_filter
178                        && decrypted.channel_id != filter
179                    {
180                        continue;
181                    }
182
183                    // Broadcast to subscribers
184                    let _ = event_tx.send(ReceivedMessage {
185                        message: received.message,
186                        received_at: received.received_at,
187                    });
188                }
189            }
190        });
191
192        rx
193    }
194
195    /// Get message delivery status
196    pub async fn get_message_status(&self, message_id: MessageId) -> Result<DeliveryStatus> {
197        // Check local confirmations first
198        // In production, this would query the transport layer's confirmation tracking
199
200        // For now, check if message exists in store
201        if let Ok(_msg) = self.store.get_message(message_id).await {
202            // Check if delivered (simplified logic)
203            let online = self.online_users.read().await;
204            if !online.is_empty() {
205                Ok(DeliveryStatus::Delivered(Utc::now()))
206            } else {
207                Ok(DeliveryStatus::Queued)
208            }
209        } else {
210            Ok(DeliveryStatus::Failed("Message not found".to_string()))
211        }
212    }
213
214    /// Retrieve a message by ID
215    pub async fn get_message(&self, message_id: MessageId) -> Result<RichMessage> {
216        self.store.get_message(message_id).await
217    }
218
219    /// Mark a user as online
220    pub async fn mark_user_online(&self, user: FourWordAddress) -> Result<()> {
221        let mut online = self.online_users.write().await;
222        online.insert(user, Utc::now());
223        Ok(())
224    }
225
226    /// Mark message as delivered
227    pub async fn mark_delivered(
228        &self,
229        message_id: MessageId,
230        recipient: FourWordAddress,
231    ) -> Result<()> {
232        // Update delivery status in store
233        if let Ok(mut msg) = self.store.get_message(message_id).await {
234            msg.delivered_to.insert(
235                crate::messaging::user_resolver::resolve_handle(&recipient),
236                Utc::now(),
237            );
238            self.store.update_message(&msg).await?;
239        }
240        Ok(())
241    }
242
243    /// Process queued messages
244    pub async fn process_message_queue(&self) -> Result<()> {
245        // Trigger transport layer queue processing
246        self.transport.process_message_queue().await;
247        Ok(())
248    }
249
250    /// Encrypt a message for a recipient
251    pub async fn encrypt_message(
252        &self,
253        recipient: FourWordAddress,
254        channel_id: ChannelId,
255        content: MessageContent,
256    ) -> Result<EncryptedMessage> {
257        let message = RichMessage::new(UserHandle::from(self.identity.to_string()), channel_id, content);
258
259        // Get encryption key
260        let key = self
261            .key_exchange
262            .get_session_key(&recipient)
263            .await
264            .unwrap_or_else(|_| vec![0u8; 32]); // Placeholder
265
266        self.encrypt_message_with_key(&message, &key).await
267    }
268
269    /// Decrypt a message
270    pub async fn decrypt_message(&self, encrypted: EncryptedMessage) -> Result<RichMessage> {
271        Self::decrypt_received_message(&encrypted, &self.key_exchange).await
272    }
273
274    // Helper: Encrypt message with key
275    async fn encrypt_message_with_key(
276        &self,
277        message: &RichMessage,
278        key: &[u8],
279    ) -> Result<EncryptedMessage> {
280        use chacha20poly1305::{
281            ChaCha20Poly1305, Nonce,
282            aead::{Aead, KeyInit, OsRng},
283        };
284        use rand::RngCore;
285
286        // Serialize message
287        let plaintext = serde_json::to_vec(message)?;
288
289        // Generate nonce
290        let mut nonce_bytes = [0u8; 12];
291        OsRng.fill_bytes(&mut nonce_bytes);
292        let nonce = Nonce::from_slice(&nonce_bytes);
293
294        // Encrypt
295        let cipher = ChaCha20Poly1305::new_from_slice(key).context("Invalid key length")?;
296        let ciphertext = cipher
297            .encrypt(nonce, plaintext.as_ref())
298            .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
299
300        Ok(EncryptedMessage {
301            id: message.id,
302            channel_id: message.channel_id,
303            sender: self.identity.clone(),
304            ciphertext,
305            nonce: nonce_bytes.to_vec(),
306            key_id: format!("key_{}", self.identity),
307        })
308    }
309
310    // Helper: Decrypt received message
311    async fn decrypt_received_message(
312        encrypted: &EncryptedMessage,
313        key_exchange: &Arc<KeyExchange>,
314    ) -> Result<RichMessage> {
315        use chacha20poly1305::{
316            ChaCha20Poly1305, Nonce,
317            aead::{Aead, KeyInit},
318        };
319
320        // Get decryption key
321        let key = key_exchange
322            .get_session_key(&encrypted.sender)
323            .await
324            .unwrap_or_else(|_| vec![0u8; 32]); // Placeholder
325
326        // Decrypt
327        let cipher = ChaCha20Poly1305::new_from_slice(&key).context("Invalid key length")?;
328        let nonce = Nonce::from_slice(&encrypted.nonce);
329
330        let plaintext = cipher
331            .decrypt(nonce, encrypted.ciphertext.as_ref())
332            .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
333
334        // Deserialize
335        let message: RichMessage = serde_json::from_slice(&plaintext)?;
336
337        Ok(message)
338    }
339
340    // Test helpers
341    #[cfg(test)]
342    pub fn create_test_message(
343        &self,
344        sender: UserHandle,
345        channel_id: ChannelId,
346        content: MessageContent,
347    ) -> RichMessage {
348        RichMessage::new(sender, channel_id, content)
349    }
350
351    #[cfg(test)]
352    pub async fn inject_test_message(&self, message: RichMessage) -> Result<()> {
353        self.store.store_message(&message).await?;
354
355        // Create encrypted version for event
356        let encrypted = EncryptedMessage {
357            id: message.id,
358            channel_id: message.channel_id,
359            sender: self.identity.clone(),
360            ciphertext: vec![],
361            nonce: vec![],
362            key_id: "test".to_string(),
363        };
364
365        let _ = self.event_tx.send(ReceivedMessage {
366            message: encrypted,
367            received_at: Utc::now(),
368        });
369
370        Ok(())
371    }
372}
373
374// Use mock implementations from mocks module
375// These are now properly implemented in mocks.rs