saorsa_core/messaging/mod.rs
1// Rich Messaging Module for P2P Foundation
2// Implements WhatsApp/Slack-style messaging with full decentralization
3
4pub mod composer;
5pub mod database;
6pub mod encryption;
7pub mod key_exchange;
8pub mod media;
9#[cfg(any(test, feature = "mocks"))]
10pub mod mocks;
11pub mod quic_media_streams;
12pub mod reactions;
13pub mod search;
14pub mod service;
15pub mod sync;
16pub mod threads;
17pub mod transport;
18pub mod types;
19pub mod user_handle;
20pub mod user_resolver;
21pub mod webrtc;
22pub mod webrtc_quic_bridge;
23
24use user_handle::UserHandle;
25// Removed unused imports
26// use anyhow::Result;
27use serde::{Deserialize, Serialize};
28// use std::sync::Arc;
29// use chrono::{DateTime, Utc};
30// Removed unused imports: use tracing::{debug, warn};
31
32pub use composer::MessageComposer;
33pub use database::MessageStore;
34pub use encryption::SecureMessaging;
35pub use key_exchange::{KeyExchange, KeyExchangeMessage};
36pub use media::MediaProcessor;
37pub use quic_media_streams::{QosParameters, QuicMediaStreamManager, StreamStats};
38pub use reactions::ReactionManager;
39pub use search::MessageSearch;
40pub use service::{MessagingService, SendOptions};
41pub use sync::RealtimeSync;
42pub use threads::ThreadManager;
43pub use transport::{DeliveryReceipt, DeliveryStatus, MessageTransport, ReceivedMessage};
44pub use types::*;
45pub use webrtc::{CallEvent, CallManager, WebRtcEvent, WebRtcService};
46pub use webrtc_quic_bridge::{RtpPacket, StreamConfig, StreamType, WebRtcQuicBridge};
47
48// Import the real DHT client
49pub use crate::dht::client::DhtClient;
50
51/// Request to send a message
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SendMessageRequest {
54 pub channel_id: ChannelId,
55 pub content: MessageContent,
56 pub attachments: Vec<Vec<u8>>,
57 pub thread_id: Option<ThreadId>,
58 pub reply_to: Option<MessageId>,
59 pub mentions: Vec<UserHandle>,
60 pub ephemeral: bool,
61}
62
63// MessagingService is now defined in service.rs
64
65// Legacy implementation removed - see service.rs for the new implementation
66
67/*
68impl MessagingService {
69 /// Create a new messaging service with a real DHT client
70 pub async fn new(identity: FourWordAddress) -> Result<Self> {
71 // Create DHT client based on the user's identity
72 // Convert four-word address to a node ID
73 let node_id_bytes = blake3::hash(identity.to_string().as_bytes());
74 let node_id = crate::dht::core_engine::NodeId::from_key(
75 crate::dht::core_engine::DhtKey::from_bytes(*node_id_bytes.as_bytes())
76 );
77
78 // Create DHT client with the user's node ID
79 let dht_client = DhtClient::with_node_id(node_id)?;
80
81 // Initialize all components
82 let store = MessageStore::new(dht_client.clone()).await?;
83 let threads = ThreadManager::new(store.clone());
84 let reactions = ReactionManager::new(store.clone());
85 let media = MediaProcessor::new()?;
86 let search = MessageSearch::new(store.clone()).await?;
87 let encryption = SecureMessaging::new(identity.clone(), dht_client.clone()).await?;
88 let sync = RealtimeSync::new(dht_client.clone()).await?;
89
90 Ok(Self {
91 store,
92 threads,
93 reactions,
94 media,
95 search,
96 encryption,
97 sync,
98 transport: None, // Will be initialized when network is available
99 webrtc: None, // Will be initialized when needed
100 identity,
101 })
102 }
103
104 /// Create a new messaging service with an existing DHT client
105 pub async fn with_dht_client(
106 identity: FourWordAddress,
107 dht_client: DhtClient,
108 ) -> Result<Self> {
109 let store = MessageStore::new(dht_client.clone()).await?;
110 let threads = ThreadManager::new(store.clone());
111 let reactions = ReactionManager::new(store.clone());
112 let media = MediaProcessor::new()?;
113 let search = MessageSearch::new(store.clone()).await?;
114 let encryption = SecureMessaging::new(identity.clone(), dht_client.clone()).await?;
115 let sync = RealtimeSync::new(dht_client).await?;
116
117 Ok(Self {
118 store,
119 threads,
120 reactions,
121 media,
122 search,
123 encryption,
124 sync,
125 transport: None, // Will be initialized when network is available
126 webrtc: None, // Will be initialized when needed
127 identity,
128 })
129 }
130
131 /// Connect to network transport
132 pub async fn connect_transport(&mut self, network: Arc<crate::network::P2PNode>) -> Result<()> {
133 let transport = MessageTransport::new(network, self.store.dht_client.clone()).await?;
134
135 // Start background tasks
136 transport.monitor_network_quality().await;
137 transport.process_message_queue().await;
138
139 self.transport = Some(transport);
140 Ok(())
141 }
142
143 /// Initialize WebRTC service
144 pub async fn initialize_webrtc(&mut self) -> Result<()> {
145 // Create WebRTC service using the DHT client
146 let dht_engine = self.store.dht_client.core_engine();
147 let webrtc = WebRtcService::new(
148 self.identity.clone(),
149 dht_engine,
150 ).await?;
151
152 // Start the WebRTC service
153 webrtc.start().await?;
154
155 self.webrtc = Some(webrtc);
156 Ok(())
157 }
158
159 /// Initiate a voice/video call
160 pub async fn initiate_call(
161 &self,
162 callee: FourWordAddress,
163 constraints: webrtc::MediaConstraints,
164 ) -> Result<webrtc::CallId> {
165 if let Some(ref webrtc) = self.webrtc {
166 webrtc.initiate_call(callee, constraints).await
167 } else {
168 Err(anyhow::anyhow!("WebRTC service not initialized"))
169 }
170 }
171
172 /// Accept an incoming call
173 pub async fn accept_call(
174 &self,
175 call_id: webrtc::CallId,
176 constraints: webrtc::MediaConstraints,
177 ) -> Result<()> {
178 if let Some(ref webrtc) = self.webrtc {
179 webrtc.accept_call(call_id, constraints).await
180 } else {
181 Err(anyhow::anyhow!("WebRTC service not initialized"))
182 }
183 }
184
185 /// Reject an incoming call
186 pub async fn reject_call(&self, call_id: webrtc::CallId) -> Result<()> {
187 if let Some(ref webrtc) = self.webrtc {
188 webrtc.reject_call(call_id).await
189 } else {
190 Err(anyhow::anyhow!("WebRTC service not initialized"))
191 }
192 }
193
194 /// End an active call
195 pub async fn end_call(&self, call_id: webrtc::CallId) -> Result<()> {
196 if let Some(ref webrtc) = self.webrtc {
197 webrtc.end_call(call_id).await
198 } else {
199 Err(anyhow::anyhow!("WebRTC service not initialized"))
200 }
201 }
202
203 /// Get call state
204 pub async fn get_call_state(&self, call_id: webrtc::CallId) -> Option<webrtc::CallState> {
205 if let Some(ref webrtc) = self.webrtc {
206 webrtc.get_call_state(call_id).await
207 } else {
208 None
209 }
210 }
211
212 /// Subscribe to WebRTC events
213 pub fn subscribe_webrtc_events(&self) -> Option<tokio::sync::broadcast::Receiver<WebRtcEvent>> {
214 self.webrtc.as_ref().map(|w| w.subscribe_events())
215 }
216
217 /// Get WebRTC service reference
218 pub fn webrtc(&self) -> Option<&WebRtcService> {
219 self.webrtc.as_ref()
220 }
221
222 /// Send a new message
223 pub async fn send_message(&mut self, request: SendMessageRequest) -> Result<RichMessage> {
224 // Create message
225 let mut message = RichMessage::new(
226 self.identity.clone(),
227 request.channel_id,
228 request.content,
229 );
230
231 // Add attachments if any
232 for attachment in request.attachments {
233 let processed = self.media.process_attachment(attachment).await?;
234 message.attachments.push(processed);
235 }
236
237 // Handle threading
238 if let Some(thread_id) = request.thread_id {
239 message.thread_id = Some(thread_id);
240 self.threads.add_to_thread(thread_id, &message).await?;
241 }
242
243 // Handle reply
244 if let Some(reply_to) = request.reply_to {
245 message.reply_to = Some(reply_to);
246 }
247
248 // Encrypt message
249 let encrypted = self.encryption.encrypt_message(&message).await?;
250
251 // Store message (we store the original, not encrypted version locally)
252 self.store.store_message(&message).await?;
253
254 // Send via transport if available, otherwise use sync
255 if let Some(ref transport) = self.transport {
256 // Extract recipients from channel members
257 let recipients = self.get_channel_members(request.channel_id).await?;
258 let receipt = transport.send_message(&encrypted, recipients).await?;
259
260 // Log delivery status
261 for (recipient, status) in receipt.delivery_status {
262 match status {
263 DeliveryStatus::Delivered(_) => {
264 debug!("Message delivered to {}", recipient);
265 }
266 DeliveryStatus::Queued => {
267 debug!("Message queued for {}", recipient);
268 }
269 DeliveryStatus::Failed(e) => {
270 warn!("Message delivery failed for {}: {}", recipient, e);
271 }
272 _ => {}
273 }
274 }
275 } else {
276 // Fallback to broadcast sync
277 self.sync.broadcast_message(&encrypted).await?;
278 }
279
280 Ok(message)
281 }
282
283 /// Receive and process an incoming message
284 pub async fn receive_message(&mut self, encrypted: EncryptedMessage) -> Result<RichMessage> {
285 // Decrypt message
286 let message = self.encryption.decrypt_message(encrypted).await?;
287
288 // Verify signature
289 if !self.encryption.verify_message(&message) {
290 return Err(anyhow::anyhow!("Invalid message signature"));
291 }
292
293 // Store message
294 self.store.store_message(&message).await?;
295
296 // Update thread if applicable
297 if let Some(thread_id) = &message.thread_id {
298 self.threads.update_thread(*thread_id, &message).await?;
299 }
300
301 // Process mentions
302 if message.mentions.contains(&self.identity) {
303 self.handle_mention(&message).await?;
304 }
305
306 Ok(message)
307 }
308
309 /// Add a reaction to a message
310 pub async fn add_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
311 self.reactions.add_reaction(
312 message_id,
313 emoji.clone(),
314 crate::messaging::user_resolver::resolve_handle(&self.identity),
315 ).await?;
316
317 // Sync reaction
318 self.sync.broadcast_reaction(message_id, emoji, true).await?;
319
320 Ok(())
321 }
322
323 /// Remove a reaction from a message
324 pub async fn remove_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
325 self.reactions.remove_reaction(
326 message_id,
327 emoji.clone(),
328 crate::messaging::user_resolver::resolve_handle(&self.identity),
329 ).await?;
330
331 // Sync reaction removal
332 self.sync.broadcast_reaction(message_id, emoji, false).await?;
333
334 Ok(())
335 }
336
337 /// Edit a message
338 pub async fn edit_message(
339 &mut self,
340 message_id: MessageId,
341 new_content: MessageContent,
342 ) -> Result<()> {
343 // Get original message
344 let mut message = self.store.get_message(message_id).await?;
345
346 // Verify sender
347 if message.sender != self.identity {
348 return Err(anyhow::anyhow!("Cannot edit message from another user"));
349 }
350
351 // Update content
352 message.content = new_content.clone();
353 message.edited_at = Some(Utc::now());
354
355 // Re-encrypt and store
356 let _encrypted = self.encryption.encrypt_message(&message).await?;
357 self.store.update_message(&message).await?;
358
359 // Sync edit
360 self.sync.broadcast_edit(message_id, new_content).await?;
361
362 Ok(())
363 }
364
365 /// Delete a message
366 pub async fn delete_message(&mut self, message_id: MessageId) -> Result<()> {
367 // Get message
368 let mut message = self.store.get_message(message_id).await?;
369
370 // Verify sender
371 if message.sender != self.identity {
372 return Err(anyhow::anyhow!("Cannot delete message from another user"));
373 }
374
375 // Soft delete
376 message.deleted_at = Some(Utc::now());
377
378 // Update storage
379 self.store.update_message(&message).await?;
380
381 // Sync deletion
382 self.sync.broadcast_deletion(message_id).await?;
383
384 Ok(())
385 }
386
387 /// Search messages
388 pub async fn search_messages(&self, query: SearchQuery) -> Result<Vec<RichMessage>> {
389 self.search.search(query).await
390 }
391
392 /// Get message history for a channel
393 pub async fn get_channel_messages(
394 &self,
395 channel_id: ChannelId,
396 limit: usize,
397 before: Option<DateTime<Utc>>,
398 ) -> Result<Vec<RichMessage>> {
399 self.store.get_channel_messages(channel_id, limit, before).await
400 }
401
402 /// Get thread messages
403 pub async fn get_thread_messages(
404 &self,
405 thread_id: ThreadId,
406 ) -> Result<ThreadView> {
407 self.threads.get_thread(thread_id).await
408 }
409
410 /// Mark messages as read
411 pub async fn mark_as_read(&mut self, message_ids: Vec<MessageId>) -> Result<()> {
412 for message_id in message_ids {
413 self.store.mark_as_read(
414 message_id,
415 crate::messaging::user_resolver::resolve_handle(&self.identity),
416 ).await?;
417 self.sync.broadcast_read_receipt(message_id).await?;
418 }
419 Ok(())
420 }
421
422 /// Start typing indicator
423 pub async fn start_typing(&mut self, channel_id: ChannelId) -> Result<()> {
424 self.sync
425 .broadcast_typing(
426 channel_id,
427 crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
428 true,
429 )
430 .await
431 }
432
433 /// Stop typing indicator
434 pub async fn stop_typing(&mut self, channel_id: ChannelId) -> Result<()> {
435 self.sync
436 .broadcast_typing(
437 channel_id,
438 crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
439 false,
440 )
441 .await
442 }
443
444 /// Initiate key exchange with a peer
445 pub async fn initiate_key_exchange(&self, peer: FourWordAddress) -> Result<KeyExchangeMessage> {
446 self.encryption.key_exchange.initiate_exchange(peer).await
447 }
448
449 /// Handle incoming key exchange message
450 pub async fn handle_key_exchange(&self, message: KeyExchangeMessage) -> Result<Option<KeyExchangeMessage>> {
451 use key_exchange::KeyExchangeType;
452
453 match message.message_type {
454 KeyExchangeType::Initiation => {
455 // Respond to initiation
456 let response = self.encryption.key_exchange.respond_to_exchange(message).await?;
457 Ok(Some(response))
458 }
459 KeyExchangeType::Response => {
460 // Complete the exchange
461 self.encryption.key_exchange.complete_exchange(message).await?;
462 Ok(None)
463 }
464 KeyExchangeType::PrekeyBundle => {
465 // Handle prekey bundle
466 Ok(None)
467 }
468 }
469 }
470
471 /// Get our prekey bundle for others
472 pub async fn get_prekey_bundle(&self) -> key_exchange::PrekeyBundle {
473 self.encryption.key_exchange.get_prekey_bundle().await
474 }
475
476 /// Rotate encryption keys
477 pub async fn rotate_keys(&self) -> Result<()> {
478 self.encryption.key_exchange.rotate_prekeys().await?;
479 self.encryption.key_exchange.cleanup_expired().await?;
480 Ok(())
481 }
482
483 /// Handle mention notification
484 async fn handle_mention(&self, message: &RichMessage) -> Result<()> {
485 // Create notification
486 tracing::info!("Mentioned in message: {:?}", message.id);
487 // TODO: Trigger system notification
488 Ok(())
489 }
490
491 /// Get channel members
492 async fn get_channel_members(&self, _channel_id: ChannelId) -> Result<Vec<FourWordAddress>> {
493 // TODO: Implement channel membership lookup
494 // For now, return empty list which will fallback to broadcast
495 Ok(Vec::new())
496 }
497}
498*/
499
500// MessageStore is now a type alias in database.rs
501
502/*
503/// Message store for persistence
504#[derive(Clone)]
505pub struct MessageStore {
506 inner: Arc<database::DatabaseMessageStore>,
507 dht_client: DhtClient,
508}
509
510impl MessageStore {
511 pub async fn new(dht_client: DhtClient) -> Result<Self> {
512 let inner = Arc::new(
513 database::DatabaseMessageStore::new(dht_client.clone(), None).await?
514 );
515
516 Ok(Self {
517 inner,
518 dht_client,
519 })
520 }
521
522 pub async fn store_message(&self, message: &RichMessage) -> Result<()> {
523 self.inner.store_message(message).await
524 }
525
526 pub async fn get_message(&self, id: MessageId) -> Result<RichMessage> {
527 self.inner.get_message(id).await
528 }
529
530 pub async fn update_message(&self, message: &RichMessage) -> Result<()> {
531 self.inner.update_message(message).await
532 }
533
534 pub async fn get_channel_messages(
535 &self,
536 channel_id: ChannelId,
537 limit: usize,
538 before: Option<DateTime<Utc>>,
539 ) -> Result<Vec<RichMessage>> {
540 self.inner.get_channel_messages(channel_id, limit, before).await
541 }
542
543 pub async fn mark_as_read(
544 &self,
545 message_id: MessageId,
546 user: FourWordAddress,
547 ) -> Result<()> {
548 self.inner.mark_as_read(message_id, user).await
549 }
550
551 /// Search messages
552 pub async fn search_messages(&self, query: &str, channel_id: Option<ChannelId>) -> Result<Vec<RichMessage>> {
553 self.inner.search_messages(query, channel_id, 50).await
554 }
555
556 /// Get thread messages
557 pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
558 self.inner.get_thread_messages(thread_id).await
559 }
560
561 /// Add reaction
562 pub async fn add_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
563 self.inner.add_reaction(message_id, emoji, user).await
564 }
565
566 /// Remove reaction
567 pub async fn remove_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
568 self.inner.remove_reaction(message_id, emoji, user).await
569 }
570
571 /// Get database statistics
572 pub async fn get_stats(&self) -> Result<database::DatabaseStats> {
573 self.inner.get_stats().await
574 }
575
576 /// Clean up ephemeral messages
577 pub async fn cleanup_ephemeral(&self, ttl_seconds: i64) -> Result<usize> {
578 self.inner.cleanup_ephemeral(ttl_seconds).await
579 }
580}
581*/
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586 use crate::identity::FourWordAddress;
587
588 #[tokio::test]
589 async fn test_message_creation() {
590 let identity = crate::messaging::user_handle::UserHandle::from("ocean-forest-moon-star");
591 let channel = ChannelId::new();
592 let content = MessageContent::Text("Hello, world!".to_string());
593
594 let message = RichMessage::new(identity.clone(), channel, content.clone());
595
596 assert_eq!(message.sender, identity);
597 assert_eq!(message.channel_id, channel);
598 assert!(matches!(message.content, MessageContent::Text(_)));
599 }
600
601 #[tokio::test]
602 async fn test_messaging_service_with_real_dht() {
603 // Skip this test in regular test runs as it requires a real DHT network
604 // and can cause nested runtime issues. This test should be run separately
605 // with proper network setup.
606 println!("Skipping test_messaging_service_with_real_dht - requires separate network setup");
607
608 // For now, just test that we can create the identity
609 let identity = FourWordAddress::from("ocean-forest-moon-star");
610 assert!(!identity.to_string().is_empty());
611 }
612}