Skip to main content

iroh_gossip/
proto.rs

1//! Implementation of the iroh-gossip protocol, as an IO-less state machine
2//!
3//! This module implements the iroh-gossip protocol. The entry point is [`State`], which contains
4//! the protocol state for a node.
5//!
6//! The iroh-gossip protocol is made up from two parts: A swarm membership protocol, based on
7//! [HyParView][hyparview], and a gossip broadcasting protocol, based on [PlumTree][plumtree].
8//!
9//! For a full explanation it is recommended to read the two papers. What follows is a brief
10//! outline of the protocols.
11//!
12//! All protocol messages are namespaced by a [`TopicId`], a 32 byte identifier. Topics are
13//! separate swarms and broadcast scopes. The HyParView and PlumTree algorithms both work in the
14//! scope of a single topic. Thus, joining multiple topics increases the number of open connections
15//! to peers and the size of the local routing table.
16//!
17//! The **membership protocol** ([HyParView][hyparview]) is a cluster protocol where each peer
18//! maintains a partial view of all nodes in the swarm.
19//! A peer joins the swarm for a topic by connecting to any known peer that is a member of this
20//! topic's swarm. Obtaining this initial contact info happens out of band. The peer then sends
21//! a `Join` message to that initial peer. All peers maintain a list of
22//! `active` and `passive` peers. Active peers are those that you maintain active connections to.
23//! Passive peers is an addressbook of additional peers. If one of your active peers goes offline,
24//! its slot is filled with a random peer from the passive set. In the default configuration, the
25//! active view has a size of 5 and the passive view a size of 30.
26//! The HyParView protocol ensures that active connections are always bidirectional, and regularly
27//! exchanges nodes for the passive view in a `Shuffle` operation.
28//! Thus, this protocol exposes a high degree of reliability and auto-recovery in the case of node
29//! failures.
30//!
31//! The **gossip protocol** ([PlumTree][plumtree]) builds upon the membership protocol. It exposes
32//! a method to broadcast messages to all peers in the swarm. On each node, it maintains two sets
33//! of peers: An `eager` set and a `lazy` set. Both are subsets of the `active` view from the
34//! membership protocol. When broadcasting a message from the local node, or upon receiving a
35//! broadcast message, the message is pushed to all peers in the eager set. Additionally, the hash
36//! of the message (which uniquely identifies it), but not the message content, is lazily pushed
37//! to all peers  in the `lazy` set. When receiving such lazy pushes (called `Ihaves`), those peers
38//! may request the message content after a timeout if they didn't receive the message by one of
39//! their eager peers before. When requesting a message from a currently-lazy peer, this peer is
40//! also upgraded to be an eager peer from that moment on. This strategy self-optimizes the
41//! messaging graph by latency. Note however that this optimization will work best if the messaging
42//! paths are stable, i.e. if it's always the same peer that broadcasts. If not, the relative
43//! message redundancy will grow and the ideal messaging graph might change frequently.
44//!
45//! [hyparview]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
46//! [plumtree]: https://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
47
48use std::{fmt, hash::Hash};
49
50use bytes::Bytes;
51use serde::{de::DeserializeOwned, Deserialize, Serialize};
52
53mod hyparview;
54mod plumtree;
55pub mod state;
56pub mod topic;
57pub mod util;
58
59#[cfg(any(test, feature = "test-utils"))]
60pub mod sim;
61
62pub use hyparview::Config as HyparviewConfig;
63pub use plumtree::{Config as PlumtreeConfig, DeliveryScope, Scope};
64pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId};
65pub use topic::{Command, Config, Event, IO};
66
67/// The default maximum size in bytes for a gossip message.
68/// This is a sane but arbitrary default and can be changed in the [`Config`].
69pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 4096;
70
71/// The minimum allowed value for [`Config::max_message_size`].
72pub const MIN_MAX_MESSAGE_SIZE: usize = 512;
73
74/// The identifier for a peer.
75///
76/// The protocol implementation is generic over this trait. When implementing the protocol,
77/// a concrete type must be chosen that will then be used throughout the implementation to identify
78/// and index individual peers.
79///
80/// Note that the concrete type will be used in protocol messages. Therefore, implementations of
81/// the protocol are only compatible if the same concrete type is supplied for this trait.
82///
83/// TODO: Rename to `PeerId`? It does not necessarily refer to a peer's address, as long as the
84/// networking layer can translate the value of its concrete type into an address.
85pub trait PeerIdentity: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned {}
86impl<T> PeerIdentity for T where
87    T: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned
88{
89}
90
91/// Opaque binary data that is transmitted on messages that introduce new peers.
92///
93/// Implementations may use these bytes to supply addresses or other information needed to connect
94/// to a peer that is not included in the peer's [`PeerIdentity`].
95#[derive(derive_more::Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
96#[debug("PeerData({}b)", self.0.len())]
97pub struct PeerData(Bytes);
98
99impl PeerData {
100    /// Create a new [`PeerData`] from a byte buffer.
101    pub fn new(data: impl Into<Bytes>) -> Self {
102        Self(data.into())
103    }
104
105    /// Get a reference to the contained [`bytes::Bytes`].
106    pub fn inner(&self) -> &bytes::Bytes {
107        &self.0
108    }
109
110    /// Get the peer data as a byte slice.
111    pub fn as_bytes(&self) -> &[u8] {
112        &self.0
113    }
114}
115
116/// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer.
117#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
118struct PeerInfo<PI> {
119    pub id: PI,
120    pub data: Option<PeerData>,
121}
122
123impl<PI> From<(PI, Option<PeerData>)> for PeerInfo<PI> {
124    fn from((id, data): (PI, Option<PeerData>)) -> Self {
125        Self { id, data }
126    }
127}
128
129#[cfg(test)]
130mod test {
131    use std::{collections::HashSet, env, fmt, str::FromStr};
132
133    use n0_tracing_test::traced_test;
134    use rand::{rngs::ChaCha12Rng, SeedableRng};
135
136    use super::{Command, Config, Event};
137    use crate::proto::{
138        sim::{LatencyConfig, Network, NetworkConfig},
139        Scope, TopicId,
140    };
141
142    #[test]
143    #[traced_test]
144    fn hyparview_smoke() {
145        // Create a network with 4 nodes and active_view_capacity 2
146        let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
147        let mut config = Config::default();
148        config.membership.active_view_capacity = 2;
149        let network_config = NetworkConfig {
150            proto: config,
151            latency: LatencyConfig::default_static(),
152        };
153        let mut network = Network::new(network_config, rng);
154        for i in 0..4 {
155            network.insert(i);
156        }
157
158        let t: TopicId = [0u8; 32].into();
159
160        // Do some joins between nodes 0,1,2
161        network.command(0, t, Command::Join(vec![1, 2]));
162        network.command(1, t, Command::Join(vec![2]));
163        network.command(2, t, Command::Join(vec![]));
164
165        network.run_trips(3);
166
167        // Confirm emitted events
168        let actual = network.events_sorted();
169        let expected = sort(vec![
170            (0, t, Event::NeighborUp(1)),
171            (0, t, Event::NeighborUp(2)),
172            (1, t, Event::NeighborUp(2)),
173            (1, t, Event::NeighborUp(0)),
174            (2, t, Event::NeighborUp(0)),
175            (2, t, Event::NeighborUp(1)),
176        ]);
177        assert_eq!(actual, expected);
178
179        // Confirm active connections
180        assert_eq!(network.conns(), vec![(0, 1), (0, 2), (1, 2)]);
181
182        // Now let node 3 join node 0.
183        // Node 0 is full, so it will disconnect from either node 1 or node 2.
184        network.command(3, t, Command::Join(vec![0]));
185
186        network.run_trips(2);
187
188        // Confirm emitted events. There's two options because whether node 0 disconnects from
189        // node 1 or node 2 is random.
190        let actual = network.events_sorted();
191        eprintln!("actual {actual:#?}");
192        let expected1 = sort(vec![
193            (3, t, Event::NeighborUp(0)),
194            (0, t, Event::NeighborUp(3)),
195            (0, t, Event::NeighborDown(1)),
196            (1, t, Event::NeighborDown(0)),
197        ]);
198        let expected2 = sort(vec![
199            (3, t, Event::NeighborUp(0)),
200            (0, t, Event::NeighborUp(3)),
201            (0, t, Event::NeighborDown(2)),
202            (2, t, Event::NeighborDown(0)),
203        ]);
204        assert!((actual == expected1) || (actual == expected2));
205
206        // Confirm active connections.
207        if actual == expected1 {
208            assert_eq!(network.conns(), vec![(0, 2), (0, 3), (1, 2)]);
209        } else {
210            assert_eq!(network.conns(), vec![(0, 1), (0, 3), (1, 2)]);
211        }
212        assert!(network.check_synchronicity());
213    }
214
215    #[test]
216    #[traced_test]
217    fn plumtree_smoke() {
218        let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
219        let network_config = NetworkConfig {
220            proto: Config::default(),
221            latency: LatencyConfig::default_static(),
222        };
223        let mut network = Network::new(network_config, rng);
224        // build a network with 6 nodes
225        for i in 0..6 {
226            network.insert(i);
227        }
228
229        let t = [0u8; 32].into();
230
231        // let node 0 join the topic but do not connect to any peers
232        network.command(0, t, Command::Join(vec![]));
233        // connect nodes 1 and 2 to node 0
234        (1..3).for_each(|i| network.command(i, t, Command::Join(vec![0])));
235        // connect nodes 4 and 5 to node 3
236        network.command(3, t, Command::Join(vec![]));
237        (4..6).for_each(|i| network.command(i, t, Command::Join(vec![3])));
238        // run ticks and drain events
239
240        network.run_trips(4);
241
242        let _ = network.events();
243        assert!(network.check_synchronicity());
244
245        // now broadcast a first message
246        network.command(
247            1,
248            t,
249            Command::Broadcast(b"hi1".to_vec().into(), Scope::Swarm),
250        );
251
252        network.run_trips(4);
253
254        let events = network.events();
255        let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
256        // message should be received by two other nodes
257        assert_eq!(received.count(), 2);
258        assert!(network.check_synchronicity());
259
260        // now connect the two sections of the swarm
261        network.command(2, t, Command::Join(vec![5]));
262        network.run_trips(3);
263        let _ = network.events();
264        println!("{}", network.report());
265
266        // now broadcast again
267        network.command(
268            1,
269            t,
270            Command::Broadcast(b"hi2".to_vec().into(), Scope::Swarm),
271        );
272        network.run_trips(5);
273        let events = network.events();
274        let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
275        // message should be received by all 5 other nodes
276        assert_eq!(received.count(), 5);
277        assert!(network.check_synchronicity());
278        println!("{}", network.report());
279    }
280
281    #[test]
282    #[traced_test]
283    fn quit() {
284        // Create a network with 4 nodes and active_view_capacity 2
285        let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
286        let mut config = Config::default();
287        config.membership.active_view_capacity = 2;
288        let mut network = Network::new(config.into(), rng);
289        let num = 4;
290        for i in 0..num {
291            network.insert(i);
292        }
293
294        let t: TopicId = [0u8; 32].into();
295
296        // join all nodes
297        network.command(0, t, Command::Join(vec![]));
298        network.command(1, t, Command::Join(vec![0]));
299        network.command(2, t, Command::Join(vec![1]));
300        network.command(3, t, Command::Join(vec![2]));
301        network.run_trips(2);
302
303        // assert all peers appear in the connections
304        let all_conns: HashSet<u64> = HashSet::from_iter((0u64..4).flat_map(|p| {
305            network
306                .neighbors(&p, &t)
307                .into_iter()
308                .flat_map(|x| x.into_iter())
309        }));
310        assert_eq!(all_conns, HashSet::from_iter([0, 1, 2, 3]));
311        assert!(network.check_synchronicity());
312
313        //  let node 3 leave the swarm
314        network.command(3, t, Command::Quit);
315        network.run_trips(4);
316        assert!(network.peer(&3).unwrap().state(&t).is_none());
317
318        // assert all peers without peer 3 appear in the connections
319        let all_conns: HashSet<u64> = HashSet::from_iter((0..num).flat_map(|p| {
320            network
321                .neighbors(&p, &t)
322                .into_iter()
323                .flat_map(|x| x.into_iter())
324        }));
325        assert_eq!(all_conns, HashSet::from_iter([0, 1, 2]));
326        assert!(network.check_synchronicity());
327    }
328
329    fn read_var<T: FromStr<Err: fmt::Display + fmt::Debug>>(name: &str, default: T) -> T {
330        env::var(name)
331            .map(|x| {
332                x.parse()
333                    .unwrap_or_else(|_| panic!("Failed to parse environment variable {name}"))
334            })
335            .unwrap_or(default)
336    }
337
338    fn sort<T: Ord + Clone>(items: Vec<T>) -> Vec<T> {
339        let mut sorted = items;
340        sorted.sort();
341        sorted
342    }
343}