use std::sync::Arc;
use futures_util::{SinkExt, StreamExt};
use tokio::sync::mpsc;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use url::Url;
use crate::error::Error;
use crate::types::directory::{DirectoryNotification, SubscriptionRequest};
#[derive(Clone, Debug)]
pub struct SubscriptionSender {
tx: mpsc::Sender<SubscriptionRequest>,
}
impl SubscriptionSender {
pub async fn send(&self, request: SubscriptionRequest) -> Result<(), Error> {
self.tx
.send(request)
.await
.map_err(|_| Error::Transport("WebSocket connection closed".into()))
}
}
pub struct DirectoryWsClient;
impl DirectoryWsClient {
pub async fn connect(
url: Url,
) -> Result<(SubscriptionSender, mpsc::Receiver<Result<DirectoryNotification, Error>>), Error>
{
let (ws_stream, _http_resp) = connect_async(url.as_str())
.await
.map_err(|e| Error::Transport(e.to_string()))?;
let (mut ws_sink, mut ws_source) = ws_stream.split();
let (sub_tx, mut sub_rx) = mpsc::channel::<SubscriptionRequest>(32);
let (notif_tx, notif_rx) = mpsc::channel::<Result<DirectoryNotification, Error>>(64);
let notif_tx = Arc::new(notif_tx);
tokio::spawn(async move {
while let Some(req) = sub_rx.recv().await {
let json = match serde_json::to_string(&req) {
Ok(s) => s,
Err(e) => {
tracing_log(format!("energy-api: JSON encode error: {e}"));
continue;
}
};
if ws_sink.send(Message::Text(json.into())).await.is_err() {
break;
}
}
let _ = ws_sink.close().await;
});
let notif_tx_clone = Arc::clone(¬if_tx);
tokio::spawn(async move {
while let Some(msg) = ws_source.next().await {
match msg {
Ok(Message::Text(text)) => {
let result = serde_json::from_str::<DirectoryNotification>(&text)
.map_err(Error::Json);
if notif_tx_clone.send(result).await.is_err() {
break;
}
}
Ok(Message::Close(_)) | Err(_) => break,
_ => {}
}
}
});
Ok((SubscriptionSender { tx: sub_tx }, notif_rx))
}
}
#[inline]
fn tracing_log(msg: String) {
let _ = msg;
}