Skip to main content

p2panda_net/gossip/
api.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::AtomicUsize;
6
7use futures_util::{Stream, StreamExt};
8use p2panda_core::Topic;
9use p2panda_store::address_book::NodeInfo as _;
10use ractor::{ActorRef, call};
11use thiserror::Error;
12use tokio::sync::mpsc::error::SendError;
13use tokio::sync::{RwLock, broadcast, mpsc};
14use tokio_stream::wrappers::BroadcastStream;
15use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
16use tracing::{trace, warn};
17
18use crate::NodeId;
19use crate::address_book::{AddressBook, AddressBookError};
20use crate::gossip::GossipConfig;
21use crate::gossip::actors::ToGossipManager;
22use crate::gossip::builder::Builder;
23use crate::gossip::events::GossipEvent;
24use crate::iroh_endpoint::Endpoint;
25use crate::utils::ShortFormat;
26
27/// Mapping of topic to the associated sender channels for getting messages into and out of the
28/// gossip overlay.
29type GossipSenders = HashMap<
30    Topic,
31    (
32        mpsc::Sender<Vec<u8>>,
33        broadcast::Sender<Vec<u8>>,
34        TopicDropGuard,
35    ),
36>;
37
38/// Gossip protocol to broadcast ephemeral messages to all online nodes interested in the same
39/// topic.
40///
41/// ## Example
42///
43/// ```rust
44/// # use std::error::Error;
45/// #
46/// # #[tokio::main]
47/// # async fn main() -> Result<(), Box<dyn Error>> {
48/// # use futures_util::StreamExt;
49/// # use p2panda_net::{AddressBook, Discovery, Endpoint, MdnsDiscovery, Gossip};
50/// # let address_book = AddressBook::builder().spawn().await?;
51/// # let endpoint = Endpoint::builder(address_book.clone())
52/// #     .spawn()
53/// #     .await?;
54/// #
55/// // Gossip uses the address book to watch for nodes interested in the same topic.
56/// let gossip = Gossip::builder(address_book, endpoint).spawn().await?;
57///
58/// // Join overlay with given topic.
59/// let handle = gossip.stream([1; 32].into()).await?;
60///
61/// // Publish a message.
62/// handle.publish(b"Hello, Panda!").await?;
63///
64/// // Subscribe to messages.
65/// let mut rx = handle.subscribe();
66///
67/// tokio::spawn(async move {
68///     while let Some(Ok(_bytes)) = rx.next().await {
69///         // ..
70///     }
71/// });
72/// #
73/// # Ok(())
74/// # }
75/// ```
76///
77/// ## Ephemeral Messaging
78///
79/// These unreliable “ephemeral” streams are intended to be used for relatively short-lived
80/// messages without persistence and catch-up of past state, for example for "Awareness" or
81/// "Presence" features. In most cases, messages will only be received if they were published after
82/// the subscription was created.
83///
84/// Use [`LogSync`](crate::LogSync) if you wish to receive messages even after being offline for
85/// guaranteed eventual consistency.
86///
87/// ## Self-healing overlay
88///
89/// Gossip-based broadcast overlays rely on membership protocols like [HyParView] which do not heal
90/// from network fragmentation caused, for example, by bootstrap nodes going offline.
91///
92/// `p2panda-net` uses it's additional, confidential topic discovery layer in
93/// [`Discovery`](crate::Discovery) to automatically heal these partitions. Whenever possible, it
94/// allows nodes a higher chance to connect to every participating node, thereby decentralising the
95/// entrance points into the network.
96///
97/// ## Topic Discovery
98///
99/// For gossip to function correctly we need to inform it about discovered nodes who are interested
100/// in the same topic. Check out the [`Discovery`](crate::Discovery) module for more information.
101///
102/// [HyParView]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
103///
104/// ## Leaving overlays on Drop
105///
106/// We're staying connected to the overlay for a topic as long as there's a `GossipHandle` or
107/// `GossipSubscription` left for it. If all references to this topic are dropped, the gossip
108/// overlay will be automatically left.
109///
110/// If all `Gossip` instances are dropped then the associated handles and subscriptions will fail
111/// sending or receiving messages.
112#[derive(Clone, Debug)]
113pub struct Gossip {
114    my_node_id: NodeId,
115    address_book: AddressBook,
116    inner: Arc<RwLock<Inner>>,
117    senders: Arc<RwLock<GossipSenders>>,
118    config: GossipConfig,
119}
120
121#[derive(Debug)]
122struct Inner {
123    actor_ref: ActorRef<ToGossipManager>,
124}
125
126impl Gossip {
127    pub(crate) fn new(
128        actor_ref: ActorRef<ToGossipManager>,
129        my_node_id: NodeId,
130        address_book: AddressBook,
131        config: GossipConfig,
132    ) -> Self {
133        Self {
134            my_node_id,
135            address_book,
136            inner: Arc::new(RwLock::new(Inner { actor_ref })),
137            senders: Arc::new(RwLock::new(HashMap::new())),
138            config,
139        }
140    }
141
142    pub fn builder(address_book: AddressBook, endpoint: Endpoint) -> Builder {
143        Builder::new(address_book, endpoint)
144    }
145
146    /// Join gossip overlay for this topic and return a handle to publish messages to it or receive
147    /// messages from the network.
148    pub async fn stream(&self, topic: Topic) -> Result<GossipHandle, GossipError> {
149        let max_message_size = self.config.max_message_size;
150
151        // Check if there's already a handle for this topic and clone it.
152        //
153        // If this handle exists but the topic counter is zero we know that all previous handles
154        // have been dropped and we didn't clean up yet. In this case we'll ignore the existing
155        // entry in "senders" and continue to create a new gossip session, overwriting the "dead"
156        // entries.
157        if let Some((to_gossip_tx, from_gossip_tx, guard)) = self.senders.read().await.get(&topic)
158            && guard.has_subscriptions()
159        {
160            return Ok(GossipHandle::new(
161                topic,
162                max_message_size,
163                to_gossip_tx.clone(),
164                from_gossip_tx.clone(),
165                guard.clone(),
166            ));
167        }
168
169        // If there's no active handle for this topic we join the overlay from scratch.
170        let inner = self.inner.read().await;
171
172        // This guard counts the number of active handles and subscriptions for this topic. Like
173        // this we can determine if we can leave the overlay.
174        let guard = TopicDropGuard::new(topic, inner.actor_ref.clone());
175
176        // Identify the initial nodes we can use to bootstrap ourselves into the overlay.
177        let node_ids = {
178            let node_infos = self.address_book.node_infos_by_topics([topic]).await?;
179            node_infos
180                .iter()
181                .filter_map(|info| {
182                    // Remove ourselves from list.
183                    let node_id = info.id();
184                    if node_id != self.my_node_id {
185                        Some(node_id)
186                    } else {
187                        None
188                    }
189                })
190                .collect()
191        };
192
193        // Register a new session with the gossip actor.
194        let (to_gossip_tx, from_gossip_tx) =
195            call!(inner.actor_ref, ToGossipManager::Subscribe, topic, node_ids)
196                .map_err(Box::new)?;
197
198        // Store the gossip senders.
199        //
200        // `from_gossip_tx` is used to create a broadcast receiver when the user calls
201        // `subscribe()` on `GossipHandle`.
202        let mut senders = self.senders.write().await;
203        senders.insert(
204            topic,
205            (
206                to_gossip_tx.clone(),
207                from_gossip_tx.clone(),
208                guard.clone_without_increment(),
209            ),
210        );
211
212        Ok(GossipHandle::new(
213            topic,
214            max_message_size,
215            to_gossip_tx,
216            from_gossip_tx,
217            guard,
218        ))
219    }
220
221    /// Subscribe to system events.
222    ///
223    /// NOTE: only events emitted _after_ calling this method will be received on the returned
224    /// channel.
225    pub async fn events(&self) -> Result<broadcast::Receiver<GossipEvent>, GossipError> {
226        let inner = self.inner.read().await;
227        let result = call!(inner.actor_ref, ToGossipManager::Events).map_err(Box::new)?;
228        Ok(result)
229    }
230}
231
232impl Drop for Inner {
233    fn drop(&mut self) {
234        trace!(
235            actor_id = %self.actor_ref.get_id(),
236            "drop gossip actor reference"
237        );
238
239        // Stop the gossip manager actor once all references to Gossip have been dropped.
240        //
241        // Process all messages which are already in the inboxes of the gossip actors which are the
242        // children of the gossip manager.
243        self.actor_ref.drain_children();
244
245        // Tell the gossip manager to shutdown.
246        if let Err(err) = self.actor_ref.send_message(ToGossipManager::Shutdown) {
247            warn!("failed to send shutdown event to gossip manager: {}", err)
248        }
249
250        // Proccess all messages in the inbox of the gossip manager before stopping it.
251        if let Err(err) = self.actor_ref.drain() {
252            warn!("failed to drain gossip manager: {}", err)
253        }
254    }
255}
256
257#[derive(Debug, Error)]
258pub enum GossipError {
259    /// Spawning the internal actor failed.
260    #[error(transparent)]
261    ActorSpawn(#[from] ractor::SpawnErr),
262
263    /// Messaging with internal actor via RPC failed.
264    #[error(transparent)]
265    ActorRpc(#[from] Box<ractor::RactorErr<ToGossipManager>>),
266
267    #[error(transparent)]
268    AddressBook(#[from] AddressBookError),
269}
270
271#[derive(Debug, Error, PartialEq)]
272pub enum GossipPublishError {
273    #[error("message size exceeds maximum limit ({} vs {})", .0.0, .0.1)]
274    MessageTooLarge((usize, usize)),
275
276    #[error(transparent)]
277    SendError(SendError<Vec<u8>>),
278}
279
280/// Handle for publishing ephemeral messages into the gossip overlay and receiving from the
281/// network for a specific topic.
282#[derive(Clone, Debug)]
283pub struct GossipHandle {
284    topic: Topic,
285    max_message_size: usize,
286    to_topic_tx: mpsc::Sender<Vec<u8>>,
287    from_gossip_tx: broadcast::Sender<Vec<u8>>,
288    _guard: TopicDropGuard,
289}
290
291impl GossipHandle {
292    fn new(
293        topic: Topic,
294        max_message_size: usize,
295        to_topic_tx: mpsc::Sender<Vec<u8>>,
296        from_gossip_tx: broadcast::Sender<Vec<u8>>,
297        _guard: TopicDropGuard,
298    ) -> Self {
299        Self {
300            topic,
301            max_message_size,
302            to_topic_tx,
303            from_gossip_tx,
304            _guard,
305        }
306    }
307
308    /// Publishes a message to the stream.
309    ///
310    /// An error will be returned if the size of the bytes exceeds the configured maximum message
311    /// size.
312    pub async fn publish(&self, bytes: impl Into<Vec<u8>>) -> Result<(), GossipPublishError> {
313        let bytes: Vec<u8> = bytes.into();
314        let message_size = bytes.len();
315
316        // NOTE(glyph): iroh-gossip currently fails silently when the message size exceeds the
317        // configured maximum; not even a warning is logged. We check the size here and return an
318        // error to guard against unexplained behaviour.
319        if message_size > self.max_message_size {
320            return Err(GossipPublishError::MessageTooLarge((
321                message_size,
322                self.max_message_size,
323            )));
324        }
325
326        self.to_topic_tx
327            .send(bytes)
328            .await
329            .map_err(GossipPublishError::SendError)
330    }
331
332    /// Subscribes to the stream.
333    ///
334    /// The returned [`GossipSubscription`] provides a means of receiving messages from the
335    /// stream.
336    pub fn subscribe(&self) -> GossipSubscription {
337        GossipSubscription::new(
338            self.topic,
339            self.from_gossip_tx.subscribe(),
340            self._guard.clone(),
341        )
342    }
343
344    /// Returns the topic of the stream.
345    pub fn topic(&self) -> Topic {
346        self.topic
347    }
348}
349
350/// A handle to an ephemeral messaging stream subscription.
351///
352/// The stream can be used to receive messages from the stream.
353#[derive(Debug)]
354pub struct GossipSubscription {
355    topic: Topic,
356    from_topic_rx: BroadcastStream<Vec<u8>>,
357    _guard: TopicDropGuard,
358}
359
360impl GossipSubscription {
361    /// Returns a handle to an ephemeral messaging stream subscriber.
362    fn new(
363        topic: Topic,
364        from_topic_rx: broadcast::Receiver<Vec<u8>>,
365        _guard: TopicDropGuard,
366    ) -> Self {
367        Self {
368            topic,
369            from_topic_rx: BroadcastStream::new(from_topic_rx),
370            _guard,
371        }
372    }
373
374    /// Returns the topic of the stream.
375    pub fn topic(&self) -> Topic {
376        self.topic
377    }
378}
379
380impl Stream for GossipSubscription {
381    type Item = Result<Vec<u8>, BroadcastStreamRecvError>;
382
383    fn poll_next(
384        mut self: std::pin::Pin<&mut Self>,
385        cx: &mut std::task::Context<'_>,
386    ) -> std::task::Poll<Option<Self::Item>> {
387        self.from_topic_rx.poll_next_unpin(cx)
388    }
389}
390
391/// Helper maintaining a counter of references using the same topic.
392///
393/// Check if we can unsubscribe from topic if all handles and subscriptions have been dropped for
394/// it. The gossip overlay will be left then for this topic.
395#[derive(Debug)]
396struct TopicDropGuard {
397    topic: Topic,
398    counter: Arc<AtomicUsize>,
399    actor_ref: ActorRef<ToGossipManager>,
400    ignore_drop: bool,
401}
402
403/// Initial value the reference counter starts with.
404///
405/// This is set to a non-zero value since the first reference exists already when creating the
406/// gossip stream.
407const INITIAL_COUNTER: usize = 1;
408
409impl TopicDropGuard {
410    fn new(topic: Topic, actor_ref: ActorRef<ToGossipManager>) -> Self {
411        trace!(
412            topic = topic.fmt_short(),
413            counter = INITIAL_COUNTER,
414            actor_id = %actor_ref.get_id(),
415            "new topic drop guard"
416        );
417
418        Self {
419            topic,
420            counter: Arc::new(AtomicUsize::new(INITIAL_COUNTER)),
421            actor_ref,
422            ignore_drop: false,
423        }
424    }
425
426    /// Returns current number of references to this topic.
427    fn counter(&self) -> usize {
428        self.counter.load(std::sync::atomic::Ordering::SeqCst)
429    }
430
431    /// Returns true if there's still one or more references for this topic used.
432    fn has_subscriptions(&self) -> bool {
433        self.counter() >= INITIAL_COUNTER
434    }
435
436    /// Clone guard, but don't increment reference counter.
437    ///
438    /// This is useful if we need to keep it around somewhere for further use without affecting the
439    /// drop logic.
440    fn clone_without_increment(&self) -> Self {
441        Self {
442            topic: self.topic,
443            counter: self.counter.clone(),
444            actor_ref: self.actor_ref.clone(),
445            ignore_drop: true,
446        }
447    }
448}
449
450impl Clone for TopicDropGuard {
451    fn clone(&self) -> Self {
452        let value = self
453            .counter
454            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
455
456        trace!(
457            topic = self.topic.fmt_short(),
458            counter = value + 1,
459            actor_id = %self.actor_ref.get_id(),
460            "clone topic drop guard +1"
461        );
462
463        Self {
464            topic: self.topic,
465            counter: self.counter.clone(),
466            actor_ref: self.actor_ref.clone(),
467            ignore_drop: false,
468        }
469    }
470}
471
472impl Drop for TopicDropGuard {
473    fn drop(&mut self) {
474        // This instance is not used to count references, we drop it without taking any action.
475        if self.ignore_drop {
476            return;
477        }
478
479        // Check if we can unsubscribe from topic if all handles and subscriptions have been
480        // dropped for it.
481        let previous_counter = self
482            .counter
483            .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
484
485        trace!(
486            topic = self.topic.fmt_short(),
487            counter = previous_counter - 1,
488            actor_id = %self.actor_ref.get_id(),
489            "drop topic drop guard -1"
490        );
491
492        // If the previous value is equal the initial value, the last instance of the guard was
493        // dropped and the counter has no references to the topic anymore.
494        let no_references_left = previous_counter == INITIAL_COUNTER;
495
496        if no_references_left {
497            trace!(
498                topic = self.topic.fmt_short(),
499                actor_id = %self.actor_ref.get_id(),
500                "send unsubscribe message"
501            );
502
503            // Ignore this error, it could be that the actor has already stopped.
504            let _ = self
505                .actor_ref
506                .send_message(ToGossipManager::Unsubscribe(self.topic));
507        }
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use ractor::thread_local::{ThreadLocalActor, ThreadLocalActorSpawner};
514
515    use crate::gossip::GossipConfig;
516    use crate::gossip::actors::GossipManager;
517    use crate::{AddressBook, Endpoint};
518
519    use super::TopicDropGuard;
520
521    #[tokio::test]
522    async fn topic_drop_guard() {
523        // A bit cumbersone, but that's the only way right now we get this actor ref from ractor.
524        let (actor_ref, _) = {
525            let address_book = AddressBook::builder().spawn().await.unwrap();
526            let endpoint = Endpoint::builder(address_book.clone())
527                .spawn()
528                .await
529                .unwrap();
530            let thread_pool = ThreadLocalActorSpawner::new();
531            let args = (GossipConfig::default(), address_book, endpoint);
532            GossipManager::spawn(None, args, thread_pool).await.unwrap()
533        };
534
535        let guard_1 = TopicDropGuard::new([1; 32].into(), actor_ref);
536        assert_eq!(guard_1.counter(), 1);
537        let _guard_2 = guard_1.clone();
538        assert_eq!(guard_1.counter(), 2);
539        let guard_3 = guard_1.clone();
540        assert_eq!(guard_1.counter(), 3);
541
542        drop(guard_3);
543        assert_eq!(guard_1.counter(), 2);
544    }
545}