simploxide_core/
dispatcher.rs

1//! Event dispatcher task.
2
3use std::{sync::Arc, task::Poll};
4
5use crate::{WsIn, router::ResponseRouter};
6use futures::{Stream, StreamExt};
7use tokio::sync::mpsc;
8use tokio_tungstenite::tungstenite::Message;
9use tokio_util::sync::CancellationToken;
10
11use super::{Event, RequestId, Result};
12
13type EventSender = mpsc::UnboundedSender<Result<Event>>;
14pub type EventReceiver = mpsc::UnboundedReceiver<Result<Event>>;
15
16pub fn init(ws_in: WsIn, router: ResponseRouter, token: CancellationToken) -> EventQueue {
17    let (events_tx, receiver) = mpsc::unbounded_channel::<Result<Event>>();
18    tokio::spawn(event_dispatcher_task(ws_in, events_tx, router, token));
19
20    EventQueue { receiver }
21}
22
23/// An event queue buffers events if you're not actively processing them so it's recommended to
24/// drop it as soon as it no longer needed.
25pub struct EventQueue {
26    receiver: EventReceiver,
27}
28
29impl EventQueue {
30    /// Can return a SimpleX event or a [`tokio_tungstenite::tungstenite::Error`] if a connection is dropped due to a
31    /// web socket failure. SimpleX events can themselves represent SimpleX errors but recognizing
32    /// and handling them them is a task of the upstream code.
33    pub async fn next_event(&mut self) -> Option<Result<Event>> {
34        self.receiver.recv().await
35    }
36
37    /// Get the underlying tokio unbounded receiver that enables more complicated use cases.
38    pub fn into_receiver(self) -> EventReceiver {
39        self.receiver
40    }
41}
42
43async fn event_dispatcher_task(
44    mut ws_in: WsIn,
45    mut event_queue: EventSender,
46    router: ResponseRouter,
47    token: CancellationToken,
48) {
49    loop {
50        tokio::select! {
51            ev = ws_in.next() => {
52                match ev {
53                    Some(Ok(msg)) => {
54                        process_raw_event(Some(&router), &mut event_queue, msg);
55                    }
56                    Some(Err(e)) => {
57                        let e = Arc::new(e);
58                        let _ = event_queue.send(Err(Arc::clone(&e)));
59                        router.shutdown(e);
60
61                        break;
62                    }
63                    None => unreachable!("Must receive an error before connection drops")
64
65                }
66            }
67            // Can get cancelled only after router task completion.
68            _ = token.cancelled() => {
69                // Processing buffered events
70                let mut ws_in = Closed(ws_in);
71                while let Some(ev) = ws_in.next().await {
72                    match ev {
73                        Ok(msg) => {
74                            process_raw_event(None, &mut event_queue, msg);
75                        }
76                        Err(e) => {
77                            let _ = event_queue.send(Err(Arc::new(e)));
78                            break;
79                        }
80                    }
81                }
82
83                break;
84            }
85        }
86    }
87
88    log::debug!("Dispatcher task finished");
89}
90
91/// Parse the top level JSON and either route event to the `event_queue` or deliver a response by
92/// `corrId` via the `router`.
93///
94/// TODO: `Option<&Router>` was added to reuse code in a branch that handles the interruption
95/// event. In this case all buffered events can only be sent to the `event_queue`. This could be
96/// refactored to look less hacky.
97fn process_raw_event(router: Option<&ResponseRouter>, event_queue: &mut EventSender, msg: Message) {
98    let mut json: serde_json::Value = match msg {
99        Message::Text(txt) => serde_json::from_str(&txt).expect("Server sends a valid JSON"),
100        unexpected => {
101            log::warn!("Ignoring event in unexpecetd format: {unexpected:#?}");
102            return;
103        }
104    };
105
106    let corr_id = json["corrId"].take();
107
108    if !corr_id.is_null() {
109        let id: RequestId = corr_id.as_str().unwrap().parse().unwrap();
110        let response = json["resp"].take();
111        assert!(!response.is_null(), "Server sends a valid resp field");
112
113        if let Some(router) = router {
114            router.deliver(id, response);
115        } else {
116            log::warn!("Dropping response: {response}\nBecause router task already finished");
117        }
118    } else {
119        let event = json["resp"].take();
120        // The client may choose to drop the event queue to stop buffering events. This is an
121        // expected behavior so errors are ignored.
122        if event.is_null() {
123            let _ = event_queue.send(Ok(json));
124        } else {
125            let _ = event_queue.send(Ok(event));
126        }
127    }
128}
129
130/// A helper that allows to process buffered items. Returns `None` when internal stream buffer
131/// becomes empty.
132struct Closed<S>(S);
133
134impl<S> Stream for Closed<S>
135where
136    S: Stream + Unpin,
137{
138    type Item = S::Item;
139
140    fn poll_next(
141        mut self: std::pin::Pin<&mut Self>,
142        cx: &mut std::task::Context<'_>,
143    ) -> Poll<Option<Self::Item>> {
144        match self.0.poll_next_unpin(cx) {
145            Poll::Ready(v) => Poll::Ready(v),
146            Poll::Pending => Poll::Ready(None),
147        }
148    }
149}