commonware_p2p/simulated/
mod.rs

1//! Simulate networking between peers with configurable link behavior (i.e. drops, latency, corruption, etc.).
2//!
3//! Both peer and link modification can be performed dynamically over the lifetime of the simulated network. This
4//! can be used to mimic transient network partitions, offline nodes (that later connect), and/or degrading link
5//! quality.
6//!
7//! # Determinism
8//!
9//! `commonware-p2p::simulated` can be run deterministically when paired with `commonware-runtime::deterministic`.
10//! This makes it possible to reproduce an arbitrary order of delivered/dropped messages with a given seed.
11//!
12//! # Example
13//!
14//! ```rust
15//! use commonware_p2p::simulated::{Config, Link, Network};
16//! use commonware_cryptography::{ed25519, PrivateKey, Signer as _, PublicKey as _, PrivateKeyExt as _};
17//! use commonware_runtime::{deterministic, Spawner, Runner, Metrics};
18//!
19//! // Generate peers
20//! let peers = vec![
21//!     ed25519::PrivateKey::from_seed(0).public_key(),
22//!     ed25519::PrivateKey::from_seed(1).public_key(),
23//!     ed25519::PrivateKey::from_seed(2).public_key(),
24//!     ed25519::PrivateKey::from_seed(3).public_key(),
25//! ];
26//!
27//! // Configure network
28//! let p2p_cfg = Config {
29//!     max_size: 1024 * 1024, // 1MB
30//! };
31//!
32//! // Start context
33//! let executor = deterministic::Runner::seeded(0);
34//! executor.start(|context| async move {
35//!     // Initialize network
36//!     let (network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
37//!
38//!     // Start network
39//!     let network_handler = network.start();
40//!
41//!     // Register some peers
42//!     let (sender, receiver) = oracle.register(peers[0].clone(), 0).await.unwrap();
43//!     let (sender, receiver) = oracle.register(peers[1].clone(), 0).await.unwrap();
44//!
45//!     // Link 2 peers
46//!     oracle.add_link(
47//!         peers[0].clone(),
48//!         peers[1].clone(),
49//!         Link {
50//!             latency: 5.0,
51//!             jitter: 2.5,
52//!             success_rate: 0.75,
53//!         },
54//!     ).await.unwrap();
55//!
56//!     // ... Use sender and receiver ...
57//!
58//!     // Update link
59//!     oracle.remove_link(
60//!         peers[0].clone(),
61//!         peers[1].clone(),
62//!     ).await.unwrap();
63//!     oracle.add_link(
64//!         peers[0].clone(),
65//!         peers[1].clone(),
66//!         Link {
67//!             latency: 100.0,
68//!             jitter: 25.0,
69//!             success_rate: 0.8,
70//!         },
71//!     ).await.unwrap();
72//!
73//!     // ... Use sender and receiver ...
74//!
75//!     // Shutdown network
76//!     network_handler.abort();
77//! });
78//! ```
79
80mod ingress;
81mod metrics;
82mod network;
83
84use thiserror::Error;
85
86/// Errors that can occur when interacting with the network.
87#[derive(Debug, Error)]
88pub enum Error {
89    #[error("message too large: {0}")]
90    MessageTooLarge(usize),
91    #[error("network closed")]
92    NetworkClosed,
93    #[error("not valid to link self")]
94    LinkingSelf,
95    #[error("link already exists")]
96    LinkExists,
97    #[error("link missing")]
98    LinkMissing,
99    #[error("invalid success rate (must be in [0, 1]): {0}")]
100    InvalidSuccessRate(f64),
101    #[error("channel already registered: {0}")]
102    ChannelAlreadyRegistered(u32),
103    #[error("send_frame failed")]
104    SendFrameFailed,
105    #[error("recv_frame failed")]
106    RecvFrameFailed,
107    #[error("bind failed")]
108    BindFailed,
109    #[error("accept failed")]
110    AcceptFailed,
111    #[error("dial failed")]
112    DialFailed,
113    #[error("peer missing")]
114    PeerMissing,
115    #[error("invalid connection definition: latency={0}, jitter={1}")]
116    InvalidBehavior(f64, f64),
117}
118
119pub use ingress::{Link, Oracle};
120pub use network::{Config, Network, Receiver, Sender};
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use crate::{Receiver, Recipients, Sender};
126    use bytes::Bytes;
127    use commonware_cryptography::{ed25519::PrivateKey, PrivateKeyExt as _, Signer as _};
128    use commonware_macros::select;
129    use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
130    use futures::{channel::mpsc, SinkExt, StreamExt};
131    use rand::Rng;
132    use std::{
133        collections::{BTreeMap, HashMap},
134        time::Duration,
135    };
136
137    fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
138        let executor = deterministic::Runner::seeded(seed);
139        executor.start(|context| async move {
140            // Create simulated network
141            let (network, mut oracle) = Network::new(
142                context.with_label("network"),
143                Config {
144                    max_size: 1024 * 1024,
145                },
146            );
147
148            // Start network
149            network.start();
150
151            // Register agents
152            let mut agents = BTreeMap::new();
153            let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
154            for i in 0..size {
155                let pk = PrivateKey::from_seed(i as u64).public_key();
156                let (sender, mut receiver) = oracle.register(pk.clone(), 0).await.unwrap();
157                agents.insert(pk, sender);
158                let mut agent_sender = seen_sender.clone();
159                context
160                    .with_label("agent_receiver")
161                    .spawn(move |_| async move {
162                        for _ in 0..size {
163                            receiver.recv().await.unwrap();
164                        }
165                        agent_sender.send(i).await.unwrap();
166
167                        // Exiting early here tests the case where the recipient end of an agent is dropped
168                    });
169            }
170
171            // Randomly link agents
172            let only_inbound = PrivateKey::from_seed(0).public_key();
173            for agent in agents.keys() {
174                if agent == &only_inbound {
175                    // Test that we can gracefully handle missing links
176                    continue;
177                }
178                for other in agents.keys() {
179                    let result = oracle
180                        .add_link(
181                            agent.clone(),
182                            other.clone(),
183                            Link {
184                                latency: 5.0,
185                                jitter: 2.5,
186                                success_rate: 0.75,
187                            },
188                        )
189                        .await;
190                    if agent == other {
191                        assert!(matches!(result, Err(Error::LinkingSelf)));
192                    } else {
193                        assert!(result.is_ok());
194                    }
195                }
196            }
197
198            // Send messages
199            context
200                .with_label("agent_sender")
201                .spawn(|mut context| async move {
202                    // Sort agents for deterministic output
203                    let keys = agents.keys().collect::<Vec<_>>();
204
205                    // Send messages
206                    loop {
207                        let index = context.gen_range(0..keys.len());
208                        let sender = keys[index];
209                        let msg = format!("hello from {sender:?}");
210                        let msg = Bytes::from(msg);
211                        let mut message_sender = agents.get(sender).unwrap().clone();
212                        let sent = message_sender
213                            .send(Recipients::All, msg.clone(), false)
214                            .await
215                            .unwrap();
216                        if sender == &only_inbound {
217                            assert_eq!(sent.len(), 0);
218                        } else {
219                            assert_eq!(sent.len(), keys.len() - 1);
220                        }
221                    }
222                });
223
224            // Wait for all recipients
225            let mut results = Vec::new();
226            for _ in 0..size {
227                results.push(seen_receiver.next().await.unwrap());
228            }
229            (context.auditor().state(), results)
230        })
231    }
232
233    fn compare_outputs(seeds: u64, size: usize) {
234        // Collect outputs
235        let mut outputs = Vec::new();
236        for seed in 0..seeds {
237            outputs.push(simulate_messages(seed, size));
238        }
239
240        // Confirm outputs are deterministic
241        for seed in 0..seeds {
242            let output = simulate_messages(seed, size);
243            assert_eq!(output, outputs[seed as usize]);
244        }
245    }
246
247    #[test]
248    fn test_determinism() {
249        compare_outputs(25, 25);
250    }
251
252    #[test]
253    fn test_message_too_big() {
254        let executor = deterministic::Runner::default();
255        executor.start(|mut context| async move {
256            // Create simulated network
257            let (network, mut oracle) = Network::new(
258                context.with_label("network"),
259                Config {
260                    max_size: 1024 * 1024,
261                },
262            );
263
264            // Start network
265            network.start();
266
267            // Register agents
268            let mut agents = HashMap::new();
269            for i in 0..10 {
270                let pk = PrivateKey::from_seed(i as u64).public_key();
271                let (sender, _) = oracle.register(pk.clone(), 0).await.unwrap();
272                agents.insert(pk, sender);
273            }
274
275            // Send invalid message
276            let keys = agents.keys().collect::<Vec<_>>();
277            let index = context.gen_range(0..keys.len());
278            let sender = keys[index];
279            let mut message_sender = agents.get(sender).unwrap().clone();
280            let mut msg = vec![0u8; 1024 * 1024 + 1];
281            context.fill(&mut msg[..]);
282            let result = message_sender
283                .send(Recipients::All, msg.into(), false)
284                .await
285                .unwrap_err();
286
287            // Confirm error is correct
288            assert!(matches!(result, Error::MessageTooLarge(_)));
289        });
290    }
291
292    #[test]
293    fn test_linking_self() {
294        let executor = deterministic::Runner::default();
295        executor.start(|context| async move {
296            // Create simulated network
297            let (network, mut oracle) = Network::new(
298                context.with_label("network"),
299                Config {
300                    max_size: 1024 * 1024,
301                },
302            );
303
304            // Start network
305            network.start();
306
307            // Register agents
308            let pk = PrivateKey::from_seed(0).public_key();
309            oracle.register(pk.clone(), 0).await.unwrap();
310
311            // Attempt to link self
312            let result = oracle
313                .add_link(
314                    pk.clone(),
315                    pk,
316                    Link {
317                        latency: 5.0,
318                        jitter: 2.5,
319                        success_rate: 0.75,
320                    },
321                )
322                .await;
323
324            // Confirm error is correct
325            assert!(matches!(result, Err(Error::LinkingSelf)));
326        });
327    }
328
329    #[test]
330    fn test_duplicate_channel() {
331        let executor = deterministic::Runner::default();
332        executor.start(|context| async move {
333            // Create simulated network
334            let (network, mut oracle) = Network::new(
335                context.with_label("network"),
336                Config {
337                    max_size: 1024 * 1024,
338                },
339            );
340
341            // Start network
342            network.start();
343
344            // Register agents
345            let pk = PrivateKey::from_seed(0).public_key();
346            oracle.register(pk.clone(), 0).await.unwrap();
347            let result = oracle.register(pk, 0).await;
348
349            // Confirm error is correct
350            assert!(matches!(result, Err(Error::ChannelAlreadyRegistered(0))));
351        });
352    }
353
354    #[test]
355    fn test_invalid_success_rate() {
356        let executor = deterministic::Runner::default();
357        executor.start(|context| async move {
358            // Create simulated network
359            let (network, mut oracle) = Network::new(
360                context.with_label("network"),
361                Config {
362                    max_size: 1024 * 1024,
363                },
364            );
365
366            // Start network
367            network.start();
368
369            // Register agents
370            let pk1 = PrivateKey::from_seed(0).public_key();
371            let pk2 = PrivateKey::from_seed(1).public_key();
372            oracle.register(pk1.clone(), 0).await.unwrap();
373            oracle.register(pk2.clone(), 0).await.unwrap();
374
375            // Attempt to link with invalid success rate
376            let result = oracle
377                .add_link(
378                    pk1,
379                    pk2,
380                    Link {
381                        latency: 5.0,
382                        jitter: 2.5,
383                        success_rate: 1.5,
384                    },
385                )
386                .await;
387
388            // Confirm error is correct
389            assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
390        });
391    }
392
393    #[test]
394    fn test_invalid_behavior() {
395        let executor = deterministic::Runner::default();
396        executor.start(|context| async move {
397            // Create simulated network
398            let (network, mut oracle) = Network::new(
399                context.with_label("network"),
400                Config {
401                    max_size: 1024 * 1024,
402                },
403            );
404
405            // Start network
406            network.start();
407
408            // Register agents
409            let pk1 = PrivateKey::from_seed(0).public_key();
410            let pk2 = PrivateKey::from_seed(1).public_key();
411            oracle.register(pk1.clone(), 0).await.unwrap();
412            oracle.register(pk2.clone(), 0).await.unwrap();
413
414            // Attempt to link with invalid jitter
415            let result = oracle
416                .add_link(
417                    pk1.clone(),
418                    pk2.clone(),
419                    Link {
420                        latency: -5.0,
421                        jitter: 2.5,
422                        success_rate: 1.0,
423                    },
424                )
425                .await;
426
427            // Confirm error is correct
428            assert!(matches!(result, Err(Error::InvalidBehavior(-5.0, 2.5))));
429
430            // Attempt to link with invalid jitter
431            let result = oracle
432                .add_link(
433                    pk1,
434                    pk2,
435                    Link {
436                        latency: 5.0,
437                        jitter: -2.5,
438                        success_rate: 1.0,
439                    },
440                )
441                .await;
442
443            // Confirm error is correct
444            assert!(matches!(result, Err(Error::InvalidBehavior(5.0, -2.5))));
445        });
446    }
447
448    #[test]
449    fn test_simple_message_delivery() {
450        let executor = deterministic::Runner::default();
451        executor.start(|context| async move {
452            // Create simulated network
453            let (network, mut oracle) = Network::new(
454                context.with_label("network"),
455                Config {
456                    max_size: 1024 * 1024,
457                },
458            );
459
460            // Start network
461            network.start();
462
463            // Register agents
464            let pk1 = PrivateKey::from_seed(0).public_key();
465            let pk2 = PrivateKey::from_seed(1).public_key();
466            let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
467            let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
468
469            // Register unused channels
470            let _ = oracle.register(pk1.clone(), 1).await.unwrap();
471            let _ = oracle.register(pk2.clone(), 2).await.unwrap();
472
473            // Link agents
474            oracle
475                .add_link(
476                    pk1.clone(),
477                    pk2.clone(),
478                    Link {
479                        latency: 5.0,
480                        jitter: 2.5,
481                        success_rate: 1.0,
482                    },
483                )
484                .await
485                .unwrap();
486            oracle
487                .add_link(
488                    pk2.clone(),
489                    pk1.clone(),
490                    Link {
491                        latency: 5.0,
492                        jitter: 2.5,
493                        success_rate: 1.0,
494                    },
495                )
496                .await
497                .unwrap();
498
499            // Send messages
500            let msg1 = Bytes::from("hello from pk1");
501            let msg2 = Bytes::from("hello from pk2");
502            sender1
503                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
504                .await
505                .unwrap();
506            sender2
507                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
508                .await
509                .unwrap();
510
511            // Confirm message delivery
512            let (sender, message) = receiver1.recv().await.unwrap();
513            assert_eq!(sender, pk2);
514            assert_eq!(message, msg2);
515            let (sender, message) = receiver2.recv().await.unwrap();
516            assert_eq!(sender, pk1);
517            assert_eq!(message, msg1);
518        });
519    }
520
521    #[test]
522    fn test_send_wrong_channel() {
523        let executor = deterministic::Runner::default();
524        executor.start(|context| async move {
525            // Create simulated network
526            let (network, mut oracle) = Network::new(
527                context.with_label("network"),
528                Config {
529                    max_size: 1024 * 1024,
530                },
531            );
532
533            // Start network
534            network.start();
535
536            // Register agents
537            let pk1 = PrivateKey::from_seed(0).public_key();
538            let pk2 = PrivateKey::from_seed(1).public_key();
539            let (mut sender1, _) = oracle.register(pk1.clone(), 0).await.unwrap();
540            let (_, mut receiver2) = oracle.register(pk2.clone(), 1).await.unwrap();
541
542            // Link agents
543            oracle
544                .add_link(
545                    pk1,
546                    pk2.clone(),
547                    Link {
548                        latency: 5.0,
549                        jitter: 0.0,
550                        success_rate: 1.0,
551                    },
552                )
553                .await
554                .unwrap();
555
556            // Send message
557            let msg = Bytes::from("hello from pk1");
558            sender1
559                .send(Recipients::One(pk2), msg, false)
560                .await
561                .unwrap();
562
563            // Confirm no message delivery
564            select! {
565                _ = receiver2.recv() => {
566                    panic!("unexpected message");
567                },
568                _ = context.sleep(Duration::from_secs(1)) => {},
569            }
570        });
571    }
572
573    #[test]
574    fn test_dynamic_peers() {
575        let executor = deterministic::Runner::default();
576        executor.start(|context| async move {
577            // Create simulated network
578            let (network, mut oracle) = Network::new(
579                context.with_label("network"),
580                Config {
581                    max_size: 1024 * 1024,
582                },
583            );
584
585            // Start network
586            network.start();
587
588            // Define agents
589            let pk1 = PrivateKey::from_seed(0).public_key();
590            let pk2 = PrivateKey::from_seed(1).public_key();
591            let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
592            let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
593
594            // Link agents
595            oracle
596                .add_link(
597                    pk1.clone(),
598                    pk2.clone(),
599                    Link {
600                        latency: 5.0,
601                        jitter: 2.5,
602                        success_rate: 1.0,
603                    },
604                )
605                .await
606                .unwrap();
607            oracle
608                .add_link(
609                    pk2.clone(),
610                    pk1.clone(),
611                    Link {
612                        latency: 5.0,
613                        jitter: 2.5,
614                        success_rate: 1.0,
615                    },
616                )
617                .await
618                .unwrap();
619
620            // Send messages
621            let msg1 = Bytes::from("attempt 1: hello from pk1");
622            let msg2 = Bytes::from("attempt 1: hello from pk2");
623            sender1
624                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
625                .await
626                .unwrap();
627            sender2
628                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
629                .await
630                .unwrap();
631
632            // Confirm message delivery
633            let (sender, message) = receiver1.recv().await.unwrap();
634            assert_eq!(sender, pk2);
635            assert_eq!(message, msg2);
636            let (sender, message) = receiver2.recv().await.unwrap();
637            assert_eq!(sender, pk1);
638            assert_eq!(message, msg1);
639        });
640    }
641
642    #[test]
643    fn test_dynamic_links() {
644        let executor = deterministic::Runner::default();
645        executor.start(|context| async move {
646            // Create simulated network
647            let (network, mut oracle) = Network::new(
648                context.with_label("network"),
649                Config {
650                    max_size: 1024 * 1024,
651                },
652            );
653
654            // Start network
655            network.start();
656
657            // Register agents
658            let pk1 = PrivateKey::from_seed(0).public_key();
659            let pk2 = PrivateKey::from_seed(1).public_key();
660            let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
661            let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
662
663            // Send messages
664            let msg1 = Bytes::from("attempt 1: hello from pk1");
665            let msg2 = Bytes::from("attempt 1: hello from pk2");
666            sender1
667                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
668                .await
669                .unwrap();
670            sender2
671                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
672                .await
673                .unwrap();
674
675            // Confirm no message delivery
676            select! {
677                _ = receiver1.recv() => {
678                    panic!("unexpected message");
679                },
680                _ = receiver2.recv() => {
681                    panic!("unexpected message");
682                },
683                _ = context.sleep(Duration::from_secs(1)) => {},
684            }
685
686            // Link agents
687            oracle
688                .add_link(
689                    pk1.clone(),
690                    pk2.clone(),
691                    Link {
692                        latency: 5.0,
693                        jitter: 2.5,
694                        success_rate: 1.0,
695                    },
696                )
697                .await
698                .unwrap();
699            oracle
700                .add_link(
701                    pk2.clone(),
702                    pk1.clone(),
703                    Link {
704                        latency: 5.0,
705                        jitter: 2.5,
706                        success_rate: 1.0,
707                    },
708                )
709                .await
710                .unwrap();
711
712            // Send messages
713            let msg1 = Bytes::from("attempt 2: hello from pk1");
714            let msg2 = Bytes::from("attempt 2: hello from pk2");
715            sender1
716                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
717                .await
718                .unwrap();
719            sender2
720                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
721                .await
722                .unwrap();
723
724            // Confirm message delivery
725            let (sender, message) = receiver1.recv().await.unwrap();
726            assert_eq!(sender, pk2);
727            assert_eq!(message, msg2);
728            let (sender, message) = receiver2.recv().await.unwrap();
729            assert_eq!(sender, pk1);
730            assert_eq!(message, msg1);
731
732            // Remove links
733            oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
734            oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
735
736            // Send messages
737            let msg1 = Bytes::from("attempt 3: hello from pk1");
738            let msg2 = Bytes::from("attempt 3: hello from pk2");
739            sender1
740                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
741                .await
742                .unwrap();
743            sender2
744                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
745                .await
746                .unwrap();
747
748            // Confirm no message delivery
749            select! {
750                _ = receiver1.recv() => {
751                    panic!("unexpected message");
752                },
753                _ = receiver2.recv() => {
754                    panic!("unexpected message");
755                },
756                _ = context.sleep(Duration::from_secs(1)) => {},
757            }
758
759            // Remove non-existent links
760            let result = oracle.remove_link(pk1, pk2).await;
761            assert!(matches!(result, Err(Error::LinkMissing)));
762        });
763    }
764}