p2panda_net/gossip/api.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::AtomicIsize;
6
7use futures_util::{Stream, StreamExt};
8use p2panda_discovery::address_book::NodeInfo as _;
9use ractor::{ActorRef, call};
10use thiserror::Error;
11use tokio::sync::{RwLock, broadcast, mpsc};
12use tokio_stream::wrappers::BroadcastStream;
13use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
14
15use crate::address_book::{AddressBook, AddressBookError};
16use crate::gossip::actors::ToGossipManager;
17use crate::gossip::builder::Builder;
18use crate::gossip::events::GossipEvent;
19use crate::iroh_endpoint::Endpoint;
20use crate::{NodeId, TopicId};
21
22/// Mapping of topic to the associated sender channels for getting messages into and out of the
23/// gossip overlay.
24type GossipSenders = HashMap<
25 TopicId,
26 (
27 mpsc::Sender<Vec<u8>>,
28 broadcast::Sender<Vec<u8>>,
29 TopicDropGuard,
30 ),
31>;
32
33/// Gossip protocol to broadcast ephemeral messages to all online nodes interested in the same
34/// topic.
35///
36/// ## Example
37///
38/// ```rust
39/// # use std::error::Error;
40/// #
41/// # #[tokio::main]
42/// # async fn main() -> Result<(), Box<dyn Error>> {
43/// # use futures_util::StreamExt;
44/// # use p2panda_net::{AddressBook, Discovery, Endpoint, MdnsDiscovery, Gossip};
45/// # let address_book = AddressBook::builder().spawn().await?;
46/// # let endpoint = Endpoint::builder(address_book.clone())
47/// # .spawn()
48/// # .await?;
49/// #
50/// // Gossip uses the address book to watch for nodes interested in the same topic.
51/// let gossip = Gossip::builder(address_book, endpoint).spawn().await?;
52///
53/// // Join overlay with given topic.
54/// let handle = gossip.stream([1; 32]).await?;
55///
56/// // Publish a message.
57/// handle.publish(b"Hello, Panda!").await?;
58///
59/// // Subscribe to messages.
60/// let mut rx = handle.subscribe();
61///
62/// tokio::spawn(async move {
63/// while let Some(Ok(_bytes)) = rx.next().await {
64/// // ..
65/// }
66/// });
67/// #
68/// # Ok(())
69/// # }
70/// ```
71///
72/// ## Ephemeral Messaging
73///
74/// These unreliable “ephemeral” streams are intended to be used for relatively short-lived
75/// messages without persistence and catch-up of past state, for example for "Awareness" or
76/// "Presence" features. In most cases, messages will only be received if they were published after
77/// the subscription was created.
78///
79/// Use [`LogSync`](crate::LogSync) if you wish to receive messages even after being offline for
80/// guaranteed eventual consistency.
81///
82///
83/// ## Self-healing overlay
84///
85/// Gossip-based broadcast overlays rely on membership protocols like [HyParView] which do not heal
86/// from network fragmentation caused, for example, by bootstrap nodes going offline.
87///
88/// `p2panda-net` uses it's additional, confidential topic discovery layer in
89/// [`Discovery`](crate::Discovery) to automatically heal these partitions. Whenever possible, it
90/// allows nodes a higher chance to connect to every participating node, thereby decentralising the
91/// entrance points into the network.
92///
93/// [HyParView]: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
94///
95/// ## Topic Discovery
96///
97/// For gossip to function correctly we need to inform it about discovered nodes who are interested
98/// in the same topic. Check out the [`Discovery`](crate::Discovery) module for more information.
99#[derive(Clone)]
100pub struct Gossip {
101 my_node_id: NodeId,
102 address_book: AddressBook,
103 inner: Arc<RwLock<Inner>>,
104 senders: Arc<RwLock<GossipSenders>>,
105}
106
107struct Inner {
108 actor_ref: ActorRef<ToGossipManager>,
109}
110
111impl Gossip {
112 pub(crate) fn new(
113 actor_ref: ActorRef<ToGossipManager>,
114 my_node_id: NodeId,
115 address_book: AddressBook,
116 ) -> Self {
117 Self {
118 my_node_id,
119 address_book,
120 inner: Arc::new(RwLock::new(Inner { actor_ref })),
121 senders: Arc::new(RwLock::new(HashMap::new())),
122 }
123 }
124
125 pub fn builder(address_book: AddressBook, endpoint: Endpoint) -> Builder {
126 Builder::new(address_book, endpoint)
127 }
128
129 /// Join gossip overlay for this topic and return a handle to publish messages to it or receive
130 /// messages from the network.
131 pub async fn stream(&self, topic: TopicId) -> Result<GossipHandle, GossipError> {
132 // Check if there's already a handle for this topic and clone it.
133 //
134 // If this handle exists but the topic counter is zero we know that all previous handles
135 // have been dropped and we didn't clean up yet. In this case we'll ignore the existing
136 // entry in "senders" and continue to create a new gossip session, overwriting the "dead"
137 // entries.
138 if let Some((to_gossip_tx, from_gossip_tx, guard)) = self.senders.read().await.get(&topic)
139 && guard.counter.load(std::sync::atomic::Ordering::SeqCst) > 0
140 {
141 return Ok(GossipHandle::new(
142 topic,
143 to_gossip_tx.clone(),
144 from_gossip_tx.clone(),
145 guard.clone(),
146 ));
147 }
148
149 // If there's no active handle for this topic we join the overlay from scratch.
150 let inner = self.inner.read().await;
151
152 // This guard counts the number of active handles and subscriptions for this topic. Like
153 // this we can determine if we can leave the overlay.
154 let guard = TopicDropGuard {
155 topic,
156 // Since the counter increments by 1 on each clone and we don't want to count cloning
157 // the guard into "senders", we start at -1 here.
158 counter: Arc::new(AtomicIsize::new(-1)),
159 actor_ref: inner.actor_ref.clone(),
160 };
161
162 let node_ids = {
163 let node_infos = self.address_book.node_infos_by_topics([topic]).await?;
164 node_infos
165 .iter()
166 .filter_map(|info| {
167 // Remove ourselves from list.
168 let node_id = info.id();
169 if node_id != self.my_node_id {
170 Some(node_id)
171 } else {
172 None
173 }
174 })
175 .collect()
176 };
177
178 // Register a new session with the gossip actor.
179 let (to_gossip_tx, from_gossip_tx) =
180 call!(inner.actor_ref, ToGossipManager::Subscribe, topic, node_ids)
181 .map_err(Box::new)?;
182
183 // Store the gossip senders.
184 //
185 // `from_gossip_tx` is used to create a broadcast receiver when the user calls
186 // `subscribe()` on `GossipHandle`.
187 let mut senders = self.senders.write().await;
188 senders.insert(
189 topic,
190 (to_gossip_tx.clone(), from_gossip_tx.clone(), guard.clone()),
191 );
192
193 Ok(GossipHandle::new(
194 topic,
195 to_gossip_tx,
196 from_gossip_tx,
197 guard,
198 ))
199 }
200
201 /// Subscribe to system events.
202 pub async fn events(&self) -> Result<broadcast::Receiver<GossipEvent>, GossipError> {
203 let inner = self.inner.read().await;
204 let result = call!(inner.actor_ref, ToGossipManager::Events).map_err(Box::new)?;
205 Ok(result)
206 }
207}
208
209impl Drop for Inner {
210 fn drop(&mut self) {
211 // Stop actor after all references (Gossip, GossipHandle, GossipSubscription) have dropped.
212 self.actor_ref.stop(None);
213 }
214}
215
216#[derive(Debug, Error)]
217pub enum GossipError {
218 /// Spawning the internal actor failed.
219 #[error(transparent)]
220 ActorSpawn(#[from] ractor::SpawnErr),
221
222 /// Messaging with internal actor via RPC failed.
223 #[error(transparent)]
224 ActorRpc(#[from] Box<ractor::RactorErr<ToGossipManager>>),
225
226 #[error(transparent)]
227 AddressBook(#[from] AddressBookError),
228}
229
230/// Handle for publishing ephemeral messages into the gossip overlay and receiving from the
231/// network for a specific topic.
232#[derive(Clone)]
233pub struct GossipHandle {
234 topic: TopicId,
235 to_topic_tx: mpsc::Sender<Vec<u8>>,
236 from_gossip_tx: broadcast::Sender<Vec<u8>>,
237 _guard: TopicDropGuard,
238}
239
240impl GossipHandle {
241 fn new(
242 topic: TopicId,
243 to_topic_tx: mpsc::Sender<Vec<u8>>,
244 from_gossip_tx: broadcast::Sender<Vec<u8>>,
245 _guard: TopicDropGuard,
246 ) -> Self {
247 Self {
248 topic,
249 to_topic_tx,
250 from_gossip_tx,
251 _guard,
252 }
253 }
254
255 /// Publishes a message to the stream.
256 pub async fn publish(
257 &self,
258 bytes: impl Into<Vec<u8>>,
259 ) -> Result<(), mpsc::error::SendError<Vec<u8>>> {
260 self.to_topic_tx.send(bytes.into()).await
261 }
262
263 /// Subscribes to the stream.
264 ///
265 /// The returned [`GossipSubscription`] provides a means of receiving messages from the
266 /// stream.
267 pub fn subscribe(&self) -> GossipSubscription {
268 GossipSubscription::new(
269 self.topic,
270 self.from_gossip_tx.subscribe(),
271 self._guard.clone(),
272 )
273 }
274
275 /// Returns the topic of the stream.
276 pub fn topic(&self) -> TopicId {
277 self.topic
278 }
279}
280
281/// A handle to an ephemeral messaging stream subscription.
282///
283/// The stream can be used to receive messages from the stream.
284pub struct GossipSubscription {
285 topic: TopicId,
286 from_topic_rx: BroadcastStream<Vec<u8>>,
287 _guard: TopicDropGuard,
288}
289
290impl GossipSubscription {
291 /// Returns a handle to an ephemeral messaging stream subscriber.
292 fn new(
293 topic: TopicId,
294 from_topic_rx: broadcast::Receiver<Vec<u8>>,
295 _guard: TopicDropGuard,
296 ) -> Self {
297 Self {
298 topic,
299 from_topic_rx: BroadcastStream::new(from_topic_rx),
300 _guard,
301 }
302 }
303
304 /// Returns the topic of the stream.
305 pub fn topic(&self) -> TopicId {
306 self.topic
307 }
308}
309
310impl Stream for GossipSubscription {
311 type Item = Result<Vec<u8>, BroadcastStreamRecvError>;
312
313 fn poll_next(
314 mut self: std::pin::Pin<&mut Self>,
315 cx: &mut std::task::Context<'_>,
316 ) -> std::task::Poll<Option<Self::Item>> {
317 self.from_topic_rx.poll_next_unpin(cx)
318 }
319}
320
321/// Helper maintaining a counter of objects using the same topic.
322///
323/// Check if we can unsubscribe from topic if all handles and subscriptions have been dropped for
324/// it.
325struct TopicDropGuard {
326 topic: TopicId,
327 counter: Arc<AtomicIsize>,
328 actor_ref: ActorRef<ToGossipManager>,
329}
330
331impl Clone for TopicDropGuard {
332 fn clone(&self) -> Self {
333 self.counter
334 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
335
336 Self {
337 topic: self.topic,
338 counter: self.counter.clone(),
339 actor_ref: self.actor_ref.clone(),
340 }
341 }
342}
343
344impl Drop for TopicDropGuard {
345 fn drop(&mut self) {
346 let actor_ref = self.actor_ref.clone();
347
348 // Check if we can unsubscribe from topic if all handles and subscriptions have been
349 // dropped for it.
350 let previous_counter = self
351 .counter
352 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
353
354 // If this is 1 the last instance of the guard was dropped and the counter is now at zero.
355 if previous_counter <= 1 {
356 // Ignore this error, it could be that the actor has already stopped.
357 let _ = actor_ref.send_message(ToGossipManager::Unsubscribe(self.topic));
358 }
359 }
360}