use std::sync::Arc;
use futures_util::{SinkExt, StreamExt};
use tokio::sync::mpsc;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use url::Url;
use crate::error::Error;
use crate::models::directory::{DirectoryNotification, SubscriptionRequest};
#[derive(Clone, Debug)]
pub struct SubscriptionSender {
tx: mpsc::Sender<SubscriptionRequest>,
}
impl SubscriptionSender {
pub async fn send(&self, request: SubscriptionRequest) -> Result<(), Error> {
self.tx
.send(request)
.await
.map_err(|_| Error::Transport("WebSocket connection closed".into()))
}
}
pub struct DirectoryWsClient;
impl DirectoryWsClient {
pub async fn connect(
url: Url,
) -> Result<
(
SubscriptionSender,
mpsc::Receiver<Result<DirectoryNotification, Error>>,
),
Error,
> {
let (ws_stream, _) = connect_async(url.as_str())
.await
.map_err(|e| Error::Transport(e.to_string()))?;
let (mut ws_sink, mut ws_source) = ws_stream.split();
let (sub_tx, mut sub_rx) = mpsc::channel::<SubscriptionRequest>(32);
let (notif_tx, notif_rx) = mpsc::channel::<Result<DirectoryNotification, Error>>(64);
let notif_tx = Arc::new(notif_tx);
tokio::spawn(async move {
while let Some(req) = sub_rx.recv().await {
match serde_json::to_string(&req) {
Ok(json) => {
if ws_sink.send(Message::Text(json.into())).await.is_err() {
break;
}
}
Err(_) => continue,
}
}
let _ = ws_sink.close().await;
});
let notif_tx2 = Arc::clone(¬if_tx);
tokio::spawn(async move {
while let Some(msg) = ws_source.next().await {
match msg {
Ok(Message::Text(text)) => {
let r = serde_json::from_str::<DirectoryNotification>(&text)
.map_err(Error::Json);
if notif_tx2.send(r).await.is_err() {
break;
}
}
Ok(Message::Close(_)) | Err(_) => break,
_ => {} }
}
});
Ok((SubscriptionSender { tx: sub_tx }, notif_rx))
}
}