saorsa_core/messaging/mod.rs
1// Rich Messaging Module for P2P Foundation
2// Implements WhatsApp/Slack-style messaging with full decentralization
3
4pub mod composer;
5pub mod database;
6pub mod encryption;
7pub mod key_exchange;
8pub mod media;
9pub mod user_handle;
10pub mod user_resolver;
11pub mod mocks;
12pub mod quic_media_streams;
13pub mod reactions;
14pub mod search;
15pub mod service;
16pub mod sync;
17pub mod threads;
18pub mod transport;
19pub mod types;
20pub mod webrtc;
21pub mod webrtc_quic_bridge;
22
23use user_handle::UserHandle;
24// Removed unused imports
25// use anyhow::Result;
26use serde::{Deserialize, Serialize};
27// use std::sync::Arc;
28// use chrono::{DateTime, Utc};
29// Removed unused imports: use tracing::{debug, warn};
30
31pub use composer::MessageComposer;
32pub use database::MessageStore;
33pub use encryption::SecureMessaging;
34pub use key_exchange::{KeyExchange, KeyExchangeMessage};
35pub use media::MediaProcessor;
36pub use quic_media_streams::{QosParameters, QuicMediaStreamManager, StreamStats};
37pub use reactions::ReactionManager;
38pub use search::MessageSearch;
39pub use service::{MessagingService, SendOptions};
40pub use sync::RealtimeSync;
41pub use threads::ThreadManager;
42pub use transport::{DeliveryReceipt, DeliveryStatus, MessageTransport, ReceivedMessage};
43pub use types::*;
44pub use webrtc::{CallEvent, CallManager, WebRtcEvent, WebRtcService};
45pub use webrtc_quic_bridge::{RtpPacket, StreamConfig, StreamType, WebRtcQuicBridge};
46
47// Import the real DHT client
48pub use crate::dht::client::DhtClient;
49
50/// Request to send a message
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct SendMessageRequest {
53 pub channel_id: ChannelId,
54 pub content: MessageContent,
55 pub attachments: Vec<Vec<u8>>,
56 pub thread_id: Option<ThreadId>,
57 pub reply_to: Option<MessageId>,
58 pub mentions: Vec<UserHandle>,
59 pub ephemeral: bool,
60}
61
62// MessagingService is now defined in service.rs
63
64// Legacy implementation removed - see service.rs for the new implementation
65
66/*
67impl MessagingService {
68 /// Create a new messaging service with a real DHT client
69 pub async fn new(identity: FourWordAddress) -> Result<Self> {
70 // Create DHT client based on the user's identity
71 // Convert four-word address to a node ID
72 let node_id_bytes = blake3::hash(identity.to_string().as_bytes());
73 let node_id = crate::dht::core_engine::NodeId::from_key(
74 crate::dht::core_engine::DhtKey::from_bytes(*node_id_bytes.as_bytes())
75 );
76
77 // Create DHT client with the user's node ID
78 let dht_client = DhtClient::with_node_id(node_id)?;
79
80 // Initialize all components
81 let store = MessageStore::new(dht_client.clone()).await?;
82 let threads = ThreadManager::new(store.clone());
83 let reactions = ReactionManager::new(store.clone());
84 let media = MediaProcessor::new()?;
85 let search = MessageSearch::new(store.clone()).await?;
86 let encryption = SecureMessaging::new(identity.clone())?;
87 let sync = RealtimeSync::new(dht_client.clone()).await?;
88
89 Ok(Self {
90 store,
91 threads,
92 reactions,
93 media,
94 search,
95 encryption,
96 sync,
97 transport: None, // Will be initialized when network is available
98 webrtc: None, // Will be initialized when needed
99 identity,
100 })
101 }
102
103 /// Create a new messaging service with an existing DHT client
104 pub async fn with_dht_client(
105 identity: FourWordAddress,
106 dht_client: DhtClient,
107 ) -> Result<Self> {
108 let store = MessageStore::new(dht_client.clone()).await?;
109 let threads = ThreadManager::new(store.clone());
110 let reactions = ReactionManager::new(store.clone());
111 let media = MediaProcessor::new()?;
112 let search = MessageSearch::new(store.clone()).await?;
113 let encryption = SecureMessaging::new(identity.clone())?;
114 let sync = RealtimeSync::new(dht_client).await?;
115
116 Ok(Self {
117 store,
118 threads,
119 reactions,
120 media,
121 search,
122 encryption,
123 sync,
124 transport: None, // Will be initialized when network is available
125 webrtc: None, // Will be initialized when needed
126 identity,
127 })
128 }
129
130 /// Connect to network transport
131 pub async fn connect_transport(&mut self, network: Arc<crate::network::P2PNode>) -> Result<()> {
132 let transport = MessageTransport::new(network, self.store.dht_client.clone()).await?;
133
134 // Start background tasks
135 transport.monitor_network_quality().await;
136 transport.process_message_queue().await;
137
138 self.transport = Some(transport);
139 Ok(())
140 }
141
142 /// Initialize WebRTC service
143 pub async fn initialize_webrtc(&mut self) -> Result<()> {
144 // Create WebRTC service using the DHT client
145 let dht_engine = self.store.dht_client.core_engine();
146 let webrtc = WebRtcService::new(
147 self.identity.clone(),
148 dht_engine,
149 ).await?;
150
151 // Start the WebRTC service
152 webrtc.start().await?;
153
154 self.webrtc = Some(webrtc);
155 Ok(())
156 }
157
158 /// Initiate a voice/video call
159 pub async fn initiate_call(
160 &self,
161 callee: FourWordAddress,
162 constraints: webrtc::MediaConstraints,
163 ) -> Result<webrtc::CallId> {
164 if let Some(ref webrtc) = self.webrtc {
165 webrtc.initiate_call(callee, constraints).await
166 } else {
167 Err(anyhow::anyhow!("WebRTC service not initialized"))
168 }
169 }
170
171 /// Accept an incoming call
172 pub async fn accept_call(
173 &self,
174 call_id: webrtc::CallId,
175 constraints: webrtc::MediaConstraints,
176 ) -> Result<()> {
177 if let Some(ref webrtc) = self.webrtc {
178 webrtc.accept_call(call_id, constraints).await
179 } else {
180 Err(anyhow::anyhow!("WebRTC service not initialized"))
181 }
182 }
183
184 /// Reject an incoming call
185 pub async fn reject_call(&self, call_id: webrtc::CallId) -> Result<()> {
186 if let Some(ref webrtc) = self.webrtc {
187 webrtc.reject_call(call_id).await
188 } else {
189 Err(anyhow::anyhow!("WebRTC service not initialized"))
190 }
191 }
192
193 /// End an active call
194 pub async fn end_call(&self, call_id: webrtc::CallId) -> Result<()> {
195 if let Some(ref webrtc) = self.webrtc {
196 webrtc.end_call(call_id).await
197 } else {
198 Err(anyhow::anyhow!("WebRTC service not initialized"))
199 }
200 }
201
202 /// Get call state
203 pub async fn get_call_state(&self, call_id: webrtc::CallId) -> Option<webrtc::CallState> {
204 if let Some(ref webrtc) = self.webrtc {
205 webrtc.get_call_state(call_id).await
206 } else {
207 None
208 }
209 }
210
211 /// Subscribe to WebRTC events
212 pub fn subscribe_webrtc_events(&self) -> Option<tokio::sync::broadcast::Receiver<WebRtcEvent>> {
213 self.webrtc.as_ref().map(|w| w.subscribe_events())
214 }
215
216 /// Get WebRTC service reference
217 pub fn webrtc(&self) -> Option<&WebRtcService> {
218 self.webrtc.as_ref()
219 }
220
221 /// Send a new message
222 pub async fn send_message(&mut self, request: SendMessageRequest) -> Result<RichMessage> {
223 // Create message
224 let mut message = RichMessage::new(
225 self.identity.clone(),
226 request.channel_id,
227 request.content,
228 );
229
230 // Add attachments if any
231 for attachment in request.attachments {
232 let processed = self.media.process_attachment(attachment).await?;
233 message.attachments.push(processed);
234 }
235
236 // Handle threading
237 if let Some(thread_id) = request.thread_id {
238 message.thread_id = Some(thread_id);
239 self.threads.add_to_thread(thread_id, &message).await?;
240 }
241
242 // Handle reply
243 if let Some(reply_to) = request.reply_to {
244 message.reply_to = Some(reply_to);
245 }
246
247 // Encrypt message
248 let encrypted = self.encryption.encrypt_message(&message).await?;
249
250 // Store message (we store the original, not encrypted version locally)
251 self.store.store_message(&message).await?;
252
253 // Send via transport if available, otherwise use sync
254 if let Some(ref transport) = self.transport {
255 // Extract recipients from channel members
256 let recipients = self.get_channel_members(request.channel_id).await?;
257 let receipt = transport.send_message(&encrypted, recipients).await?;
258
259 // Log delivery status
260 for (recipient, status) in receipt.delivery_status {
261 match status {
262 DeliveryStatus::Delivered(_) => {
263 debug!("Message delivered to {}", recipient);
264 }
265 DeliveryStatus::Queued => {
266 debug!("Message queued for {}", recipient);
267 }
268 DeliveryStatus::Failed(e) => {
269 warn!("Message delivery failed for {}: {}", recipient, e);
270 }
271 _ => {}
272 }
273 }
274 } else {
275 // Fallback to broadcast sync
276 self.sync.broadcast_message(&encrypted).await?;
277 }
278
279 Ok(message)
280 }
281
282 /// Receive and process an incoming message
283 pub async fn receive_message(&mut self, encrypted: EncryptedMessage) -> Result<RichMessage> {
284 // Decrypt message
285 let message = self.encryption.decrypt_message(encrypted).await?;
286
287 // Verify signature
288 if !self.encryption.verify_message(&message) {
289 return Err(anyhow::anyhow!("Invalid message signature"));
290 }
291
292 // Store message
293 self.store.store_message(&message).await?;
294
295 // Update thread if applicable
296 if let Some(thread_id) = &message.thread_id {
297 self.threads.update_thread(*thread_id, &message).await?;
298 }
299
300 // Process mentions
301 if message.mentions.contains(&self.identity) {
302 self.handle_mention(&message).await?;
303 }
304
305 Ok(message)
306 }
307
308 /// Add a reaction to a message
309 pub async fn add_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
310 self.reactions.add_reaction(
311 message_id,
312 emoji.clone(),
313 crate::messaging::user_resolver::resolve_handle(&self.identity),
314 ).await?;
315
316 // Sync reaction
317 self.sync.broadcast_reaction(message_id, emoji, true).await?;
318
319 Ok(())
320 }
321
322 /// Remove a reaction from a message
323 pub async fn remove_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
324 self.reactions.remove_reaction(
325 message_id,
326 emoji.clone(),
327 crate::messaging::user_resolver::resolve_handle(&self.identity),
328 ).await?;
329
330 // Sync reaction removal
331 self.sync.broadcast_reaction(message_id, emoji, false).await?;
332
333 Ok(())
334 }
335
336 /// Edit a message
337 pub async fn edit_message(
338 &mut self,
339 message_id: MessageId,
340 new_content: MessageContent,
341 ) -> Result<()> {
342 // Get original message
343 let mut message = self.store.get_message(message_id).await?;
344
345 // Verify sender
346 if message.sender != self.identity {
347 return Err(anyhow::anyhow!("Cannot edit message from another user"));
348 }
349
350 // Update content
351 message.content = new_content.clone();
352 message.edited_at = Some(Utc::now());
353
354 // Re-encrypt and store
355 let _encrypted = self.encryption.encrypt_message(&message).await?;
356 self.store.update_message(&message).await?;
357
358 // Sync edit
359 self.sync.broadcast_edit(message_id, new_content).await?;
360
361 Ok(())
362 }
363
364 /// Delete a message
365 pub async fn delete_message(&mut self, message_id: MessageId) -> Result<()> {
366 // Get message
367 let mut message = self.store.get_message(message_id).await?;
368
369 // Verify sender
370 if message.sender != self.identity {
371 return Err(anyhow::anyhow!("Cannot delete message from another user"));
372 }
373
374 // Soft delete
375 message.deleted_at = Some(Utc::now());
376
377 // Update storage
378 self.store.update_message(&message).await?;
379
380 // Sync deletion
381 self.sync.broadcast_deletion(message_id).await?;
382
383 Ok(())
384 }
385
386 /// Search messages
387 pub async fn search_messages(&self, query: SearchQuery) -> Result<Vec<RichMessage>> {
388 self.search.search(query).await
389 }
390
391 /// Get message history for a channel
392 pub async fn get_channel_messages(
393 &self,
394 channel_id: ChannelId,
395 limit: usize,
396 before: Option<DateTime<Utc>>,
397 ) -> Result<Vec<RichMessage>> {
398 self.store.get_channel_messages(channel_id, limit, before).await
399 }
400
401 /// Get thread messages
402 pub async fn get_thread_messages(
403 &self,
404 thread_id: ThreadId,
405 ) -> Result<ThreadView> {
406 self.threads.get_thread(thread_id).await
407 }
408
409 /// Mark messages as read
410 pub async fn mark_as_read(&mut self, message_ids: Vec<MessageId>) -> Result<()> {
411 for message_id in message_ids {
412 self.store.mark_as_read(
413 message_id,
414 crate::messaging::user_resolver::resolve_handle(&self.identity),
415 ).await?;
416 self.sync.broadcast_read_receipt(message_id).await?;
417 }
418 Ok(())
419 }
420
421 /// Start typing indicator
422 pub async fn start_typing(&mut self, channel_id: ChannelId) -> Result<()> {
423 self.sync
424 .broadcast_typing(
425 channel_id,
426 crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
427 true,
428 )
429 .await
430 }
431
432 /// Stop typing indicator
433 pub async fn stop_typing(&mut self, channel_id: ChannelId) -> Result<()> {
434 self.sync
435 .broadcast_typing(
436 channel_id,
437 crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
438 false,
439 )
440 .await
441 }
442
443 /// Initiate key exchange with a peer
444 pub async fn initiate_key_exchange(&self, peer: FourWordAddress) -> Result<KeyExchangeMessage> {
445 self.encryption.key_exchange.initiate_exchange(peer).await
446 }
447
448 /// Handle incoming key exchange message
449 pub async fn handle_key_exchange(&self, message: KeyExchangeMessage) -> Result<Option<KeyExchangeMessage>> {
450 use key_exchange::KeyExchangeType;
451
452 match message.message_type {
453 KeyExchangeType::Initiation => {
454 // Respond to initiation
455 let response = self.encryption.key_exchange.respond_to_exchange(message).await?;
456 Ok(Some(response))
457 }
458 KeyExchangeType::Response => {
459 // Complete the exchange
460 self.encryption.key_exchange.complete_exchange(message).await?;
461 Ok(None)
462 }
463 KeyExchangeType::PrekeyBundle => {
464 // Handle prekey bundle
465 Ok(None)
466 }
467 }
468 }
469
470 /// Get our prekey bundle for others
471 pub async fn get_prekey_bundle(&self) -> key_exchange::PrekeyBundle {
472 self.encryption.key_exchange.get_prekey_bundle().await
473 }
474
475 /// Rotate encryption keys
476 pub async fn rotate_keys(&self) -> Result<()> {
477 self.encryption.key_exchange.rotate_prekeys().await?;
478 self.encryption.key_exchange.cleanup_expired().await?;
479 Ok(())
480 }
481
482 /// Handle mention notification
483 async fn handle_mention(&self, message: &RichMessage) -> Result<()> {
484 // Create notification
485 log::info!("Mentioned in message: {:?}", message.id);
486 // TODO: Trigger system notification
487 Ok(())
488 }
489
490 /// Get channel members
491 async fn get_channel_members(&self, _channel_id: ChannelId) -> Result<Vec<FourWordAddress>> {
492 // TODO: Implement channel membership lookup
493 // For now, return empty list which will fallback to broadcast
494 Ok(Vec::new())
495 }
496}
497*/
498
499// MessageStore is now a type alias in database.rs
500
501/*
502/// Message store for persistence
503#[derive(Clone)]
504pub struct MessageStore {
505 inner: Arc<database::DatabaseMessageStore>,
506 dht_client: DhtClient,
507}
508
509impl MessageStore {
510 pub async fn new(dht_client: DhtClient) -> Result<Self> {
511 let inner = Arc::new(
512 database::DatabaseMessageStore::new(dht_client.clone(), None).await?
513 );
514
515 Ok(Self {
516 inner,
517 dht_client,
518 })
519 }
520
521 pub async fn store_message(&self, message: &RichMessage) -> Result<()> {
522 self.inner.store_message(message).await
523 }
524
525 pub async fn get_message(&self, id: MessageId) -> Result<RichMessage> {
526 self.inner.get_message(id).await
527 }
528
529 pub async fn update_message(&self, message: &RichMessage) -> Result<()> {
530 self.inner.update_message(message).await
531 }
532
533 pub async fn get_channel_messages(
534 &self,
535 channel_id: ChannelId,
536 limit: usize,
537 before: Option<DateTime<Utc>>,
538 ) -> Result<Vec<RichMessage>> {
539 self.inner.get_channel_messages(channel_id, limit, before).await
540 }
541
542 pub async fn mark_as_read(
543 &self,
544 message_id: MessageId,
545 user: FourWordAddress,
546 ) -> Result<()> {
547 self.inner.mark_as_read(message_id, user).await
548 }
549
550 /// Search messages
551 pub async fn search_messages(&self, query: &str, channel_id: Option<ChannelId>) -> Result<Vec<RichMessage>> {
552 self.inner.search_messages(query, channel_id, 50).await
553 }
554
555 /// Get thread messages
556 pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
557 self.inner.get_thread_messages(thread_id).await
558 }
559
560 /// Add reaction
561 pub async fn add_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
562 self.inner.add_reaction(message_id, emoji, user).await
563 }
564
565 /// Remove reaction
566 pub async fn remove_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
567 self.inner.remove_reaction(message_id, emoji, user).await
568 }
569
570 /// Get database statistics
571 pub async fn get_stats(&self) -> Result<database::DatabaseStats> {
572 self.inner.get_stats().await
573 }
574
575 /// Clean up ephemeral messages
576 pub async fn cleanup_ephemeral(&self, ttl_seconds: i64) -> Result<usize> {
577 self.inner.cleanup_ephemeral(ttl_seconds).await
578 }
579}
580*/
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use crate::identity::FourWordAddress;
586
587 #[tokio::test]
588 async fn test_message_creation() {
589 let identity = crate::messaging::user_handle::UserHandle::from("ocean-forest-moon-star");
590 let channel = ChannelId::new();
591 let content = MessageContent::Text("Hello, world!".to_string());
592
593 let message = RichMessage::new(identity.clone(), channel, content.clone());
594
595 assert_eq!(message.sender, identity);
596 assert_eq!(message.channel_id, channel);
597 assert!(matches!(message.content, MessageContent::Text(_)));
598 }
599
600 #[tokio::test]
601 async fn test_messaging_service_with_real_dht() {
602 let identity = FourWordAddress::from("ocean-forest-moon-star");
603
604 // Create messaging service with real DHT
605 let service = MessagingService::new(
606 identity.clone(),
607 crate::messaging::DhtClient::new().unwrap(),
608 )
609 .await;
610 assert!(service.is_ok());
611
612 let service = service.unwrap();
613
614 // Test sending a message
615 let _request = SendMessageRequest {
616 channel_id: ChannelId::new(),
617 content: MessageContent::Text("Test with real DHT".to_string()),
618 attachments: vec![],
619 thread_id: None,
620 reply_to: None,
621 mentions: vec![],
622 ephemeral: false,
623 };
624
625 let result = service
626 .send_message(
627 vec![FourWordAddress::from("recipient-alpha-bravo-charlie")],
628 MessageContent::Text("Hello".to_string()),
629 ChannelId::new(),
630 crate::messaging::SendOptions::default(),
631 )
632 .await;
633 assert!(result.is_ok());
634 }
635}