saorsa_core/messaging/
transport.rs

1// Network transport layer for messaging
2// Integrates with the existing P2P network infrastructure
3
4use super::DhtClient;
5use super::types::*;
6use crate::identity::FourWordAddress;
7use crate::network::P2PNode;
8use anyhow::Result;
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::{RwLock, broadcast};
14use tokio::time::{Duration, interval};
15use tracing::{debug, info, warn};
16
17/// Message transport layer for real-time messaging
18pub struct MessageTransport {
19    /// Reference to the P2P network node
20    network: Arc<P2PNode>,
21    /// DHT client for distributed storage
22    dht_client: DhtClient,
23    /// Connection pool for efficient message delivery
24    connections: Arc<RwLock<ConnectionPool>>,
25    /// Message queue for offline delivery
26    message_queue: Arc<RwLock<MessageQueue>>,
27    /// Delivery confirmations tracking
28    confirmations: Arc<RwLock<HashMap<MessageId, DeliveryStatus>>>,
29    /// Network metrics
30    metrics: Arc<RwLock<NetworkMetrics>>,
31    /// Event broadcaster
32    event_tx: broadcast::Sender<TransportEvent>,
33}
34
35impl MessageTransport {
36    /// Create new message transport
37    pub async fn new(network: Arc<P2PNode>, dht_client: DhtClient) -> Result<Self> {
38        let (event_tx, _) = broadcast::channel(1000);
39
40        Ok(Self {
41            network,
42            dht_client,
43            connections: Arc::new(RwLock::new(ConnectionPool::new())),
44            message_queue: Arc::new(RwLock::new(MessageQueue::new())),
45            confirmations: Arc::new(RwLock::new(HashMap::new())),
46            metrics: Arc::new(RwLock::new(NetworkMetrics::default())),
47            event_tx,
48        })
49    }
50
51    /// Send a message to recipients
52    pub async fn send_message(
53        &self,
54        message: &EncryptedMessage,
55        recipients: Vec<FourWordAddress>,
56    ) -> Result<DeliveryReceipt> {
57        debug!(
58            "Sending message {} to {} recipients",
59            message.id,
60            recipients.len()
61        );
62
63        let mut delivery_results = Vec::new();
64        let mut metrics = self.metrics.write().await;
65
66        for recipient in recipients {
67            // Try direct delivery first
68            match self.try_direct_delivery(&recipient, message).await {
69                Ok(status) => {
70                    delivery_results.push((recipient.clone(), status));
71                    metrics.messages_sent += 1;
72                }
73                Err(e) => {
74                    debug!("Direct delivery failed for {}: {}, queuing", recipient, e);
75
76                    // Queue for later delivery
77                    self.queue_message(&recipient, message).await?;
78                    delivery_results.push((recipient.clone(), DeliveryStatus::Queued));
79                    metrics.messages_queued += 1;
80                }
81            }
82        }
83
84        // Store in DHT for persistence
85        self.store_in_dht(message).await?;
86
87        // Create delivery receipt
88        let receipt = DeliveryReceipt {
89            message_id: message.id,
90            timestamp: Utc::now(),
91            delivery_status: delivery_results,
92        };
93
94        // Track confirmations
95        let mut confirmations = self.confirmations.write().await;
96        for (_recipient, status) in &receipt.delivery_status {
97            confirmations.insert(message.id, status.clone());
98        }
99
100        Ok(receipt)
101    }
102
103    /// Receive messages from the network
104    pub async fn receive_messages(&self) -> broadcast::Receiver<ReceivedMessage> {
105        let (tx, rx) = broadcast::channel(256);
106
107        // Subscribe to network events and forward "messaging" topic messages
108        let mut events = self.network.subscribe_events();
109        tokio::spawn(async move {
110            while let Ok(event) = events.recv().await {
111                #[allow(clippy::collapsible_if)]
112                if let crate::network::P2PEvent::Message {
113                    topic,
114                    source,
115                    data,
116                } = event
117                {
118                    if topic == "messaging" {
119                        // Repackage for messaging consumers
120                        let encrypted_msg = EncryptedMessage {
121                            id: MessageId::new(),
122                            channel_id: ChannelId::new(),
123                            sender: FourWordAddress::parse_str(&source)
124                                .unwrap_or_else(|_| FourWordAddress("unknown".to_string())),
125                            ciphertext: data,
126                            nonce: vec![], // TODO: Generate proper nonce
127                            key_id: "default".to_string(),
128                        };
129                        let _ = tx.send(ReceivedMessage {
130                            message: encrypted_msg,
131                            received_at: Utc::now(),
132                        });
133                    }
134                }
135            }
136        });
137
138        rx
139    }
140
141    /// Establish a direct connection to a peer
142    pub async fn connect_to_peer(&self, peer: &FourWordAddress) -> Result<()> {
143        debug!("Establishing connection to {}", peer);
144
145        // Resolve peer address through DHT
146        let peer_info = self.resolve_peer_address(peer).await?;
147
148        // Create connection
149        let mut pool = self.connections.write().await;
150        pool.add_connection(peer.clone(), peer_info).await?;
151
152        // Send presence update
153        self.broadcast_presence(PresenceStatus::Online).await?;
154
155        Ok(())
156    }
157
158    /// Monitor network quality and adapt behavior
159    pub async fn monitor_network_quality(&self) {
160        let metrics = self.metrics.clone();
161        let connections = self.connections.clone();
162
163        tokio::spawn(async move {
164            let mut ticker = interval(Duration::from_secs(10));
165
166            loop {
167                ticker.tick().await;
168
169                // Calculate network quality metrics
170                let mut metrics = metrics.write().await;
171                let pool = connections.read().await;
172
173                metrics.update_quality(&pool);
174
175                // Adapt behavior based on quality
176                if metrics.average_latency > Duration::from_millis(500) {
177                    debug!("High latency detected, adjusting parameters");
178                    // Implement adaptive behavior
179                }
180
181                if metrics.packet_loss > 0.05 {
182                    warn!("High packet loss: {:.2}%", metrics.packet_loss * 100.0);
183                    // Implement recovery strategies
184                }
185            }
186        });
187    }
188
189    /// Process queued messages
190    pub async fn process_message_queue(&self) {
191        let queue = self.message_queue.clone();
192        let transport = Arc::new(self.clone());
193
194        tokio::spawn(async move {
195            let mut ticker = interval(Duration::from_secs(30));
196
197            loop {
198                ticker.tick().await;
199
200                let mut queue = queue.write().await;
201                let messages = queue.get_pending_messages();
202
203                for (recipient, message) in messages {
204                    // Retry delivery
205                    if let Ok(_status) = transport.try_direct_delivery(&recipient, &message).await {
206                        queue.mark_delivered(&message.id);
207                        info!("Delivered queued message {} to {}", message.id, recipient);
208                    }
209                }
210
211                // Clean up old messages
212                queue.cleanup_expired().await;
213            }
214        });
215    }
216
217    /// Try direct delivery to a peer
218    async fn try_direct_delivery(
219        &self,
220        recipient: &FourWordAddress,
221        message: &EncryptedMessage,
222    ) -> Result<DeliveryStatus> {
223        // Check if peer is online
224        let pool = self.connections.read().await;
225
226        // Serialize payload
227        let data = serde_json::to_vec(message)?;
228
229        // Try existing pool connection first (best-effort info-only cache)
230        if let Some(_connection) = pool.get_connection(recipient) {
231            // We route via the network node to ensure real delivery
232            // Resolve and connect using DHT-discovered endpoints
233            let peer_info = self.resolve_peer_address(recipient).await?;
234            for addr in &peer_info.addresses {
235                if let Ok(peer_id) = self.network.connect_peer(addr).await {
236                    // Use a stable protocol label for routing
237                    if let Err(e) = self
238                        .network
239                        .send_message(&peer_id, "messaging", data.clone())
240                        .await
241                    {
242                        warn!("Failed sending to {} via {}: {}", recipient, addr, e);
243                        continue;
244                    }
245                    debug!(
246                        "Message {} delivered to {} (peer {})",
247                        message.id, recipient, peer_id
248                    );
249                    return Ok(DeliveryStatus::Delivered(Utc::now()));
250                }
251            }
252            // Fall through to queue if all endpoints failed
253            return Err(anyhow::anyhow!("All endpoints failed for {recipient}"));
254        }
255
256        // No cached connection: resolve and send via network
257        let peer_info = self.resolve_peer_address(recipient).await?;
258        for addr in &peer_info.addresses {
259            match self.network.connect_peer(addr).await {
260                Ok(peer_id) => {
261                    if let Err(e) = self
262                        .network
263                        .send_message(&peer_id, "messaging", data.clone())
264                        .await
265                    {
266                        warn!("Failed sending to {} via {}: {}", recipient, addr, e);
267                        continue;
268                    }
269                    debug!(
270                        "Message {} delivered to {} (peer {})",
271                        message.id, recipient, peer_id
272                    );
273                    return Ok(DeliveryStatus::Delivered(Utc::now()));
274                }
275                Err(e) => {
276                    debug!("Cannot connect to {} at {}: {}", recipient, addr, e);
277                }
278            }
279        }
280
281        Err(anyhow::anyhow!("Delivery failed: no reachable endpoints"))
282    }
283
284    /// Queue message for later delivery
285    async fn queue_message(
286        &self,
287        recipient: &FourWordAddress,
288        message: &EncryptedMessage,
289    ) -> Result<()> {
290        let mut queue = self.message_queue.write().await;
291        queue.add_message(recipient.clone(), message.clone());
292        debug!("Queued message {} for {}", message.id, recipient);
293        Ok(())
294    }
295
296    /// Store message in DHT for persistence
297    async fn store_in_dht(&self, message: &EncryptedMessage) -> Result<()> {
298        let key = format!("msg:{}", message.id);
299        let value = serde_json::to_vec(message)?;
300
301        self.dht_client.put(key, value).await?;
302        debug!("Stored message {} in DHT", message.id);
303
304        Ok(())
305    }
306
307    /// Resolve peer address through DHT
308    async fn resolve_peer_address(&self, peer: &FourWordAddress) -> Result<PeerInfo> {
309        let key = format!("peer:{}", peer);
310
311        if let Some(data) = self.dht_client.get(key).await? {
312            let info: PeerInfo = serde_json::from_slice(&data)?;
313            Ok(info)
314        } else {
315            Err(anyhow::anyhow!("Peer {} not found in DHT", peer))
316        }
317    }
318
319    /// Broadcast presence status
320    async fn broadcast_presence(&self, status: PresenceStatus) -> Result<()> {
321        let event = TransportEvent::PresenceUpdate {
322            status,
323            timestamp: Utc::now(),
324        };
325
326        let _ = self.event_tx.send(event);
327        Ok(())
328    }
329
330    /// Subscribe to transport events
331    pub fn subscribe_events(&self) -> broadcast::Receiver<TransportEvent> {
332        self.event_tx.subscribe()
333    }
334
335    /// Get network metrics
336    pub async fn get_metrics(&self) -> NetworkMetrics {
337        self.metrics.read().await.clone()
338    }
339}
340
341/// Connection pool for managing peer connections
342#[derive(Debug, Clone)]
343struct ConnectionPool {
344    connections: HashMap<FourWordAddress, PeerConnection>,
345    max_connections: usize,
346}
347
348impl ConnectionPool {
349    fn new() -> Self {
350        Self {
351            connections: HashMap::new(),
352            max_connections: 100,
353        }
354    }
355
356    async fn add_connection(&mut self, peer: FourWordAddress, info: PeerInfo) -> Result<()> {
357        // Check connection limit
358        if self.connections.len() >= self.max_connections {
359            // Remove least recently used
360            self.evict_lru();
361        }
362
363        let connection = PeerConnection {
364            _peer: peer.clone(),
365            _info: info,
366            _established_at: Utc::now(),
367            last_activity: Utc::now(),
368            quality: ConnectionQuality::default(),
369        };
370
371        self.connections.insert(peer, connection);
372        Ok(())
373    }
374
375    fn get_connection(&self, peer: &FourWordAddress) -> Option<&PeerConnection> {
376        self.connections.get(peer)
377    }
378
379    fn evict_lru(&mut self) {
380        // Find and remove least recently used connection
381        if let Some((peer, _)) = self
382            .connections
383            .iter()
384            .min_by_key(|(_, conn)| conn.last_activity)
385        {
386            let peer = peer.clone();
387            self.connections.remove(&peer);
388        }
389    }
390}
391
392/// Individual peer connection
393#[derive(Debug, Clone)]
394struct PeerConnection {
395    _peer: FourWordAddress,
396    _info: PeerInfo,
397    _established_at: DateTime<Utc>,
398    last_activity: DateTime<Utc>,
399    quality: ConnectionQuality,
400}
401
402impl PeerConnection {
403    #[allow(dead_code)]
404    async fn send(&self, _data: Vec<u8>) -> Result<()> {
405        Ok(())
406    }
407}
408
409/// Peer information stored in DHT
410#[derive(Debug, Clone, Serialize, Deserialize)]
411struct PeerInfo {
412    addresses: Vec<String>,
413    public_key: Vec<u8>,
414    capabilities: Vec<String>,
415    last_seen: DateTime<Utc>,
416}
417
418/// Connection quality metrics
419#[derive(Debug, Clone, Default)]
420struct ConnectionQuality {
421    latency: Duration,
422    packet_loss: f32,
423    _bandwidth: u64,
424}
425
426/// Message queue for offline delivery
427#[derive(Debug)]
428struct MessageQueue {
429    messages: HashMap<MessageId, QueuedMessage>,
430    by_recipient: HashMap<FourWordAddress, Vec<MessageId>>,
431}
432
433impl MessageQueue {
434    fn new() -> Self {
435        Self {
436            messages: HashMap::new(),
437            by_recipient: HashMap::new(),
438        }
439    }
440
441    fn add_message(&mut self, recipient: FourWordAddress, message: EncryptedMessage) {
442        let queued = QueuedMessage {
443            message: message.clone(),
444            recipient: recipient.clone(),
445            queued_at: Utc::now(),
446            retry_count: 0,
447        };
448
449        self.messages.insert(message.id, queued);
450        self.by_recipient
451            .entry(recipient)
452            .or_default()
453            .push(message.id);
454    }
455
456    fn get_pending_messages(&self) -> Vec<(FourWordAddress, EncryptedMessage)> {
457        self.messages
458            .values()
459            .filter(|q| q.retry_count < 5)
460            .map(|q| (q.recipient.clone(), q.message.clone()))
461            .collect()
462    }
463
464    fn mark_delivered(&mut self, message_id: &MessageId) {
465        self.messages.remove(message_id);
466
467        // Remove from recipient index
468        for ids in self.by_recipient.values_mut() {
469            ids.retain(|id| id != message_id);
470        }
471    }
472
473    async fn cleanup_expired(&mut self) {
474        let cutoff = Utc::now() - chrono::Duration::days(7);
475
476        self.messages.retain(|_, q| q.queued_at > cutoff);
477
478        // Clean up empty recipient entries
479        self.by_recipient.retain(|_, ids| !ids.is_empty());
480    }
481}
482
483/// Queued message
484#[derive(Debug, Clone)]
485struct QueuedMessage {
486    message: EncryptedMessage,
487    recipient: FourWordAddress,
488    queued_at: DateTime<Utc>,
489    retry_count: u32,
490}
491
492/// Delivery status for a message
493#[derive(Debug, Clone, Serialize, Deserialize)]
494pub enum DeliveryStatus {
495    Delivered(DateTime<Utc>),
496    Queued,
497    Failed(String),
498    Pending,
499}
500
501/// Delivery receipt
502#[derive(Debug, Clone, Serialize, Deserialize)]
503pub struct DeliveryReceipt {
504    pub message_id: MessageId,
505    pub timestamp: DateTime<Utc>,
506    pub delivery_status: Vec<(FourWordAddress, DeliveryStatus)>,
507}
508
509/// Received message wrapper
510#[derive(Debug, Clone)]
511pub struct ReceivedMessage {
512    pub message: EncryptedMessage,
513    pub received_at: DateTime<Utc>,
514}
515
516/// Transport event types
517#[derive(Debug, Clone)]
518pub enum TransportEvent {
519    MessageReceived(ReceivedMessage),
520    MessageDelivered(MessageId),
521    ConnectionEstablished(FourWordAddress),
522    ConnectionLost(FourWordAddress),
523    PresenceUpdate {
524        status: PresenceStatus,
525        timestamp: DateTime<Utc>,
526    },
527}
528
529/// Network metrics
530#[derive(Debug, Clone, Default)]
531pub struct NetworkMetrics {
532    pub messages_sent: u64,
533    pub messages_received: u64,
534    pub messages_queued: u64,
535    pub active_connections: usize,
536    pub average_latency: Duration,
537    pub packet_loss: f32,
538    pub bandwidth_used: u64,
539}
540
541impl NetworkMetrics {
542    fn update_quality(&mut self, pool: &ConnectionPool) {
543        self.active_connections = pool.connections.len();
544
545        if !pool.connections.is_empty() {
546            let total_latency: Duration =
547                pool.connections.values().map(|c| c.quality.latency).sum();
548
549            self.average_latency = total_latency / pool.connections.len() as u32;
550
551            let total_loss: f32 = pool
552                .connections
553                .values()
554                .map(|c| c.quality.packet_loss)
555                .sum();
556
557            self.packet_loss = total_loss / pool.connections.len() as f32;
558        }
559    }
560}
561
562// Implement Clone for MessageTransport (needed for spawning)
563impl Clone for MessageTransport {
564    fn clone(&self) -> Self {
565        Self {
566            network: self.network.clone(),
567            dht_client: self.dht_client.clone(),
568            connections: self.connections.clone(),
569            message_queue: self.message_queue.clone(),
570            confirmations: self.confirmations.clone(),
571            metrics: self.metrics.clone(),
572            event_tx: self.event_tx.clone(),
573        }
574    }
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580
581    #[tokio::test]
582    async fn test_transport_creation() {
583        // This would need a mock network and DHT client
584        // For now, just verify the types compile
585        assert!(std::mem::size_of::<MessageTransport>() > 0);
586    }
587
588    #[tokio::test]
589    async fn test_delivery_status() {
590        let status = DeliveryStatus::Delivered(Utc::now());
591
592        match status {
593            DeliveryStatus::Delivered(time) => {
594                assert!(time <= Utc::now());
595            }
596            _ => panic!("Expected Delivered status"),
597        }
598    }
599
600    #[tokio::test]
601    async fn test_message_queue() {
602        let mut queue = MessageQueue::new();
603
604        let recipient = FourWordAddress::from("test-user-address-here");
605        let message = EncryptedMessage {
606            id: MessageId::new(),
607            channel_id: ChannelId::new(),
608            sender: FourWordAddress::from("sender-address-here"),
609            ciphertext: vec![1, 2, 3],
610            nonce: vec![4, 5, 6],
611            key_id: "test-key".to_string(),
612        };
613
614        queue.add_message(recipient.clone(), message.clone());
615
616        let pending = queue.get_pending_messages();
617        assert_eq!(pending.len(), 1);
618        assert_eq!(pending[0].0, recipient);
619
620        queue.mark_delivered(&message.id);
621        let pending = queue.get_pending_messages();
622        assert_eq!(pending.len(), 0);
623    }
624
625    #[tokio::test]
626    async fn test_network_metrics() {
627        let metrics = NetworkMetrics {
628            messages_sent: 100,
629            messages_received: 95,
630            packet_loss: 0.02,
631            ..Default::default()
632        };
633        assert_eq!(metrics.messages_sent, 100);
634        assert!(metrics.packet_loss < 0.05); // Less than 5% loss
635    }
636}