1use std::io::{Error, ErrorKind};
7use std::net::SocketAddr;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use tokio::net::UdpSocket;
12use tokio::time::timeout;
13
14use elara_core::{DegradationLevel, NodeId, PresenceVector};
15use rand::rngs::StdRng;
16use rand::{Rng, SeedableRng};
17
18pub struct NetworkTestNode {
24 pub node_id: NodeId,
26
27 socket: Arc<UdpSocket>,
29
30 local_addr: SocketAddr,
32
33 received: Vec<(Vec<u8>, SocketAddr)>,
35
36 sent_count: usize,
38
39 recv_buf: Vec<u8>,
40
41 presence: PresenceVector,
43
44 degradation: DegradationLevel,
46}
47
48impl NetworkTestNode {
49 pub async fn new(node_id: NodeId) -> std::io::Result<Self> {
51 let socket = UdpSocket::bind("127.0.0.1:0").await?;
52 let local_addr = socket.local_addr()?;
53
54 Ok(Self {
55 node_id,
56 socket: Arc::new(socket),
57 local_addr,
58 received: Vec::new(),
59 sent_count: 0,
60 recv_buf: vec![0u8; 65535],
61 presence: PresenceVector::full(),
62 degradation: DegradationLevel::L0_FullPerception,
63 })
64 }
65
66 pub fn local_addr(&self) -> SocketAddr {
68 self.local_addr
69 }
70
71 pub async fn send_to(&mut self, data: &[u8], dest: SocketAddr) -> std::io::Result<()> {
73 self.socket.send_to(data, dest).await?;
74 self.sent_count += 1;
75 Ok(())
76 }
77
78 pub async fn recv_timeout(&mut self, timeout_ms: u64) -> Option<(Vec<u8>, SocketAddr)> {
80 match timeout(
81 Duration::from_millis(timeout_ms),
82 self.socket.recv_from(&mut self.recv_buf),
83 )
84 .await
85 {
86 Ok(Ok((len, addr))) => {
87 let data = self.recv_buf[..len].to_vec();
88 self.received.push((data.clone(), addr));
89 Some((data, addr))
90 }
91 _ => None,
92 }
93 }
94
95 pub fn received_count(&self) -> usize {
97 self.received.len()
98 }
99
100 pub fn sent_count(&self) -> usize {
102 self.sent_count
103 }
104
105 pub fn update_presence(&mut self, factor: f32) {
107 self.presence = PresenceVector::new(
108 self.presence.liveness * factor,
109 self.presence.immediacy * factor,
110 self.presence.coherence * factor,
111 self.presence.relational_continuity * factor,
112 self.presence.emotional_bandwidth * factor,
113 );
114 }
115
116 pub fn degrade(&mut self) -> bool {
118 if let Some(next) = self.degradation.degrade() {
119 self.degradation = next;
120 true
121 } else {
122 false
123 }
124 }
125
126 pub fn is_alive(&self) -> bool {
128 self.presence.is_alive()
129 }
130
131 pub fn presence(&self) -> &PresenceVector {
133 &self.presence
134 }
135
136 pub fn degradation_level(&self) -> DegradationLevel {
138 self.degradation
139 }
140}
141
142#[derive(Debug, Clone)]
148pub struct NetworkTestConfig {
149 pub node_count: usize,
151
152 pub messages_per_node: usize,
154
155 pub recv_timeout_ms: u64,
157
158 pub send_delay_ms: u64,
160
161 pub loss_rate: f32,
162
163 pub jitter_ms: u64,
164
165 pub rng_seed: u64,
166
167 pub nat_relay: bool,
168}
169
170impl Default for NetworkTestConfig {
171 fn default() -> Self {
172 Self {
173 node_count: 3,
174 messages_per_node: 5,
175 recv_timeout_ms: 100,
176 send_delay_ms: 10,
177 loss_rate: 0.0,
178 jitter_ms: 0,
179 rng_seed: 42,
180 nat_relay: false,
181 }
182 }
183}
184
185impl NetworkTestConfig {
186 fn validate(&self) -> std::io::Result<()> {
187 if self.node_count == 0 {
188 return Err(Error::new(
189 ErrorKind::InvalidInput,
190 "node_count must be greater than 0",
191 ));
192 }
193 if self.messages_per_node == 0 {
194 return Err(Error::new(
195 ErrorKind::InvalidInput,
196 "messages_per_node must be greater than 0",
197 ));
198 }
199 if self.recv_timeout_ms == 0 {
200 return Err(Error::new(
201 ErrorKind::InvalidInput,
202 "recv_timeout_ms must be greater than 0",
203 ));
204 }
205 if !self.loss_rate.is_finite() {
206 return Err(Error::new(
207 ErrorKind::InvalidInput,
208 "loss_rate must be finite",
209 ));
210 }
211 Ok(())
212 }
213}
214
215#[derive(Debug, Clone)]
217pub struct NetworkTestResult {
218 pub messages_sent: usize,
220
221 pub messages_received: usize,
223
224 pub delivery_rate: f64,
226
227 pub all_alive: bool,
229
230 pub invariants_maintained: bool,
232
233 pub violations: Vec<String>,
235}
236
237impl NetworkTestResult {
238 pub fn passed(&self) -> bool {
240 self.all_alive && self.invariants_maintained && self.delivery_rate > 0.9
241 }
242
243 pub fn failure(violations: Vec<String>) -> Self {
244 Self {
245 messages_sent: 0,
246 messages_received: 0,
247 delivery_rate: 0.0,
248 all_alive: false,
249 invariants_maintained: false,
250 violations,
251 }
252 }
253}
254
255pub struct NetworkTestHarness {
257 config: NetworkTestConfig,
258 nodes: Vec<NetworkTestNode>,
259}
260
261struct NatRelay {
262 addr: SocketAddr,
263 handle: tokio::task::JoinHandle<()>,
264}
265
266impl NatRelay {
267 async fn start(routes: Vec<SocketAddr>) -> std::io::Result<Self> {
268 let socket = Arc::new(UdpSocket::bind("127.0.0.1:0").await?);
269 let addr = socket.local_addr()?;
270 let routes = Arc::new(routes);
271 let handle = tokio::spawn(async move {
272 let mut buf = vec![0u8; 65535];
273 loop {
274 let (len, _) = match socket.recv_from(&mut buf).await {
275 Ok(result) => result,
276 Err(_) => break,
277 };
278 let Some((dest_idx, payload_start)) = parse_nat_payload(&buf[..len]) else {
279 continue;
280 };
281 if let Some(dest) = routes.get(dest_idx) {
282 let _ = socket.send_to(&buf[payload_start..len], dest).await;
283 }
284 }
285 });
286
287 Ok(Self { addr, handle })
288 }
289
290 async fn shutdown(self) {
291 self.handle.abort();
292 let _ = self.handle.await;
293 }
294}
295
296fn parse_nat_payload(buf: &[u8]) -> Option<(usize, usize)> {
297 if buf.len() < 2 {
298 return None;
299 }
300 let dest = u16::from_le_bytes([buf[0], buf[1]]) as usize;
301 Some((dest, 2))
302}
303
304impl NetworkTestHarness {
305 pub async fn new(config: NetworkTestConfig) -> std::io::Result<Self> {
307 config.validate()?;
308 let mut nodes = Vec::new();
309
310 for i in 0..config.node_count {
311 let node = NetworkTestNode::new(NodeId::new(i as u64 + 1)).await?;
312 nodes.push(node);
313 }
314
315 Ok(Self { config, nodes })
316 }
317
318 pub async fn run(&mut self) -> NetworkTestResult {
320 let mut messages_sent = 0;
321 let mut messages_received = 0;
322 let mut rng = StdRng::seed_from_u64(self.config.rng_seed);
323 let mut violations = Vec::new();
324
325 if self.nodes.len() < 2 {
326 violations.push("INV-0 violated: Insufficient nodes".to_string());
327 }
328
329 let addresses: Vec<_> = self.nodes.iter().map(|n| n.local_addr()).collect();
331
332 let mut relay = None;
333 let mut relay_addr = None;
334 if self.config.nat_relay {
335 match NatRelay::start(addresses.clone()).await {
336 Ok(nat) => {
337 relay_addr = Some(nat.addr);
338 relay = Some(nat);
339 }
340 Err(err) => {
341 violations.push(format!("NAT relay failed: {}", err));
342 }
343 }
344 }
345
346 let loss_rate = self.config.loss_rate.clamp(0.0, 1.0);
347
348 let mut send_failures = 0usize;
350 for (sender_idx, sender) in self.nodes.iter_mut().enumerate() {
351 for msg_num in 0..self.config.messages_per_node {
352 for (receiver_idx, dest) in addresses.iter().copied().enumerate() {
353 if sender_idx == receiver_idx {
354 continue;
355 }
356 let msg_bytes: Vec<u8> = if self.config.nat_relay {
357 let payload = format!("msg_{}_{}", sender_idx, msg_num).into_bytes();
358 let mut buf = Vec::with_capacity(2 + payload.len());
359 let idx_le = (receiver_idx as u16).to_le_bytes();
360 buf.extend_from_slice(&idx_le);
361 buf.extend_from_slice(&payload);
362 buf
363 } else {
364 format!("msg_{}_{}", sender_idx, msg_num).into_bytes()
365 };
366 messages_sent += 1;
367
368 if loss_rate > 0.0 && rng.gen::<f32>() < loss_rate {
369 continue;
370 }
371
372 if self.config.jitter_ms > 0 {
373 let jitter = rng.gen_range(0..=self.config.jitter_ms);
374 tokio::time::sleep(Duration::from_millis(jitter)).await;
375 }
376
377 let target = relay_addr.unwrap_or(dest);
378 if sender.send_to(&msg_bytes, target).await.is_err() {
379 send_failures += 1;
380 }
381 }
382
383 if self.config.send_delay_ms > 0 {
385 tokio::time::sleep(Duration::from_millis(self.config.send_delay_ms)).await;
386 }
387 }
388 }
389
390 tokio::time::sleep(Duration::from_millis(50)).await;
392
393 for node in &mut self.nodes {
395 loop {
396 if node
397 .recv_timeout(self.config.recv_timeout_ms)
398 .await
399 .is_none()
400 {
401 break;
402 }
403 messages_received += 1;
404 }
405 }
406
407 if let Some(relay) = relay {
408 relay.shutdown().await;
409 }
410
411 let all_alive = self.nodes.iter().all(|n| n.is_alive());
412
413 if !all_alive {
414 violations.push("INV-2 violated: Not all nodes are alive".to_string());
415 }
416
417 if send_failures > 0 {
418 violations.push(format!("Send failures: {}", send_failures));
419 }
420
421 let delivery_rate = if messages_sent > 0 {
422 messages_received as f64 / messages_sent as f64
423 } else {
424 1.0
425 };
426
427 NetworkTestResult {
428 messages_sent,
429 messages_received,
430 delivery_rate,
431 all_alive,
432 invariants_maintained: violations.is_empty(),
433 violations,
434 }
435 }
436
437 pub fn nodes(&self) -> &[NetworkTestNode] {
439 &self.nodes
440 }
441}
442
443pub async fn measure_rtt(
449 node_a: &mut NetworkTestNode,
450 node_b: &mut NetworkTestNode,
451 samples: usize,
452) -> Option<Duration> {
453 let rtts = measure_rtt_samples(node_a, node_b, samples).await;
454 if rtts.is_empty() {
455 None
456 } else {
457 let total: Duration = rtts.iter().sum();
458 Some(total / rtts.len() as u32)
459 }
460}
461
462pub async fn measure_rtt_samples(
463 node_a: &mut NetworkTestNode,
464 node_b: &mut NetworkTestNode,
465 samples: usize,
466) -> Vec<Duration> {
467 let mut rtts = Vec::new();
468 let addr_b = node_b.local_addr();
469 let addr_a = node_a.local_addr();
470
471 for i in 0..samples {
472 let ping_msg = format!("ping_{}", i);
473 let start = std::time::Instant::now();
474
475 if node_a.send_to(ping_msg.as_bytes(), addr_b).await.is_err() {
476 continue;
477 }
478
479 if let Some((data, _)) = node_b.recv_timeout(100).await {
480 let pong_msg = format!("pong_{}", String::from_utf8_lossy(&data));
481 if node_b.send_to(pong_msg.as_bytes(), addr_a).await.is_err() {
482 continue;
483 }
484
485 if node_a.recv_timeout(100).await.is_some() {
486 rtts.push(start.elapsed());
487 }
488 }
489 }
490
491 rtts
492}
493
494pub async fn measure_rtt_samples_with_conditions(
495 node_a: &mut NetworkTestNode,
496 node_b: &mut NetworkTestNode,
497 samples: usize,
498 loss_rate: f32,
499 jitter_ms: u64,
500 rng_seed: u64,
501) -> Vec<Duration> {
502 let mut rtts = Vec::new();
503 let addr_b = node_b.local_addr();
504 let addr_a = node_a.local_addr();
505 let mut rng = StdRng::seed_from_u64(rng_seed);
506 let clamped_loss = loss_rate.clamp(0.0, 1.0);
507
508 for i in 0..samples {
509 if clamped_loss > 0.0 && rng.gen::<f32>() < clamped_loss {
510 continue;
511 }
512
513 if jitter_ms > 0 {
514 let jitter = rng.gen_range(0..=jitter_ms);
515 tokio::time::sleep(Duration::from_millis(jitter)).await;
516 }
517
518 let ping_msg = format!("ping_{}", i);
519 let start = Instant::now();
520
521 if node_a.send_to(ping_msg.as_bytes(), addr_b).await.is_err() {
522 continue;
523 }
524
525 if let Some((data, _)) = node_b.recv_timeout(100).await {
526 if clamped_loss > 0.0 && rng.gen::<f32>() < clamped_loss {
527 continue;
528 }
529
530 if jitter_ms > 0 {
531 let jitter = rng.gen_range(0..=jitter_ms);
532 tokio::time::sleep(Duration::from_millis(jitter)).await;
533 }
534
535 let pong_msg = format!("pong_{}", String::from_utf8_lossy(&data));
536 if node_b.send_to(pong_msg.as_bytes(), addr_a).await.is_err() {
537 continue;
538 }
539
540 if node_a.recv_timeout(100).await.is_some() {
541 rtts.push(start.elapsed());
542 }
543 }
544 }
545
546 rtts
547}
548
549pub async fn measure_rtt_nat(samples: usize) -> Option<Duration> {
550 let rtts = measure_rtt_nat_samples(samples).await;
551 if rtts.is_empty() {
552 None
553 } else {
554 let total: Duration = rtts.iter().sum();
555 Some(total / rtts.len() as u32)
556 }
557}
558
559pub async fn measure_rtt_nat_samples(samples: usize) -> Vec<Duration> {
560 let mut node_a = match NetworkTestNode::new(NodeId::new(1)).await {
561 Ok(node) => node,
562 Err(_) => return Vec::new(),
563 };
564 let mut node_b = match NetworkTestNode::new(NodeId::new(2)).await {
565 Ok(node) => node,
566 Err(_) => return Vec::new(),
567 };
568 let routes = vec![node_a.local_addr(), node_b.local_addr()];
569 let relay = match NatRelay::start(routes).await {
570 Ok(relay) => relay,
571 Err(_) => return Vec::new(),
572 };
573 let relay_addr = relay.addr;
574 let mut rtts = Vec::new();
575
576 for i in 0..samples {
577 let payload = format!("ping_{}", i).into_bytes();
578 let mut buf = Vec::with_capacity(2 + payload.len());
579 buf.extend_from_slice(&1u16.to_le_bytes());
580 buf.extend_from_slice(&payload);
581 let start = Instant::now();
582
583 if node_a.send_to(&buf, relay_addr).await.is_err() {
584 continue;
585 }
586
587 if let Some((data, _)) = node_b.recv_timeout(100).await {
588 let pong_payload = format!("pong_{}", String::from_utf8_lossy(&data)).into_bytes();
589 let mut pong_buf = Vec::with_capacity(2 + pong_payload.len());
590 pong_buf.extend_from_slice(&0u16.to_le_bytes());
591 pong_buf.extend_from_slice(&pong_payload);
592 if node_b.send_to(&pong_buf, relay_addr).await.is_err() {
593 continue;
594 }
595
596 if node_a.recv_timeout(100).await.is_some() {
597 rtts.push(start.elapsed());
598 }
599 }
600 }
601
602 relay.shutdown().await;
603
604 rtts
605}
606
607pub async fn test_basic_connectivity() -> NetworkTestResult {
613 let config = NetworkTestConfig {
614 node_count: 2,
615 messages_per_node: 3,
616 recv_timeout_ms: 100,
617 send_delay_ms: 5,
618 loss_rate: 0.0,
619 jitter_ms: 0,
620 rng_seed: 1,
621 nat_relay: false,
622 };
623
624 match NetworkTestHarness::new(config).await {
625 Ok(mut harness) => harness.run().await,
626 Err(err) => {
627 NetworkTestResult::failure(vec![format!("network harness creation failed: {}", err)])
628 }
629 }
630}
631
632pub async fn test_multi_node_network() -> NetworkTestResult {
634 let config = NetworkTestConfig::default();
635 match NetworkTestHarness::new(config).await {
636 Ok(mut harness) => harness.run().await,
637 Err(err) => {
638 NetworkTestResult::failure(vec![format!("network harness creation failed: {}", err)])
639 }
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646
647 #[tokio::test]
648 async fn test_network_node_creation() {
649 let node = NetworkTestNode::new(NodeId::new(1)).await.unwrap();
650 assert!(node.local_addr().port() > 0);
651 assert!(node.is_alive());
652 }
653
654 #[tokio::test]
655 async fn test_network_send_receive() {
656 let mut node_a = NetworkTestNode::new(NodeId::new(1)).await.unwrap();
657 let mut node_b = NetworkTestNode::new(NodeId::new(2)).await.unwrap();
658
659 let addr_b = node_b.local_addr();
660
661 node_a.send_to(b"hello", addr_b).await.unwrap();
663
664 let result = node_b.recv_timeout(100).await;
666 assert!(result.is_some());
667
668 let (data, _) = result.unwrap();
669 assert_eq!(&data, b"hello");
670 }
671
672 #[tokio::test]
673 async fn test_invalid_config_rejected() {
674 let config = NetworkTestConfig {
675 node_count: 0,
676 messages_per_node: 1,
677 recv_timeout_ms: 100,
678 send_delay_ms: 0,
679 loss_rate: 0.0,
680 jitter_ms: 0,
681 rng_seed: 1,
682 nat_relay: false,
683 };
684
685 let result = NetworkTestHarness::new(config).await;
686 assert!(result.is_err());
687 }
688
689 #[tokio::test]
690 async fn test_basic_connectivity_test() {
691 let result = test_basic_connectivity().await;
692
693 assert!(result.all_alive, "All nodes should be alive");
694 assert!(result.delivery_rate > 0.9, "Delivery rate should be > 90%");
695 assert!(result.passed(), "Basic connectivity test should pass");
696 }
697
698 #[tokio::test]
699 async fn test_multi_node_network_test() {
700 let result = test_multi_node_network().await;
701
702 assert!(result.all_alive, "All nodes should be alive");
703 assert!(result.messages_sent > 0, "Should have sent messages");
704 assert!(
705 result.messages_received > 0,
706 "Should have received messages"
707 );
708 }
709
710 #[tokio::test]
711 async fn test_rtt_measurement() {
712 let mut node_a = NetworkTestNode::new(NodeId::new(1)).await.unwrap();
713 let mut node_b = NetworkTestNode::new(NodeId::new(2)).await.unwrap();
714
715 let rtt = measure_rtt(&mut node_a, &mut node_b, 3).await;
716
717 if let Some(rtt) = rtt {
719 let max_rtt_ms = if cfg!(target_os = "windows") {
720 500
721 } else {
722 200
723 };
724 assert!(
725 rtt < Duration::from_millis(max_rtt_ms),
726 "Localhost RTT should be < {max_rtt_ms}ms"
727 );
728 }
729 }
730
731 #[tokio::test]
732 async fn test_network_harness() {
733 let config = NetworkTestConfig {
734 node_count: 3,
735 messages_per_node: 2,
736 recv_timeout_ms: 100,
737 send_delay_ms: 5,
738 loss_rate: 0.0,
739 jitter_ms: 0,
740 rng_seed: 5,
741 nat_relay: false,
742 };
743
744 let mut harness = NetworkTestHarness::new(config).await.unwrap();
745 let result = harness.run().await;
746
747 assert!(
748 result.invariants_maintained,
749 "Invariants should be maintained"
750 );
751 }
752
753 #[tokio::test]
754 async fn test_network_loss_and_jitter() {
755 let config = NetworkTestConfig {
756 node_count: 3,
757 messages_per_node: 50,
758 recv_timeout_ms: 150,
759 send_delay_ms: 0,
760 loss_rate: 0.2,
761 jitter_ms: 25,
762 rng_seed: 9,
763 nat_relay: false,
764 };
765
766 let mut harness = NetworkTestHarness::new(config).await.unwrap();
767 let result = harness.run().await;
768
769 assert!(result.all_alive, "All nodes should be alive");
770 assert!(result.delivery_rate < 0.95, "Delivery rate should drop");
771 }
772
773 #[tokio::test]
774 async fn test_network_nat_relay() {
775 let config = NetworkTestConfig {
776 node_count: 3,
777 messages_per_node: 5,
778 recv_timeout_ms: 150,
779 send_delay_ms: 5,
780 loss_rate: 0.0,
781 jitter_ms: 0,
782 rng_seed: 11,
783 nat_relay: true,
784 };
785
786 let mut harness = NetworkTestHarness::new(config).await.unwrap();
787 let result = harness.run().await;
788
789 assert!(result.all_alive, "All nodes should be alive");
790 assert!(result.delivery_rate > 0.9, "Delivery rate should be high");
791 }
792
793 #[tokio::test]
794 async fn test_nat_relay_routes_to_destination() {
795 let mut node_a = NetworkTestNode::new(NodeId::new(1)).await.unwrap();
796 let mut node_b = NetworkTestNode::new(NodeId::new(2)).await.unwrap();
797
798 let relay = NatRelay::start(vec![node_a.local_addr(), node_b.local_addr()])
799 .await
800 .unwrap();
801
802 let mut msg = Vec::with_capacity(2 + 5);
803 msg.extend_from_slice(&(1u16).to_le_bytes());
804 msg.extend_from_slice(b"hello");
805 node_a.send_to(&msg, relay.addr).await.unwrap();
806
807 let received_b = node_b.recv_timeout(100).await;
808 assert!(received_b.is_some());
809 let (data, _) = received_b.unwrap();
810 assert_eq!(&data, b"hello");
811
812 let received_a = node_a.recv_timeout(50).await;
813 assert!(received_a.is_none());
814
815 relay.shutdown().await;
816 }
817
818 #[tokio::test]
819 async fn test_network_high_loss_and_jitter() {
820 let config = NetworkTestConfig {
821 node_count: 3,
822 messages_per_node: 20,
823 recv_timeout_ms: 200,
824 send_delay_ms: 0,
825 loss_rate: 0.8,
826 jitter_ms: 100,
827 rng_seed: 17,
828 nat_relay: false,
829 };
830
831 let mut harness = NetworkTestHarness::new(config).await.unwrap();
832 let result = harness.run().await;
833
834 assert!(result.all_alive, "All nodes should be alive");
835 assert!(result.delivery_rate < 0.5, "Delivery rate should be low");
836 }
837}