p2panda_net/sync/
handle.rs1use futures_util::{Stream, StreamExt};
4use p2panda_sync::FromSync;
5use ractor::{ActorRef, call};
6use thiserror::Error;
7use tokio::sync::broadcast;
8use tokio_stream::wrappers::BroadcastStream;
9use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
10
11use crate::TopicId;
12use crate::sync::actors::{ToSyncManager, ToTopicManager};
13
14pub struct SyncHandle<M, E>
18where
19 M: Clone + Send + 'static,
20 E: Clone + Send + 'static,
21{
22 topic: TopicId,
23 manager_ref: ActorRef<ToSyncManager<M, E>>,
24 topic_manager_ref: ActorRef<ToTopicManager<M>>,
25}
26
27impl<M, E> SyncHandle<M, E>
28where
29 M: Clone + Send + 'static,
30 E: Clone + Send + 'static,
31{
32 pub(crate) fn new(
33 topic: TopicId,
34 manager_ref: ActorRef<ToSyncManager<M, E>>,
35 topic_manager_ref: ActorRef<ToTopicManager<M>>,
36 ) -> Self {
37 Self {
38 topic,
39 manager_ref,
40 topic_manager_ref,
41 }
42 }
43
44 pub async fn publish(&self, data: M) -> Result<(), SyncHandleError<M, E>> {
46 self.topic_manager_ref
49 .send_message(ToTopicManager::Publish(data))
50 .map_err(Box::new)?;
51 Ok(())
52 }
53
54 pub async fn subscribe(&self) -> Result<SyncSubscription<E>, SyncHandleError<M, E>> {
59 if let Some(stream) =
60 call!(self.manager_ref, ToSyncManager::Subscribe, self.topic).map_err(Box::new)?
61 {
62 Ok(SyncSubscription::<E>::new(self.topic, stream))
63 } else {
64 Err(SyncHandleError::StreamNotFound)
65 }
66 }
67
68 pub fn topic(&self) -> TopicId {
70 self.topic
71 }
72
73 #[cfg(test)]
79 pub fn initiate_session(&self, node_id: crate::NodeId) {
80 self.manager_ref
81 .send_message(ToSyncManager::InitiateSync(self.topic, node_id))
82 .unwrap();
83 }
84}
85
86impl<M, E> Drop for SyncHandle<M, E>
87where
88 M: Clone + Send + 'static,
89 E: Clone + Send + 'static,
90{
91 fn drop(&mut self) {
92 let _ = self
94 .manager_ref
95 .send_message(ToSyncManager::Close(self.topic));
96 }
97}
98
99pub struct SyncSubscription<E> {
103 topic: TopicId,
104 from_sync_rx: BroadcastStream<FromSync<E>>,
106}
107
108impl<E> SyncSubscription<E>
109where
110 E: Clone + Send + 'static,
111{
112 pub(crate) fn new(topic: TopicId, from_sync_rx: broadcast::Receiver<FromSync<E>>) -> Self {
113 Self {
114 topic,
115 from_sync_rx: BroadcastStream::new(from_sync_rx),
116 }
117 }
118
119 pub fn topic(&self) -> TopicId {
121 self.topic
122 }
123}
124
125impl<E> Stream for SyncSubscription<E>
126where
127 E: Clone + Send + 'static,
128{
129 type Item = Result<FromSync<E>, BroadcastStreamRecvError>;
130
131 fn poll_next(
132 mut self: std::pin::Pin<&mut Self>,
133 cx: &mut std::task::Context<'_>,
134 ) -> std::task::Poll<Option<Self::Item>> {
135 self.from_sync_rx.poll_next_unpin(cx)
136 }
137}
138
139#[derive(Debug, Error)]
140pub enum SyncHandleError<M, E> {
141 #[error(transparent)]
143 ActorRpc(#[from] Box<ractor::RactorErr<ToSyncManager<M, E>>>),
144
145 #[error(transparent)]
146 Publish(#[from] Box<ractor::MessagingErr<ToTopicManager<M>>>),
147
148 #[error("no stream exists for the given topic")]
149 StreamNotFound,
150}