xapi_shared/ws/
stream.rs

1use crate::ws::{error::SharedWsError, response::SharedWsResponseTrait};
2use std::{collections::HashMap, fmt::Debug, hash::Hash};
3use tokio::sync::mpsc;
4
5#[async_trait::async_trait]
6pub trait SharedWsStreamTrait<
7    WsStreamId: Hash + Eq + Clone + Debug + Send,
8    Response: SharedWsResponseTrait<WsStreamId> + Send + Debug,
9>
10{
11    fn get_stream_tx_map(
12        &mut self,
13    ) -> &mut HashMap<WsStreamId, mpsc::Sender<Result<Response, SharedWsError>>>;
14
15    async fn recv_stream_resp(&mut self, text: &str) -> Option<Result<(), SharedWsError>> {
16        match Response::try_parse(text) {
17            None => None,
18            Some(result) => {
19                let (id, msg) = match result {
20                    Ok(resp) => {
21                        let id = resp.get_id().clone();
22                        (id, Ok(resp))
23                    }
24                    Err((id, err)) => {
25                        tracing::error!(?id, ?err, "failed to parse ws stream response");
26                        (id, Err(err))
27                    }
28                };
29
30                let tx_map = self.get_stream_tx_map();
31
32                Some(match tx_map.get(&id) {
33                    None => {
34                        tracing::error!(?id, ?msg, "missing ws stream resp channel");
35                        Err(SharedWsError::InvalidIdError(format!("{id:?}")))
36                    }
37                    Some(tx) => tx
38                        .send(msg)
39                        .await
40                        .inspect_err(|err| {
41                            tracing::error!(?err, "failed to send to stream resp channel")
42                        })
43                        .map_err(|err| SharedWsError::ChannelClosedError(err.to_string())),
44                })
45            }
46        }
47    }
48}