use crate::ws::{error::SharedWsError, response::SharedWsResponseTrait};
use std::{collections::HashMap, fmt::Debug, hash::Hash};
use tokio::sync::mpsc;
#[async_trait::async_trait]
pub trait SharedWsStreamTrait<
WsStreamId: Hash + Eq + Clone + Debug + Send,
Response: SharedWsResponseTrait<WsStreamId> + Send + Debug,
>
{
fn get_stream_tx_map(
&mut self,
) -> &mut HashMap<WsStreamId, mpsc::Sender<Result<Response, SharedWsError>>>;
async fn recv_stream_resp(&mut self, text: &str) -> Option<Result<(), SharedWsError>> {
match Response::try_parse(text) {
None => None,
Some(result) => {
let (id, msg) = match result {
Ok(resp) => {
let id = resp.get_id().clone();
(id, Ok(resp))
}
Err((id, err)) => {
tracing::error!(?id, ?err, "failed to parse ws stream response");
(id, Err(err))
}
};
let tx_map = self.get_stream_tx_map();
Some(match tx_map.get(&id) {
None => {
tracing::error!(?id, ?msg, "missing ws stream resp channel");
Err(SharedWsError::InvalidIdError(format!("{id:?}")))
}
Some(tx) => tx
.send(msg)
.await
.inspect_err(|err| {
tracing::error!(?err, "failed to send to stream resp channel")
})
.map_err(|err| SharedWsError::ChannelClosedError(err.to_string())),
})
}
}
}
}