nakamoto_net/
simulator.rs

1//! A simple P2P network simulator. Acts as the _reactor_, but without doing any I/O.
2#![allow(clippy::collapsible_if)]
3
4use crate::{Disconnect, Io, Link, LocalDuration, LocalTime};
5use log::*;
6
7use std::borrow::Cow;
8use std::collections::{BTreeMap, BTreeSet, VecDeque};
9use std::ops::{Deref, DerefMut, Range};
10use std::{fmt, io, net};
11
12use crate::StateMachine;
13
14#[cfg(feature = "quickcheck")]
15pub mod arbitrary;
16
17/// Minimum latency between peers.
18pub const MIN_LATENCY: LocalDuration = LocalDuration::from_millis(1);
19/// Maximum number of events buffered per peer.
20pub const MAX_EVENTS: usize = 2048;
21
22/// Identifier for a simulated node/peer.
23/// The simulator requires each peer to have a distinct IP address.
24type NodeId = net::IpAddr;
25
26/// A simulated peer. Protocol instances have to be wrapped in this type to be simulated.
27pub trait Peer<P>: Deref<Target = P> + DerefMut<Target = P> + 'static
28where
29    P: StateMachine,
30{
31    /// Initialize the peer. This should at minimum initialize the protocol with the
32    /// current time.
33    fn init(&mut self);
34    /// Get the peer address.
35    fn addr(&self) -> net::SocketAddr;
36}
37
38/// Simulated protocol input.
39#[derive(Debug, Clone)]
40pub enum Input<M, D> {
41    /// Connection attempt underway.
42    Connecting {
43        /// Remote peer address.
44        addr: net::SocketAddr,
45    },
46    /// New connection with a peer.
47    Connected {
48        /// Remote peer id.
49        addr: net::SocketAddr,
50        /// Local peer id.
51        local_addr: net::SocketAddr,
52        /// Link direction.
53        link: Link,
54    },
55    /// Disconnected from peer.
56    Disconnected(net::SocketAddr, Disconnect<D>),
57    /// Received a message from a remote peer.
58    Received(net::SocketAddr, M),
59    /// Used to advance the state machine after some wall time has passed.
60    Wake,
61}
62
63/// A scheduled protocol input.
64#[derive(Debug, Clone)]
65pub struct Scheduled<M, D> {
66    /// The node for which this input is scheduled.
67    pub node: NodeId,
68    /// The remote peer from which this input originates.
69    /// If the input originates from the local node, this should be set to the zero address.
70    pub remote: net::SocketAddr,
71    /// The input being scheduled.
72    pub input: Input<M, D>,
73}
74
75impl<M: fmt::Debug, D: fmt::Display> fmt::Display for Scheduled<M, D> {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        match &self.input {
78            Input::Received(from, msg) => {
79                write!(f, "{} <- {} ({:?})", self.node, from, msg)
80            }
81            Input::Connected {
82                addr,
83                local_addr,
84                link: Link::Inbound,
85                ..
86            } => write!(f, "{} <== {}: Connected", local_addr, addr),
87            Input::Connected {
88                local_addr,
89                addr,
90                link: Link::Outbound,
91                ..
92            } => write!(f, "{} ==> {}: Connected", local_addr, addr),
93            Input::Connecting { addr } => {
94                write!(f, "{} => {}: Connecting", self.node, addr)
95            }
96            Input::Disconnected(addr, reason) => {
97                write!(f, "{} =/= {}: Disconnected: {}", self.node, addr, reason)
98            }
99            Input::Wake => {
100                write!(f, "{}: Tock", self.node)
101            }
102        }
103    }
104}
105
106/// Inbox of scheduled state machine inputs to be delivered to the simulated nodes.
107#[derive(Debug)]
108pub struct Inbox<M, D> {
109    /// The set of scheduled inputs. We use a `BTreeMap` to ensure inputs are always
110    /// ordered by scheduled delivery time.
111    messages: BTreeMap<LocalTime, Scheduled<M, D>>,
112}
113
114impl<M: Clone, D: Clone> Inbox<M, D> {
115    /// Add a scheduled input to the inbox.
116    fn insert(&mut self, mut time: LocalTime, msg: Scheduled<M, D>) {
117        // Make sure we don't overwrite an existing message by using the same time slot.
118        while self.messages.contains_key(&time) {
119            time = time + MIN_LATENCY;
120        }
121        self.messages.insert(time, msg);
122    }
123
124    /// Get the next scheduled input to be delivered.
125    fn next(&mut self) -> Option<(LocalTime, Scheduled<M, D>)> {
126        self.messages
127            .iter()
128            .next()
129            .map(|(time, scheduled)| (*time, scheduled.clone()))
130    }
131
132    /// Get the last message sent between two peers. Only checks one direction.
133    fn last(
134        &self,
135        node: &NodeId,
136        remote: &net::SocketAddr,
137    ) -> Option<(&LocalTime, &Scheduled<M, D>)> {
138        self.messages
139            .iter()
140            .rev()
141            .find(|(_, v)| &v.node == node && &v.remote == remote)
142    }
143}
144
145/// Simulation options.
146#[derive(Debug, Clone)]
147pub struct Options {
148    /// Minimum and maximum latency between nodes, in seconds.
149    pub latency: Range<u64>,
150    /// Probability that network I/O fails.
151    /// A rate of `1.0` means 100% of I/O fails.
152    pub failure_rate: f64,
153}
154
155impl Default for Options {
156    fn default() -> Self {
157        Self {
158            latency: Range::default(),
159            failure_rate: 0.,
160        }
161    }
162}
163
164/// A peer-to-peer node simulation.
165pub struct Simulation<T>
166where
167    T: StateMachine,
168{
169    /// Inbox of inputs to be delivered by the simulation.
170    inbox: Inbox<<T::Message as ToOwned>::Owned, T::DisconnectReason>,
171    /// Events emitted during simulation.
172    events: BTreeMap<NodeId, VecDeque<T::Event>>,
173    /// Priority events that should happen immediately.
174    priority: VecDeque<Scheduled<<T::Message as ToOwned>::Owned, T::DisconnectReason>>,
175    /// Simulated latencies between nodes.
176    latencies: BTreeMap<(NodeId, NodeId), LocalDuration>,
177    /// Network partitions between two nodes.
178    partitions: BTreeSet<(NodeId, NodeId)>,
179    /// Set of existing connections between nodes.
180    connections: BTreeMap<(NodeId, NodeId), u16>,
181    /// Set of connection attempts.
182    attempts: BTreeSet<(NodeId, NodeId)>,
183    /// Simulation options.
184    opts: Options,
185    /// Start time of simulation.
186    start_time: LocalTime,
187    /// Current simulation time. Updated when a scheduled message is processed.
188    time: LocalTime,
189    /// RNG.
190    rng: fastrand::Rng,
191}
192
193impl<T> Simulation<T>
194where
195    T: StateMachine + 'static,
196    T::DisconnectReason: Clone + Into<Disconnect<T::DisconnectReason>>,
197
198    <T::Message as ToOwned>::Owned: fmt::Debug + Clone,
199{
200    /// Create a new simulation.
201    pub fn new(time: LocalTime, rng: fastrand::Rng, opts: Options) -> Self {
202        Self {
203            inbox: Inbox {
204                messages: BTreeMap::new(),
205            },
206            events: BTreeMap::new(),
207            priority: VecDeque::new(),
208            partitions: BTreeSet::new(),
209            latencies: BTreeMap::new(),
210            connections: BTreeMap::new(),
211            attempts: BTreeSet::new(),
212            opts,
213            start_time: time,
214            time,
215            rng,
216        }
217    }
218
219    /// Check whether the simulation is done, ie. there are no more messages to process.
220    pub fn is_done(&self) -> bool {
221        self.inbox.messages.is_empty()
222    }
223
224    /// Total amount of simulated time elapsed.
225    #[allow(dead_code)]
226    pub fn elapsed(&self) -> LocalDuration {
227        self.time - self.start_time
228    }
229
230    /// Check whether the simulation has settled, ie. the only messages left to process
231    /// are (periodic) timeouts.
232    pub fn is_settled(&self) -> bool {
233        self.inbox
234            .messages
235            .iter()
236            .all(|(_, s)| matches!(s.input, Input::Wake))
237    }
238
239    /// Get a node's emitted events.
240    pub fn events(&mut self, node: &NodeId) -> impl Iterator<Item = T::Event> + '_ {
241        self.events.entry(*node).or_default().drain(..)
242    }
243
244    /// Get the latency between two nodes. The minimum latency between nodes is 1 millisecond.
245    pub fn latency(&self, from: NodeId, to: NodeId) -> LocalDuration {
246        self.latencies
247            .get(&(from, to))
248            .cloned()
249            .map(|l| {
250                if l <= MIN_LATENCY {
251                    l
252                } else {
253                    // Create variance in the latency. The resulting latency
254                    // will be between half, and two times the base latency.
255                    let millis = l.as_millis();
256
257                    if self.rng.bool() {
258                        // More latency.
259                        LocalDuration::from_millis(millis + self.rng.u128(0..millis))
260                    } else {
261                        // Less latency.
262                        LocalDuration::from_millis(millis - self.rng.u128(0..millis / 2))
263                    }
264                }
265            })
266            .unwrap_or_else(|| MIN_LATENCY)
267    }
268
269    /// Initialize peers.
270    pub fn initialize<'a, P: Peer<T>>(self, peers: impl IntoIterator<Item = &'a mut P>) -> Self {
271        for peer in peers.into_iter() {
272            peer.init();
273        }
274        self
275    }
276
277    /// Run the simulation while the given predicate holds.
278    pub fn run_while<'a, P: Peer<T>>(
279        &mut self,
280        peers: impl IntoIterator<Item = &'a mut P>,
281        pred: impl Fn(&Self) -> bool,
282    ) {
283        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.addr().ip(), p)).collect();
284
285        while self.step_(&mut nodes) {
286            if !pred(self) {
287                break;
288            }
289        }
290    }
291
292    /// Process one scheduled input from the inbox, using the provided peers.
293    /// This function should be called until it returns `false`, or some desired state is reached.
294    /// Returns `true` if there are more messages to process.
295    pub fn step<'a, P: Peer<T>>(&mut self, peers: impl IntoIterator<Item = &'a mut P>) -> bool {
296        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.addr().ip(), p)).collect();
297        self.step_(&mut nodes)
298    }
299
300    fn step_<P: Peer<T>>(&mut self, nodes: &mut BTreeMap<NodeId, &mut P>) -> bool {
301        if !self.opts.latency.is_empty() {
302            // Configure latencies.
303            for (i, from) in nodes.keys().enumerate() {
304                for to in nodes.keys().skip(i + 1) {
305                    let range = self.opts.latency.clone();
306                    let latency = LocalDuration::from_millis(
307                        self.rng
308                            .u128(range.start as u128 * 1_000..range.end as u128 * 1_000),
309                    );
310
311                    self.latencies.entry((*from, *to)).or_insert(latency);
312                    self.latencies.entry((*to, *from)).or_insert(latency);
313                }
314            }
315        }
316
317        // Create and heal partitions.
318        // TODO: These aren't really "network" partitions, as they are only
319        // between individual nodes. We need to think about more realistic
320        // scenarios. We should also think about creating various network
321        // topologies.
322        if self.time.as_secs() % 10 == 0 {
323            for (i, x) in nodes.keys().enumerate() {
324                for y in nodes.keys().skip(i + 1) {
325                    if self.is_fallible() {
326                        self.partitions.insert((*x, *y));
327                    } else {
328                        self.partitions.remove(&(*x, *y));
329                    }
330                }
331            }
332        }
333
334        // Schedule any messages in the pipes.
335        for peer in nodes.values_mut() {
336            let ip = peer.addr().ip();
337
338            for o in peer.by_ref() {
339                self.schedule(&ip, o);
340            }
341        }
342        // Next high-priority message.
343        let priority = self.priority.pop_front().map(|s| (self.time, s));
344
345        if let Some((time, next)) = priority.or_else(|| self.inbox.next()) {
346            let elapsed = (time - self.start_time).as_millis();
347            if matches!(next.input, Input::Wake) {
348                trace!(target: "sim", "{:05} {}", elapsed, next);
349            } else {
350                // TODO: This can be confusing, since this event may not actually be passed to
351                // the protocol. It would be best to only log the events that are being sent
352                // to the protocol, or to log when an input is being dropped.
353                info!(target: "sim", "{:05} {} ({})", elapsed, next, self.inbox.messages.len());
354            }
355            assert!(time >= self.time, "Time only moves forwards!");
356
357            self.time = time;
358            self.inbox.messages.remove(&time);
359
360            let Scheduled { input, node, .. } = next;
361
362            if let Some(ref mut p) = nodes.get_mut(&node) {
363                p.tick(time);
364
365                match input {
366                    Input::Connecting { addr } => {
367                        if self.attempts.insert((node, addr.ip())) {
368                            p.attempted(&addr);
369                        }
370                    }
371                    Input::Connected {
372                        addr,
373                        local_addr,
374                        link,
375                    } => {
376                        let conn = (node, addr.ip());
377
378                        let attempted = link.is_outbound() && self.attempts.remove(&conn);
379                        if attempted || link.is_inbound() {
380                            if self.connections.insert(conn, local_addr.port()).is_none() {
381                                p.connected(addr, &local_addr, link);
382                            }
383                        }
384                    }
385                    Input::Disconnected(addr, reason) => {
386                        let conn = (node, addr.ip());
387                        let attempt = self.attempts.remove(&conn);
388                        let connection = self.connections.remove(&conn).is_some();
389
390                        // Can't be both attempting and connected.
391                        assert!(!(attempt && connection));
392
393                        if attempt || connection {
394                            p.disconnected(&addr, reason);
395                        }
396                    }
397                    Input::Wake => p.timer_expired(),
398                    Input::Received(addr, msg) => {
399                        p.message_received(&addr, Cow::Owned(msg));
400                    }
401                }
402                for o in p.by_ref() {
403                    self.schedule(&node, o);
404                }
405            } else {
406                panic!(
407                    "Node {} not found when attempting to schedule {:?}",
408                    node, input
409                );
410            }
411        }
412        !self.is_done()
413    }
414
415    /// Process a protocol output event from a node.
416    pub fn schedule(
417        &mut self,
418        node: &NodeId,
419        out: Io<<T::Message as ToOwned>::Owned, T::Event, T::DisconnectReason, net::SocketAddr>,
420    ) {
421        let node = *node;
422
423        match out {
424            Io::Write(receiver, msg) => {
425                // If the other end has disconnected the sender with some latency, there may not be
426                // a connection remaining to use.
427                let port = if let Some(port) = self.connections.get(&(node, receiver.ip())) {
428                    *port
429                } else {
430                    return;
431                };
432
433                let sender: net::SocketAddr = (node, port).into();
434                if self.is_partitioned(sender.ip(), receiver.ip()) {
435                    // Drop message if nodes are partitioned.
436                    info!(
437                        target: "sim",
438                        "{} -> {} (DROPPED)",
439                         sender, receiver,
440                    );
441                    return;
442                }
443
444                // Schedule message in the future, ensuring messages don't arrive out-of-order
445                // between two peers.
446                let latency = self.latency(node, receiver.ip());
447                let time = self
448                    .inbox
449                    .last(&receiver.ip(), &sender)
450                    .map(|(k, _)| *k)
451                    .unwrap_or_else(|| self.time);
452                let time = time + latency;
453                let elapsed = (time - self.start_time).as_millis();
454
455                info!(
456                    target: "sim",
457                    "{:05} {} -> {}: ({:?}) ({})",
458                    elapsed, sender, receiver, &msg, latency
459                );
460
461                self.inbox.insert(
462                    time,
463                    Scheduled {
464                        remote: sender,
465                        node: receiver.ip(),
466                        input: Input::Received(sender, msg),
467                    },
468                );
469            }
470            Io::Connect(remote) => {
471                assert!(remote.ip() != node, "self-connections are not allowed");
472
473                // Create an ephemeral sockaddr for the connecting (local) node.
474                let local_addr: net::SocketAddr = net::SocketAddr::new(node, self.rng.u16(8192..));
475                let latency = self.latency(node, remote.ip());
476
477                self.inbox.insert(
478                    self.time + MIN_LATENCY,
479                    Scheduled {
480                        node,
481                        remote,
482                        input: Input::Connecting { addr: remote },
483                    },
484                );
485
486                // Fail to connect if the nodes are partitioned.
487                if self.is_partitioned(node, remote.ip()) {
488                    log::info!(target: "sim", "{} -/-> {} (partitioned)", node, remote.ip());
489
490                    // Sometimes, the protocol gets a failure input, other times it just hangs.
491                    if self.rng.bool() {
492                        self.inbox.insert(
493                            self.time + MIN_LATENCY,
494                            Scheduled {
495                                node,
496                                remote,
497                                input: Input::Disconnected(
498                                    remote,
499                                    Disconnect::ConnectionError(
500                                        io::Error::from(io::ErrorKind::UnexpectedEof).into(),
501                                    ),
502                                ),
503                            },
504                        );
505                    }
506                    return;
507                }
508
509                self.inbox.insert(
510                    // The remote will get the connection attempt with some latency.
511                    self.time + latency,
512                    Scheduled {
513                        node: remote.ip(),
514                        remote: local_addr,
515                        input: Input::Connected {
516                            addr: local_addr,
517                            local_addr: remote,
518                            link: Link::Inbound,
519                        },
520                    },
521                );
522                self.inbox.insert(
523                    // The local node will have established the connection after some latency.
524                    self.time + latency,
525                    Scheduled {
526                        remote,
527                        node,
528                        input: Input::Connected {
529                            addr: remote,
530                            local_addr,
531                            link: Link::Outbound,
532                        },
533                    },
534                );
535            }
536            Io::Disconnect(remote, reason) => {
537                // The local node is immediately disconnected.
538                self.priority.push_back(Scheduled {
539                    remote,
540                    node,
541                    input: Input::Disconnected(remote, reason.into()),
542                });
543
544                // Nb. It's possible for disconnects to happen simultaneously from both ends, hence
545                // it can be that a node will try to disconnect a remote that is already
546                // disconnected from the other side.
547                //
548                // It's also possible that the connection was only attempted and never succeeded,
549                // in which case we would return here.
550                let port = if let Some(port) = self.connections.get(&(node, remote.ip())) {
551                    *port
552                } else {
553                    debug!(target: "sim", "Ignoring disconnect of {remote} from {node}");
554                    return;
555                };
556                let local_addr: net::SocketAddr = (node, port).into();
557                let latency = self.latency(node, remote.ip());
558
559                // The remote node receives the disconnection with some delay.
560                self.inbox.insert(
561                    self.time + latency,
562                    Scheduled {
563                        node: remote.ip(),
564                        remote: local_addr,
565                        input: Input::Disconnected(
566                            local_addr,
567                            Disconnect::ConnectionError(
568                                io::Error::from(io::ErrorKind::ConnectionReset).into(),
569                            ),
570                        ),
571                    },
572                );
573            }
574            Io::SetTimer(duration) => {
575                let time = self.time + duration;
576
577                if !matches!(
578                    self.inbox.messages.get(&time),
579                    Some(Scheduled {
580                        input: Input::Wake,
581                        ..
582                    })
583                ) {
584                    self.inbox.insert(
585                        time,
586                        Scheduled {
587                            node,
588                            // The remote is not applicable for this type of output.
589                            remote: ([0, 0, 0, 0], 0).into(),
590                            input: Input::Wake,
591                        },
592                    );
593                }
594            }
595            Io::Event(event) => {
596                let events = self.events.entry(node).or_insert_with(VecDeque::new);
597                if events.len() >= MAX_EVENTS {
598                    warn!(target: "sim", "Dropping event: buffer is full");
599                } else {
600                    events.push_back(event);
601                }
602            }
603        }
604    }
605
606    /// Check whether we should fail the next operation.
607    fn is_fallible(&self) -> bool {
608        self.rng.f64() % 1.0 < self.opts.failure_rate
609    }
610
611    /// Check whether two nodes are partitioned.
612    fn is_partitioned(&self, a: NodeId, b: NodeId) -> bool {
613        self.partitions.contains(&(a, b)) || self.partitions.contains(&(b, a))
614    }
615}