saorsa_core/messaging/
mod.rs

1// Rich Messaging Module for P2P Foundation
2// Implements WhatsApp/Slack-style messaging with full decentralization
3
4pub mod composer;
5pub mod database;
6pub mod encryption;
7pub mod key_exchange;
8pub mod media;
9pub mod user_handle;
10pub mod user_resolver;
11pub mod mocks;
12pub mod quic_media_streams;
13pub mod reactions;
14pub mod search;
15pub mod service;
16pub mod sync;
17pub mod threads;
18pub mod transport;
19pub mod types;
20pub mod webrtc;
21pub mod webrtc_quic_bridge;
22
23use user_handle::UserHandle;
24// Removed unused imports
25// use anyhow::Result;
26use serde::{Deserialize, Serialize};
27// use std::sync::Arc;
28// use chrono::{DateTime, Utc};
29// Removed unused imports: use tracing::{debug, warn};
30
31pub use composer::MessageComposer;
32pub use database::MessageStore;
33pub use encryption::SecureMessaging;
34pub use key_exchange::{KeyExchange, KeyExchangeMessage};
35pub use media::MediaProcessor;
36pub use quic_media_streams::{QosParameters, QuicMediaStreamManager, StreamStats};
37pub use reactions::ReactionManager;
38pub use search::MessageSearch;
39pub use service::{MessagingService, SendOptions};
40pub use sync::RealtimeSync;
41pub use threads::ThreadManager;
42pub use transport::{DeliveryReceipt, DeliveryStatus, MessageTransport, ReceivedMessage};
43pub use types::*;
44pub use webrtc::{CallEvent, CallManager, WebRtcEvent, WebRtcService};
45pub use webrtc_quic_bridge::{RtpPacket, StreamConfig, StreamType, WebRtcQuicBridge};
46
47// Import the real DHT client
48pub use crate::dht::client::DhtClient;
49
50/// Request to send a message
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct SendMessageRequest {
53    pub channel_id: ChannelId,
54    pub content: MessageContent,
55    pub attachments: Vec<Vec<u8>>,
56    pub thread_id: Option<ThreadId>,
57    pub reply_to: Option<MessageId>,
58    pub mentions: Vec<UserHandle>,
59    pub ephemeral: bool,
60}
61
62// MessagingService is now defined in service.rs
63
64// Legacy implementation removed - see service.rs for the new implementation
65
66/*
67impl MessagingService {
68    /// Create a new messaging service with a real DHT client
69    pub async fn new(identity: FourWordAddress) -> Result<Self> {
70        // Create DHT client based on the user's identity
71        // Convert four-word address to a node ID
72        let node_id_bytes = blake3::hash(identity.to_string().as_bytes());
73        let node_id = crate::dht::core_engine::NodeId::from_key(
74            crate::dht::core_engine::DhtKey::from_bytes(*node_id_bytes.as_bytes())
75        );
76
77        // Create DHT client with the user's node ID
78        let dht_client = DhtClient::with_node_id(node_id)?;
79
80        // Initialize all components
81        let store = MessageStore::new(dht_client.clone()).await?;
82        let threads = ThreadManager::new(store.clone());
83        let reactions = ReactionManager::new(store.clone());
84        let media = MediaProcessor::new()?;
85        let search = MessageSearch::new(store.clone()).await?;
86        let encryption = SecureMessaging::new(identity.clone())?;
87        let sync = RealtimeSync::new(dht_client.clone()).await?;
88
89        Ok(Self {
90            store,
91            threads,
92            reactions,
93            media,
94            search,
95            encryption,
96            sync,
97            transport: None, // Will be initialized when network is available
98            webrtc: None,    // Will be initialized when needed
99            identity,
100        })
101    }
102
103    /// Create a new messaging service with an existing DHT client
104    pub async fn with_dht_client(
105        identity: FourWordAddress,
106        dht_client: DhtClient,
107    ) -> Result<Self> {
108        let store = MessageStore::new(dht_client.clone()).await?;
109        let threads = ThreadManager::new(store.clone());
110        let reactions = ReactionManager::new(store.clone());
111        let media = MediaProcessor::new()?;
112        let search = MessageSearch::new(store.clone()).await?;
113        let encryption = SecureMessaging::new(identity.clone())?;
114        let sync = RealtimeSync::new(dht_client).await?;
115
116        Ok(Self {
117            store,
118            threads,
119            reactions,
120            media,
121            search,
122            encryption,
123            sync,
124            transport: None, // Will be initialized when network is available
125            webrtc: None,    // Will be initialized when needed
126            identity,
127        })
128    }
129
130    /// Connect to network transport
131    pub async fn connect_transport(&mut self, network: Arc<crate::network::P2PNode>) -> Result<()> {
132        let transport = MessageTransport::new(network, self.store.dht_client.clone()).await?;
133
134        // Start background tasks
135        transport.monitor_network_quality().await;
136        transport.process_message_queue().await;
137
138        self.transport = Some(transport);
139        Ok(())
140    }
141
142    /// Initialize WebRTC service
143    pub async fn initialize_webrtc(&mut self) -> Result<()> {
144        // Create WebRTC service using the DHT client
145        let dht_engine = self.store.dht_client.core_engine();
146        let webrtc = WebRtcService::new(
147            self.identity.clone(),
148            dht_engine,
149        ).await?;
150
151        // Start the WebRTC service
152        webrtc.start().await?;
153
154        self.webrtc = Some(webrtc);
155        Ok(())
156    }
157
158    /// Initiate a voice/video call
159    pub async fn initiate_call(
160        &self,
161        callee: FourWordAddress,
162        constraints: webrtc::MediaConstraints,
163    ) -> Result<webrtc::CallId> {
164        if let Some(ref webrtc) = self.webrtc {
165            webrtc.initiate_call(callee, constraints).await
166        } else {
167            Err(anyhow::anyhow!("WebRTC service not initialized"))
168        }
169    }
170
171    /// Accept an incoming call
172    pub async fn accept_call(
173        &self,
174        call_id: webrtc::CallId,
175        constraints: webrtc::MediaConstraints,
176    ) -> Result<()> {
177        if let Some(ref webrtc) = self.webrtc {
178            webrtc.accept_call(call_id, constraints).await
179        } else {
180            Err(anyhow::anyhow!("WebRTC service not initialized"))
181        }
182    }
183
184    /// Reject an incoming call
185    pub async fn reject_call(&self, call_id: webrtc::CallId) -> Result<()> {
186        if let Some(ref webrtc) = self.webrtc {
187            webrtc.reject_call(call_id).await
188        } else {
189            Err(anyhow::anyhow!("WebRTC service not initialized"))
190        }
191    }
192
193    /// End an active call
194    pub async fn end_call(&self, call_id: webrtc::CallId) -> Result<()> {
195        if let Some(ref webrtc) = self.webrtc {
196            webrtc.end_call(call_id).await
197        } else {
198            Err(anyhow::anyhow!("WebRTC service not initialized"))
199        }
200    }
201
202    /// Get call state
203    pub async fn get_call_state(&self, call_id: webrtc::CallId) -> Option<webrtc::CallState> {
204        if let Some(ref webrtc) = self.webrtc {
205            webrtc.get_call_state(call_id).await
206        } else {
207            None
208        }
209    }
210
211    /// Subscribe to WebRTC events
212    pub fn subscribe_webrtc_events(&self) -> Option<tokio::sync::broadcast::Receiver<WebRtcEvent>> {
213        self.webrtc.as_ref().map(|w| w.subscribe_events())
214    }
215
216    /// Get WebRTC service reference
217    pub fn webrtc(&self) -> Option<&WebRtcService> {
218        self.webrtc.as_ref()
219    }
220
221    /// Send a new message
222    pub async fn send_message(&mut self, request: SendMessageRequest) -> Result<RichMessage> {
223        // Create message
224        let mut message = RichMessage::new(
225            self.identity.clone(),
226            request.channel_id,
227            request.content,
228        );
229
230        // Add attachments if any
231        for attachment in request.attachments {
232            let processed = self.media.process_attachment(attachment).await?;
233            message.attachments.push(processed);
234        }
235
236        // Handle threading
237        if let Some(thread_id) = request.thread_id {
238            message.thread_id = Some(thread_id);
239            self.threads.add_to_thread(thread_id, &message).await?;
240        }
241
242        // Handle reply
243        if let Some(reply_to) = request.reply_to {
244            message.reply_to = Some(reply_to);
245        }
246
247        // Encrypt message
248        let encrypted = self.encryption.encrypt_message(&message).await?;
249
250        // Store message (we store the original, not encrypted version locally)
251        self.store.store_message(&message).await?;
252
253        // Send via transport if available, otherwise use sync
254        if let Some(ref transport) = self.transport {
255            // Extract recipients from channel members
256            let recipients = self.get_channel_members(request.channel_id).await?;
257            let receipt = transport.send_message(&encrypted, recipients).await?;
258
259            // Log delivery status
260            for (recipient, status) in receipt.delivery_status {
261                match status {
262                    DeliveryStatus::Delivered(_) => {
263                        debug!("Message delivered to {}", recipient);
264                    }
265                    DeliveryStatus::Queued => {
266                        debug!("Message queued for {}", recipient);
267                    }
268                    DeliveryStatus::Failed(e) => {
269                        warn!("Message delivery failed for {}: {}", recipient, e);
270                    }
271                    _ => {}
272                }
273            }
274        } else {
275            // Fallback to broadcast sync
276            self.sync.broadcast_message(&encrypted).await?;
277        }
278
279        Ok(message)
280    }
281
282    /// Receive and process an incoming message
283    pub async fn receive_message(&mut self, encrypted: EncryptedMessage) -> Result<RichMessage> {
284        // Decrypt message
285        let message = self.encryption.decrypt_message(encrypted).await?;
286
287        // Verify signature
288        if !self.encryption.verify_message(&message) {
289            return Err(anyhow::anyhow!("Invalid message signature"));
290        }
291
292        // Store message
293        self.store.store_message(&message).await?;
294
295        // Update thread if applicable
296        if let Some(thread_id) = &message.thread_id {
297            self.threads.update_thread(*thread_id, &message).await?;
298        }
299
300        // Process mentions
301        if message.mentions.contains(&self.identity) {
302            self.handle_mention(&message).await?;
303        }
304
305        Ok(message)
306    }
307
308    /// Add a reaction to a message
309    pub async fn add_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
310        self.reactions.add_reaction(
311            message_id,
312            emoji.clone(),
313            crate::messaging::user_resolver::resolve_handle(&self.identity),
314        ).await?;
315
316        // Sync reaction
317        self.sync.broadcast_reaction(message_id, emoji, true).await?;
318
319        Ok(())
320    }
321
322    /// Remove a reaction from a message
323    pub async fn remove_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
324        self.reactions.remove_reaction(
325            message_id,
326            emoji.clone(),
327            crate::messaging::user_resolver::resolve_handle(&self.identity),
328        ).await?;
329
330        // Sync reaction removal
331        self.sync.broadcast_reaction(message_id, emoji, false).await?;
332
333        Ok(())
334    }
335
336    /// Edit a message
337    pub async fn edit_message(
338        &mut self,
339        message_id: MessageId,
340        new_content: MessageContent,
341    ) -> Result<()> {
342        // Get original message
343        let mut message = self.store.get_message(message_id).await?;
344
345        // Verify sender
346        if message.sender != self.identity {
347            return Err(anyhow::anyhow!("Cannot edit message from another user"));
348        }
349
350        // Update content
351        message.content = new_content.clone();
352        message.edited_at = Some(Utc::now());
353
354        // Re-encrypt and store
355        let _encrypted = self.encryption.encrypt_message(&message).await?;
356        self.store.update_message(&message).await?;
357
358        // Sync edit
359        self.sync.broadcast_edit(message_id, new_content).await?;
360
361        Ok(())
362    }
363
364    /// Delete a message
365    pub async fn delete_message(&mut self, message_id: MessageId) -> Result<()> {
366        // Get message
367        let mut message = self.store.get_message(message_id).await?;
368
369        // Verify sender
370        if message.sender != self.identity {
371            return Err(anyhow::anyhow!("Cannot delete message from another user"));
372        }
373
374        // Soft delete
375        message.deleted_at = Some(Utc::now());
376
377        // Update storage
378        self.store.update_message(&message).await?;
379
380        // Sync deletion
381        self.sync.broadcast_deletion(message_id).await?;
382
383        Ok(())
384    }
385
386    /// Search messages
387    pub async fn search_messages(&self, query: SearchQuery) -> Result<Vec<RichMessage>> {
388        self.search.search(query).await
389    }
390
391    /// Get message history for a channel
392    pub async fn get_channel_messages(
393        &self,
394        channel_id: ChannelId,
395        limit: usize,
396        before: Option<DateTime<Utc>>,
397    ) -> Result<Vec<RichMessage>> {
398        self.store.get_channel_messages(channel_id, limit, before).await
399    }
400
401    /// Get thread messages
402    pub async fn get_thread_messages(
403        &self,
404        thread_id: ThreadId,
405    ) -> Result<ThreadView> {
406        self.threads.get_thread(thread_id).await
407    }
408
409    /// Mark messages as read
410    pub async fn mark_as_read(&mut self, message_ids: Vec<MessageId>) -> Result<()> {
411        for message_id in message_ids {
412            self.store.mark_as_read(
413                message_id,
414                crate::messaging::user_resolver::resolve_handle(&self.identity),
415            ).await?;
416            self.sync.broadcast_read_receipt(message_id).await?;
417        }
418        Ok(())
419    }
420
421    /// Start typing indicator
422    pub async fn start_typing(&mut self, channel_id: ChannelId) -> Result<()> {
423        self.sync
424            .broadcast_typing(
425                channel_id,
426                crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
427                true,
428            )
429            .await
430    }
431
432    /// Stop typing indicator
433    pub async fn stop_typing(&mut self, channel_id: ChannelId) -> Result<()> {
434        self.sync
435            .broadcast_typing(
436                channel_id,
437                crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
438                false,
439            )
440            .await
441    }
442
443    /// Initiate key exchange with a peer
444    pub async fn initiate_key_exchange(&self, peer: FourWordAddress) -> Result<KeyExchangeMessage> {
445        self.encryption.key_exchange.initiate_exchange(peer).await
446    }
447
448    /// Handle incoming key exchange message
449    pub async fn handle_key_exchange(&self, message: KeyExchangeMessage) -> Result<Option<KeyExchangeMessage>> {
450        use key_exchange::KeyExchangeType;
451
452        match message.message_type {
453            KeyExchangeType::Initiation => {
454                // Respond to initiation
455                let response = self.encryption.key_exchange.respond_to_exchange(message).await?;
456                Ok(Some(response))
457            }
458            KeyExchangeType::Response => {
459                // Complete the exchange
460                self.encryption.key_exchange.complete_exchange(message).await?;
461                Ok(None)
462            }
463            KeyExchangeType::PrekeyBundle => {
464                // Handle prekey bundle
465                Ok(None)
466            }
467        }
468    }
469
470    /// Get our prekey bundle for others
471    pub async fn get_prekey_bundle(&self) -> key_exchange::PrekeyBundle {
472        self.encryption.key_exchange.get_prekey_bundle().await
473    }
474
475    /// Rotate encryption keys
476    pub async fn rotate_keys(&self) -> Result<()> {
477        self.encryption.key_exchange.rotate_prekeys().await?;
478        self.encryption.key_exchange.cleanup_expired().await?;
479        Ok(())
480    }
481
482    /// Handle mention notification
483    async fn handle_mention(&self, message: &RichMessage) -> Result<()> {
484        // Create notification
485        log::info!("Mentioned in message: {:?}", message.id);
486        // TODO: Trigger system notification
487        Ok(())
488    }
489
490    /// Get channel members
491    async fn get_channel_members(&self, _channel_id: ChannelId) -> Result<Vec<FourWordAddress>> {
492        // TODO: Implement channel membership lookup
493        // For now, return empty list which will fallback to broadcast
494        Ok(Vec::new())
495    }
496}
497*/
498
499// MessageStore is now a type alias in database.rs
500
501/*
502/// Message store for persistence
503#[derive(Clone)]
504pub struct MessageStore {
505    inner: Arc<database::DatabaseMessageStore>,
506    dht_client: DhtClient,
507}
508
509impl MessageStore {
510    pub async fn new(dht_client: DhtClient) -> Result<Self> {
511        let inner = Arc::new(
512            database::DatabaseMessageStore::new(dht_client.clone(), None).await?
513        );
514
515        Ok(Self {
516            inner,
517            dht_client,
518        })
519    }
520
521    pub async fn store_message(&self, message: &RichMessage) -> Result<()> {
522        self.inner.store_message(message).await
523    }
524
525    pub async fn get_message(&self, id: MessageId) -> Result<RichMessage> {
526        self.inner.get_message(id).await
527    }
528
529    pub async fn update_message(&self, message: &RichMessage) -> Result<()> {
530        self.inner.update_message(message).await
531    }
532
533    pub async fn get_channel_messages(
534        &self,
535        channel_id: ChannelId,
536        limit: usize,
537        before: Option<DateTime<Utc>>,
538    ) -> Result<Vec<RichMessage>> {
539        self.inner.get_channel_messages(channel_id, limit, before).await
540    }
541
542    pub async fn mark_as_read(
543        &self,
544        message_id: MessageId,
545        user: FourWordAddress,
546    ) -> Result<()> {
547        self.inner.mark_as_read(message_id, user).await
548    }
549
550    /// Search messages
551    pub async fn search_messages(&self, query: &str, channel_id: Option<ChannelId>) -> Result<Vec<RichMessage>> {
552        self.inner.search_messages(query, channel_id, 50).await
553    }
554
555    /// Get thread messages
556    pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
557        self.inner.get_thread_messages(thread_id).await
558    }
559
560    /// Add reaction
561    pub async fn add_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
562        self.inner.add_reaction(message_id, emoji, user).await
563    }
564
565    /// Remove reaction
566    pub async fn remove_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
567        self.inner.remove_reaction(message_id, emoji, user).await
568    }
569
570    /// Get database statistics
571    pub async fn get_stats(&self) -> Result<database::DatabaseStats> {
572        self.inner.get_stats().await
573    }
574
575    /// Clean up ephemeral messages
576    pub async fn cleanup_ephemeral(&self, ttl_seconds: i64) -> Result<usize> {
577        self.inner.cleanup_ephemeral(ttl_seconds).await
578    }
579}
580*/
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use crate::identity::FourWordAddress;
586
587    #[tokio::test]
588    async fn test_message_creation() {
589        let identity = crate::messaging::user_handle::UserHandle::from("ocean-forest-moon-star");
590        let channel = ChannelId::new();
591        let content = MessageContent::Text("Hello, world!".to_string());
592
593        let message = RichMessage::new(identity.clone(), channel, content.clone());
594
595        assert_eq!(message.sender, identity);
596        assert_eq!(message.channel_id, channel);
597        assert!(matches!(message.content, MessageContent::Text(_)));
598    }
599
600    #[tokio::test]
601    async fn test_messaging_service_with_real_dht() {
602        let identity = FourWordAddress::from("ocean-forest-moon-star");
603
604        // Create messaging service with real DHT
605        let service = MessagingService::new(
606            identity.clone(),
607            crate::messaging::DhtClient::new().unwrap(),
608        )
609        .await;
610        assert!(service.is_ok());
611
612        let service = service.unwrap();
613
614        // Test sending a message
615        let _request = SendMessageRequest {
616            channel_id: ChannelId::new(),
617            content: MessageContent::Text("Test with real DHT".to_string()),
618            attachments: vec![],
619            thread_id: None,
620            reply_to: None,
621            mentions: vec![],
622            ephemeral: false,
623        };
624
625        let result = service
626            .send_message(
627                vec![FourWordAddress::from("recipient-alpha-bravo-charlie")],
628                MessageContent::Text("Hello".to_string()),
629                ChannelId::new(),
630                crate::messaging::SendOptions::default(),
631            )
632            .await;
633        assert!(result.is_ok());
634    }
635}