saorsa_core/messaging/
mod.rs

1// Rich Messaging Module for P2P Foundation
2// Implements WhatsApp/Slack-style messaging with full decentralization
3
4pub mod composer;
5pub mod database;
6pub mod encryption;
7pub mod key_exchange;
8pub mod media;
9#[cfg(any(test, feature = "mocks"))]
10pub mod mocks;
11pub mod quic_media_streams;
12pub mod reactions;
13pub mod search;
14pub mod service;
15pub mod sync;
16pub mod threads;
17pub mod transport;
18pub mod types;
19pub mod user_handle;
20pub mod user_resolver;
21pub mod webrtc;
22pub mod webrtc_quic_bridge;
23
24use user_handle::UserHandle;
25// Removed unused imports
26// use anyhow::Result;
27use serde::{Deserialize, Serialize};
28// use std::sync::Arc;
29// use chrono::{DateTime, Utc};
30// Removed unused imports: use tracing::{debug, warn};
31
32pub use composer::MessageComposer;
33pub use database::MessageStore;
34pub use encryption::SecureMessaging;
35pub use key_exchange::{KeyExchange, KeyExchangeMessage};
36pub use media::MediaProcessor;
37pub use quic_media_streams::{QosParameters, QuicMediaStreamManager, StreamStats};
38pub use reactions::ReactionManager;
39pub use search::MessageSearch;
40pub use service::{MessagingService, SendOptions};
41pub use sync::RealtimeSync;
42pub use threads::ThreadManager;
43pub use transport::{DeliveryReceipt, DeliveryStatus, MessageTransport, ReceivedMessage};
44pub use types::*;
45pub use webrtc::{CallEvent, CallManager, WebRtcEvent, WebRtcService};
46pub use webrtc_quic_bridge::{RtpPacket, StreamConfig, StreamType, WebRtcQuicBridge};
47
48// Import the real DHT client
49pub use crate::dht::client::DhtClient;
50
51/// Request to send a message
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SendMessageRequest {
54    pub channel_id: ChannelId,
55    pub content: MessageContent,
56    pub attachments: Vec<Vec<u8>>,
57    pub thread_id: Option<ThreadId>,
58    pub reply_to: Option<MessageId>,
59    pub mentions: Vec<UserHandle>,
60    pub ephemeral: bool,
61}
62
63// MessagingService is now defined in service.rs
64
65// Legacy implementation removed - see service.rs for the new implementation
66
67/*
68impl MessagingService {
69    /// Create a new messaging service with a real DHT client
70    pub async fn new(identity: FourWordAddress) -> Result<Self> {
71        // Create DHT client based on the user's identity
72        // Convert four-word address to a node ID
73        let node_id_bytes = blake3::hash(identity.to_string().as_bytes());
74        let node_id = crate::dht::core_engine::NodeId::from_key(
75            crate::dht::core_engine::DhtKey::from_bytes(*node_id_bytes.as_bytes())
76        );
77
78        // Create DHT client with the user's node ID
79        let dht_client = DhtClient::with_node_id(node_id)?;
80
81        // Initialize all components
82        let store = MessageStore::new(dht_client.clone()).await?;
83        let threads = ThreadManager::new(store.clone());
84        let reactions = ReactionManager::new(store.clone());
85        let media = MediaProcessor::new()?;
86        let search = MessageSearch::new(store.clone()).await?;
87        let encryption = SecureMessaging::new(identity.clone(), dht_client.clone()).await?;
88        let sync = RealtimeSync::new(dht_client.clone()).await?;
89
90        Ok(Self {
91            store,
92            threads,
93            reactions,
94            media,
95            search,
96            encryption,
97            sync,
98            transport: None, // Will be initialized when network is available
99            webrtc: None,    // Will be initialized when needed
100            identity,
101        })
102    }
103
104    /// Create a new messaging service with an existing DHT client
105    pub async fn with_dht_client(
106        identity: FourWordAddress,
107        dht_client: DhtClient,
108    ) -> Result<Self> {
109        let store = MessageStore::new(dht_client.clone()).await?;
110        let threads = ThreadManager::new(store.clone());
111        let reactions = ReactionManager::new(store.clone());
112        let media = MediaProcessor::new()?;
113        let search = MessageSearch::new(store.clone()).await?;
114        let encryption = SecureMessaging::new(identity.clone(), dht_client.clone()).await?;
115        let sync = RealtimeSync::new(dht_client).await?;
116
117        Ok(Self {
118            store,
119            threads,
120            reactions,
121            media,
122            search,
123            encryption,
124            sync,
125            transport: None, // Will be initialized when network is available
126            webrtc: None,    // Will be initialized when needed
127            identity,
128        })
129    }
130
131    /// Connect to network transport
132    pub async fn connect_transport(&mut self, network: Arc<crate::network::P2PNode>) -> Result<()> {
133        let transport = MessageTransport::new(network, self.store.dht_client.clone()).await?;
134
135        // Start background tasks
136        transport.monitor_network_quality().await;
137        transport.process_message_queue().await;
138
139        self.transport = Some(transport);
140        Ok(())
141    }
142
143    /// Initialize WebRTC service
144    pub async fn initialize_webrtc(&mut self) -> Result<()> {
145        // Create WebRTC service using the DHT client
146        let dht_engine = self.store.dht_client.core_engine();
147        let webrtc = WebRtcService::new(
148            self.identity.clone(),
149            dht_engine,
150        ).await?;
151
152        // Start the WebRTC service
153        webrtc.start().await?;
154
155        self.webrtc = Some(webrtc);
156        Ok(())
157    }
158
159    /// Initiate a voice/video call
160    pub async fn initiate_call(
161        &self,
162        callee: FourWordAddress,
163        constraints: webrtc::MediaConstraints,
164    ) -> Result<webrtc::CallId> {
165        if let Some(ref webrtc) = self.webrtc {
166            webrtc.initiate_call(callee, constraints).await
167        } else {
168            Err(anyhow::anyhow!("WebRTC service not initialized"))
169        }
170    }
171
172    /// Accept an incoming call
173    pub async fn accept_call(
174        &self,
175        call_id: webrtc::CallId,
176        constraints: webrtc::MediaConstraints,
177    ) -> Result<()> {
178        if let Some(ref webrtc) = self.webrtc {
179            webrtc.accept_call(call_id, constraints).await
180        } else {
181            Err(anyhow::anyhow!("WebRTC service not initialized"))
182        }
183    }
184
185    /// Reject an incoming call
186    pub async fn reject_call(&self, call_id: webrtc::CallId) -> Result<()> {
187        if let Some(ref webrtc) = self.webrtc {
188            webrtc.reject_call(call_id).await
189        } else {
190            Err(anyhow::anyhow!("WebRTC service not initialized"))
191        }
192    }
193
194    /// End an active call
195    pub async fn end_call(&self, call_id: webrtc::CallId) -> Result<()> {
196        if let Some(ref webrtc) = self.webrtc {
197            webrtc.end_call(call_id).await
198        } else {
199            Err(anyhow::anyhow!("WebRTC service not initialized"))
200        }
201    }
202
203    /// Get call state
204    pub async fn get_call_state(&self, call_id: webrtc::CallId) -> Option<webrtc::CallState> {
205        if let Some(ref webrtc) = self.webrtc {
206            webrtc.get_call_state(call_id).await
207        } else {
208            None
209        }
210    }
211
212    /// Subscribe to WebRTC events
213    pub fn subscribe_webrtc_events(&self) -> Option<tokio::sync::broadcast::Receiver<WebRtcEvent>> {
214        self.webrtc.as_ref().map(|w| w.subscribe_events())
215    }
216
217    /// Get WebRTC service reference
218    pub fn webrtc(&self) -> Option<&WebRtcService> {
219        self.webrtc.as_ref()
220    }
221
222    /// Send a new message
223    pub async fn send_message(&mut self, request: SendMessageRequest) -> Result<RichMessage> {
224        // Create message
225        let mut message = RichMessage::new(
226            self.identity.clone(),
227            request.channel_id,
228            request.content,
229        );
230
231        // Add attachments if any
232        for attachment in request.attachments {
233            let processed = self.media.process_attachment(attachment).await?;
234            message.attachments.push(processed);
235        }
236
237        // Handle threading
238        if let Some(thread_id) = request.thread_id {
239            message.thread_id = Some(thread_id);
240            self.threads.add_to_thread(thread_id, &message).await?;
241        }
242
243        // Handle reply
244        if let Some(reply_to) = request.reply_to {
245            message.reply_to = Some(reply_to);
246        }
247
248        // Encrypt message
249        let encrypted = self.encryption.encrypt_message(&message).await?;
250
251        // Store message (we store the original, not encrypted version locally)
252        self.store.store_message(&message).await?;
253
254        // Send via transport if available, otherwise use sync
255        if let Some(ref transport) = self.transport {
256            // Extract recipients from channel members
257            let recipients = self.get_channel_members(request.channel_id).await?;
258            let receipt = transport.send_message(&encrypted, recipients).await?;
259
260            // Log delivery status
261            for (recipient, status) in receipt.delivery_status {
262                match status {
263                    DeliveryStatus::Delivered(_) => {
264                        debug!("Message delivered to {}", recipient);
265                    }
266                    DeliveryStatus::Queued => {
267                        debug!("Message queued for {}", recipient);
268                    }
269                    DeliveryStatus::Failed(e) => {
270                        warn!("Message delivery failed for {}: {}", recipient, e);
271                    }
272                    _ => {}
273                }
274            }
275        } else {
276            // Fallback to broadcast sync
277            self.sync.broadcast_message(&encrypted).await?;
278        }
279
280        Ok(message)
281    }
282
283    /// Receive and process an incoming message
284    pub async fn receive_message(&mut self, encrypted: EncryptedMessage) -> Result<RichMessage> {
285        // Decrypt message
286        let message = self.encryption.decrypt_message(encrypted).await?;
287
288        // Verify signature
289        if !self.encryption.verify_message(&message) {
290            return Err(anyhow::anyhow!("Invalid message signature"));
291        }
292
293        // Store message
294        self.store.store_message(&message).await?;
295
296        // Update thread if applicable
297        if let Some(thread_id) = &message.thread_id {
298            self.threads.update_thread(*thread_id, &message).await?;
299        }
300
301        // Process mentions
302        if message.mentions.contains(&self.identity) {
303            self.handle_mention(&message).await?;
304        }
305
306        Ok(message)
307    }
308
309    /// Add a reaction to a message
310    pub async fn add_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
311        self.reactions.add_reaction(
312            message_id,
313            emoji.clone(),
314            crate::messaging::user_resolver::resolve_handle(&self.identity),
315        ).await?;
316
317        // Sync reaction
318        self.sync.broadcast_reaction(message_id, emoji, true).await?;
319
320        Ok(())
321    }
322
323    /// Remove a reaction from a message
324    pub async fn remove_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
325        self.reactions.remove_reaction(
326            message_id,
327            emoji.clone(),
328            crate::messaging::user_resolver::resolve_handle(&self.identity),
329        ).await?;
330
331        // Sync reaction removal
332        self.sync.broadcast_reaction(message_id, emoji, false).await?;
333
334        Ok(())
335    }
336
337    /// Edit a message
338    pub async fn edit_message(
339        &mut self,
340        message_id: MessageId,
341        new_content: MessageContent,
342    ) -> Result<()> {
343        // Get original message
344        let mut message = self.store.get_message(message_id).await?;
345
346        // Verify sender
347        if message.sender != self.identity {
348            return Err(anyhow::anyhow!("Cannot edit message from another user"));
349        }
350
351        // Update content
352        message.content = new_content.clone();
353        message.edited_at = Some(Utc::now());
354
355        // Re-encrypt and store
356        let _encrypted = self.encryption.encrypt_message(&message).await?;
357        self.store.update_message(&message).await?;
358
359        // Sync edit
360        self.sync.broadcast_edit(message_id, new_content).await?;
361
362        Ok(())
363    }
364
365    /// Delete a message
366    pub async fn delete_message(&mut self, message_id: MessageId) -> Result<()> {
367        // Get message
368        let mut message = self.store.get_message(message_id).await?;
369
370        // Verify sender
371        if message.sender != self.identity {
372            return Err(anyhow::anyhow!("Cannot delete message from another user"));
373        }
374
375        // Soft delete
376        message.deleted_at = Some(Utc::now());
377
378        // Update storage
379        self.store.update_message(&message).await?;
380
381        // Sync deletion
382        self.sync.broadcast_deletion(message_id).await?;
383
384        Ok(())
385    }
386
387    /// Search messages
388    pub async fn search_messages(&self, query: SearchQuery) -> Result<Vec<RichMessage>> {
389        self.search.search(query).await
390    }
391
392    /// Get message history for a channel
393    pub async fn get_channel_messages(
394        &self,
395        channel_id: ChannelId,
396        limit: usize,
397        before: Option<DateTime<Utc>>,
398    ) -> Result<Vec<RichMessage>> {
399        self.store.get_channel_messages(channel_id, limit, before).await
400    }
401
402    /// Get thread messages
403    pub async fn get_thread_messages(
404        &self,
405        thread_id: ThreadId,
406    ) -> Result<ThreadView> {
407        self.threads.get_thread(thread_id).await
408    }
409
410    /// Mark messages as read
411    pub async fn mark_as_read(&mut self, message_ids: Vec<MessageId>) -> Result<()> {
412        for message_id in message_ids {
413            self.store.mark_as_read(
414                message_id,
415                crate::messaging::user_resolver::resolve_handle(&self.identity),
416            ).await?;
417            self.sync.broadcast_read_receipt(message_id).await?;
418        }
419        Ok(())
420    }
421
422    /// Start typing indicator
423    pub async fn start_typing(&mut self, channel_id: ChannelId) -> Result<()> {
424        self.sync
425            .broadcast_typing(
426                channel_id,
427                crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
428                true,
429            )
430            .await
431    }
432
433    /// Stop typing indicator
434    pub async fn stop_typing(&mut self, channel_id: ChannelId) -> Result<()> {
435        self.sync
436            .broadcast_typing(
437                channel_id,
438                crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
439                false,
440            )
441            .await
442    }
443
444    /// Initiate key exchange with a peer
445    pub async fn initiate_key_exchange(&self, peer: FourWordAddress) -> Result<KeyExchangeMessage> {
446        self.encryption.key_exchange.initiate_exchange(peer).await
447    }
448
449    /// Handle incoming key exchange message
450    pub async fn handle_key_exchange(&self, message: KeyExchangeMessage) -> Result<Option<KeyExchangeMessage>> {
451        use key_exchange::KeyExchangeType;
452
453        match message.message_type {
454            KeyExchangeType::Initiation => {
455                // Respond to initiation
456                let response = self.encryption.key_exchange.respond_to_exchange(message).await?;
457                Ok(Some(response))
458            }
459            KeyExchangeType::Response => {
460                // Complete the exchange
461                self.encryption.key_exchange.complete_exchange(message).await?;
462                Ok(None)
463            }
464            KeyExchangeType::PrekeyBundle => {
465                // Handle prekey bundle
466                Ok(None)
467            }
468        }
469    }
470
471    /// Get our prekey bundle for others
472    pub async fn get_prekey_bundle(&self) -> key_exchange::PrekeyBundle {
473        self.encryption.key_exchange.get_prekey_bundle().await
474    }
475
476    /// Rotate encryption keys
477    pub async fn rotate_keys(&self) -> Result<()> {
478        self.encryption.key_exchange.rotate_prekeys().await?;
479        self.encryption.key_exchange.cleanup_expired().await?;
480        Ok(())
481    }
482
483    /// Handle mention notification
484    async fn handle_mention(&self, message: &RichMessage) -> Result<()> {
485        // Create notification
486        tracing::info!("Mentioned in message: {:?}", message.id);
487        // TODO: Trigger system notification
488        Ok(())
489    }
490
491    /// Get channel members
492    async fn get_channel_members(&self, _channel_id: ChannelId) -> Result<Vec<FourWordAddress>> {
493        // TODO: Implement channel membership lookup
494        // For now, return empty list which will fallback to broadcast
495        Ok(Vec::new())
496    }
497}
498*/
499
500// MessageStore is now a type alias in database.rs
501
502/*
503/// Message store for persistence
504#[derive(Clone)]
505pub struct MessageStore {
506    inner: Arc<database::DatabaseMessageStore>,
507    dht_client: DhtClient,
508}
509
510impl MessageStore {
511    pub async fn new(dht_client: DhtClient) -> Result<Self> {
512        let inner = Arc::new(
513            database::DatabaseMessageStore::new(dht_client.clone(), None).await?
514        );
515
516        Ok(Self {
517            inner,
518            dht_client,
519        })
520    }
521
522    pub async fn store_message(&self, message: &RichMessage) -> Result<()> {
523        self.inner.store_message(message).await
524    }
525
526    pub async fn get_message(&self, id: MessageId) -> Result<RichMessage> {
527        self.inner.get_message(id).await
528    }
529
530    pub async fn update_message(&self, message: &RichMessage) -> Result<()> {
531        self.inner.update_message(message).await
532    }
533
534    pub async fn get_channel_messages(
535        &self,
536        channel_id: ChannelId,
537        limit: usize,
538        before: Option<DateTime<Utc>>,
539    ) -> Result<Vec<RichMessage>> {
540        self.inner.get_channel_messages(channel_id, limit, before).await
541    }
542
543    pub async fn mark_as_read(
544        &self,
545        message_id: MessageId,
546        user: FourWordAddress,
547    ) -> Result<()> {
548        self.inner.mark_as_read(message_id, user).await
549    }
550
551    /// Search messages
552    pub async fn search_messages(&self, query: &str, channel_id: Option<ChannelId>) -> Result<Vec<RichMessage>> {
553        self.inner.search_messages(query, channel_id, 50).await
554    }
555
556    /// Get thread messages
557    pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
558        self.inner.get_thread_messages(thread_id).await
559    }
560
561    /// Add reaction
562    pub async fn add_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
563        self.inner.add_reaction(message_id, emoji, user).await
564    }
565
566    /// Remove reaction
567    pub async fn remove_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
568        self.inner.remove_reaction(message_id, emoji, user).await
569    }
570
571    /// Get database statistics
572    pub async fn get_stats(&self) -> Result<database::DatabaseStats> {
573        self.inner.get_stats().await
574    }
575
576    /// Clean up ephemeral messages
577    pub async fn cleanup_ephemeral(&self, ttl_seconds: i64) -> Result<usize> {
578        self.inner.cleanup_ephemeral(ttl_seconds).await
579    }
580}
581*/
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586    use crate::identity::FourWordAddress;
587
588    #[tokio::test]
589    async fn test_message_creation() {
590        let identity = crate::messaging::user_handle::UserHandle::from("ocean-forest-moon-star");
591        let channel = ChannelId::new();
592        let content = MessageContent::Text("Hello, world!".to_string());
593
594        let message = RichMessage::new(identity.clone(), channel, content.clone());
595
596        assert_eq!(message.sender, identity);
597        assert_eq!(message.channel_id, channel);
598        assert!(matches!(message.content, MessageContent::Text(_)));
599    }
600
601    #[tokio::test]
602    async fn test_messaging_service_with_real_dht() {
603        // Skip this test in regular test runs as it requires a real DHT network
604        // and can cause nested runtime issues. This test should be run separately
605        // with proper network setup.
606        println!("Skipping test_messaging_service_with_real_dht - requires separate network setup");
607
608        // For now, just test that we can create the identity
609        let identity = FourWordAddress::from("ocean-forest-moon-star");
610        assert!(!identity.to_string().is_empty());
611    }
612}