p2panda_net/sync/
handle.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use futures_util::{Stream, StreamExt};
4use p2panda_sync::FromSync;
5use ractor::{ActorRef, call};
6use thiserror::Error;
7use tokio::sync::broadcast;
8use tokio_stream::wrappers::BroadcastStream;
9use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
10
11use crate::TopicId;
12use crate::sync::actors::{ToSyncManager, ToTopicManager};
13
14/// Handle to a sync stream.
15///
16/// The stream can be used to publish messages or to request a subscription.
17pub struct SyncHandle<M, E>
18where
19    M: Clone + Send + 'static,
20    E: Clone + Send + 'static,
21{
22    topic: TopicId,
23    manager_ref: ActorRef<ToSyncManager<M, E>>,
24    topic_manager_ref: ActorRef<ToTopicManager<M>>,
25}
26
27impl<M, E> SyncHandle<M, E>
28where
29    M: Clone + Send + 'static,
30    E: Clone + Send + 'static,
31{
32    pub(crate) fn new(
33        topic: TopicId,
34        manager_ref: ActorRef<ToSyncManager<M, E>>,
35        topic_manager_ref: ActorRef<ToTopicManager<M>>,
36    ) -> Self {
37        Self {
38            topic,
39            manager_ref,
40            topic_manager_ref,
41        }
42    }
43
44    /// Publishes a message to the stream.
45    pub async fn publish(&self, data: M) -> Result<(), SyncHandleError<M, E>> {
46        // This would likely be a critical failure for this stream handle, since we are unable to
47        // send messages to the sync manager.
48        self.topic_manager_ref
49            .send_message(ToTopicManager::Publish(data))
50            .map_err(Box::new)?;
51        Ok(())
52    }
53
54    /// Subscribes to the stream.
55    ///
56    /// The returned `SyncSubscription` provides a means of receiving messages from
57    /// the stream.
58    pub async fn subscribe(&self) -> Result<SyncSubscription<E>, SyncHandleError<M, E>> {
59        if let Some(stream) =
60            call!(self.manager_ref, ToSyncManager::Subscribe, self.topic).map_err(Box::new)?
61        {
62            Ok(SyncSubscription::<E>::new(self.topic, stream))
63        } else {
64            Err(SyncHandleError::StreamNotFound)
65        }
66    }
67
68    /// Returns the topic of the stream.
69    pub fn topic(&self) -> TopicId {
70        self.topic
71    }
72
73    /// Manually starts sync session with given node.
74    ///
75    /// If there's no transport information for this node this action will fail.
76    // TODO: Consider making this public, for this we would need to decide if we want to receive
77    // the sync session events and status directly as a stream from the return type?
78    #[cfg(test)]
79    pub fn initiate_session(&self, node_id: crate::NodeId) {
80        self.manager_ref
81            .send_message(ToSyncManager::InitiateSync(self.topic, node_id))
82            .unwrap();
83    }
84}
85
86impl<M, E> Drop for SyncHandle<M, E>
87where
88    M: Clone + Send + 'static,
89    E: Clone + Send + 'static,
90{
91    fn drop(&mut self) {
92        // Ignore error here as the actor might already be dropped.
93        let _ = self
94            .manager_ref
95            .send_message(ToSyncManager::Close(self.topic));
96    }
97}
98
99/// Handle to a sync subscription.
100///
101/// The stream can be used to receive messages from the stream.
102pub struct SyncSubscription<E> {
103    topic: TopicId,
104    // Messages sent directly from the topic manager.
105    from_sync_rx: BroadcastStream<FromSync<E>>,
106}
107
108impl<E> SyncSubscription<E>
109where
110    E: Clone + Send + 'static,
111{
112    pub(crate) fn new(topic: TopicId, from_sync_rx: broadcast::Receiver<FromSync<E>>) -> Self {
113        Self {
114            topic,
115            from_sync_rx: BroadcastStream::new(from_sync_rx),
116        }
117    }
118
119    /// Returns the topic of the stream.
120    pub fn topic(&self) -> TopicId {
121        self.topic
122    }
123}
124
125impl<E> Stream for SyncSubscription<E>
126where
127    E: Clone + Send + 'static,
128{
129    type Item = Result<FromSync<E>, BroadcastStreamRecvError>;
130
131    fn poll_next(
132        mut self: std::pin::Pin<&mut Self>,
133        cx: &mut std::task::Context<'_>,
134    ) -> std::task::Poll<Option<Self::Item>> {
135        self.from_sync_rx.poll_next_unpin(cx)
136    }
137}
138
139#[derive(Debug, Error)]
140pub enum SyncHandleError<M, E> {
141    /// Messaging with internal actor via RPC failed.
142    #[error(transparent)]
143    ActorRpc(#[from] Box<ractor::RactorErr<ToSyncManager<M, E>>>),
144
145    #[error(transparent)]
146    Publish(#[from] Box<ractor::MessagingErr<ToTopicManager<M>>>),
147
148    #[error("no stream exists for the given topic")]
149    StreamNotFound,
150}