1use super::DhtClient;
5use super::types::*;
6use crate::identity::FourWordAddress;
7use crate::network::P2PNode;
8use anyhow::Result;
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::{RwLock, broadcast};
14use tokio::time::{Duration, interval};
15use tracing::{debug, info, warn};
16
17pub struct MessageTransport {
19 network: Arc<P2PNode>,
21 dht_client: DhtClient,
23 connections: Arc<RwLock<ConnectionPool>>,
25 message_queue: Arc<RwLock<MessageQueue>>,
27 confirmations: Arc<RwLock<HashMap<MessageId, DeliveryStatus>>>,
29 metrics: Arc<RwLock<NetworkMetrics>>,
31 event_tx: broadcast::Sender<TransportEvent>,
33}
34
35impl MessageTransport {
36 pub async fn new(network: Arc<P2PNode>, dht_client: DhtClient) -> Result<Self> {
38 let (event_tx, _) = broadcast::channel(1000);
39
40 Ok(Self {
41 network,
42 dht_client,
43 connections: Arc::new(RwLock::new(ConnectionPool::new())),
44 message_queue: Arc::new(RwLock::new(MessageQueue::new())),
45 confirmations: Arc::new(RwLock::new(HashMap::new())),
46 metrics: Arc::new(RwLock::new(NetworkMetrics::default())),
47 event_tx,
48 })
49 }
50
51 pub async fn send_message(
53 &self,
54 message: &EncryptedMessage,
55 recipients: Vec<FourWordAddress>,
56 ) -> Result<DeliveryReceipt> {
57 debug!(
58 "Sending message {} to {} recipients",
59 message.id,
60 recipients.len()
61 );
62
63 let mut delivery_results = Vec::new();
64 let mut metrics = self.metrics.write().await;
65
66 for recipient in recipients {
67 match self.try_direct_delivery(&recipient, message).await {
69 Ok(status) => {
70 delivery_results.push((recipient.clone(), status));
71 metrics.messages_sent += 1;
72 }
73 Err(e) => {
74 debug!("Direct delivery failed for {}: {}, queuing", recipient, e);
75
76 self.queue_message(&recipient, message).await?;
78 delivery_results.push((recipient.clone(), DeliveryStatus::Queued));
79 metrics.messages_queued += 1;
80 }
81 }
82 }
83
84 self.store_in_dht(message).await?;
86
87 let receipt = DeliveryReceipt {
89 message_id: message.id,
90 timestamp: Utc::now(),
91 delivery_status: delivery_results,
92 };
93
94 let mut confirmations = self.confirmations.write().await;
96 for (_recipient, status) in &receipt.delivery_status {
97 confirmations.insert(message.id, status.clone());
98 }
99
100 Ok(receipt)
101 }
102
103 pub async fn receive_messages(&self) -> broadcast::Receiver<ReceivedMessage> {
105 let (tx, rx) = broadcast::channel(256);
106
107 let mut events = self.network.subscribe_events();
109 tokio::spawn(async move {
110 while let Ok(event) = events.recv().await {
111 #[allow(clippy::collapsible_if)]
112 if let crate::network::P2PEvent::Message {
113 topic,
114 source,
115 data,
116 } = event
117 {
118 if topic == "messaging" {
119 let encrypted_msg = EncryptedMessage {
121 id: MessageId::new(),
122 channel_id: ChannelId::new(),
123 sender: FourWordAddress::parse_str(&source)
124 .unwrap_or_else(|_| FourWordAddress("unknown".to_string())),
125 ciphertext: data,
126 nonce: vec![], key_id: "default".to_string(),
128 };
129 let _ = tx.send(ReceivedMessage {
130 message: encrypted_msg,
131 received_at: Utc::now(),
132 });
133 }
134 }
135 }
136 });
137
138 rx
139 }
140
141 pub async fn connect_to_peer(&self, peer: &FourWordAddress) -> Result<()> {
143 debug!("Establishing connection to {}", peer);
144
145 let peer_info = self.resolve_peer_address(peer).await?;
147
148 let mut pool = self.connections.write().await;
150 pool.add_connection(peer.clone(), peer_info).await?;
151
152 self.broadcast_presence(PresenceStatus::Online).await?;
154
155 Ok(())
156 }
157
158 pub async fn monitor_network_quality(&self) {
160 let metrics = self.metrics.clone();
161 let connections = self.connections.clone();
162
163 tokio::spawn(async move {
164 let mut ticker = interval(Duration::from_secs(10));
165
166 loop {
167 ticker.tick().await;
168
169 let mut metrics = metrics.write().await;
171 let pool = connections.read().await;
172
173 metrics.update_quality(&pool);
174
175 if metrics.average_latency > Duration::from_millis(500) {
177 debug!("High latency detected, adjusting parameters");
178 }
180
181 if metrics.packet_loss > 0.05 {
182 warn!("High packet loss: {:.2}%", metrics.packet_loss * 100.0);
183 }
185 }
186 });
187 }
188
189 pub async fn process_message_queue(&self) {
191 let queue = self.message_queue.clone();
192 let transport = Arc::new(self.clone());
193
194 tokio::spawn(async move {
195 let mut ticker = interval(Duration::from_secs(30));
196
197 loop {
198 ticker.tick().await;
199
200 let mut queue = queue.write().await;
201 let messages = queue.get_pending_messages();
202
203 for (recipient, message) in messages {
204 if let Ok(_status) = transport.try_direct_delivery(&recipient, &message).await {
206 queue.mark_delivered(&message.id);
207 info!("Delivered queued message {} to {}", message.id, recipient);
208 }
209 }
210
211 queue.cleanup_expired().await;
213 }
214 });
215 }
216
217 async fn try_direct_delivery(
219 &self,
220 recipient: &FourWordAddress,
221 message: &EncryptedMessage,
222 ) -> Result<DeliveryStatus> {
223 let pool = self.connections.read().await;
225
226 let data = serde_json::to_vec(message)?;
228
229 if let Some(_connection) = pool.get_connection(recipient) {
231 let peer_info = self.resolve_peer_address(recipient).await?;
234 for addr in &peer_info.addresses {
235 if let Ok(peer_id) = self.network.connect_peer(addr).await {
236 if let Err(e) = self
238 .network
239 .send_message(&peer_id, "messaging", data.clone())
240 .await
241 {
242 warn!("Failed sending to {} via {}: {}", recipient, addr, e);
243 continue;
244 }
245 debug!(
246 "Message {} delivered to {} (peer {})",
247 message.id, recipient, peer_id
248 );
249 return Ok(DeliveryStatus::Delivered(Utc::now()));
250 }
251 }
252 return Err(anyhow::anyhow!("All endpoints failed for {recipient}"));
254 }
255
256 let peer_info = self.resolve_peer_address(recipient).await?;
258 for addr in &peer_info.addresses {
259 match self.network.connect_peer(addr).await {
260 Ok(peer_id) => {
261 if let Err(e) = self
262 .network
263 .send_message(&peer_id, "messaging", data.clone())
264 .await
265 {
266 warn!("Failed sending to {} via {}: {}", recipient, addr, e);
267 continue;
268 }
269 debug!(
270 "Message {} delivered to {} (peer {})",
271 message.id, recipient, peer_id
272 );
273 return Ok(DeliveryStatus::Delivered(Utc::now()));
274 }
275 Err(e) => {
276 debug!("Cannot connect to {} at {}: {}", recipient, addr, e);
277 }
278 }
279 }
280
281 Err(anyhow::anyhow!("Delivery failed: no reachable endpoints"))
282 }
283
284 async fn queue_message(
286 &self,
287 recipient: &FourWordAddress,
288 message: &EncryptedMessage,
289 ) -> Result<()> {
290 let mut queue = self.message_queue.write().await;
291 queue.add_message(recipient.clone(), message.clone());
292 debug!("Queued message {} for {}", message.id, recipient);
293 Ok(())
294 }
295
296 async fn store_in_dht(&self, message: &EncryptedMessage) -> Result<()> {
298 let key = format!("msg:{}", message.id);
299 let value = serde_json::to_vec(message)?;
300
301 self.dht_client.put(key, value).await?;
302 debug!("Stored message {} in DHT", message.id);
303
304 Ok(())
305 }
306
307 async fn resolve_peer_address(&self, peer: &FourWordAddress) -> Result<PeerInfo> {
309 let key = format!("peer:{}", peer);
310
311 if let Some(data) = self.dht_client.get(key).await? {
312 let info: PeerInfo = serde_json::from_slice(&data)?;
313 Ok(info)
314 } else {
315 Err(anyhow::anyhow!("Peer {} not found in DHT", peer))
316 }
317 }
318
319 async fn broadcast_presence(&self, status: PresenceStatus) -> Result<()> {
321 let event = TransportEvent::PresenceUpdate {
322 status,
323 timestamp: Utc::now(),
324 };
325
326 let _ = self.event_tx.send(event);
327 Ok(())
328 }
329
330 pub fn subscribe_events(&self) -> broadcast::Receiver<TransportEvent> {
332 self.event_tx.subscribe()
333 }
334
335 pub async fn get_metrics(&self) -> NetworkMetrics {
337 self.metrics.read().await.clone()
338 }
339}
340
341#[derive(Debug, Clone)]
343struct ConnectionPool {
344 connections: HashMap<FourWordAddress, PeerConnection>,
345 max_connections: usize,
346}
347
348impl ConnectionPool {
349 fn new() -> Self {
350 Self {
351 connections: HashMap::new(),
352 max_connections: 100,
353 }
354 }
355
356 async fn add_connection(&mut self, peer: FourWordAddress, info: PeerInfo) -> Result<()> {
357 if self.connections.len() >= self.max_connections {
359 self.evict_lru();
361 }
362
363 let connection = PeerConnection {
364 _peer: peer.clone(),
365 _info: info,
366 _established_at: Utc::now(),
367 last_activity: Utc::now(),
368 quality: ConnectionQuality::default(),
369 };
370
371 self.connections.insert(peer, connection);
372 Ok(())
373 }
374
375 fn get_connection(&self, peer: &FourWordAddress) -> Option<&PeerConnection> {
376 self.connections.get(peer)
377 }
378
379 fn evict_lru(&mut self) {
380 if let Some((peer, _)) = self
382 .connections
383 .iter()
384 .min_by_key(|(_, conn)| conn.last_activity)
385 {
386 let peer = peer.clone();
387 self.connections.remove(&peer);
388 }
389 }
390}
391
392#[derive(Debug, Clone)]
394struct PeerConnection {
395 _peer: FourWordAddress,
396 _info: PeerInfo,
397 _established_at: DateTime<Utc>,
398 last_activity: DateTime<Utc>,
399 quality: ConnectionQuality,
400}
401
402impl PeerConnection {
403 #[allow(dead_code)]
404 async fn send(&self, _data: Vec<u8>) -> Result<()> {
405 Ok(())
406 }
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
411struct PeerInfo {
412 addresses: Vec<String>,
413 public_key: Vec<u8>,
414 capabilities: Vec<String>,
415 last_seen: DateTime<Utc>,
416}
417
418#[derive(Debug, Clone, Default)]
420struct ConnectionQuality {
421 latency: Duration,
422 packet_loss: f32,
423 _bandwidth: u64,
424}
425
426#[derive(Debug)]
428struct MessageQueue {
429 messages: HashMap<MessageId, QueuedMessage>,
430 by_recipient: HashMap<FourWordAddress, Vec<MessageId>>,
431}
432
433impl MessageQueue {
434 fn new() -> Self {
435 Self {
436 messages: HashMap::new(),
437 by_recipient: HashMap::new(),
438 }
439 }
440
441 fn add_message(&mut self, recipient: FourWordAddress, message: EncryptedMessage) {
442 let queued = QueuedMessage {
443 message: message.clone(),
444 recipient: recipient.clone(),
445 queued_at: Utc::now(),
446 retry_count: 0,
447 };
448
449 self.messages.insert(message.id, queued);
450 self.by_recipient
451 .entry(recipient)
452 .or_default()
453 .push(message.id);
454 }
455
456 fn get_pending_messages(&self) -> Vec<(FourWordAddress, EncryptedMessage)> {
457 self.messages
458 .values()
459 .filter(|q| q.retry_count < 5)
460 .map(|q| (q.recipient.clone(), q.message.clone()))
461 .collect()
462 }
463
464 fn mark_delivered(&mut self, message_id: &MessageId) {
465 self.messages.remove(message_id);
466
467 for ids in self.by_recipient.values_mut() {
469 ids.retain(|id| id != message_id);
470 }
471 }
472
473 async fn cleanup_expired(&mut self) {
474 let cutoff = Utc::now() - chrono::Duration::days(7);
475
476 self.messages.retain(|_, q| q.queued_at > cutoff);
477
478 self.by_recipient.retain(|_, ids| !ids.is_empty());
480 }
481}
482
483#[derive(Debug, Clone)]
485struct QueuedMessage {
486 message: EncryptedMessage,
487 recipient: FourWordAddress,
488 queued_at: DateTime<Utc>,
489 retry_count: u32,
490}
491
492#[derive(Debug, Clone, Serialize, Deserialize)]
494pub enum DeliveryStatus {
495 Delivered(DateTime<Utc>),
496 Queued,
497 Failed(String),
498 Pending,
499}
500
501#[derive(Debug, Clone, Serialize, Deserialize)]
503pub struct DeliveryReceipt {
504 pub message_id: MessageId,
505 pub timestamp: DateTime<Utc>,
506 pub delivery_status: Vec<(FourWordAddress, DeliveryStatus)>,
507}
508
509#[derive(Debug, Clone)]
511pub struct ReceivedMessage {
512 pub message: EncryptedMessage,
513 pub received_at: DateTime<Utc>,
514}
515
516#[derive(Debug, Clone)]
518pub enum TransportEvent {
519 MessageReceived(ReceivedMessage),
520 MessageDelivered(MessageId),
521 ConnectionEstablished(FourWordAddress),
522 ConnectionLost(FourWordAddress),
523 PresenceUpdate {
524 status: PresenceStatus,
525 timestamp: DateTime<Utc>,
526 },
527}
528
529#[derive(Debug, Clone, Default)]
531pub struct NetworkMetrics {
532 pub messages_sent: u64,
533 pub messages_received: u64,
534 pub messages_queued: u64,
535 pub active_connections: usize,
536 pub average_latency: Duration,
537 pub packet_loss: f32,
538 pub bandwidth_used: u64,
539}
540
541impl NetworkMetrics {
542 fn update_quality(&mut self, pool: &ConnectionPool) {
543 self.active_connections = pool.connections.len();
544
545 if !pool.connections.is_empty() {
546 let total_latency: Duration =
547 pool.connections.values().map(|c| c.quality.latency).sum();
548
549 self.average_latency = total_latency / pool.connections.len() as u32;
550
551 let total_loss: f32 = pool
552 .connections
553 .values()
554 .map(|c| c.quality.packet_loss)
555 .sum();
556
557 self.packet_loss = total_loss / pool.connections.len() as f32;
558 }
559 }
560}
561
562impl Clone for MessageTransport {
564 fn clone(&self) -> Self {
565 Self {
566 network: self.network.clone(),
567 dht_client: self.dht_client.clone(),
568 connections: self.connections.clone(),
569 message_queue: self.message_queue.clone(),
570 confirmations: self.confirmations.clone(),
571 metrics: self.metrics.clone(),
572 event_tx: self.event_tx.clone(),
573 }
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580
581 #[tokio::test]
582 async fn test_transport_creation() {
583 assert!(std::mem::size_of::<MessageTransport>() > 0);
586 }
587
588 #[tokio::test]
589 async fn test_delivery_status() {
590 let status = DeliveryStatus::Delivered(Utc::now());
591
592 match status {
593 DeliveryStatus::Delivered(time) => {
594 assert!(time <= Utc::now());
595 }
596 _ => panic!("Expected Delivered status"),
597 }
598 }
599
600 #[tokio::test]
601 async fn test_message_queue() {
602 let mut queue = MessageQueue::new();
603
604 let recipient = FourWordAddress::from("test-user-address-here");
605 let message = EncryptedMessage {
606 id: MessageId::new(),
607 channel_id: ChannelId::new(),
608 sender: FourWordAddress::from("sender-address-here"),
609 ciphertext: vec![1, 2, 3],
610 nonce: vec![4, 5, 6],
611 key_id: "test-key".to_string(),
612 };
613
614 queue.add_message(recipient.clone(), message.clone());
615
616 let pending = queue.get_pending_messages();
617 assert_eq!(pending.len(), 1);
618 assert_eq!(pending[0].0, recipient);
619
620 queue.mark_delivered(&message.id);
621 let pending = queue.get_pending_messages();
622 assert_eq!(pending.len(), 0);
623 }
624
625 #[tokio::test]
626 async fn test_network_metrics() {
627 let metrics = NetworkMetrics {
628 messages_sent: 100,
629 messages_received: 95,
630 packet_loss: 0.02,
631 ..Default::default()
632 };
633 assert_eq!(metrics.messages_sent, 100);
634 assert!(metrics.packet_loss < 0.05); }
636}