simploxide_core/
dispatcher.rs1use std::{sync::Arc, task::Poll};
4
5use crate::{WsIn, router::ResponseRouter};
6use futures::{Stream, StreamExt};
7use tokio::sync::mpsc;
8use tokio_tungstenite::tungstenite::Message;
9use tokio_util::sync::CancellationToken;
10
11use super::{Event, RequestId, Result};
12
13type EventSender = mpsc::UnboundedSender<Result<Event>>;
14type EventReceiver = mpsc::UnboundedReceiver<Result<Event>>;
15
16pub fn init(ws_in: WsIn, router: ResponseRouter, token: CancellationToken) -> EventQueue {
17 let (events_tx, receiver) = mpsc::unbounded_channel::<Result<Event>>();
18 tokio::spawn(event_dispatcher_task(ws_in, events_tx, router, token));
19
20 EventQueue { receiver }
21}
22
23pub struct EventQueue {
26 receiver: EventReceiver,
27}
28
29impl EventQueue {
30 pub async fn next_event(&mut self) -> Option<Result<Event>> {
34 self.receiver.recv().await
35 }
36
37 pub fn into_receiver(self) -> EventReceiver {
39 self.receiver
40 }
41}
42
43async fn event_dispatcher_task(
44 mut ws_in: WsIn,
45 mut event_queue: EventSender,
46 router: ResponseRouter,
47 token: CancellationToken,
48) {
49 loop {
50 tokio::select! {
51 ev = ws_in.next() => {
52 match ev {
53 Some(Ok(msg)) => {
54 process_raw_event(Some(&router), &mut event_queue, msg);
55 }
56 Some(Err(e)) => {
57 let e = Arc::new(e);
58 let _ = event_queue.send(Err(Arc::clone(&e)));
59 router.shutdown(e);
60
61 break;
62 }
63 None => unreachable!("Must receive an error before connection drops")
64
65 }
66 }
67 _ = token.cancelled() => {
69 let mut ws_in = Closed(ws_in);
71 while let Some(ev) = ws_in.next().await {
72 match ev {
73 Ok(msg) => {
74 process_raw_event(None, &mut event_queue, msg);
75 }
76 Err(e) => {
77 let _ = event_queue.send(Err(Arc::new(e)));
78 break;
79 }
80 }
81 }
82
83 break;
84 }
85 }
86 }
87
88 log::debug!("Dispatcher task finished");
89}
90
91fn process_raw_event(router: Option<&ResponseRouter>, event_queue: &mut EventSender, msg: Message) {
98 let mut json: serde_json::Value = match msg {
99 Message::Text(txt) => serde_json::from_str(&txt).expect("Server sends a valid JSON"),
100 unexpected => {
101 log::warn!("Ignoring event in unexpecetd format: {unexpected:#?}");
102 return;
103 }
104 };
105
106 let corr_id = json["corrId"].take();
107
108 if !corr_id.is_null() {
109 let id: RequestId = corr_id.as_str().unwrap().parse().unwrap();
110 let response = json["resp"].take();
111 assert!(!response.is_null(), "Server sends a valid resp field");
112
113 if let Some(router) = router {
114 router.deliver(id, response);
115 } else {
116 log::warn!("Dropping response: {response}\nBecause router task already finished");
117 }
118 } else {
119 let event = json["resp"].take();
120 if event.is_null() {
123 let _ = event_queue.send(Ok(json));
124 } else {
125 let _ = event_queue.send(Ok(event));
126 }
127 }
128}
129
130struct Closed<S>(S);
133
134impl<S> Stream for Closed<S>
135where
136 S: Stream + Unpin,
137{
138 type Item = S::Item;
139
140 fn poll_next(
141 mut self: std::pin::Pin<&mut Self>,
142 cx: &mut std::task::Context<'_>,
143 ) -> Poll<Option<Self::Item>> {
144 match self.0.poll_next_unpin(cx) {
145 Poll::Ready(v) => Poll::Ready(v),
146 Poll::Pending => Poll::Ready(None),
147 }
148 }
149}