1mod ingress;
81mod metrics;
82mod network;
83
84use thiserror::Error;
85
86#[derive(Debug, Error)]
88pub enum Error {
89 #[error("message too large: {0}")]
90 MessageTooLarge(usize),
91 #[error("network closed")]
92 NetworkClosed,
93 #[error("not valid to link self")]
94 LinkingSelf,
95 #[error("link already exists")]
96 LinkExists,
97 #[error("link missing")]
98 LinkMissing,
99 #[error("invalid success rate (must be in [0, 1]): {0}")]
100 InvalidSuccessRate(f64),
101 #[error("channel already registered: {0}")]
102 ChannelAlreadyRegistered(u32),
103 #[error("send_frame failed")]
104 SendFrameFailed,
105 #[error("recv_frame failed")]
106 RecvFrameFailed,
107 #[error("bind failed")]
108 BindFailed,
109 #[error("accept failed")]
110 AcceptFailed,
111 #[error("dial failed")]
112 DialFailed,
113 #[error("peer missing")]
114 PeerMissing,
115 #[error("invalid connection definition: latency={0}, jitter={1}")]
116 InvalidBehavior(f64, f64),
117}
118
119pub use ingress::{Link, Oracle};
120pub use network::{Config, Network, Receiver, Sender};
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use crate::{Receiver, Recipients, Sender};
126 use bytes::Bytes;
127 use commonware_cryptography::{ed25519::PrivateKey, PrivateKeyExt as _, Signer as _};
128 use commonware_macros::select;
129 use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
130 use futures::{channel::mpsc, SinkExt, StreamExt};
131 use rand::Rng;
132 use std::{
133 collections::{BTreeMap, HashMap},
134 time::Duration,
135 };
136
137 fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
138 let executor = deterministic::Runner::seeded(seed);
139 executor.start(|context| async move {
140 let (network, mut oracle) = Network::new(
142 context.with_label("network"),
143 Config {
144 max_size: 1024 * 1024,
145 },
146 );
147
148 network.start();
150
151 let mut agents = BTreeMap::new();
153 let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
154 for i in 0..size {
155 let pk = PrivateKey::from_seed(i as u64).public_key();
156 let (sender, mut receiver) = oracle.register(pk.clone(), 0).await.unwrap();
157 agents.insert(pk, sender);
158 let mut agent_sender = seen_sender.clone();
159 context
160 .with_label("agent_receiver")
161 .spawn(move |_| async move {
162 for _ in 0..size {
163 receiver.recv().await.unwrap();
164 }
165 agent_sender.send(i).await.unwrap();
166
167 });
169 }
170
171 let only_inbound = PrivateKey::from_seed(0).public_key();
173 for agent in agents.keys() {
174 if agent == &only_inbound {
175 continue;
177 }
178 for other in agents.keys() {
179 let result = oracle
180 .add_link(
181 agent.clone(),
182 other.clone(),
183 Link {
184 latency: 5.0,
185 jitter: 2.5,
186 success_rate: 0.75,
187 },
188 )
189 .await;
190 if agent == other {
191 assert!(matches!(result, Err(Error::LinkingSelf)));
192 } else {
193 assert!(result.is_ok());
194 }
195 }
196 }
197
198 context
200 .with_label("agent_sender")
201 .spawn(|mut context| async move {
202 let keys = agents.keys().collect::<Vec<_>>();
204
205 loop {
207 let index = context.gen_range(0..keys.len());
208 let sender = keys[index];
209 let msg = format!("hello from {sender:?}");
210 let msg = Bytes::from(msg);
211 let mut message_sender = agents.get(sender).unwrap().clone();
212 let sent = message_sender
213 .send(Recipients::All, msg.clone(), false)
214 .await
215 .unwrap();
216 if sender == &only_inbound {
217 assert_eq!(sent.len(), 0);
218 } else {
219 assert_eq!(sent.len(), keys.len() - 1);
220 }
221 }
222 });
223
224 let mut results = Vec::new();
226 for _ in 0..size {
227 results.push(seen_receiver.next().await.unwrap());
228 }
229 (context.auditor().state(), results)
230 })
231 }
232
233 fn compare_outputs(seeds: u64, size: usize) {
234 let mut outputs = Vec::new();
236 for seed in 0..seeds {
237 outputs.push(simulate_messages(seed, size));
238 }
239
240 for seed in 0..seeds {
242 let output = simulate_messages(seed, size);
243 assert_eq!(output, outputs[seed as usize]);
244 }
245 }
246
247 #[test]
248 fn test_determinism() {
249 compare_outputs(25, 25);
250 }
251
252 #[test]
253 fn test_message_too_big() {
254 let executor = deterministic::Runner::default();
255 executor.start(|mut context| async move {
256 let (network, mut oracle) = Network::new(
258 context.with_label("network"),
259 Config {
260 max_size: 1024 * 1024,
261 },
262 );
263
264 network.start();
266
267 let mut agents = HashMap::new();
269 for i in 0..10 {
270 let pk = PrivateKey::from_seed(i as u64).public_key();
271 let (sender, _) = oracle.register(pk.clone(), 0).await.unwrap();
272 agents.insert(pk, sender);
273 }
274
275 let keys = agents.keys().collect::<Vec<_>>();
277 let index = context.gen_range(0..keys.len());
278 let sender = keys[index];
279 let mut message_sender = agents.get(sender).unwrap().clone();
280 let mut msg = vec![0u8; 1024 * 1024 + 1];
281 context.fill(&mut msg[..]);
282 let result = message_sender
283 .send(Recipients::All, msg.into(), false)
284 .await
285 .unwrap_err();
286
287 assert!(matches!(result, Error::MessageTooLarge(_)));
289 });
290 }
291
292 #[test]
293 fn test_linking_self() {
294 let executor = deterministic::Runner::default();
295 executor.start(|context| async move {
296 let (network, mut oracle) = Network::new(
298 context.with_label("network"),
299 Config {
300 max_size: 1024 * 1024,
301 },
302 );
303
304 network.start();
306
307 let pk = PrivateKey::from_seed(0).public_key();
309 oracle.register(pk.clone(), 0).await.unwrap();
310
311 let result = oracle
313 .add_link(
314 pk.clone(),
315 pk,
316 Link {
317 latency: 5.0,
318 jitter: 2.5,
319 success_rate: 0.75,
320 },
321 )
322 .await;
323
324 assert!(matches!(result, Err(Error::LinkingSelf)));
326 });
327 }
328
329 #[test]
330 fn test_duplicate_channel() {
331 let executor = deterministic::Runner::default();
332 executor.start(|context| async move {
333 let (network, mut oracle) = Network::new(
335 context.with_label("network"),
336 Config {
337 max_size: 1024 * 1024,
338 },
339 );
340
341 network.start();
343
344 let pk = PrivateKey::from_seed(0).public_key();
346 oracle.register(pk.clone(), 0).await.unwrap();
347 let result = oracle.register(pk, 0).await;
348
349 assert!(matches!(result, Err(Error::ChannelAlreadyRegistered(0))));
351 });
352 }
353
354 #[test]
355 fn test_invalid_success_rate() {
356 let executor = deterministic::Runner::default();
357 executor.start(|context| async move {
358 let (network, mut oracle) = Network::new(
360 context.with_label("network"),
361 Config {
362 max_size: 1024 * 1024,
363 },
364 );
365
366 network.start();
368
369 let pk1 = PrivateKey::from_seed(0).public_key();
371 let pk2 = PrivateKey::from_seed(1).public_key();
372 oracle.register(pk1.clone(), 0).await.unwrap();
373 oracle.register(pk2.clone(), 0).await.unwrap();
374
375 let result = oracle
377 .add_link(
378 pk1,
379 pk2,
380 Link {
381 latency: 5.0,
382 jitter: 2.5,
383 success_rate: 1.5,
384 },
385 )
386 .await;
387
388 assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
390 });
391 }
392
393 #[test]
394 fn test_invalid_behavior() {
395 let executor = deterministic::Runner::default();
396 executor.start(|context| async move {
397 let (network, mut oracle) = Network::new(
399 context.with_label("network"),
400 Config {
401 max_size: 1024 * 1024,
402 },
403 );
404
405 network.start();
407
408 let pk1 = PrivateKey::from_seed(0).public_key();
410 let pk2 = PrivateKey::from_seed(1).public_key();
411 oracle.register(pk1.clone(), 0).await.unwrap();
412 oracle.register(pk2.clone(), 0).await.unwrap();
413
414 let result = oracle
416 .add_link(
417 pk1.clone(),
418 pk2.clone(),
419 Link {
420 latency: -5.0,
421 jitter: 2.5,
422 success_rate: 1.0,
423 },
424 )
425 .await;
426
427 assert!(matches!(result, Err(Error::InvalidBehavior(-5.0, 2.5))));
429
430 let result = oracle
432 .add_link(
433 pk1,
434 pk2,
435 Link {
436 latency: 5.0,
437 jitter: -2.5,
438 success_rate: 1.0,
439 },
440 )
441 .await;
442
443 assert!(matches!(result, Err(Error::InvalidBehavior(5.0, -2.5))));
445 });
446 }
447
448 #[test]
449 fn test_simple_message_delivery() {
450 let executor = deterministic::Runner::default();
451 executor.start(|context| async move {
452 let (network, mut oracle) = Network::new(
454 context.with_label("network"),
455 Config {
456 max_size: 1024 * 1024,
457 },
458 );
459
460 network.start();
462
463 let pk1 = PrivateKey::from_seed(0).public_key();
465 let pk2 = PrivateKey::from_seed(1).public_key();
466 let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
467 let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
468
469 let _ = oracle.register(pk1.clone(), 1).await.unwrap();
471 let _ = oracle.register(pk2.clone(), 2).await.unwrap();
472
473 oracle
475 .add_link(
476 pk1.clone(),
477 pk2.clone(),
478 Link {
479 latency: 5.0,
480 jitter: 2.5,
481 success_rate: 1.0,
482 },
483 )
484 .await
485 .unwrap();
486 oracle
487 .add_link(
488 pk2.clone(),
489 pk1.clone(),
490 Link {
491 latency: 5.0,
492 jitter: 2.5,
493 success_rate: 1.0,
494 },
495 )
496 .await
497 .unwrap();
498
499 let msg1 = Bytes::from("hello from pk1");
501 let msg2 = Bytes::from("hello from pk2");
502 sender1
503 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
504 .await
505 .unwrap();
506 sender2
507 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
508 .await
509 .unwrap();
510
511 let (sender, message) = receiver1.recv().await.unwrap();
513 assert_eq!(sender, pk2);
514 assert_eq!(message, msg2);
515 let (sender, message) = receiver2.recv().await.unwrap();
516 assert_eq!(sender, pk1);
517 assert_eq!(message, msg1);
518 });
519 }
520
521 #[test]
522 fn test_send_wrong_channel() {
523 let executor = deterministic::Runner::default();
524 executor.start(|context| async move {
525 let (network, mut oracle) = Network::new(
527 context.with_label("network"),
528 Config {
529 max_size: 1024 * 1024,
530 },
531 );
532
533 network.start();
535
536 let pk1 = PrivateKey::from_seed(0).public_key();
538 let pk2 = PrivateKey::from_seed(1).public_key();
539 let (mut sender1, _) = oracle.register(pk1.clone(), 0).await.unwrap();
540 let (_, mut receiver2) = oracle.register(pk2.clone(), 1).await.unwrap();
541
542 oracle
544 .add_link(
545 pk1,
546 pk2.clone(),
547 Link {
548 latency: 5.0,
549 jitter: 0.0,
550 success_rate: 1.0,
551 },
552 )
553 .await
554 .unwrap();
555
556 let msg = Bytes::from("hello from pk1");
558 sender1
559 .send(Recipients::One(pk2), msg, false)
560 .await
561 .unwrap();
562
563 select! {
565 _ = receiver2.recv() => {
566 panic!("unexpected message");
567 },
568 _ = context.sleep(Duration::from_secs(1)) => {},
569 }
570 });
571 }
572
573 #[test]
574 fn test_dynamic_peers() {
575 let executor = deterministic::Runner::default();
576 executor.start(|context| async move {
577 let (network, mut oracle) = Network::new(
579 context.with_label("network"),
580 Config {
581 max_size: 1024 * 1024,
582 },
583 );
584
585 network.start();
587
588 let pk1 = PrivateKey::from_seed(0).public_key();
590 let pk2 = PrivateKey::from_seed(1).public_key();
591 let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
592 let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
593
594 oracle
596 .add_link(
597 pk1.clone(),
598 pk2.clone(),
599 Link {
600 latency: 5.0,
601 jitter: 2.5,
602 success_rate: 1.0,
603 },
604 )
605 .await
606 .unwrap();
607 oracle
608 .add_link(
609 pk2.clone(),
610 pk1.clone(),
611 Link {
612 latency: 5.0,
613 jitter: 2.5,
614 success_rate: 1.0,
615 },
616 )
617 .await
618 .unwrap();
619
620 let msg1 = Bytes::from("attempt 1: hello from pk1");
622 let msg2 = Bytes::from("attempt 1: hello from pk2");
623 sender1
624 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
625 .await
626 .unwrap();
627 sender2
628 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
629 .await
630 .unwrap();
631
632 let (sender, message) = receiver1.recv().await.unwrap();
634 assert_eq!(sender, pk2);
635 assert_eq!(message, msg2);
636 let (sender, message) = receiver2.recv().await.unwrap();
637 assert_eq!(sender, pk1);
638 assert_eq!(message, msg1);
639 });
640 }
641
642 #[test]
643 fn test_dynamic_links() {
644 let executor = deterministic::Runner::default();
645 executor.start(|context| async move {
646 let (network, mut oracle) = Network::new(
648 context.with_label("network"),
649 Config {
650 max_size: 1024 * 1024,
651 },
652 );
653
654 network.start();
656
657 let pk1 = PrivateKey::from_seed(0).public_key();
659 let pk2 = PrivateKey::from_seed(1).public_key();
660 let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
661 let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
662
663 let msg1 = Bytes::from("attempt 1: hello from pk1");
665 let msg2 = Bytes::from("attempt 1: hello from pk2");
666 sender1
667 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
668 .await
669 .unwrap();
670 sender2
671 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
672 .await
673 .unwrap();
674
675 select! {
677 _ = receiver1.recv() => {
678 panic!("unexpected message");
679 },
680 _ = receiver2.recv() => {
681 panic!("unexpected message");
682 },
683 _ = context.sleep(Duration::from_secs(1)) => {},
684 }
685
686 oracle
688 .add_link(
689 pk1.clone(),
690 pk2.clone(),
691 Link {
692 latency: 5.0,
693 jitter: 2.5,
694 success_rate: 1.0,
695 },
696 )
697 .await
698 .unwrap();
699 oracle
700 .add_link(
701 pk2.clone(),
702 pk1.clone(),
703 Link {
704 latency: 5.0,
705 jitter: 2.5,
706 success_rate: 1.0,
707 },
708 )
709 .await
710 .unwrap();
711
712 let msg1 = Bytes::from("attempt 2: hello from pk1");
714 let msg2 = Bytes::from("attempt 2: hello from pk2");
715 sender1
716 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
717 .await
718 .unwrap();
719 sender2
720 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
721 .await
722 .unwrap();
723
724 let (sender, message) = receiver1.recv().await.unwrap();
726 assert_eq!(sender, pk2);
727 assert_eq!(message, msg2);
728 let (sender, message) = receiver2.recv().await.unwrap();
729 assert_eq!(sender, pk1);
730 assert_eq!(message, msg1);
731
732 oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
734 oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
735
736 let msg1 = Bytes::from("attempt 3: hello from pk1");
738 let msg2 = Bytes::from("attempt 3: hello from pk2");
739 sender1
740 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
741 .await
742 .unwrap();
743 sender2
744 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
745 .await
746 .unwrap();
747
748 select! {
750 _ = receiver1.recv() => {
751 panic!("unexpected message");
752 },
753 _ = receiver2.recv() => {
754 panic!("unexpected message");
755 },
756 _ = context.sleep(Duration::from_secs(1)) => {},
757 }
758
759 let result = oracle.remove_link(pk1, pk2).await;
761 assert!(matches!(result, Err(Error::LinkMissing)));
762 });
763 }
764}