p2panda_net/gossip/
api.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::AtomicIsize;
6
7use futures_util::{Stream, StreamExt};
8use p2panda_discovery::address_book::NodeInfo as _;
9use ractor::{ActorRef, call};
10use thiserror::Error;
11use tokio::sync::{RwLock, broadcast, mpsc};
12use tokio_stream::wrappers::BroadcastStream;
13use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
14
15use crate::address_book::{AddressBook, AddressBookError};
16use crate::gossip::actors::ToGossipManager;
17use crate::gossip::builder::Builder;
18use crate::gossip::events::GossipEvent;
19use crate::iroh_endpoint::Endpoint;
20use crate::{NodeId, TopicId};
21
22/// Mapping of topic to the associated sender channels for getting messages into and out of the
23/// gossip overlay.
24type GossipSenders = HashMap<
25    TopicId,
26    (
27        mpsc::Sender<Vec<u8>>,
28        broadcast::Sender<Vec<u8>>,
29        TopicDropGuard,
30    ),
31>;
32
33/// Gossip protocol to broadcast ephemeral messages to all online nodes interested in the same
34/// topic.
35///
36/// ## Example
37///
38/// ```rust
39/// # use std::error::Error;
40/// #
41/// # #[tokio::main]
42/// # async fn main() -> Result<(), Box<dyn Error>> {
43/// # use futures_util::StreamExt;
44/// # use p2panda_net::{AddressBook, Discovery, Endpoint, MdnsDiscovery, Gossip};
45/// # let address_book = AddressBook::builder().spawn().await?;
46/// # let endpoint = Endpoint::builder(address_book.clone())
47/// #     .spawn()
48/// #     .await?;
49/// #
50/// // Gossip uses the address book to watch for nodes interested in the same topic.
51/// let gossip = Gossip::builder(address_book, endpoint).spawn().await?;
52///
53/// // Join overlay with given topic.
54/// let handle = gossip.stream([1; 32]).await?;
55///
56/// // Publish a message.
57/// handle.publish(b"Hello, Panda!").await?;
58///
59/// // Subscribe to messages.
60/// let mut rx = handle.subscribe();
61///
62/// tokio::spawn(async move {
63///     while let Some(Ok(_bytes)) = rx.next().await {
64///         // ..
65///     }
66/// });
67/// #
68/// # Ok(())
69/// # }
70/// ```
71///
72/// ## Ephemeral Messaging
73///
74/// These unreliable “ephemeral” streams are intended to be used for relatively short-lived
75/// messages without persistence and catch-up of past state, for example for "Awareness" or
76/// "Presence" features. In most cases, messages will only be received if they were published after
77/// the subscription was created.
78///
79/// Use [`LogSync`](crate::LogSync) if you wish to receive messages even after being offline for
80/// guaranteed eventual consistency.
81///
82///
83/// ## Self-healing overlay
84///
85/// Gossip-based broadcast overlays rely on membership protocols like [HyParView] which do not heal
86/// from network fragmentation caused, for example, by bootstrap nodes going offline.
87///
88/// `p2panda-net` uses it's additional, confidential topic discovery layer in
89/// [`Discovery`](crate::Discovery) to automatically heal these partitions. Whenever possible, it
90/// allows nodes a higher chance to connect to every participating node, thereby decentralising the
91/// entrance points into the network.
92///
93/// [HyParView]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
94///
95/// ## Topic Discovery
96///
97/// For gossip to function correctly we need to inform it about discovered nodes who are interested
98/// in the same topic. Check out the [`Discovery`](crate::Discovery) module for more information.
99#[derive(Clone)]
100pub struct Gossip {
101    my_node_id: NodeId,
102    address_book: AddressBook,
103    inner: Arc<RwLock<Inner>>,
104    senders: Arc<RwLock<GossipSenders>>,
105}
106
107struct Inner {
108    actor_ref: ActorRef<ToGossipManager>,
109}
110
111impl Gossip {
112    pub(crate) fn new(
113        actor_ref: ActorRef<ToGossipManager>,
114        my_node_id: NodeId,
115        address_book: AddressBook,
116    ) -> Self {
117        Self {
118            my_node_id,
119            address_book,
120            inner: Arc::new(RwLock::new(Inner { actor_ref })),
121            senders: Arc::new(RwLock::new(HashMap::new())),
122        }
123    }
124
125    pub fn builder(address_book: AddressBook, endpoint: Endpoint) -> Builder {
126        Builder::new(address_book, endpoint)
127    }
128
129    /// Join gossip overlay for this topic and return a handle to publish messages to it or receive
130    /// messages from the network.
131    pub async fn stream(&self, topic: TopicId) -> Result<GossipHandle, GossipError> {
132        // Check if there's already a handle for this topic and clone it.
133        //
134        // If this handle exists but the topic counter is zero we know that all previous handles
135        // have been dropped and we didn't clean up yet. In this case we'll ignore the existing
136        // entry in "senders" and continue to create a new gossip session, overwriting the "dead"
137        // entries.
138        if let Some((to_gossip_tx, from_gossip_tx, guard)) = self.senders.read().await.get(&topic)
139            && guard.counter.load(std::sync::atomic::Ordering::SeqCst) > 0
140        {
141            return Ok(GossipHandle::new(
142                topic,
143                to_gossip_tx.clone(),
144                from_gossip_tx.clone(),
145                guard.clone(),
146            ));
147        }
148
149        // If there's no active handle for this topic we join the overlay from scratch.
150        let inner = self.inner.read().await;
151
152        // This guard counts the number of active handles and subscriptions for this topic. Like
153        // this we can determine if we can leave the overlay.
154        let guard = TopicDropGuard {
155            topic,
156            // Since the counter increments by 1 on each clone and we don't want to count cloning
157            // the guard into "senders", we start at -1 here.
158            counter: Arc::new(AtomicIsize::new(-1)),
159            actor_ref: inner.actor_ref.clone(),
160        };
161
162        let node_ids = {
163            let node_infos = self.address_book.node_infos_by_topics([topic]).await?;
164            node_infos
165                .iter()
166                .filter_map(|info| {
167                    // Remove ourselves from list.
168                    let node_id = info.id();
169                    if node_id != self.my_node_id {
170                        Some(node_id)
171                    } else {
172                        None
173                    }
174                })
175                .collect()
176        };
177
178        // Register a new session with the gossip actor.
179        let (to_gossip_tx, from_gossip_tx) =
180            call!(inner.actor_ref, ToGossipManager::Subscribe, topic, node_ids)
181                .map_err(Box::new)?;
182
183        // Store the gossip senders.
184        //
185        // `from_gossip_tx` is used to create a broadcast receiver when the user calls
186        // `subscribe()` on `GossipHandle`.
187        let mut senders = self.senders.write().await;
188        senders.insert(
189            topic,
190            (to_gossip_tx.clone(), from_gossip_tx.clone(), guard.clone()),
191        );
192
193        Ok(GossipHandle::new(
194            topic,
195            to_gossip_tx,
196            from_gossip_tx,
197            guard,
198        ))
199    }
200
201    /// Subscribe to system events.
202    pub async fn events(&self) -> Result<broadcast::Receiver<GossipEvent>, GossipError> {
203        let inner = self.inner.read().await;
204        let result = call!(inner.actor_ref, ToGossipManager::Events).map_err(Box::new)?;
205        Ok(result)
206    }
207}
208
209impl Drop for Inner {
210    fn drop(&mut self) {
211        // Stop actor after all references (Gossip, GossipHandle, GossipSubscription) have dropped.
212        self.actor_ref.stop(None);
213    }
214}
215
216#[derive(Debug, Error)]
217pub enum GossipError {
218    /// Spawning the internal actor failed.
219    #[error(transparent)]
220    ActorSpawn(#[from] ractor::SpawnErr),
221
222    /// Messaging with internal actor via RPC failed.
223    #[error(transparent)]
224    ActorRpc(#[from] Box<ractor::RactorErr<ToGossipManager>>),
225
226    #[error(transparent)]
227    AddressBook(#[from] AddressBookError),
228}
229
230/// Handle for publishing ephemeral messages into the gossip overlay and receiving from the
231/// network for a specific topic.
232#[derive(Clone)]
233pub struct GossipHandle {
234    topic: TopicId,
235    to_topic_tx: mpsc::Sender<Vec<u8>>,
236    from_gossip_tx: broadcast::Sender<Vec<u8>>,
237    _guard: TopicDropGuard,
238}
239
240impl GossipHandle {
241    fn new(
242        topic: TopicId,
243        to_topic_tx: mpsc::Sender<Vec<u8>>,
244        from_gossip_tx: broadcast::Sender<Vec<u8>>,
245        _guard: TopicDropGuard,
246    ) -> Self {
247        Self {
248            topic,
249            to_topic_tx,
250            from_gossip_tx,
251            _guard,
252        }
253    }
254
255    /// Publishes a message to the stream.
256    pub async fn publish(
257        &self,
258        bytes: impl Into<Vec<u8>>,
259    ) -> Result<(), mpsc::error::SendError<Vec<u8>>> {
260        self.to_topic_tx.send(bytes.into()).await
261    }
262
263    /// Subscribes to the stream.
264    ///
265    /// The returned [`GossipSubscription`] provides a means of receiving messages from the
266    /// stream.
267    pub fn subscribe(&self) -> GossipSubscription {
268        GossipSubscription::new(
269            self.topic,
270            self.from_gossip_tx.subscribe(),
271            self._guard.clone(),
272        )
273    }
274
275    /// Returns the topic of the stream.
276    pub fn topic(&self) -> TopicId {
277        self.topic
278    }
279}
280
281/// A handle to an ephemeral messaging stream subscription.
282///
283/// The stream can be used to receive messages from the stream.
284pub struct GossipSubscription {
285    topic: TopicId,
286    from_topic_rx: BroadcastStream<Vec<u8>>,
287    _guard: TopicDropGuard,
288}
289
290impl GossipSubscription {
291    /// Returns a handle to an ephemeral messaging stream subscriber.
292    fn new(
293        topic: TopicId,
294        from_topic_rx: broadcast::Receiver<Vec<u8>>,
295        _guard: TopicDropGuard,
296    ) -> Self {
297        Self {
298            topic,
299            from_topic_rx: BroadcastStream::new(from_topic_rx),
300            _guard,
301        }
302    }
303
304    /// Returns the topic of the stream.
305    pub fn topic(&self) -> TopicId {
306        self.topic
307    }
308}
309
310impl Stream for GossipSubscription {
311    type Item = Result<Vec<u8>, BroadcastStreamRecvError>;
312
313    fn poll_next(
314        mut self: std::pin::Pin<&mut Self>,
315        cx: &mut std::task::Context<'_>,
316    ) -> std::task::Poll<Option<Self::Item>> {
317        self.from_topic_rx.poll_next_unpin(cx)
318    }
319}
320
321/// Helper maintaining a counter of objects using the same topic.
322///
323/// Check if we can unsubscribe from topic if all handles and subscriptions have been dropped for
324/// it.
325struct TopicDropGuard {
326    topic: TopicId,
327    counter: Arc<AtomicIsize>,
328    actor_ref: ActorRef<ToGossipManager>,
329}
330
331impl Clone for TopicDropGuard {
332    fn clone(&self) -> Self {
333        self.counter
334            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
335
336        Self {
337            topic: self.topic,
338            counter: self.counter.clone(),
339            actor_ref: self.actor_ref.clone(),
340        }
341    }
342}
343
344impl Drop for TopicDropGuard {
345    fn drop(&mut self) {
346        let actor_ref = self.actor_ref.clone();
347
348        // Check if we can unsubscribe from topic if all handles and subscriptions have been
349        // dropped for it.
350        let previous_counter = self
351            .counter
352            .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
353
354        // If this is 1 the last instance of the guard was dropped and the counter is now at zero.
355        if previous_counter <= 1 {
356            // Ignore this error, it could be that the actor has already stopped.
357            let _ = actor_ref.send_message(ToGossipManager::Unsubscribe(self.topic));
358        }
359    }
360}