p2panda_sync/manager/
event_stream.rs1use std::fmt::Debug;
4use std::hash::Hash as StdHash;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use futures::channel::mpsc;
9use futures::stream::SelectAll;
10use futures::{SinkExt, Stream, StreamExt};
11use p2panda_core::Extensions;
12use tokio_stream::wrappers::BroadcastStream;
13use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
14use tracing::debug;
15
16use crate::manager::{SessionStream, SessionTopicMap, ToTopicSync};
17use crate::protocols::TopicLogSyncEvent;
18use crate::{FromSync, ToSync};
19
20pub(crate) trait StreamDebug<Item>: Stream<Item = Item> + Send + Debug + 'static {}
21
22impl<T, Item> StreamDebug<Item> for T where T: Stream<Item = Item> + Send + Debug + 'static {}
23
24#[allow(clippy::type_complexity)]
25pub(crate) struct ManagerEventStreamState<T, E>
26where
27 T: Clone + Eq + StdHash + Send + 'static,
28 E: Extensions + Send + 'static,
29{
30 pub(crate) manager_rx: mpsc::Receiver<SessionStream<T, E>>,
31 pub(crate) session_rx_set:
32 SelectAll<Pin<Box<dyn StreamDebug<Option<FromSync<TopicLogSyncEvent<E>>>>>>>,
33 pub(crate) session_topic_map: SessionTopicMap<T, mpsc::Sender<ToTopicSync<E>>>,
34}
35
36type FutureOutput<T, E> = (
37 ManagerEventStreamState<T, E>,
38 Option<FromSync<TopicLogSyncEvent<E>>>,
39);
40
41#[allow(clippy::type_complexity)]
47pub struct ManagerEventStream<T, E>
48where
49 T: Clone + Eq + StdHash + Send + 'static,
50 E: Extensions + Send + 'static,
51{
52 pub(crate) state: Option<ManagerEventStreamState<T, E>>,
54
55 pub(crate) pending: Option<Pin<Box<dyn Future<Output = FutureOutput<T, E>> + Send>>>,
57}
58
59impl<T, E> ManagerEventStream<T, E>
60where
61 T: Clone + Debug + Eq + StdHash + Send + 'static,
62 E: Extensions + Send + 'static,
63{
64 async fn next_event(
65 mut state: ManagerEventStreamState<T, E>,
66 ) -> (
67 ManagerEventStreamState<T, E>,
68 Option<FromSync<TopicLogSyncEvent<E>>>,
69 ) {
70 loop {
71 tokio::select!(
72 biased;
73 item = state.manager_rx.next() => {
74 let Some(manager_event) = item else {
75 debug!("manager event stream closed");
76 return (state, None)
77 };
78 debug!("manager event received: {manager_event:?}");
79 let session_id = manager_event.session_id;
80 state.session_topic_map.insert_with_topic(session_id, manager_event.topic, manager_event.live_tx);
81
82 let stream = BroadcastStream::new(manager_event.event_rx);
83
84 let stream =
85 Box::pin(stream.map(Box::new(
86 move |event: Result<TopicLogSyncEvent<E>, BroadcastStreamRecvError>| {
87 event.ok().map(|event| FromSync {
88 session_id,
89 remote: manager_event.remote,
90 event,
91 })
92 },
93 )));
94 state.session_rx_set.push(stream);
95 }
96 Some(Some(from_sync)) = state.session_rx_set.next() => {
97 debug!("from sync event received: {from_sync:?}");
98 let session_id = from_sync.session_id();
99 let event = from_sync.event();
100
101 let operation = match event {
102 TopicLogSyncEvent::Operation(operation) => Some(*operation.clone()),
103 _ => return (state, Some(from_sync)),
104 };
105
106 if let Some(operation) = operation {
107 let Some(topic) = state.session_topic_map.topic(session_id) else {
108 debug!("session {session_id} not found");
109 state.session_topic_map.drop(session_id);
110 continue;
111 };
112 let keys = state.session_topic_map.sessions(topic);
113 let mut dropped = vec![];
114
115 for id in keys {
116 if id == session_id {
117 continue;
118 }
119
120 let Some(tx) = state.session_topic_map.sender_mut(id) else {
121 debug!("session {id} channel unexpectedly closed");
122 state.session_topic_map.drop(session_id);
123 continue;
124 };
125
126 let result = tx.send(ToSync::Payload(operation.clone())).await;
127
128 if result.is_err() {
129 debug!("failed sending message on session channel");
130 dropped.push(id);
131 }
132 }
133
134 for id in dropped {
135 state.session_topic_map.drop(id);
136 }
137 }
138
139 return (state, Some(from_sync))
140 }
141 )
142 }
143 }
144}
145
146impl<T, E> Unpin for ManagerEventStream<T, E>
147where
148 T: Clone + Debug + Eq + StdHash + Send + 'static,
149 E: Extensions + Send + 'static,
150{
151}
152
153impl<T, E> Stream for ManagerEventStream<T, E>
154where
155 T: Clone + Debug + Eq + StdHash + Send + 'static,
156 E: Extensions + Send + 'static,
157{
158 type Item = FromSync<TopicLogSyncEvent<E>>;
159
160 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161 if self.pending.is_none() {
162 let fut = Box::pin(ManagerEventStream::next_event(
163 self.state.take().expect("state is not None"),
164 ));
165 self.pending = Some(fut);
166 }
167
168 let fut = self.pending.as_mut().unwrap();
169 match fut.as_mut().poll(cx) {
170 Poll::Pending => Poll::Pending,
171 Poll::Ready((state, item)) => {
172 self.pending = None;
173 self.state.replace(state);
174 Poll::Ready(item)
175 }
176 }
177 }
178}