p2panda_net/gossip/api.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::AtomicUsize;
6
7use futures_util::{Stream, StreamExt};
8use p2panda_core::Topic;
9use p2panda_store::address_book::NodeInfo as _;
10use ractor::{ActorRef, call};
11use thiserror::Error;
12use tokio::sync::mpsc::error::SendError;
13use tokio::sync::{RwLock, broadcast, mpsc};
14use tokio_stream::wrappers::BroadcastStream;
15use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
16use tracing::{trace, warn};
17
18use crate::NodeId;
19use crate::address_book::{AddressBook, AddressBookError};
20use crate::gossip::GossipConfig;
21use crate::gossip::actors::ToGossipManager;
22use crate::gossip::builder::Builder;
23use crate::gossip::events::GossipEvent;
24use crate::iroh_endpoint::Endpoint;
25use crate::utils::ShortFormat;
26
27/// Mapping of topic to the associated sender channels for getting messages into and out of the
28/// gossip overlay.
29type GossipSenders = HashMap<
30 Topic,
31 (
32 mpsc::Sender<Vec<u8>>,
33 broadcast::Sender<Vec<u8>>,
34 TopicDropGuard,
35 ),
36>;
37
38/// Gossip protocol to broadcast ephemeral messages to all online nodes interested in the same
39/// topic.
40///
41/// ## Example
42///
43/// ```rust
44/// # use std::error::Error;
45/// #
46/// # #[tokio::main]
47/// # async fn main() -> Result<(), Box<dyn Error>> {
48/// # use futures_util::StreamExt;
49/// # use p2panda_net::{AddressBook, Discovery, Endpoint, MdnsDiscovery, Gossip};
50/// # let address_book = AddressBook::builder().spawn().await?;
51/// # let endpoint = Endpoint::builder(address_book.clone())
52/// # .spawn()
53/// # .await?;
54/// #
55/// // Gossip uses the address book to watch for nodes interested in the same topic.
56/// let gossip = Gossip::builder(address_book, endpoint).spawn().await?;
57///
58/// // Join overlay with given topic.
59/// let handle = gossip.stream([1; 32].into()).await?;
60///
61/// // Publish a message.
62/// handle.publish(b"Hello, Panda!").await?;
63///
64/// // Subscribe to messages.
65/// let mut rx = handle.subscribe();
66///
67/// tokio::spawn(async move {
68/// while let Some(Ok(_bytes)) = rx.next().await {
69/// // ..
70/// }
71/// });
72/// #
73/// # Ok(())
74/// # }
75/// ```
76///
77/// ## Ephemeral Messaging
78///
79/// These unreliable “ephemeral” streams are intended to be used for relatively short-lived
80/// messages without persistence and catch-up of past state, for example for "Awareness" or
81/// "Presence" features. In most cases, messages will only be received if they were published after
82/// the subscription was created.
83///
84/// Use [`LogSync`](crate::LogSync) if you wish to receive messages even after being offline for
85/// guaranteed eventual consistency.
86///
87/// ## Self-healing overlay
88///
89/// Gossip-based broadcast overlays rely on membership protocols like [HyParView] which do not heal
90/// from network fragmentation caused, for example, by bootstrap nodes going offline.
91///
92/// `p2panda-net` uses it's additional, confidential topic discovery layer in
93/// [`Discovery`](crate::Discovery) to automatically heal these partitions. Whenever possible, it
94/// allows nodes a higher chance to connect to every participating node, thereby decentralising the
95/// entrance points into the network.
96///
97/// ## Topic Discovery
98///
99/// For gossip to function correctly we need to inform it about discovered nodes who are interested
100/// in the same topic. Check out the [`Discovery`](crate::Discovery) module for more information.
101///
102/// [HyParView]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
103///
104/// ## Leaving overlays on Drop
105///
106/// We're staying connected to the overlay for a topic as long as there's a `GossipHandle` or
107/// `GossipSubscription` left for it. If all references to this topic are dropped, the gossip
108/// overlay will be automatically left.
109///
110/// If all `Gossip` instances are dropped then the associated handles and subscriptions will fail
111/// sending or receiving messages.
112#[derive(Clone, Debug)]
113pub struct Gossip {
114 my_node_id: NodeId,
115 address_book: AddressBook,
116 inner: Arc<RwLock<Inner>>,
117 senders: Arc<RwLock<GossipSenders>>,
118 config: GossipConfig,
119}
120
121#[derive(Debug)]
122struct Inner {
123 actor_ref: ActorRef<ToGossipManager>,
124}
125
126impl Gossip {
127 pub(crate) fn new(
128 actor_ref: ActorRef<ToGossipManager>,
129 my_node_id: NodeId,
130 address_book: AddressBook,
131 config: GossipConfig,
132 ) -> Self {
133 Self {
134 my_node_id,
135 address_book,
136 inner: Arc::new(RwLock::new(Inner { actor_ref })),
137 senders: Arc::new(RwLock::new(HashMap::new())),
138 config,
139 }
140 }
141
142 pub fn builder(address_book: AddressBook, endpoint: Endpoint) -> Builder {
143 Builder::new(address_book, endpoint)
144 }
145
146 /// Join gossip overlay for this topic and return a handle to publish messages to it or receive
147 /// messages from the network.
148 pub async fn stream(&self, topic: Topic) -> Result<GossipHandle, GossipError> {
149 let max_message_size = self.config.max_message_size;
150
151 // Check if there's already a handle for this topic and clone it.
152 //
153 // If this handle exists but the topic counter is zero we know that all previous handles
154 // have been dropped and we didn't clean up yet. In this case we'll ignore the existing
155 // entry in "senders" and continue to create a new gossip session, overwriting the "dead"
156 // entries.
157 if let Some((to_gossip_tx, from_gossip_tx, guard)) = self.senders.read().await.get(&topic)
158 && guard.has_subscriptions()
159 {
160 return Ok(GossipHandle::new(
161 topic,
162 max_message_size,
163 to_gossip_tx.clone(),
164 from_gossip_tx.clone(),
165 guard.clone(),
166 ));
167 }
168
169 // If there's no active handle for this topic we join the overlay from scratch.
170 let inner = self.inner.read().await;
171
172 // This guard counts the number of active handles and subscriptions for this topic. Like
173 // this we can determine if we can leave the overlay.
174 let guard = TopicDropGuard::new(topic, inner.actor_ref.clone());
175
176 // Identify the initial nodes we can use to bootstrap ourselves into the overlay.
177 let node_ids = {
178 let node_infos = self.address_book.node_infos_by_topics([topic]).await?;
179 node_infos
180 .iter()
181 .filter_map(|info| {
182 // Remove ourselves from list.
183 let node_id = info.id();
184 if node_id != self.my_node_id {
185 Some(node_id)
186 } else {
187 None
188 }
189 })
190 .collect()
191 };
192
193 // Register a new session with the gossip actor.
194 let (to_gossip_tx, from_gossip_tx) =
195 call!(inner.actor_ref, ToGossipManager::Subscribe, topic, node_ids)
196 .map_err(Box::new)?;
197
198 // Store the gossip senders.
199 //
200 // `from_gossip_tx` is used to create a broadcast receiver when the user calls
201 // `subscribe()` on `GossipHandle`.
202 let mut senders = self.senders.write().await;
203 senders.insert(
204 topic,
205 (
206 to_gossip_tx.clone(),
207 from_gossip_tx.clone(),
208 guard.clone_without_increment(),
209 ),
210 );
211
212 Ok(GossipHandle::new(
213 topic,
214 max_message_size,
215 to_gossip_tx,
216 from_gossip_tx,
217 guard,
218 ))
219 }
220
221 /// Subscribe to system events.
222 ///
223 /// NOTE: only events emitted _after_ calling this method will be received on the returned
224 /// channel.
225 pub async fn events(&self) -> Result<broadcast::Receiver<GossipEvent>, GossipError> {
226 let inner = self.inner.read().await;
227 let result = call!(inner.actor_ref, ToGossipManager::Events).map_err(Box::new)?;
228 Ok(result)
229 }
230}
231
232impl Drop for Inner {
233 fn drop(&mut self) {
234 trace!(
235 actor_id = %self.actor_ref.get_id(),
236 "drop gossip actor reference"
237 );
238
239 // Stop the gossip manager actor once all references to Gossip have been dropped.
240 //
241 // Process all messages which are already in the inboxes of the gossip actors which are the
242 // children of the gossip manager.
243 self.actor_ref.drain_children();
244
245 // Tell the gossip manager to shutdown.
246 if let Err(err) = self.actor_ref.send_message(ToGossipManager::Shutdown) {
247 warn!("failed to send shutdown event to gossip manager: {}", err)
248 }
249
250 // Proccess all messages in the inbox of the gossip manager before stopping it.
251 if let Err(err) = self.actor_ref.drain() {
252 warn!("failed to drain gossip manager: {}", err)
253 }
254 }
255}
256
257#[derive(Debug, Error)]
258pub enum GossipError {
259 /// Spawning the internal actor failed.
260 #[error(transparent)]
261 ActorSpawn(#[from] ractor::SpawnErr),
262
263 /// Messaging with internal actor via RPC failed.
264 #[error(transparent)]
265 ActorRpc(#[from] Box<ractor::RactorErr<ToGossipManager>>),
266
267 #[error(transparent)]
268 AddressBook(#[from] AddressBookError),
269}
270
271#[derive(Debug, Error, PartialEq)]
272pub enum GossipPublishError {
273 #[error("message size exceeds maximum limit ({} vs {})", .0.0, .0.1)]
274 MessageTooLarge((usize, usize)),
275
276 #[error(transparent)]
277 SendError(SendError<Vec<u8>>),
278}
279
280/// Handle for publishing ephemeral messages into the gossip overlay and receiving from the
281/// network for a specific topic.
282#[derive(Clone, Debug)]
283pub struct GossipHandle {
284 topic: Topic,
285 max_message_size: usize,
286 to_topic_tx: mpsc::Sender<Vec<u8>>,
287 from_gossip_tx: broadcast::Sender<Vec<u8>>,
288 _guard: TopicDropGuard,
289}
290
291impl GossipHandle {
292 fn new(
293 topic: Topic,
294 max_message_size: usize,
295 to_topic_tx: mpsc::Sender<Vec<u8>>,
296 from_gossip_tx: broadcast::Sender<Vec<u8>>,
297 _guard: TopicDropGuard,
298 ) -> Self {
299 Self {
300 topic,
301 max_message_size,
302 to_topic_tx,
303 from_gossip_tx,
304 _guard,
305 }
306 }
307
308 /// Publishes a message to the stream.
309 ///
310 /// An error will be returned if the size of the bytes exceeds the configured maximum message
311 /// size.
312 pub async fn publish(&self, bytes: impl Into<Vec<u8>>) -> Result<(), GossipPublishError> {
313 let bytes: Vec<u8> = bytes.into();
314 let message_size = bytes.len();
315
316 // NOTE(glyph): iroh-gossip currently fails silently when the message size exceeds the
317 // configured maximum; not even a warning is logged. We check the size here and return an
318 // error to guard against unexplained behaviour.
319 if message_size > self.max_message_size {
320 return Err(GossipPublishError::MessageTooLarge((
321 message_size,
322 self.max_message_size,
323 )));
324 }
325
326 self.to_topic_tx
327 .send(bytes)
328 .await
329 .map_err(GossipPublishError::SendError)
330 }
331
332 /// Subscribes to the stream.
333 ///
334 /// The returned [`GossipSubscription`] provides a means of receiving messages from the
335 /// stream.
336 pub fn subscribe(&self) -> GossipSubscription {
337 GossipSubscription::new(
338 self.topic,
339 self.from_gossip_tx.subscribe(),
340 self._guard.clone(),
341 )
342 }
343
344 /// Returns the topic of the stream.
345 pub fn topic(&self) -> Topic {
346 self.topic
347 }
348}
349
350/// A handle to an ephemeral messaging stream subscription.
351///
352/// The stream can be used to receive messages from the stream.
353#[derive(Debug)]
354pub struct GossipSubscription {
355 topic: Topic,
356 from_topic_rx: BroadcastStream<Vec<u8>>,
357 _guard: TopicDropGuard,
358}
359
360impl GossipSubscription {
361 /// Returns a handle to an ephemeral messaging stream subscriber.
362 fn new(
363 topic: Topic,
364 from_topic_rx: broadcast::Receiver<Vec<u8>>,
365 _guard: TopicDropGuard,
366 ) -> Self {
367 Self {
368 topic,
369 from_topic_rx: BroadcastStream::new(from_topic_rx),
370 _guard,
371 }
372 }
373
374 /// Returns the topic of the stream.
375 pub fn topic(&self) -> Topic {
376 self.topic
377 }
378}
379
380impl Stream for GossipSubscription {
381 type Item = Result<Vec<u8>, BroadcastStreamRecvError>;
382
383 fn poll_next(
384 mut self: std::pin::Pin<&mut Self>,
385 cx: &mut std::task::Context<'_>,
386 ) -> std::task::Poll<Option<Self::Item>> {
387 self.from_topic_rx.poll_next_unpin(cx)
388 }
389}
390
391/// Helper maintaining a counter of references using the same topic.
392///
393/// Check if we can unsubscribe from topic if all handles and subscriptions have been dropped for
394/// it. The gossip overlay will be left then for this topic.
395#[derive(Debug)]
396struct TopicDropGuard {
397 topic: Topic,
398 counter: Arc<AtomicUsize>,
399 actor_ref: ActorRef<ToGossipManager>,
400 ignore_drop: bool,
401}
402
403/// Initial value the reference counter starts with.
404///
405/// This is set to a non-zero value since the first reference exists already when creating the
406/// gossip stream.
407const INITIAL_COUNTER: usize = 1;
408
409impl TopicDropGuard {
410 fn new(topic: Topic, actor_ref: ActorRef<ToGossipManager>) -> Self {
411 trace!(
412 topic = topic.fmt_short(),
413 counter = INITIAL_COUNTER,
414 actor_id = %actor_ref.get_id(),
415 "new topic drop guard"
416 );
417
418 Self {
419 topic,
420 counter: Arc::new(AtomicUsize::new(INITIAL_COUNTER)),
421 actor_ref,
422 ignore_drop: false,
423 }
424 }
425
426 /// Returns current number of references to this topic.
427 fn counter(&self) -> usize {
428 self.counter.load(std::sync::atomic::Ordering::SeqCst)
429 }
430
431 /// Returns true if there's still one or more references for this topic used.
432 fn has_subscriptions(&self) -> bool {
433 self.counter() >= INITIAL_COUNTER
434 }
435
436 /// Clone guard, but don't increment reference counter.
437 ///
438 /// This is useful if we need to keep it around somewhere for further use without affecting the
439 /// drop logic.
440 fn clone_without_increment(&self) -> Self {
441 Self {
442 topic: self.topic,
443 counter: self.counter.clone(),
444 actor_ref: self.actor_ref.clone(),
445 ignore_drop: true,
446 }
447 }
448}
449
450impl Clone for TopicDropGuard {
451 fn clone(&self) -> Self {
452 let value = self
453 .counter
454 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
455
456 trace!(
457 topic = self.topic.fmt_short(),
458 counter = value + 1,
459 actor_id = %self.actor_ref.get_id(),
460 "clone topic drop guard +1"
461 );
462
463 Self {
464 topic: self.topic,
465 counter: self.counter.clone(),
466 actor_ref: self.actor_ref.clone(),
467 ignore_drop: false,
468 }
469 }
470}
471
472impl Drop for TopicDropGuard {
473 fn drop(&mut self) {
474 // This instance is not used to count references, we drop it without taking any action.
475 if self.ignore_drop {
476 return;
477 }
478
479 // Check if we can unsubscribe from topic if all handles and subscriptions have been
480 // dropped for it.
481 let previous_counter = self
482 .counter
483 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
484
485 trace!(
486 topic = self.topic.fmt_short(),
487 counter = previous_counter - 1,
488 actor_id = %self.actor_ref.get_id(),
489 "drop topic drop guard -1"
490 );
491
492 // If the previous value is equal the initial value, the last instance of the guard was
493 // dropped and the counter has no references to the topic anymore.
494 let no_references_left = previous_counter == INITIAL_COUNTER;
495
496 if no_references_left {
497 trace!(
498 topic = self.topic.fmt_short(),
499 actor_id = %self.actor_ref.get_id(),
500 "send unsubscribe message"
501 );
502
503 // Ignore this error, it could be that the actor has already stopped.
504 let _ = self
505 .actor_ref
506 .send_message(ToGossipManager::Unsubscribe(self.topic));
507 }
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use ractor::thread_local::{ThreadLocalActor, ThreadLocalActorSpawner};
514
515 use crate::gossip::GossipConfig;
516 use crate::gossip::actors::GossipManager;
517 use crate::{AddressBook, Endpoint};
518
519 use super::TopicDropGuard;
520
521 #[tokio::test]
522 async fn topic_drop_guard() {
523 // A bit cumbersone, but that's the only way right now we get this actor ref from ractor.
524 let (actor_ref, _) = {
525 let address_book = AddressBook::builder().spawn().await.unwrap();
526 let endpoint = Endpoint::builder(address_book.clone())
527 .spawn()
528 .await
529 .unwrap();
530 let thread_pool = ThreadLocalActorSpawner::new();
531 let args = (GossipConfig::default(), address_book, endpoint);
532 GossipManager::spawn(None, args, thread_pool).await.unwrap()
533 };
534
535 let guard_1 = TopicDropGuard::new([1; 32].into(), actor_ref);
536 assert_eq!(guard_1.counter(), 1);
537 let _guard_2 = guard_1.clone();
538 assert_eq!(guard_1.counter(), 2);
539 let guard_3 = guard_1.clone();
540 assert_eq!(guard_1.counter(), 3);
541
542 drop(guard_3);
543 assert_eq!(guard_1.counter(), 2);
544 }
545}