saorsa_core/messaging/
mod.rs

1// Rich Messaging Module for P2P Foundation
2// Implements WhatsApp/Slack-style messaging with full decentralization
3
4pub mod composer;
5pub mod database;
6pub mod encryption;
7pub mod key_exchange;
8pub mod media;
9#[cfg(any(test, feature = "mocks"))]
10pub mod mocks;
11pub mod network_config;
12pub mod quic_media_streams;
13pub mod reactions;
14pub mod search;
15pub mod service;
16pub mod sync;
17pub mod threads;
18pub mod transport;
19pub mod types;
20pub mod user_handle;
21pub mod user_resolver;
22
23use user_handle::UserHandle;
24// Removed unused imports
25// use anyhow::Result;
26use serde::{Deserialize, Serialize};
27// use std::sync::Arc;
28// use chrono::{DateTime, Utc};
29// Removed unused imports: use tracing::{debug, warn};
30
31pub use composer::MessageComposer;
32pub use database::MessageStore;
33pub use encryption::SecureMessaging;
34pub use key_exchange::{KeyExchange, KeyExchangeMessage};
35pub use media::MediaProcessor;
36pub use network_config::{
37    IpMode, NatTraversalMode, NetworkConfig, NetworkConfigError, PortConfig, RetryBehavior,
38};
39pub use quic_media_streams::{QosParameters, QuicMediaStreamManager, StreamStats};
40pub use reactions::ReactionManager;
41pub use search::MessageSearch;
42pub use service::{MessagingService, SendOptions};
43pub use sync::RealtimeSync;
44pub use threads::ThreadManager;
45pub use transport::{DeliveryReceipt, DeliveryStatus, MessageTransport, ReceivedMessage};
46pub use types::*;
47
48// Re-export WebRTC types from saorsa-webrtc crate
49pub use saorsa_webrtc::{
50    CallEvent, CallId, CallManager, CallState, MediaConstraints, MediaEvent, MediaStreamManager,
51    PeerIdentity, SignalingHandler, SignalingTransport, WebRtcEvent, WebRtcQuicBridge,
52    WebRtcService,
53};
54
55// Import the real DHT client
56pub use crate::dht::client::DhtClient;
57
58/// Request to send a message
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct SendMessageRequest {
61    pub channel_id: ChannelId,
62    pub content: MessageContent,
63    pub attachments: Vec<Vec<u8>>,
64    pub thread_id: Option<ThreadId>,
65    pub reply_to: Option<MessageId>,
66    pub mentions: Vec<UserHandle>,
67    pub ephemeral: bool,
68}
69
70// MessagingService is now defined in service.rs
71
72// Legacy implementation removed - see service.rs for the new implementation
73
74/*
75impl MessagingService {
76    /// Create a new messaging service with a real DHT client
77    pub async fn new(identity: FourWordAddress) -> Result<Self> {
78        // Create DHT client based on the user's identity
79        // Convert four-word address to a node ID
80        let node_id_bytes = blake3::hash(identity.to_string().as_bytes());
81        let node_id = crate::dht::core_engine::NodeId::from_key(
82            crate::dht::core_engine::DhtKey::from_bytes(*node_id_bytes.as_bytes())
83        );
84
85        // Create DHT client with the user's node ID
86        let dht_client = DhtClient::with_node_id(node_id)?;
87
88        // Initialize all components
89        let store = MessageStore::new(dht_client.clone()).await?;
90        let threads = ThreadManager::new(store.clone());
91        let reactions = ReactionManager::new(store.clone());
92        let media = MediaProcessor::new()?;
93        let search = MessageSearch::new(store.clone()).await?;
94        let encryption = SecureMessaging::new(identity.clone(), dht_client.clone()).await?;
95        let sync = RealtimeSync::new(dht_client.clone()).await?;
96
97        Ok(Self {
98            store,
99            threads,
100            reactions,
101            media,
102            search,
103            encryption,
104            sync,
105            transport: None, // Will be initialized when network is available
106            webrtc: None,    // Will be initialized when needed
107            identity,
108        })
109    }
110
111    /// Create a new messaging service with an existing DHT client
112    pub async fn with_dht_client(
113        identity: FourWordAddress,
114        dht_client: DhtClient,
115    ) -> Result<Self> {
116        let store = MessageStore::new(dht_client.clone()).await?;
117        let threads = ThreadManager::new(store.clone());
118        let reactions = ReactionManager::new(store.clone());
119        let media = MediaProcessor::new()?;
120        let search = MessageSearch::new(store.clone()).await?;
121        let encryption = SecureMessaging::new(identity.clone(), dht_client.clone()).await?;
122        let sync = RealtimeSync::new(dht_client).await?;
123
124        Ok(Self {
125            store,
126            threads,
127            reactions,
128            media,
129            search,
130            encryption,
131            sync,
132            transport: None, // Will be initialized when network is available
133            webrtc: None,    // Will be initialized when needed
134            identity,
135        })
136    }
137
138    /// Connect to network transport
139    pub async fn connect_transport(&mut self, network: Arc<crate::network::P2PNode>) -> Result<()> {
140        let transport = MessageTransport::new(network, self.store.dht_client.clone()).await?;
141
142        // Start background tasks
143        transport.monitor_network_quality().await;
144        transport.process_message_queue().await;
145
146        self.transport = Some(transport);
147        Ok(())
148    }
149
150    /// Initialize WebRTC service
151    pub async fn initialize_webrtc(&mut self) -> Result<()> {
152        // Create WebRTC service using the DHT client
153        let dht_engine = self.store.dht_client.core_engine();
154        let webrtc = WebRtcService::new(
155            self.identity.clone(),
156            dht_engine,
157        ).await?;
158
159        // Start the WebRTC service
160        webrtc.start().await?;
161
162        self.webrtc = Some(webrtc);
163        Ok(())
164    }
165
166    /// Initiate a voice/video call
167    pub async fn initiate_call(
168        &self,
169        callee: FourWordAddress,
170        constraints: webrtc::MediaConstraints,
171    ) -> Result<webrtc::CallId> {
172        if let Some(ref webrtc) = self.webrtc {
173            webrtc.initiate_call(callee, constraints).await
174        } else {
175            Err(anyhow::anyhow!("WebRTC service not initialized"))
176        }
177    }
178
179    /// Accept an incoming call
180    pub async fn accept_call(
181        &self,
182        call_id: webrtc::CallId,
183        constraints: webrtc::MediaConstraints,
184    ) -> Result<()> {
185        if let Some(ref webrtc) = self.webrtc {
186            webrtc.accept_call(call_id, constraints).await
187        } else {
188            Err(anyhow::anyhow!("WebRTC service not initialized"))
189        }
190    }
191
192    /// Reject an incoming call
193    pub async fn reject_call(&self, call_id: webrtc::CallId) -> Result<()> {
194        if let Some(ref webrtc) = self.webrtc {
195            webrtc.reject_call(call_id).await
196        } else {
197            Err(anyhow::anyhow!("WebRTC service not initialized"))
198        }
199    }
200
201    /// End an active call
202    pub async fn end_call(&self, call_id: webrtc::CallId) -> Result<()> {
203        if let Some(ref webrtc) = self.webrtc {
204            webrtc.end_call(call_id).await
205        } else {
206            Err(anyhow::anyhow!("WebRTC service not initialized"))
207        }
208    }
209
210    /// Get call state
211    pub async fn get_call_state(&self, call_id: webrtc::CallId) -> Option<webrtc::CallState> {
212        if let Some(ref webrtc) = self.webrtc {
213            webrtc.get_call_state(call_id).await
214        } else {
215            None
216        }
217    }
218
219    /// Subscribe to WebRTC events
220    pub fn subscribe_webrtc_events(&self) -> Option<tokio::sync::broadcast::Receiver<WebRtcEvent>> {
221        self.webrtc.as_ref().map(|w| w.subscribe_events())
222    }
223
224    /// Get WebRTC service reference
225    pub fn webrtc(&self) -> Option<&WebRtcService> {
226        self.webrtc.as_ref()
227    }
228
229    /// Send a new message
230    pub async fn send_message(&mut self, request: SendMessageRequest) -> Result<RichMessage> {
231        // Create message
232        let mut message = RichMessage::new(
233            self.identity.clone(),
234            request.channel_id,
235            request.content,
236        );
237
238        // Add attachments if any
239        for attachment in request.attachments {
240            let processed = self.media.process_attachment(attachment).await?;
241            message.attachments.push(processed);
242        }
243
244        // Handle threading
245        if let Some(thread_id) = request.thread_id {
246            message.thread_id = Some(thread_id);
247            self.threads.add_to_thread(thread_id, &message).await?;
248        }
249
250        // Handle reply
251        if let Some(reply_to) = request.reply_to {
252            message.reply_to = Some(reply_to);
253        }
254
255        // Encrypt message
256        let encrypted = self.encryption.encrypt_message(&message).await?;
257
258        // Store message (we store the original, not encrypted version locally)
259        self.store.store_message(&message).await?;
260
261        // Send via transport if available, otherwise use sync
262        if let Some(ref transport) = self.transport {
263            // Extract recipients from channel members
264            let recipients = self.get_channel_members(request.channel_id).await?;
265            let receipt = transport.send_message(&encrypted, recipients).await?;
266
267            // Log delivery status
268            for (recipient, status) in receipt.delivery_status {
269                match status {
270                    DeliveryStatus::Delivered(_) => {
271                        debug!("Message delivered to {}", recipient);
272                    }
273                    DeliveryStatus::Queued => {
274                        debug!("Message queued for {}", recipient);
275                    }
276                    DeliveryStatus::Failed(e) => {
277                        warn!("Message delivery failed for {}: {}", recipient, e);
278                    }
279                    _ => {}
280                }
281            }
282        } else {
283            // Fallback to broadcast sync
284            self.sync.broadcast_message(&encrypted).await?;
285        }
286
287        Ok(message)
288    }
289
290    /// Receive and process an incoming message
291    pub async fn receive_message(&mut self, encrypted: EncryptedMessage) -> Result<RichMessage> {
292        // Decrypt message
293        let message = self.encryption.decrypt_message(encrypted).await?;
294
295        // Verify signature
296        if !self.encryption.verify_message(&message) {
297            return Err(anyhow::anyhow!("Invalid message signature"));
298        }
299
300        // Store message
301        self.store.store_message(&message).await?;
302
303        // Update thread if applicable
304        if let Some(thread_id) = &message.thread_id {
305            self.threads.update_thread(*thread_id, &message).await?;
306        }
307
308        // Process mentions
309        if message.mentions.contains(&self.identity) {
310            self.handle_mention(&message).await?;
311        }
312
313        Ok(message)
314    }
315
316    /// Add a reaction to a message
317    pub async fn add_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
318        self.reactions.add_reaction(
319            message_id,
320            emoji.clone(),
321            crate::messaging::user_resolver::resolve_handle(&self.identity),
322        ).await?;
323
324        // Sync reaction
325        self.sync.broadcast_reaction(message_id, emoji, true).await?;
326
327        Ok(())
328    }
329
330    /// Remove a reaction from a message
331    pub async fn remove_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
332        self.reactions.remove_reaction(
333            message_id,
334            emoji.clone(),
335            crate::messaging::user_resolver::resolve_handle(&self.identity),
336        ).await?;
337
338        // Sync reaction removal
339        self.sync.broadcast_reaction(message_id, emoji, false).await?;
340
341        Ok(())
342    }
343
344    /// Edit a message
345    pub async fn edit_message(
346        &mut self,
347        message_id: MessageId,
348        new_content: MessageContent,
349    ) -> Result<()> {
350        // Get original message
351        let mut message = self.store.get_message(message_id).await?;
352
353        // Verify sender
354        if message.sender != self.identity {
355            return Err(anyhow::anyhow!("Cannot edit message from another user"));
356        }
357
358        // Update content
359        message.content = new_content.clone();
360        message.edited_at = Some(Utc::now());
361
362        // Re-encrypt and store
363        let _encrypted = self.encryption.encrypt_message(&message).await?;
364        self.store.update_message(&message).await?;
365
366        // Sync edit
367        self.sync.broadcast_edit(message_id, new_content).await?;
368
369        Ok(())
370    }
371
372    /// Delete a message
373    pub async fn delete_message(&mut self, message_id: MessageId) -> Result<()> {
374        // Get message
375        let mut message = self.store.get_message(message_id).await?;
376
377        // Verify sender
378        if message.sender != self.identity {
379            return Err(anyhow::anyhow!("Cannot delete message from another user"));
380        }
381
382        // Soft delete
383        message.deleted_at = Some(Utc::now());
384
385        // Update storage
386        self.store.update_message(&message).await?;
387
388        // Sync deletion
389        self.sync.broadcast_deletion(message_id).await?;
390
391        Ok(())
392    }
393
394    /// Search messages
395    pub async fn search_messages(&self, query: SearchQuery) -> Result<Vec<RichMessage>> {
396        self.search.search(query).await
397    }
398
399    /// Get message history for a channel
400    pub async fn get_channel_messages(
401        &self,
402        channel_id: ChannelId,
403        limit: usize,
404        before: Option<DateTime<Utc>>,
405    ) -> Result<Vec<RichMessage>> {
406        self.store.get_channel_messages(channel_id, limit, before).await
407    }
408
409    /// Get thread messages
410    pub async fn get_thread_messages(
411        &self,
412        thread_id: ThreadId,
413    ) -> Result<ThreadView> {
414        self.threads.get_thread(thread_id).await
415    }
416
417    /// Mark messages as read
418    pub async fn mark_as_read(&mut self, message_ids: Vec<MessageId>) -> Result<()> {
419        for message_id in message_ids {
420            self.store.mark_as_read(
421                message_id,
422                crate::messaging::user_resolver::resolve_handle(&self.identity),
423            ).await?;
424            self.sync.broadcast_read_receipt(message_id).await?;
425        }
426        Ok(())
427    }
428
429    /// Start typing indicator
430    pub async fn start_typing(&mut self, channel_id: ChannelId) -> Result<()> {
431        self.sync
432            .broadcast_typing(
433                channel_id,
434                crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
435                true,
436            )
437            .await
438    }
439
440    /// Stop typing indicator
441    pub async fn stop_typing(&mut self, channel_id: ChannelId) -> Result<()> {
442        self.sync
443            .broadcast_typing(
444                channel_id,
445                crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
446                false,
447            )
448            .await
449    }
450
451    /// Initiate key exchange with a peer
452    pub async fn initiate_key_exchange(&self, peer: FourWordAddress) -> Result<KeyExchangeMessage> {
453        self.encryption.key_exchange.initiate_exchange(peer).await
454    }
455
456    /// Handle incoming key exchange message
457    pub async fn handle_key_exchange(&self, message: KeyExchangeMessage) -> Result<Option<KeyExchangeMessage>> {
458        use key_exchange::KeyExchangeType;
459
460        match message.message_type {
461            KeyExchangeType::Initiation => {
462                // Respond to initiation
463                let response = self.encryption.key_exchange.respond_to_exchange(message).await?;
464                Ok(Some(response))
465            }
466            KeyExchangeType::Response => {
467                // Complete the exchange
468                self.encryption.key_exchange.complete_exchange(message).await?;
469                Ok(None)
470            }
471            KeyExchangeType::PrekeyBundle => {
472                // Handle prekey bundle
473                Ok(None)
474            }
475        }
476    }
477
478    /// Get our prekey bundle for others
479    pub async fn get_prekey_bundle(&self) -> key_exchange::PrekeyBundle {
480        self.encryption.key_exchange.get_prekey_bundle().await
481    }
482
483    /// Rotate encryption keys
484    pub async fn rotate_keys(&self) -> Result<()> {
485        self.encryption.key_exchange.rotate_prekeys().await?;
486        self.encryption.key_exchange.cleanup_expired().await?;
487        Ok(())
488    }
489
490    /// Handle mention notification
491    async fn handle_mention(&self, message: &RichMessage) -> Result<()> {
492        // Create notification
493        tracing::info!("Mentioned in message: {:?}", message.id);
494        // TODO: Trigger system notification
495        Ok(())
496    }
497
498    /// Get channel members
499    async fn get_channel_members(&self, _channel_id: ChannelId) -> Result<Vec<FourWordAddress>> {
500        // TODO: Implement channel membership lookup
501        // For now, return empty list which will fallback to broadcast
502        Ok(Vec::new())
503    }
504}
505*/
506
507// MessageStore is now a type alias in database.rs
508
509/*
510/// Message store for persistence
511#[derive(Clone)]
512pub struct MessageStore {
513    inner: Arc<database::DatabaseMessageStore>,
514    dht_client: DhtClient,
515}
516
517impl MessageStore {
518    pub async fn new(dht_client: DhtClient) -> Result<Self> {
519        let inner = Arc::new(
520            database::DatabaseMessageStore::new(dht_client.clone(), None).await?
521        );
522
523        Ok(Self {
524            inner,
525            dht_client,
526        })
527    }
528
529    pub async fn store_message(&self, message: &RichMessage) -> Result<()> {
530        self.inner.store_message(message).await
531    }
532
533    pub async fn get_message(&self, id: MessageId) -> Result<RichMessage> {
534        self.inner.get_message(id).await
535    }
536
537    pub async fn update_message(&self, message: &RichMessage) -> Result<()> {
538        self.inner.update_message(message).await
539    }
540
541    pub async fn get_channel_messages(
542        &self,
543        channel_id: ChannelId,
544        limit: usize,
545        before: Option<DateTime<Utc>>,
546    ) -> Result<Vec<RichMessage>> {
547        self.inner.get_channel_messages(channel_id, limit, before).await
548    }
549
550    pub async fn mark_as_read(
551        &self,
552        message_id: MessageId,
553        user: FourWordAddress,
554    ) -> Result<()> {
555        self.inner.mark_as_read(message_id, user).await
556    }
557
558    /// Search messages
559    pub async fn search_messages(&self, query: &str, channel_id: Option<ChannelId>) -> Result<Vec<RichMessage>> {
560        self.inner.search_messages(query, channel_id, 50).await
561    }
562
563    /// Get thread messages
564    pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
565        self.inner.get_thread_messages(thread_id).await
566    }
567
568    /// Add reaction
569    pub async fn add_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
570        self.inner.add_reaction(message_id, emoji, user).await
571    }
572
573    /// Remove reaction
574    pub async fn remove_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
575        self.inner.remove_reaction(message_id, emoji, user).await
576    }
577
578    /// Get database statistics
579    pub async fn get_stats(&self) -> Result<database::DatabaseStats> {
580        self.inner.get_stats().await
581    }
582
583    /// Clean up ephemeral messages
584    pub async fn cleanup_ephemeral(&self, ttl_seconds: i64) -> Result<usize> {
585        self.inner.cleanup_ephemeral(ttl_seconds).await
586    }
587}
588*/
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593    use crate::identity::FourWordAddress;
594
595    #[tokio::test]
596    async fn test_message_creation() {
597        let identity = crate::messaging::user_handle::UserHandle::from("ocean-forest-moon-star");
598        let channel = ChannelId::new();
599        let content = MessageContent::Text("Hello, world!".to_string());
600
601        let message = RichMessage::new(identity.clone(), channel, content.clone());
602
603        assert_eq!(message.sender, identity);
604        assert_eq!(message.channel_id, channel);
605        assert!(matches!(message.content, MessageContent::Text(_)));
606    }
607
608    #[tokio::test]
609    async fn test_messaging_service_with_real_dht() {
610        // Skip this test in regular test runs as it requires a real DHT network
611        // and can cause nested runtime issues. This test should be run separately
612        // with proper network setup.
613        println!("Skipping test_messaging_service_with_real_dht - requires separate network setup");
614
615        // For now, just test that we can create the identity
616        let identity = FourWordAddress::from("ocean-forest-moon-star");
617        assert!(!identity.to_string().is_empty());
618    }
619}