usdpl_front/
client_handler.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use futures::{SinkExt, StreamExt, future::{select, Either}};
4use gloo_net::websocket::{futures::WebSocket, Message};
5use nrpc::{ClientHandler, ServiceError, ServiceClientStream, _helpers::async_trait, _helpers::bytes};
6use wasm_bindgen_futures::spawn_local;
7
8static LAST_ID: AtomicU64 = AtomicU64::new(0);
9
10/// Websocket client.
11/// In most cases, this shouldn't be used directly, but generated code will use this.
12pub struct WebSocketHandler {
13    port: u16,
14}
15
16#[inline]
17fn ws_is_alive(ws_state: &gloo_net::websocket::State) -> bool {
18    match ws_state {
19        gloo_net::websocket::State::Connecting | gloo_net::websocket::State::Open => true,
20        gloo_net::websocket::State::Closing | gloo_net::websocket::State::Closed => false,
21    }
22}
23
24async fn send_recv_ws<'a>(mut tx: futures_channel::mpsc::Sender<Result<bytes::Bytes, String>>, url: String, mut input: ServiceClientStream<'a, bytes::Bytes>) {
25    let ws = match WebSocket::open_with_protocol(&url, "usdpl-nrpc").map_err(|e| e.to_string()) {
26        Ok(x) => x,
27        Err(e) => {
28            log::error!("ws open error: {}", e);
29            tx.send(Err(e.to_string())).await.unwrap_or(());
30            return;
31        }
32    };
33
34
35    log::debug!("ws opened successfully with url `{}`", url);
36
37    let (mut input_done, mut output_done) = (false, false);
38    let mut last_ws_state = ws.state();
39    log::debug!("ws with url `{}` initial state: {:?}", url, last_ws_state);
40    let (mut ws_sink, mut ws_stream) = ws.split();
41    let (mut left, mut right) = (input.next(), ws_stream.next());
42    while ws_is_alive(&last_ws_state) {
43        if !input_done && !output_done {
44            log::debug!("Input and output streams are both alive");
45            match select(left, right).await {
46                Either::Left((next, outstanding)) => {
47                   log::debug!("Got message to send over websocket");
48                    if let Some(next) = next {
49                        match next {
50                            Ok(next) => {
51                                if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await {
52                                    tx.send(Err(e.to_string())).await.unwrap_or(());
53                                }
54                            },
55                            Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(())
56                        }
57                    } else {
58                        input_done = true;
59                    }
60                    right = outstanding;
61                    left = input.next();
62                },
63                Either::Right((response, outstanding)) => {
64                    log::debug!("Received message from websocket");
65                    if let Some(next) = response {
66                        match next {
67                            Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
68                            Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
69                            Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
70                        }
71                    } else {
72                        output_done = true;
73                    }
74                    left = outstanding;
75                    let ws = ws_stream.reunite(ws_sink).unwrap();
76                    last_ws_state = ws.state();
77                    (ws_sink, ws_stream) = ws.split();
78                    right = ws_stream.next();
79                }
80            }
81        } else if input_done {
82            log::debug!("Input stream is complete");
83            if let Some(next) = right.await {
84                log::debug!("Received message from websocket");
85                match next {
86                    Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
87                    Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
88                    Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
89                }
90            } else {
91                output_done = true;
92            }
93            //left = outstanding;
94            let ws = ws_stream.reunite(ws_sink).unwrap();
95            last_ws_state = ws.state();
96            (ws_sink, ws_stream) = ws.split();
97            right = ws_stream.next();
98        } else {
99            // output_done is true
100            log::debug!("Output stream is complete");
101            if let Some(next) = left.await {
102                log::debug!("Got message to send over websocket");
103                match next {
104                    Ok(next) => {
105                        if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await {
106                            tx.send(Err(e.to_string())).await.unwrap_or(());
107                        }
108                    },
109                    Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(())
110                }
111            } else {
112                input_done = true;
113            }
114            //right = outstanding;
115            let ws = ws_stream.reunite(ws_sink).unwrap();
116            last_ws_state = ws.state();
117            (ws_sink, ws_stream) = ws.split();
118            left = input.next();
119            right = ws_stream.next(); // this should always resolve to None (but compiler is unhappy without this)
120        }
121    }
122
123    log::debug!("ws with url `{}` has closed", url);
124}
125
126#[derive(Debug)]
127struct ErrorStr(String);
128
129impl std::fmt::Display for ErrorStr {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        write!(f, "Error message: {}", self.0)
132    }
133}
134
135impl std::error::Error for ErrorStr {}
136
137const CHANNEL_BOUND: usize = 4;
138
139impl WebSocketHandler {
140    /// Instantiate the web socket client for connecting on the specified port
141    pub fn new(port: u16) -> Self {
142        Self { port }
143    }
144}
145
146#[async_trait::async_trait(?Send)]
147impl ClientHandler<'static> for WebSocketHandler {
148    async fn call<'a: 'static>(
149        &self,
150        package: &str,
151        service: &str,
152        method: &str,
153        input: ServiceClientStream<'a, bytes::Bytes>,
154    ) -> Result<ServiceClientStream<'a, bytes::Bytes>, ServiceError> {
155        let id = LAST_ID.fetch_add(1, Ordering::SeqCst);
156        let url = format!(
157            "ws://usdpl-ws-{}.localhost:{}/{}.{}/{}",
158            id, self.port, package, service, method,
159        );
160        log::debug!("doing send/receive on ws url `{}`", url);
161        let (tx, rx) = futures_channel::mpsc::channel(CHANNEL_BOUND);
162        spawn_local(send_recv_ws(tx, url, input));
163
164        Ok(Box::new(rx.map(|buf_result: Result<bytes::Bytes, String>| buf_result
165            .map(|buf| bytes::Bytes::from(buf))
166            .map_err(|e| ServiceError::Method(Box::new(ErrorStr(e)))))))
167    }
168}