1use crate::ws::{error::SharedWsError, response::SharedWsResponseTrait};
2use std::{collections::HashMap, fmt::Debug, hash::Hash};
3use tokio::sync::mpsc;
4
5#[async_trait::async_trait]
6pub trait SharedWsStreamTrait<
7 WsStreamId: Hash + Eq + Clone + Debug + Send,
8 Response: SharedWsResponseTrait<WsStreamId> + Send + Debug,
9>
10{
11 fn get_stream_tx_map(
12 &mut self,
13 ) -> &mut HashMap<WsStreamId, mpsc::Sender<Result<Response, SharedWsError>>>;
14
15 async fn recv_stream_resp(&mut self, text: &str) -> Option<Result<(), SharedWsError>> {
16 match Response::try_parse(text) {
17 None => None,
18 Some(result) => {
19 let (id, msg) = match result {
20 Ok(resp) => {
21 let id = resp.get_id().clone();
22 (id, Ok(resp))
23 }
24 Err((id, err)) => {
25 tracing::error!(?id, ?err, "failed to parse ws stream response");
26 (id, Err(err))
27 }
28 };
29
30 let tx_map = self.get_stream_tx_map();
31
32 Some(match tx_map.get(&id) {
33 None => {
34 tracing::error!(?id, ?msg, "missing ws stream resp channel");
35 Err(SharedWsError::InvalidIdError(format!("{id:?}")))
36 }
37 Some(tx) => tx
38 .send(msg)
39 .await
40 .inspect_err(|err| {
41 tracing::error!(?err, "failed to send to stream resp channel")
42 })
43 .map_err(|err| SharedWsError::ChannelClosedError(err.to_string())),
44 })
45 }
46 }
47 }
48}