saorsa_core/messaging/
service.rs

1// High-level messaging service API
2use super::transport::{DeliveryReceipt, DeliveryStatus, ReceivedMessage};
3use super::types::*;
4use super::{DhtClient, KeyExchange, MessageStore, MessageTransport};
5use crate::control::ControlMessageHandler;
6use crate::identity::FourWordAddress;
7use crate::identity::restart::RestartManager;
8use crate::messaging::user_handle::UserHandle;
9use anyhow::Result;
10use chrono::{Duration, Utc};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::{RwLock, broadcast};
14use tracing::{info, warn};
15
16/// Resolve channel members to their FourWordAddress recipients
17/// This maps channel member user_ids to their FourWordAddress representation
18pub async fn channel_recipients(_channel_id: &ChannelId) -> Result<Vec<FourWordAddress>> {
19    // Load channel metadata from storage/database
20    // Note: This is a placeholder implementation that should be connected to
21    // the actual channel storage system (e.g., ChatManager)
22
23    // For now, we'll return an empty list. In production, this would:
24    // 1. Load channel from storage
25    // 2. Get member list
26    // 3. Map each member's user_id to their FourWordAddress
27    // 4. Return the list of addresses
28
29    // Example production implementation:
30    // let channel = chat_manager.get_channel(channel_id).await?;
31    // let mut recipients = Vec::new();
32    // for member_id in channel.members {
33    //     if let Ok(addr) = FourWordAddress::from_user_id(&member_id) {
34    //         recipients.push(addr);
35    //     }
36    // }
37    // Ok(recipients)
38
39    // TODO: Integrate with actual channel storage
40    Ok(Vec::new())
41}
42
43/// High-level messaging service that coordinates all messaging components
44pub struct MessagingService {
45    /// Local user identity
46    identity: FourWordAddress,
47    /// Message store for persistence
48    store: MessageStore,
49    /// Transport layer for network communication
50    transport: Arc<MessageTransport>,
51    /// Key exchange for E2E encryption
52    key_exchange: Arc<KeyExchange>,
53    /// DHT client for distributed storage
54    _dht_client: DhtClient,
55    /// Message event broadcaster
56    event_tx: broadcast::Sender<ReceivedMessage>,
57    /// Online users tracking
58    online_users: Arc<RwLock<HashMap<FourWordAddress, chrono::DateTime<Utc>>>>,
59}
60
61/// Options for sending messages
62#[derive(Debug, Clone, Default)]
63pub struct SendOptions {
64    pub ephemeral: bool,
65    pub expiry_seconds: Option<u64>,
66    pub reply_to: Option<MessageId>,
67    pub thread_id: Option<ThreadId>,
68    pub attachments: Vec<Attachment>,
69}
70
71impl MessagingService {
72    /// Create a new messaging service with default configuration
73    ///
74    /// Uses OS-assigned port (port 0) by default to avoid port conflicts.
75    /// This is the recommended way to create a MessagingService for most use cases.
76    ///
77    /// # Example
78    /// ```no_run
79    /// # use saorsa_core::messaging::{MessagingService, DhtClient};
80    /// # use saorsa_core::identity::FourWordAddress;
81    /// # async fn example() -> anyhow::Result<()> {
82    /// let dht = DhtClient::new()?;
83    /// let address = FourWordAddress("test-user-one-alpha".to_string());
84    /// let service = MessagingService::new(address, dht).await?;
85    ///
86    /// // Get actual bound port
87    /// let addrs = service.listen_addrs().await;
88    /// println!("Listening on: {:?}", addrs);
89    /// # Ok(())
90    /// # }
91    /// ```
92    pub async fn new(identity: FourWordAddress, dht_client: DhtClient) -> Result<Self> {
93        // Use default NetworkConfig (OS-assigned port, IPv4-only)
94        Self::new_with_config(identity, dht_client, super::NetworkConfig::default()).await
95    }
96
97    /// Create a new messaging service with custom network configuration
98    ///
99    /// This allows fine-grained control over port binding, IP mode, and retry behavior.
100    ///
101    /// # Arguments
102    /// * `identity` - Four-word address for this node
103    /// * `dht_client` - DHT client for distributed operations
104    /// * `config` - Network configuration (port, IP mode, retry behavior)
105    ///
106    /// # Example with OS-Assigned Port
107    /// ```no_run
108    /// # use saorsa_core::messaging::{MessagingService, DhtClient, NetworkConfig};
109    /// # use saorsa_core::identity::FourWordAddress;
110    /// # async fn example() -> anyhow::Result<()> {
111    /// let dht = DhtClient::new()?;
112    /// let address = FourWordAddress("test-user-one-alpha".to_string());
113    ///
114    /// // Use default config (OS-assigned port)
115    /// let config = NetworkConfig::default();
116    /// let service = MessagingService::new_with_config(address, dht, config).await?;
117    /// # Ok(())
118    /// # }
119    /// ```
120    ///
121    /// # Example with Explicit Port
122    /// ```no_run
123    /// # use saorsa_core::messaging::{MessagingService, DhtClient, NetworkConfig, PortConfig, IpMode, RetryBehavior};
124    /// # use saorsa_core::identity::FourWordAddress;
125    /// # async fn example() -> anyhow::Result<()> {
126    /// let dht = DhtClient::new()?;
127    /// let address = FourWordAddress("test-user-two-alpha".to_string());
128    ///
129    /// // Use explicit port
130    /// let config = NetworkConfig {
131    ///     port: PortConfig::Explicit(12345),
132    ///     ip_mode: IpMode::IPv4Only,
133    ///     retry_behavior: RetryBehavior::FailFast,
134    /// };
135    /// let service = MessagingService::new_with_config(address, dht, config).await?;
136    /// # Ok(())
137    /// # }
138    /// ```
139    ///
140    /// # Note
141    /// Currently, only `PortConfig::OsAssigned` and `PortConfig::Explicit` are fully supported.
142    /// Full configuration support (port ranges, dual-stack separate ports) requires ant-quic 0.10.0.
143    pub async fn new_with_config(
144        identity: FourWordAddress,
145        dht_client: DhtClient,
146        config: super::NetworkConfig,
147    ) -> Result<Self> {
148        // Log NAT traversal configuration
149        match &config.nat_traversal {
150            Some(super::NatTraversalMode::P2PNode { concurrency_limit }) => {
151                info!(
152                    "Initializing MessagingService with P2P NAT traversal (concurrency limit: {})",
153                    concurrency_limit
154                );
155            }
156            Some(super::NatTraversalMode::ClientOnly) => {
157                info!("Initializing MessagingService with client-only NAT traversal");
158            }
159            None => {
160                warn!("Initializing MessagingService with NAT traversal disabled");
161            }
162        }
163
164        // Initialize components
165        let store = MessageStore::new(dht_client.clone(), None).await?;
166
167        // Create mock network for testing
168        #[cfg(test)]
169        let network = Arc::new(crate::network::P2PNode::new_for_tests()?);
170
171        #[cfg(not(test))]
172        let network = {
173            // Convert NetworkConfig to NodeConfig
174            let mut node_config = crate::network::NodeConfig::new()?;
175
176            // Apply port configuration
177            match &config.port {
178                super::PortConfig::OsAssigned => {
179                    // Use port 0 for OS-assigned
180                    let bind_addr = std::net::SocketAddr::new(
181                        std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
182                        0, // Port 0 = OS-assigned
183                    );
184                    node_config.listen_addr = bind_addr;
185                    node_config.listen_addrs = vec![bind_addr];
186                }
187                super::PortConfig::Explicit(port) => {
188                    // Use explicit port
189                    let bind_addr = match &config.ip_mode {
190                        super::IpMode::IPv6Only => std::net::SocketAddr::new(
191                            std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
192                            *port,
193                        ),
194                        _ => std::net::SocketAddr::new(
195                            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
196                            *port,
197                        ),
198                    };
199                    node_config.listen_addr = bind_addr;
200                    node_config.listen_addrs = vec![bind_addr];
201                }
202                super::PortConfig::Range(start, _end) => {
203                    // For now, use the start of the range
204                    // Full range support requires ant-quic 0.10.0
205                    warn!(
206                        "Port range configuration not fully supported yet, using port {}",
207                        start
208                    );
209                    let bind_addr = std::net::SocketAddr::new(
210                        std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
211                        *start,
212                    );
213                    node_config.listen_addr = bind_addr;
214                    node_config.listen_addrs = vec![bind_addr];
215                }
216            }
217
218            // Apply IP mode configuration
219            match &config.ip_mode {
220                super::IpMode::IPv4Only => {
221                    node_config.enable_ipv6 = false;
222                }
223                super::IpMode::IPv6Only => {
224                    node_config.enable_ipv6 = true;
225                    // Only include IPv6 address
226                    let port = node_config.listen_addr.port();
227                    node_config.listen_addrs = vec![std::net::SocketAddr::new(
228                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
229                        port,
230                    )];
231                }
232                super::IpMode::DualStack => {
233                    node_config.enable_ipv6 = true;
234                    // Add both IPv4 and IPv6
235                    let port = node_config.listen_addr.port();
236                    node_config.listen_addrs = vec![
237                        std::net::SocketAddr::new(
238                            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
239                            port,
240                        ),
241                        std::net::SocketAddr::new(
242                            std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
243                            port,
244                        ),
245                    ];
246                }
247                super::IpMode::DualStackSeparate {
248                    ipv4_port,
249                    ipv6_port,
250                } => {
251                    // Separate ports for IPv4 and IPv6
252                    node_config.enable_ipv6 = true;
253
254                    let ipv4_port_num = match ipv4_port {
255                        super::PortConfig::OsAssigned => 0,
256                        super::PortConfig::Explicit(p) => *p,
257                        super::PortConfig::Range(start, _) => *start,
258                    };
259
260                    let ipv6_port_num = match ipv6_port {
261                        super::PortConfig::OsAssigned => 0,
262                        super::PortConfig::Explicit(p) => *p,
263                        super::PortConfig::Range(start, _) => *start,
264                    };
265
266                    node_config.listen_addrs = vec![
267                        std::net::SocketAddr::new(
268                            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
269                            ipv4_port_num,
270                        ),
271                        std::net::SocketAddr::new(
272                            std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
273                            ipv6_port_num,
274                        ),
275                    ];
276
277                    // Set primary listen_addr to IPv4
278                    node_config.listen_addr = node_config.listen_addrs[0];
279                }
280            }
281
282            let node = crate::network::P2PNode::new(node_config).await?;
283            Arc::new(node)
284        };
285        let transport = Arc::new(MessageTransport::new(network, dht_client.clone()).await?);
286        let key_exchange = Arc::new(KeyExchange::new(identity.clone(), dht_client.clone()).await?);
287
288        let (event_tx, _) = broadcast::channel(1000);
289
290        Ok(Self {
291            identity,
292            store,
293            transport,
294            key_exchange,
295            _dht_client: dht_client,
296            event_tx,
297            online_users: Arc::new(RwLock::new(HashMap::new())),
298        })
299    }
300
301    /// Enable restart management for this service
302    ///
303    /// This integrates the RestartManager to handle network rejections and identity regeneration.
304    pub async fn enable_restart_management(&self, restart_manager: Arc<RestartManager>) {
305        let handler = Arc::new(ControlMessageHandler::new(restart_manager));
306        let events = self.transport.network().subscribe_events();
307        handler.start(events).await;
308        info!("Restart management enabled for MessagingService");
309    }
310
311    /// Send a message to recipients
312    pub async fn send_message(
313        &self,
314        recipients: Vec<FourWordAddress>,
315        content: MessageContent,
316        channel_id: ChannelId,
317        options: SendOptions,
318    ) -> Result<(MessageId, DeliveryReceipt)> {
319        // Create rich message
320        let mut message = RichMessage::new(
321            UserHandle::from(self.identity.to_string()),
322            channel_id,
323            content,
324        );
325
326        // Apply options
327        message.ephemeral = options.ephemeral;
328        if let Some(seconds) = options.expiry_seconds {
329            message.expires_at = Some(Utc::now() + Duration::seconds(seconds as i64));
330        }
331        message.reply_to = options.reply_to;
332        message.thread_id = options.thread_id;
333        message.attachments = options.attachments;
334
335        // Store locally first
336        self.store.store_message(&message).await?;
337
338        // Encrypt for each recipient
339        let mut delivery_results = Vec::new();
340
341        for recipient in &recipients {
342            // Get or establish encryption key
343            let encryption_key = match self.key_exchange.get_session_key(recipient).await {
344                Ok(key) => key,
345                Err(_) => {
346                    // Initiate PQC key exchange
347                    info!("No session key for {}, initiating key exchange", recipient);
348                    let kex_msg = self
349                        .key_exchange
350                        .initiate_exchange(recipient.clone())
351                        .await?;
352
353                    // Send the key exchange message
354                    self.transport
355                        .send_key_exchange_message(recipient, kex_msg)
356                        .await?;
357
358                    // Wait for session establishment with timeout
359                    let wait_result = tokio::time::timeout(
360                        tokio::time::Duration::from_secs(5),
361                        self.wait_for_session_key(recipient),
362                    )
363                    .await;
364
365                    match wait_result {
366                        Ok(Ok(key)) => {
367                            info!("Key exchange completed for {}", recipient);
368                            key
369                        }
370                        Ok(Err(e)) => {
371                            return Err(anyhow::anyhow!(
372                                "Key exchange failed for {}: {}",
373                                recipient,
374                                e
375                            ));
376                        }
377                        Err(_) => {
378                            return Err(anyhow::anyhow!("Key exchange timeout for {}", recipient));
379                        }
380                    }
381                }
382            };
383
384            // Encrypt message
385            let encrypted = self
386                .encrypt_message_with_key(&message, &encryption_key)
387                .await?;
388
389            // Send via transport
390            match self
391                .transport
392                .send_message(&encrypted, vec![recipient.clone()])
393                .await
394            {
395                Ok(_receipt) => {
396                    delivery_results.push((recipient.clone(), DeliveryStatus::Queued));
397                }
398                Err(e) => {
399                    warn!("Failed to send to {}: {}", recipient, e);
400                    delivery_results
401                        .push((recipient.clone(), DeliveryStatus::Failed(e.to_string())));
402                }
403            }
404        }
405
406        // Create delivery receipt
407        let receipt = DeliveryReceipt {
408            message_id: message.id,
409            timestamp: Utc::now(),
410            delivery_status: delivery_results,
411        };
412
413        info!(
414            "Sent message {} to {} recipients",
415            message.id,
416            recipients.len()
417        );
418
419        Ok((message.id, receipt))
420    }
421
422    /// Send a message to a channel
423    pub async fn send_message_to_channel(
424        &self,
425        channel_id: ChannelId,
426        content: MessageContent,
427        options: SendOptions,
428    ) -> Result<(MessageId, DeliveryReceipt)> {
429        // Resolve recipients from channel members
430        let recipients = channel_recipients(&channel_id).await?;
431
432        if recipients.is_empty() {
433            return Err(anyhow::anyhow!(
434                "No recipients found for channel {}",
435                channel_id
436            ));
437        }
438
439        // Call existing send_message with resolved recipients
440        self.send_message(recipients, content, channel_id, options)
441            .await
442    }
443
444    /// Subscribe to incoming messages
445    pub async fn subscribe_messages(
446        &self,
447        channel_filter: Option<ChannelId>,
448    ) -> broadcast::Receiver<ReceivedMessage> {
449        let rx = self.event_tx.subscribe();
450
451        // Start message receiver if not already running
452        let transport = self.transport.clone();
453        let event_tx = self.event_tx.clone();
454        let key_exchange = self.key_exchange.clone();
455        let store = self.store.clone();
456
457        // Spawn message receiver task
458        tokio::spawn(async move {
459            let mut receiver = transport.receive_messages().await;
460
461            while let Ok(received) = receiver.recv().await {
462                // Decrypt message
463                if let Ok(decrypted) =
464                    Self::decrypt_received_message(&received.message, &key_exchange).await
465                {
466                    // Store in database
467                    let _ = store.store_message(&decrypted).await;
468
469                    // Apply channel filter if specified
470                    if let Some(filter) = channel_filter
471                        && decrypted.channel_id != filter
472                    {
473                        continue;
474                    }
475
476                    // Broadcast to subscribers
477                    let _ = event_tx.send(ReceivedMessage {
478                        message: received.message,
479                        received_at: received.received_at,
480                    });
481                }
482            }
483        });
484
485        // Spawn key exchange handler task
486        let transport_kex = self.transport.clone();
487        let key_exchange_kex = self.key_exchange.clone();
488        tokio::spawn(async move {
489            let mut kex_receiver = transport_kex.subscribe_key_exchange();
490
491            while let Ok(kex_msg) = kex_receiver.recv().await {
492                use super::key_exchange::KeyExchangeType;
493
494                match kex_msg.message_type {
495                    KeyExchangeType::Initiation => {
496                        // Received key exchange initiation - respond
497                        info!("Received key exchange initiation from {}", kex_msg.sender);
498                        match key_exchange_kex.respond_to_exchange(kex_msg).await {
499                            Ok(response) => {
500                                // Send response back
501                                let recipient = response.recipient.clone();
502                                if let Err(e) = transport_kex
503                                    .send_key_exchange_message(&recipient, response)
504                                    .await
505                                {
506                                    warn!(
507                                        "Failed to send key exchange response to {}: {}",
508                                        recipient, e
509                                    );
510                                }
511                            }
512                            Err(e) => {
513                                warn!("Failed to respond to key exchange: {}", e);
514                            }
515                        }
516                    }
517                    KeyExchangeType::Response => {
518                        // Received key exchange response - complete
519                        info!("Received key exchange response from {}", kex_msg.sender);
520                        if let Err(e) = key_exchange_kex.complete_exchange(kex_msg).await {
521                            warn!("Failed to complete key exchange: {}", e);
522                        }
523                    }
524                }
525            }
526        });
527
528        rx
529    }
530
531    /// Get message delivery status
532    pub async fn get_message_status(&self, message_id: MessageId) -> Result<DeliveryStatus> {
533        // Check local confirmations first
534        // In production, this would query the transport layer's confirmation tracking
535
536        // For now, check if message exists in store
537        if let Ok(_msg) = self.store.get_message(message_id).await {
538            // Check if delivered (simplified logic)
539            let online = self.online_users.read().await;
540            if !online.is_empty() {
541                Ok(DeliveryStatus::Delivered(Utc::now()))
542            } else {
543                Ok(DeliveryStatus::Queued)
544            }
545        } else {
546            Ok(DeliveryStatus::Failed("Message not found".to_string()))
547        }
548    }
549
550    /// Retrieve a message by ID
551    pub async fn get_message(&self, message_id: MessageId) -> Result<RichMessage> {
552        self.store.get_message(message_id).await
553    }
554
555    /// Get messages for a channel with pagination
556    ///
557    /// # Arguments
558    /// * `channel_id` - The channel to retrieve messages from
559    /// * `limit` - Maximum number of messages to return
560    /// * `before` - Optional timestamp to get messages before (for pagination)
561    ///
562    /// # Returns
563    /// Vector of messages ordered by creation time (newest first)
564    pub async fn get_channel_messages(
565        &self,
566        channel_id: ChannelId,
567        limit: usize,
568        before: Option<chrono::DateTime<chrono::Utc>>,
569    ) -> Result<Vec<RichMessage>> {
570        self.store
571            .get_channel_messages(channel_id, limit, before)
572            .await
573    }
574
575    /// Get all messages in a thread
576    ///
577    /// # Arguments
578    /// * `thread_id` - The thread to retrieve messages from
579    ///
580    /// # Returns
581    /// Vector of all messages in the thread
582    pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
583        self.store.get_thread_messages(thread_id).await
584    }
585
586    /// Mark a user as online
587    pub async fn mark_user_online(&self, user: FourWordAddress) -> Result<()> {
588        let mut online = self.online_users.write().await;
589        online.insert(user, Utc::now());
590        Ok(())
591    }
592
593    /// Mark message as delivered
594    pub async fn mark_delivered(
595        &self,
596        message_id: MessageId,
597        recipient: FourWordAddress,
598    ) -> Result<()> {
599        // Update delivery status in store
600        if let Ok(mut msg) = self.store.get_message(message_id).await {
601            msg.delivered_to.insert(
602                crate::messaging::user_resolver::resolve_handle(&recipient),
603                Utc::now(),
604            );
605            self.store.update_message(&msg).await?;
606        }
607        Ok(())
608    }
609
610    /// Process queued messages
611    pub async fn process_message_queue(&self) -> Result<()> {
612        // Trigger transport layer queue processing
613        self.transport.process_message_queue().await;
614        Ok(())
615    }
616
617    /// Encrypt a message for a recipient
618    pub async fn encrypt_message(
619        &self,
620        recipient: FourWordAddress,
621        channel_id: ChannelId,
622        content: MessageContent,
623    ) -> Result<EncryptedMessage> {
624        let message = RichMessage::new(
625            UserHandle::from(self.identity.to_string()),
626            channel_id,
627            content,
628        );
629
630        // Get encryption key
631        let key = self
632            .key_exchange
633            .get_session_key(&recipient)
634            .await
635            .unwrap_or_else(|_| vec![0u8; 32]); // Placeholder
636
637        self.encrypt_message_with_key(&message, &key).await
638    }
639
640    /// Decrypt a message
641    pub async fn decrypt_message(&self, encrypted: EncryptedMessage) -> Result<RichMessage> {
642        Self::decrypt_received_message(&encrypted, &self.key_exchange).await
643    }
644
645    // Helper: Encrypt message with key
646    async fn encrypt_message_with_key(
647        &self,
648        message: &RichMessage,
649        key: &[u8],
650    ) -> Result<EncryptedMessage> {
651        use saorsa_pqc::{ChaCha20Poly1305Cipher, SymmetricKey};
652
653        let plaintext = serde_json::to_vec(message)?;
654        let mut k = [0u8; 32];
655        if key.len() != 32 {
656            return Err(anyhow::anyhow!("Invalid session key length"));
657        }
658        k.copy_from_slice(&key[..32]);
659        let sk = SymmetricKey::from_bytes(k);
660        let cipher = ChaCha20Poly1305Cipher::new(&sk);
661        let (ciphertext, nonce) = cipher
662            .encrypt(&plaintext, None)
663            .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
664
665        Ok(EncryptedMessage {
666            id: message.id,
667            channel_id: message.channel_id,
668            sender: self.identity.clone(),
669            ciphertext,
670            nonce: nonce.to_vec(),
671            key_id: format!("key_{}", self.identity),
672        })
673    }
674
675    // Helper: Decrypt received message
676    async fn decrypt_received_message(
677        encrypted: &EncryptedMessage,
678        key_exchange: &Arc<KeyExchange>,
679    ) -> Result<RichMessage> {
680        use saorsa_pqc::{ChaCha20Poly1305Cipher, SymmetricKey};
681
682        // Get decryption key
683        let key = key_exchange
684            .get_session_key(&encrypted.sender)
685            .await
686            .map_err(|e| anyhow::anyhow!("No session key for {}: {}", encrypted.sender, e))?;
687        if key.len() != 32 {
688            return Err(anyhow::anyhow!("Invalid session key length"));
689        }
690        let mut k = [0u8; 32];
691        k.copy_from_slice(&key[..32]);
692        let sk = SymmetricKey::from_bytes(k);
693        let cipher = ChaCha20Poly1305Cipher::new(&sk);
694        // Convert Vec<u8> nonce back to [u8; 12] array
695        if encrypted.nonce.len() != 12 {
696            return Err(anyhow::anyhow!(
697                "Invalid nonce length: expected 12, got {}",
698                encrypted.nonce.len()
699            ));
700        }
701        let mut nonce_array = [0u8; 12];
702        nonce_array.copy_from_slice(&encrypted.nonce);
703
704        let plaintext = cipher
705            .decrypt(&encrypted.ciphertext, &nonce_array, None)
706            .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
707
708        // Deserialize
709        let message: RichMessage = serde_json::from_slice(&plaintext)?;
710
711        Ok(message)
712    }
713
714    // ===== P2P Networking Methods =====
715
716    /// Get the local network address(es) this node is listening on
717    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
718        self.transport.listen_addrs().await
719    }
720
721    /// Get the list of currently connected peer IDs
722    pub async fn connected_peers(&self) -> Vec<String> {
723        self.transport
724            .connected_peers()
725            .await
726            .into_iter()
727            .map(|peer_id| peer_id.to_string())
728            .collect()
729    }
730
731    /// Get the count of currently connected peers
732    pub async fn peer_count(&self) -> usize {
733        self.transport.peer_count().await
734    }
735
736    /// Check if the P2P node is running
737    pub async fn is_running(&self) -> bool {
738        // If we have a transport, we're running
739        // The transport is created during initialization and stays active
740        true
741    }
742
743    /// Connect to a peer via their network address
744    ///
745    /// # Arguments
746    /// * `address` - Network address in format "ip:port" or "[ipv6]:port"
747    ///
748    /// # Returns
749    /// The peer ID of the connected peer
750    pub async fn connect_peer(&self, address: &str) -> Result<String> {
751        let peer_id = self.transport.connect_peer(address).await?;
752        Ok(peer_id.to_string())
753    }
754
755    /// Disconnect from a specific peer
756    ///
757    /// # Arguments
758    /// * `peer_id` - The peer ID to disconnect from
759    pub async fn disconnect_peer(&self, peer_id: &str) -> Result<()> {
760        // Parse peer ID from string
761        let peer_id_parsed = peer_id
762            .parse()
763            .map_err(|e| anyhow::anyhow!("Invalid peer ID: {}", e))?;
764        self.transport.disconnect_peer(&peer_id_parsed).await
765    }
766
767    /// Wait for a session key to be established with a peer
768    ///
769    /// Polls the key exchange system for up to the specified duration.
770    /// This is called after initiating key exchange to wait for the response.
771    async fn wait_for_session_key(&self, peer: &FourWordAddress) -> Result<Vec<u8>> {
772        // Poll with exponential backoff
773        let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
774        let max_attempts = 50; // 5 seconds total (100ms * 50)
775
776        for _ in 0..max_attempts {
777            interval.tick().await;
778
779            if let Ok(key) = self.key_exchange.get_session_key(peer).await {
780                return Ok(key);
781            }
782        }
783
784        Err(anyhow::anyhow!(
785            "Session key not established within timeout"
786        ))
787    }
788
789    // Test helpers
790    #[cfg(test)]
791    pub fn create_test_message(
792        &self,
793        sender: UserHandle,
794        channel_id: ChannelId,
795        content: MessageContent,
796    ) -> RichMessage {
797        RichMessage::new(sender, channel_id, content)
798    }
799
800    #[cfg(test)]
801    pub async fn inject_test_message(&self, message: RichMessage) -> Result<()> {
802        self.store.store_message(&message).await?;
803
804        // Create encrypted version for event
805        let encrypted = EncryptedMessage {
806            id: message.id,
807            channel_id: message.channel_id,
808            sender: self.identity.clone(),
809            ciphertext: vec![],
810            nonce: vec![],
811            key_id: "test".to_string(),
812        };
813
814        let _ = self.event_tx.send(ReceivedMessage {
815            message: encrypted,
816            received_at: Utc::now(),
817        });
818
819        Ok(())
820    }
821}
822
823// Use mock implementations from mocks module
824// These are now properly implemented in mocks.rs