iroh_gossip/proto.rs
1//! Implementation of the iroh-gossip protocol, as an IO-less state machine
2//!
3//! This module implements the iroh-gossip protocol. The entry point is [`State`], which contains
4//! the protocol state for a node.
5//!
6//! The iroh-gossip protocol is made up from two parts: A swarm membership protocol, based on
7//! [HyParView][hyparview], and a gossip broadcasting protocol, based on [PlumTree][plumtree].
8//!
9//! For a full explanation it is recommended to read the two papers. What follows is a brief
10//! outline of the protocols.
11//!
12//! All protocol messages are namespaced by a [`TopicId`], a 32 byte identifier. Topics are
13//! separate swarms and broadcast scopes. The HyParView and PlumTree algorithms both work in the
14//! scope of a single topic. Thus, joining multiple topics increases the number of open connections
15//! to peers and the size of the local routing table.
16//!
17//! The **membership protocol** ([HyParView][hyparview]) is a cluster protocol where each peer
18//! maintains a partial view of all nodes in the swarm.
19//! A peer joins the swarm for a topic by connecting to any known peer that is a member of this
20//! topic's swarm. Obtaining this initial contact info happens out of band. The peer then sends
21//! a `Join` message to that initial peer. All peers maintain a list of
22//! `active` and `passive` peers. Active peers are those that you maintain active connections to.
23//! Passive peers is an addressbook of additional peers. If one of your active peers goes offline,
24//! its slot is filled with a random peer from the passive set. In the default configuration, the
25//! active view has a size of 5 and the passive view a size of 30.
26//! The HyParView protocol ensures that active connections are always bidirectional, and regularly
27//! exchanges nodes for the passive view in a `Shuffle` operation.
28//! Thus, this protocol exposes a high degree of reliability and auto-recovery in the case of node
29//! failures.
30//!
31//! The **gossip protocol** ([PlumTree][plumtree]) builds upon the membership protocol. It exposes
32//! a method to broadcast messages to all peers in the swarm. On each node, it maintains two sets
33//! of peers: An `eager` set and a `lazy` set. Both are subsets of the `active` view from the
34//! membership protocol. When broadcasting a message from the local node, or upon receiving a
35//! broadcast message, the message is pushed to all peers in the eager set. Additionally, the hash
36//! of the message (which uniquely identifies it), but not the message content, is lazily pushed
37//! to all peers in the `lazy` set. When receiving such lazy pushes (called `Ihaves`), those peers
38//! may request the message content after a timeout if they didn't receive the message by one of
39//! their eager peers before. When requesting a message from a currently-lazy peer, this peer is
40//! also upgraded to be an eager peer from that moment on. This strategy self-optimizes the
41//! messaging graph by latency. Note however that this optimization will work best if the messaging
42//! paths are stable, i.e. if it's always the same peer that broadcasts. If not, the relative
43//! message redundancy will grow and the ideal messaging graph might change frequently.
44//!
45//! [hyparview]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
46//! [plumtree]: https://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf
47
48use std::{fmt, hash::Hash};
49
50use bytes::Bytes;
51use serde::{de::DeserializeOwned, Deserialize, Serialize};
52
53mod hyparview;
54mod plumtree;
55pub mod state;
56pub mod topic;
57pub mod util;
58
59#[cfg(any(test, feature = "test-utils"))]
60pub mod sim;
61
62pub use hyparview::Config as HyparviewConfig;
63pub use plumtree::{Config as PlumtreeConfig, DeliveryScope, Scope};
64pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId};
65pub use topic::{Command, Config, Event, IO};
66
67/// The default maximum size in bytes for a gossip message.
68/// This is a sane but arbitrary default and can be changed in the [`Config`].
69pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 4096;
70
71/// The minimum allowed value for [`Config::max_message_size`].
72pub const MIN_MAX_MESSAGE_SIZE: usize = 512;
73
74/// The identifier for a peer.
75///
76/// The protocol implementation is generic over this trait. When implementing the protocol,
77/// a concrete type must be chosen that will then be used throughout the implementation to identify
78/// and index individual peers.
79///
80/// Note that the concrete type will be used in protocol messages. Therefore, implementations of
81/// the protocol are only compatible if the same concrete type is supplied for this trait.
82///
83/// TODO: Rename to `PeerId`? It does not necessarily refer to a peer's address, as long as the
84/// networking layer can translate the value of its concrete type into an address.
85pub trait PeerIdentity: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned {}
86impl<T> PeerIdentity for T where
87 T: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned
88{
89}
90
91/// Opaque binary data that is transmitted on messages that introduce new peers.
92///
93/// Implementations may use these bytes to supply addresses or other information needed to connect
94/// to a peer that is not included in the peer's [`PeerIdentity`].
95#[derive(derive_more::Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
96#[debug("PeerData({}b)", self.0.len())]
97pub struct PeerData(Bytes);
98
99impl PeerData {
100 /// Create a new [`PeerData`] from a byte buffer.
101 pub fn new(data: impl Into<Bytes>) -> Self {
102 Self(data.into())
103 }
104
105 /// Get a reference to the contained [`bytes::Bytes`].
106 pub fn inner(&self) -> &bytes::Bytes {
107 &self.0
108 }
109
110 /// Get the peer data as a byte slice.
111 pub fn as_bytes(&self) -> &[u8] {
112 &self.0
113 }
114}
115
116/// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer.
117#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
118struct PeerInfo<PI> {
119 pub id: PI,
120 pub data: Option<PeerData>,
121}
122
123impl<PI> From<(PI, Option<PeerData>)> for PeerInfo<PI> {
124 fn from((id, data): (PI, Option<PeerData>)) -> Self {
125 Self { id, data }
126 }
127}
128
129#[cfg(test)]
130mod test {
131 use std::{collections::HashSet, env, fmt, str::FromStr};
132
133 use n0_tracing_test::traced_test;
134 use rand::{rngs::ChaCha12Rng, SeedableRng};
135
136 use super::{Command, Config, Event};
137 use crate::proto::{
138 sim::{LatencyConfig, Network, NetworkConfig},
139 Scope, TopicId,
140 };
141
142 #[test]
143 #[traced_test]
144 fn hyparview_smoke() {
145 // Create a network with 4 nodes and active_view_capacity 2
146 let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
147 let mut config = Config::default();
148 config.membership.active_view_capacity = 2;
149 let network_config = NetworkConfig {
150 proto: config,
151 latency: LatencyConfig::default_static(),
152 };
153 let mut network = Network::new(network_config, rng);
154 for i in 0..4 {
155 network.insert(i);
156 }
157
158 let t: TopicId = [0u8; 32].into();
159
160 // Do some joins between nodes 0,1,2
161 network.command(0, t, Command::Join(vec![1, 2]));
162 network.command(1, t, Command::Join(vec![2]));
163 network.command(2, t, Command::Join(vec![]));
164
165 network.run_trips(3);
166
167 // Confirm emitted events
168 let actual = network.events_sorted();
169 let expected = sort(vec![
170 (0, t, Event::NeighborUp(1)),
171 (0, t, Event::NeighborUp(2)),
172 (1, t, Event::NeighborUp(2)),
173 (1, t, Event::NeighborUp(0)),
174 (2, t, Event::NeighborUp(0)),
175 (2, t, Event::NeighborUp(1)),
176 ]);
177 assert_eq!(actual, expected);
178
179 // Confirm active connections
180 assert_eq!(network.conns(), vec![(0, 1), (0, 2), (1, 2)]);
181
182 // Now let node 3 join node 0.
183 // Node 0 is full, so it will disconnect from either node 1 or node 2.
184 network.command(3, t, Command::Join(vec![0]));
185
186 network.run_trips(2);
187
188 // Confirm emitted events. There's two options because whether node 0 disconnects from
189 // node 1 or node 2 is random.
190 let actual = network.events_sorted();
191 eprintln!("actual {actual:#?}");
192 let expected1 = sort(vec![
193 (3, t, Event::NeighborUp(0)),
194 (0, t, Event::NeighborUp(3)),
195 (0, t, Event::NeighborDown(1)),
196 (1, t, Event::NeighborDown(0)),
197 ]);
198 let expected2 = sort(vec![
199 (3, t, Event::NeighborUp(0)),
200 (0, t, Event::NeighborUp(3)),
201 (0, t, Event::NeighborDown(2)),
202 (2, t, Event::NeighborDown(0)),
203 ]);
204 assert!((actual == expected1) || (actual == expected2));
205
206 // Confirm active connections.
207 if actual == expected1 {
208 assert_eq!(network.conns(), vec![(0, 2), (0, 3), (1, 2)]);
209 } else {
210 assert_eq!(network.conns(), vec![(0, 1), (0, 3), (1, 2)]);
211 }
212 assert!(network.check_synchronicity());
213 }
214
215 #[test]
216 #[traced_test]
217 fn plumtree_smoke() {
218 let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
219 let network_config = NetworkConfig {
220 proto: Config::default(),
221 latency: LatencyConfig::default_static(),
222 };
223 let mut network = Network::new(network_config, rng);
224 // build a network with 6 nodes
225 for i in 0..6 {
226 network.insert(i);
227 }
228
229 let t = [0u8; 32].into();
230
231 // let node 0 join the topic but do not connect to any peers
232 network.command(0, t, Command::Join(vec![]));
233 // connect nodes 1 and 2 to node 0
234 (1..3).for_each(|i| network.command(i, t, Command::Join(vec![0])));
235 // connect nodes 4 and 5 to node 3
236 network.command(3, t, Command::Join(vec![]));
237 (4..6).for_each(|i| network.command(i, t, Command::Join(vec![3])));
238 // run ticks and drain events
239
240 network.run_trips(4);
241
242 let _ = network.events();
243 assert!(network.check_synchronicity());
244
245 // now broadcast a first message
246 network.command(
247 1,
248 t,
249 Command::Broadcast(b"hi1".to_vec().into(), Scope::Swarm),
250 );
251
252 network.run_trips(4);
253
254 let events = network.events();
255 let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
256 // message should be received by two other nodes
257 assert_eq!(received.count(), 2);
258 assert!(network.check_synchronicity());
259
260 // now connect the two sections of the swarm
261 network.command(2, t, Command::Join(vec![5]));
262 network.run_trips(3);
263 let _ = network.events();
264 println!("{}", network.report());
265
266 // now broadcast again
267 network.command(
268 1,
269 t,
270 Command::Broadcast(b"hi2".to_vec().into(), Scope::Swarm),
271 );
272 network.run_trips(5);
273 let events = network.events();
274 let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
275 // message should be received by all 5 other nodes
276 assert_eq!(received.count(), 5);
277 assert!(network.check_synchronicity());
278 println!("{}", network.report());
279 }
280
281 #[test]
282 #[traced_test]
283 fn quit() {
284 // Create a network with 4 nodes and active_view_capacity 2
285 let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
286 let mut config = Config::default();
287 config.membership.active_view_capacity = 2;
288 let mut network = Network::new(config.into(), rng);
289 let num = 4;
290 for i in 0..num {
291 network.insert(i);
292 }
293
294 let t: TopicId = [0u8; 32].into();
295
296 // join all nodes
297 network.command(0, t, Command::Join(vec![]));
298 network.command(1, t, Command::Join(vec![0]));
299 network.command(2, t, Command::Join(vec![1]));
300 network.command(3, t, Command::Join(vec![2]));
301 network.run_trips(2);
302
303 // assert all peers appear in the connections
304 let all_conns: HashSet<u64> = HashSet::from_iter((0u64..4).flat_map(|p| {
305 network
306 .neighbors(&p, &t)
307 .into_iter()
308 .flat_map(|x| x.into_iter())
309 }));
310 assert_eq!(all_conns, HashSet::from_iter([0, 1, 2, 3]));
311 assert!(network.check_synchronicity());
312
313 // let node 3 leave the swarm
314 network.command(3, t, Command::Quit);
315 network.run_trips(4);
316 assert!(network.peer(&3).unwrap().state(&t).is_none());
317
318 // assert all peers without peer 3 appear in the connections
319 let all_conns: HashSet<u64> = HashSet::from_iter((0..num).flat_map(|p| {
320 network
321 .neighbors(&p, &t)
322 .into_iter()
323 .flat_map(|x| x.into_iter())
324 }));
325 assert_eq!(all_conns, HashSet::from_iter([0, 1, 2]));
326 assert!(network.check_synchronicity());
327 }
328
329 fn read_var<T: FromStr<Err: fmt::Display + fmt::Debug>>(name: &str, default: T) -> T {
330 env::var(name)
331 .map(|x| {
332 x.parse()
333 .unwrap_or_else(|_| panic!("Failed to parse environment variable {name}"))
334 })
335 .unwrap_or(default)
336 }
337
338 fn sort<T: Ord + Clone>(items: Vec<T>) -> Vec<T> {
339 let mut sorted = items;
340 sorted.sort();
341 sorted
342 }
343}