p2panda_sync/manager/
event_stream.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::fmt::Debug;
4use std::hash::Hash as StdHash;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use futures::channel::mpsc;
9use futures::stream::SelectAll;
10use futures::{SinkExt, Stream, StreamExt};
11use p2panda_core::Extensions;
12use tokio_stream::wrappers::BroadcastStream;
13use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
14use tracing::debug;
15
16use crate::manager::{SessionStream, SessionTopicMap, ToTopicSync};
17use crate::protocols::TopicLogSyncEvent;
18use crate::{FromSync, ToSync};
19
20pub(crate) trait StreamDebug<Item>: Stream<Item = Item> + Send + Debug + 'static {}
21
22impl<T, Item> StreamDebug<Item> for T where T: Stream<Item = Item> + Send + Debug + 'static {}
23
24#[allow(clippy::type_complexity)]
25pub(crate) struct ManagerEventStreamState<T, E>
26where
27    T: Clone + Eq + StdHash + Send + 'static,
28    E: Extensions + Send + 'static,
29{
30    pub(crate) manager_rx: mpsc::Receiver<SessionStream<T, E>>,
31    pub(crate) session_rx_set:
32        SelectAll<Pin<Box<dyn StreamDebug<Option<FromSync<TopicLogSyncEvent<E>>>>>>>,
33    pub(crate) session_topic_map: SessionTopicMap<T, mpsc::Sender<ToTopicSync<E>>>,
34}
35
36type FutureOutput<T, E> = (
37    ManagerEventStreamState<T, E>,
38    Option<FromSync<TopicLogSyncEvent<E>>>,
39);
40
41/// Event stream for a manager returned from SyncManager::subscribe().
42///
43/// Calling `next_event` on the manager event stream returns the next event in the event queue
44/// (combined events of all running sync sessions). If the event contains an operation then it
45/// will be forwarded on to any concurrently running sync sessions.
46#[allow(clippy::type_complexity)]
47pub struct ManagerEventStream<T, E>
48where
49    T: Clone + Eq + StdHash + Send + 'static,
50    E: Extensions + Send + 'static,
51{
52    /// Stream state.
53    pub(crate) state: Option<ManagerEventStreamState<T, E>>,
54
55    /// The current future being polled.
56    pub(crate) pending: Option<Pin<Box<dyn Future<Output = FutureOutput<T, E>> + Send>>>,
57}
58
59impl<T, E> ManagerEventStream<T, E>
60where
61    T: Clone + Debug + Eq + StdHash + Send + 'static,
62    E: Extensions + Send + 'static,
63{
64    async fn next_event(
65        mut state: ManagerEventStreamState<T, E>,
66    ) -> (
67        ManagerEventStreamState<T, E>,
68        Option<FromSync<TopicLogSyncEvent<E>>>,
69    ) {
70        loop {
71            tokio::select!(
72                biased;
73                item = state.manager_rx.next() => {
74                    let Some(manager_event) = item else {
75                        debug!("manager event stream closed");
76                        return (state, None)
77                    };
78                    debug!("manager event received: {manager_event:?}");
79                    let session_id = manager_event.session_id;
80                    state.session_topic_map.insert_with_topic(session_id, manager_event.topic, manager_event.live_tx);
81
82                    let stream = BroadcastStream::new(manager_event.event_rx);
83
84                    let stream =
85                        Box::pin(stream.map(Box::new(
86                            move |event: Result<TopicLogSyncEvent<E>, BroadcastStreamRecvError>| {
87                                event.ok().map(|event| FromSync {
88                                    session_id,
89                                    remote: manager_event.remote,
90                                    event,
91                                })
92                            },
93                        )));
94                    state.session_rx_set.push(stream);
95                }
96                Some(Some(from_sync)) = state.session_rx_set.next() => {
97                    debug!("from sync event received: {from_sync:?}");
98                    let session_id = from_sync.session_id();
99                    let event = from_sync.event();
100
101                    let operation = match event {
102                        TopicLogSyncEvent::Operation(operation) => Some(*operation.clone()),
103                        _ => return (state, Some(from_sync)),
104                    };
105
106                    if let Some(operation) = operation {
107                        let Some(topic) = state.session_topic_map.topic(session_id) else {
108                            debug!("session {session_id} not found");
109                            state.session_topic_map.drop(session_id);
110                            continue;
111                        };
112                        let keys = state.session_topic_map.sessions(topic);
113                        let mut dropped = vec![];
114
115                        for id in keys {
116                            if id == session_id {
117                                continue;
118                            }
119
120                            let Some(tx) = state.session_topic_map.sender_mut(id) else {
121                                debug!("session {id} channel unexpectedly closed");
122                                state.session_topic_map.drop(session_id);
123                                continue;
124                            };
125
126                            let result = tx.send(ToSync::Payload(operation.clone())).await;
127
128                            if result.is_err() {
129                                debug!("failed sending message on session channel");
130                                dropped.push(id);
131                            }
132                        }
133
134                        for id in dropped {
135                            state.session_topic_map.drop(id);
136                        }
137                    }
138
139                    return (state, Some(from_sync))
140                }
141            )
142        }
143    }
144}
145
146impl<T, E> Unpin for ManagerEventStream<T, E>
147where
148    T: Clone + Debug + Eq + StdHash + Send + 'static,
149    E: Extensions + Send + 'static,
150{
151}
152
153impl<T, E> Stream for ManagerEventStream<T, E>
154where
155    T: Clone + Debug + Eq + StdHash + Send + 'static,
156    E: Extensions + Send + 'static,
157{
158    type Item = FromSync<TopicLogSyncEvent<E>>;
159
160    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161        if self.pending.is_none() {
162            let fut = Box::pin(ManagerEventStream::next_event(
163                self.state.take().expect("state is not None"),
164            ));
165            self.pending = Some(fut);
166        }
167
168        let fut = self.pending.as_mut().unwrap();
169        match fut.as_mut().poll(cx) {
170            Poll::Pending => Poll::Pending,
171            Poll::Ready((state, item)) => {
172                self.pending = None;
173                self.state.replace(state);
174                Poll::Ready(item)
175            }
176        }
177    }
178}