saorsa_core/messaging/mod.rs
1// Rich Messaging Module for P2P Foundation
2// Implements WhatsApp/Slack-style messaging with full decentralization
3
4pub mod composer;
5pub mod database;
6pub mod encryption;
7pub mod key_exchange;
8pub mod media;
9#[cfg(any(test, feature = "mocks"))]
10pub mod mocks;
11pub mod network_config;
12pub mod quic_media_streams;
13pub mod reactions;
14pub mod search;
15pub mod service;
16pub mod sync;
17pub mod threads;
18pub mod transport;
19pub mod types;
20pub mod user_handle;
21pub mod user_resolver;
22
23use user_handle::UserHandle;
24// Removed unused imports
25// use anyhow::Result;
26use serde::{Deserialize, Serialize};
27// use std::sync::Arc;
28// use chrono::{DateTime, Utc};
29// Removed unused imports: use tracing::{debug, warn};
30
31pub use composer::MessageComposer;
32pub use database::MessageStore;
33pub use encryption::SecureMessaging;
34pub use key_exchange::{KeyExchange, KeyExchangeMessage};
35pub use media::MediaProcessor;
36pub use network_config::{
37 IpMode, NatTraversalMode, NetworkConfig, NetworkConfigError, PortConfig, RetryBehavior,
38};
39pub use quic_media_streams::{QosParameters, QuicMediaStreamManager, StreamStats};
40pub use reactions::ReactionManager;
41pub use search::MessageSearch;
42pub use service::{MessagingService, SendOptions};
43pub use sync::RealtimeSync;
44pub use threads::ThreadManager;
45pub use transport::{DeliveryReceipt, DeliveryStatus, MessageTransport, ReceivedMessage};
46pub use types::*;
47
48// Re-export WebRTC types from saorsa-webrtc crate
49pub use saorsa_webrtc::{
50 CallEvent, CallId, CallManager, CallState, MediaConstraints, MediaEvent, MediaStreamManager,
51 PeerIdentity, SignalingHandler, SignalingTransport, WebRtcEvent, WebRtcQuicBridge,
52 WebRtcService,
53};
54
55// Import the real DHT client
56pub use crate::dht::client::DhtClient;
57
58/// Request to send a message
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct SendMessageRequest {
61 pub channel_id: ChannelId,
62 pub content: MessageContent,
63 pub attachments: Vec<Vec<u8>>,
64 pub thread_id: Option<ThreadId>,
65 pub reply_to: Option<MessageId>,
66 pub mentions: Vec<UserHandle>,
67 pub ephemeral: bool,
68}
69
70// MessagingService is now defined in service.rs
71
72// Legacy implementation removed - see service.rs for the new implementation
73
74/*
75impl MessagingService {
76 /// Create a new messaging service with a real DHT client
77 pub async fn new(identity: FourWordAddress) -> Result<Self> {
78 // Create DHT client based on the user's identity
79 // Convert four-word address to a node ID
80 let node_id_bytes = blake3::hash(identity.to_string().as_bytes());
81 let node_id = crate::dht::core_engine::NodeId::from_key(
82 crate::dht::core_engine::DhtKey::from_bytes(*node_id_bytes.as_bytes())
83 );
84
85 // Create DHT client with the user's node ID
86 let dht_client = DhtClient::with_node_id(node_id)?;
87
88 // Initialize all components
89 let store = MessageStore::new(dht_client.clone()).await?;
90 let threads = ThreadManager::new(store.clone());
91 let reactions = ReactionManager::new(store.clone());
92 let media = MediaProcessor::new()?;
93 let search = MessageSearch::new(store.clone()).await?;
94 let encryption = SecureMessaging::new(identity.clone(), dht_client.clone()).await?;
95 let sync = RealtimeSync::new(dht_client.clone()).await?;
96
97 Ok(Self {
98 store,
99 threads,
100 reactions,
101 media,
102 search,
103 encryption,
104 sync,
105 transport: None, // Will be initialized when network is available
106 webrtc: None, // Will be initialized when needed
107 identity,
108 })
109 }
110
111 /// Create a new messaging service with an existing DHT client
112 pub async fn with_dht_client(
113 identity: FourWordAddress,
114 dht_client: DhtClient,
115 ) -> Result<Self> {
116 let store = MessageStore::new(dht_client.clone()).await?;
117 let threads = ThreadManager::new(store.clone());
118 let reactions = ReactionManager::new(store.clone());
119 let media = MediaProcessor::new()?;
120 let search = MessageSearch::new(store.clone()).await?;
121 let encryption = SecureMessaging::new(identity.clone(), dht_client.clone()).await?;
122 let sync = RealtimeSync::new(dht_client).await?;
123
124 Ok(Self {
125 store,
126 threads,
127 reactions,
128 media,
129 search,
130 encryption,
131 sync,
132 transport: None, // Will be initialized when network is available
133 webrtc: None, // Will be initialized when needed
134 identity,
135 })
136 }
137
138 /// Connect to network transport
139 pub async fn connect_transport(&mut self, network: Arc<crate::network::P2PNode>) -> Result<()> {
140 let transport = MessageTransport::new(network, self.store.dht_client.clone()).await?;
141
142 // Start background tasks
143 transport.monitor_network_quality().await;
144 transport.process_message_queue().await;
145
146 self.transport = Some(transport);
147 Ok(())
148 }
149
150 /// Initialize WebRTC service
151 pub async fn initialize_webrtc(&mut self) -> Result<()> {
152 // Create WebRTC service using the DHT client
153 let dht_engine = self.store.dht_client.core_engine();
154 let webrtc = WebRtcService::new(
155 self.identity.clone(),
156 dht_engine,
157 ).await?;
158
159 // Start the WebRTC service
160 webrtc.start().await?;
161
162 self.webrtc = Some(webrtc);
163 Ok(())
164 }
165
166 /// Initiate a voice/video call
167 pub async fn initiate_call(
168 &self,
169 callee: FourWordAddress,
170 constraints: webrtc::MediaConstraints,
171 ) -> Result<webrtc::CallId> {
172 if let Some(ref webrtc) = self.webrtc {
173 webrtc.initiate_call(callee, constraints).await
174 } else {
175 Err(anyhow::anyhow!("WebRTC service not initialized"))
176 }
177 }
178
179 /// Accept an incoming call
180 pub async fn accept_call(
181 &self,
182 call_id: webrtc::CallId,
183 constraints: webrtc::MediaConstraints,
184 ) -> Result<()> {
185 if let Some(ref webrtc) = self.webrtc {
186 webrtc.accept_call(call_id, constraints).await
187 } else {
188 Err(anyhow::anyhow!("WebRTC service not initialized"))
189 }
190 }
191
192 /// Reject an incoming call
193 pub async fn reject_call(&self, call_id: webrtc::CallId) -> Result<()> {
194 if let Some(ref webrtc) = self.webrtc {
195 webrtc.reject_call(call_id).await
196 } else {
197 Err(anyhow::anyhow!("WebRTC service not initialized"))
198 }
199 }
200
201 /// End an active call
202 pub async fn end_call(&self, call_id: webrtc::CallId) -> Result<()> {
203 if let Some(ref webrtc) = self.webrtc {
204 webrtc.end_call(call_id).await
205 } else {
206 Err(anyhow::anyhow!("WebRTC service not initialized"))
207 }
208 }
209
210 /// Get call state
211 pub async fn get_call_state(&self, call_id: webrtc::CallId) -> Option<webrtc::CallState> {
212 if let Some(ref webrtc) = self.webrtc {
213 webrtc.get_call_state(call_id).await
214 } else {
215 None
216 }
217 }
218
219 /// Subscribe to WebRTC events
220 pub fn subscribe_webrtc_events(&self) -> Option<tokio::sync::broadcast::Receiver<WebRtcEvent>> {
221 self.webrtc.as_ref().map(|w| w.subscribe_events())
222 }
223
224 /// Get WebRTC service reference
225 pub fn webrtc(&self) -> Option<&WebRtcService> {
226 self.webrtc.as_ref()
227 }
228
229 /// Send a new message
230 pub async fn send_message(&mut self, request: SendMessageRequest) -> Result<RichMessage> {
231 // Create message
232 let mut message = RichMessage::new(
233 self.identity.clone(),
234 request.channel_id,
235 request.content,
236 );
237
238 // Add attachments if any
239 for attachment in request.attachments {
240 let processed = self.media.process_attachment(attachment).await?;
241 message.attachments.push(processed);
242 }
243
244 // Handle threading
245 if let Some(thread_id) = request.thread_id {
246 message.thread_id = Some(thread_id);
247 self.threads.add_to_thread(thread_id, &message).await?;
248 }
249
250 // Handle reply
251 if let Some(reply_to) = request.reply_to {
252 message.reply_to = Some(reply_to);
253 }
254
255 // Encrypt message
256 let encrypted = self.encryption.encrypt_message(&message).await?;
257
258 // Store message (we store the original, not encrypted version locally)
259 self.store.store_message(&message).await?;
260
261 // Send via transport if available, otherwise use sync
262 if let Some(ref transport) = self.transport {
263 // Extract recipients from channel members
264 let recipients = self.get_channel_members(request.channel_id).await?;
265 let receipt = transport.send_message(&encrypted, recipients).await?;
266
267 // Log delivery status
268 for (recipient, status) in receipt.delivery_status {
269 match status {
270 DeliveryStatus::Delivered(_) => {
271 debug!("Message delivered to {}", recipient);
272 }
273 DeliveryStatus::Queued => {
274 debug!("Message queued for {}", recipient);
275 }
276 DeliveryStatus::Failed(e) => {
277 warn!("Message delivery failed for {}: {}", recipient, e);
278 }
279 _ => {}
280 }
281 }
282 } else {
283 // Fallback to broadcast sync
284 self.sync.broadcast_message(&encrypted).await?;
285 }
286
287 Ok(message)
288 }
289
290 /// Receive and process an incoming message
291 pub async fn receive_message(&mut self, encrypted: EncryptedMessage) -> Result<RichMessage> {
292 // Decrypt message
293 let message = self.encryption.decrypt_message(encrypted).await?;
294
295 // Verify signature
296 if !self.encryption.verify_message(&message) {
297 return Err(anyhow::anyhow!("Invalid message signature"));
298 }
299
300 // Store message
301 self.store.store_message(&message).await?;
302
303 // Update thread if applicable
304 if let Some(thread_id) = &message.thread_id {
305 self.threads.update_thread(*thread_id, &message).await?;
306 }
307
308 // Process mentions
309 if message.mentions.contains(&self.identity) {
310 self.handle_mention(&message).await?;
311 }
312
313 Ok(message)
314 }
315
316 /// Add a reaction to a message
317 pub async fn add_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
318 self.reactions.add_reaction(
319 message_id,
320 emoji.clone(),
321 crate::messaging::user_resolver::resolve_handle(&self.identity),
322 ).await?;
323
324 // Sync reaction
325 self.sync.broadcast_reaction(message_id, emoji, true).await?;
326
327 Ok(())
328 }
329
330 /// Remove a reaction from a message
331 pub async fn remove_reaction(&mut self, message_id: MessageId, emoji: String) -> Result<()> {
332 self.reactions.remove_reaction(
333 message_id,
334 emoji.clone(),
335 crate::messaging::user_resolver::resolve_handle(&self.identity),
336 ).await?;
337
338 // Sync reaction removal
339 self.sync.broadcast_reaction(message_id, emoji, false).await?;
340
341 Ok(())
342 }
343
344 /// Edit a message
345 pub async fn edit_message(
346 &mut self,
347 message_id: MessageId,
348 new_content: MessageContent,
349 ) -> Result<()> {
350 // Get original message
351 let mut message = self.store.get_message(message_id).await?;
352
353 // Verify sender
354 if message.sender != self.identity {
355 return Err(anyhow::anyhow!("Cannot edit message from another user"));
356 }
357
358 // Update content
359 message.content = new_content.clone();
360 message.edited_at = Some(Utc::now());
361
362 // Re-encrypt and store
363 let _encrypted = self.encryption.encrypt_message(&message).await?;
364 self.store.update_message(&message).await?;
365
366 // Sync edit
367 self.sync.broadcast_edit(message_id, new_content).await?;
368
369 Ok(())
370 }
371
372 /// Delete a message
373 pub async fn delete_message(&mut self, message_id: MessageId) -> Result<()> {
374 // Get message
375 let mut message = self.store.get_message(message_id).await?;
376
377 // Verify sender
378 if message.sender != self.identity {
379 return Err(anyhow::anyhow!("Cannot delete message from another user"));
380 }
381
382 // Soft delete
383 message.deleted_at = Some(Utc::now());
384
385 // Update storage
386 self.store.update_message(&message).await?;
387
388 // Sync deletion
389 self.sync.broadcast_deletion(message_id).await?;
390
391 Ok(())
392 }
393
394 /// Search messages
395 pub async fn search_messages(&self, query: SearchQuery) -> Result<Vec<RichMessage>> {
396 self.search.search(query).await
397 }
398
399 /// Get message history for a channel
400 pub async fn get_channel_messages(
401 &self,
402 channel_id: ChannelId,
403 limit: usize,
404 before: Option<DateTime<Utc>>,
405 ) -> Result<Vec<RichMessage>> {
406 self.store.get_channel_messages(channel_id, limit, before).await
407 }
408
409 /// Get thread messages
410 pub async fn get_thread_messages(
411 &self,
412 thread_id: ThreadId,
413 ) -> Result<ThreadView> {
414 self.threads.get_thread(thread_id).await
415 }
416
417 /// Mark messages as read
418 pub async fn mark_as_read(&mut self, message_ids: Vec<MessageId>) -> Result<()> {
419 for message_id in message_ids {
420 self.store.mark_as_read(
421 message_id,
422 crate::messaging::user_resolver::resolve_handle(&self.identity),
423 ).await?;
424 self.sync.broadcast_read_receipt(message_id).await?;
425 }
426 Ok(())
427 }
428
429 /// Start typing indicator
430 pub async fn start_typing(&mut self, channel_id: ChannelId) -> Result<()> {
431 self.sync
432 .broadcast_typing(
433 channel_id,
434 crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
435 true,
436 )
437 .await
438 }
439
440 /// Stop typing indicator
441 pub async fn stop_typing(&mut self, channel_id: ChannelId) -> Result<()> {
442 self.sync
443 .broadcast_typing(
444 channel_id,
445 crate::messaging::user_handle::UserHandle::from(self.identity.to_string()),
446 false,
447 )
448 .await
449 }
450
451 /// Initiate key exchange with a peer
452 pub async fn initiate_key_exchange(&self, peer: FourWordAddress) -> Result<KeyExchangeMessage> {
453 self.encryption.key_exchange.initiate_exchange(peer).await
454 }
455
456 /// Handle incoming key exchange message
457 pub async fn handle_key_exchange(&self, message: KeyExchangeMessage) -> Result<Option<KeyExchangeMessage>> {
458 use key_exchange::KeyExchangeType;
459
460 match message.message_type {
461 KeyExchangeType::Initiation => {
462 // Respond to initiation
463 let response = self.encryption.key_exchange.respond_to_exchange(message).await?;
464 Ok(Some(response))
465 }
466 KeyExchangeType::Response => {
467 // Complete the exchange
468 self.encryption.key_exchange.complete_exchange(message).await?;
469 Ok(None)
470 }
471 KeyExchangeType::PrekeyBundle => {
472 // Handle prekey bundle
473 Ok(None)
474 }
475 }
476 }
477
478 /// Get our prekey bundle for others
479 pub async fn get_prekey_bundle(&self) -> key_exchange::PrekeyBundle {
480 self.encryption.key_exchange.get_prekey_bundle().await
481 }
482
483 /// Rotate encryption keys
484 pub async fn rotate_keys(&self) -> Result<()> {
485 self.encryption.key_exchange.rotate_prekeys().await?;
486 self.encryption.key_exchange.cleanup_expired().await?;
487 Ok(())
488 }
489
490 /// Handle mention notification
491 async fn handle_mention(&self, message: &RichMessage) -> Result<()> {
492 // Create notification
493 tracing::info!("Mentioned in message: {:?}", message.id);
494 // TODO: Trigger system notification
495 Ok(())
496 }
497
498 /// Get channel members
499 async fn get_channel_members(&self, _channel_id: ChannelId) -> Result<Vec<FourWordAddress>> {
500 // TODO: Implement channel membership lookup
501 // For now, return empty list which will fallback to broadcast
502 Ok(Vec::new())
503 }
504}
505*/
506
507// MessageStore is now a type alias in database.rs
508
509/*
510/// Message store for persistence
511#[derive(Clone)]
512pub struct MessageStore {
513 inner: Arc<database::DatabaseMessageStore>,
514 dht_client: DhtClient,
515}
516
517impl MessageStore {
518 pub async fn new(dht_client: DhtClient) -> Result<Self> {
519 let inner = Arc::new(
520 database::DatabaseMessageStore::new(dht_client.clone(), None).await?
521 );
522
523 Ok(Self {
524 inner,
525 dht_client,
526 })
527 }
528
529 pub async fn store_message(&self, message: &RichMessage) -> Result<()> {
530 self.inner.store_message(message).await
531 }
532
533 pub async fn get_message(&self, id: MessageId) -> Result<RichMessage> {
534 self.inner.get_message(id).await
535 }
536
537 pub async fn update_message(&self, message: &RichMessage) -> Result<()> {
538 self.inner.update_message(message).await
539 }
540
541 pub async fn get_channel_messages(
542 &self,
543 channel_id: ChannelId,
544 limit: usize,
545 before: Option<DateTime<Utc>>,
546 ) -> Result<Vec<RichMessage>> {
547 self.inner.get_channel_messages(channel_id, limit, before).await
548 }
549
550 pub async fn mark_as_read(
551 &self,
552 message_id: MessageId,
553 user: FourWordAddress,
554 ) -> Result<()> {
555 self.inner.mark_as_read(message_id, user).await
556 }
557
558 /// Search messages
559 pub async fn search_messages(&self, query: &str, channel_id: Option<ChannelId>) -> Result<Vec<RichMessage>> {
560 self.inner.search_messages(query, channel_id, 50).await
561 }
562
563 /// Get thread messages
564 pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
565 self.inner.get_thread_messages(thread_id).await
566 }
567
568 /// Add reaction
569 pub async fn add_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
570 self.inner.add_reaction(message_id, emoji, user).await
571 }
572
573 /// Remove reaction
574 pub async fn remove_reaction(&self, message_id: MessageId, emoji: String, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
575 self.inner.remove_reaction(message_id, emoji, user).await
576 }
577
578 /// Get database statistics
579 pub async fn get_stats(&self) -> Result<database::DatabaseStats> {
580 self.inner.get_stats().await
581 }
582
583 /// Clean up ephemeral messages
584 pub async fn cleanup_ephemeral(&self, ttl_seconds: i64) -> Result<usize> {
585 self.inner.cleanup_ephemeral(ttl_seconds).await
586 }
587}
588*/
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593 use crate::identity::FourWordAddress;
594
595 #[tokio::test]
596 async fn test_message_creation() {
597 let identity = crate::messaging::user_handle::UserHandle::from("ocean-forest-moon-star");
598 let channel = ChannelId::new();
599 let content = MessageContent::Text("Hello, world!".to_string());
600
601 let message = RichMessage::new(identity.clone(), channel, content.clone());
602
603 assert_eq!(message.sender, identity);
604 assert_eq!(message.channel_id, channel);
605 assert!(matches!(message.content, MessageContent::Text(_)));
606 }
607
608 #[tokio::test]
609 async fn test_messaging_service_with_real_dht() {
610 // Skip this test in regular test runs as it requires a real DHT network
611 // and can cause nested runtime issues. This test should be run separately
612 // with proper network setup.
613 println!("Skipping test_messaging_service_with_real_dht - requires separate network setup");
614
615 // For now, just test that we can create the identity
616 let identity = FourWordAddress::from("ocean-forest-moon-star");
617 assert!(!identity.to_string().is_empty());
618 }
619}