use crate::Result;
use futures_util::{stream::StreamExt, SinkExt};
use tokio_tungstenite::tungstenite::Message as WSMessage;
pub type Query = serde_json::Value;
#[pin_project::pin_project]
pub struct QueryStream {
ws: super::WebSocketStream,
}
impl QueryStream {
pub(crate) async fn new(avassa_client: &crate::Client, mut query: Query) -> Result<Self> {
let ws_uri = avassa_client
.websocket_url
.join("volga")?
.to_string()
.parse()?;
let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
.with_header(
"Authorization",
format!("Bearer {}", avassa_client.bearer_token().await),
);
let tls = avassa_client.open_tls_stream().await?;
let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
query["op"] = "query-topics".into();
let json = serde_json::to_string_pretty(&query)?;
tracing::debug!("{}", json);
ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
.await?;
Ok(Self { ws })
}
pub async fn recv(&mut self) -> Result<Option<bytes::Bytes>> {
loop {
return match self.ws.next().await {
Some(Ok(val)) => match val {
WSMessage::Binary(v) => Ok(Some(v)),
WSMessage::Ping(v) => {
tracing::trace!("Received ping: {:?}", v);
self.ws.send(WSMessage::Pong(v)).await?;
continue;
}
x => {
panic!("Unexpected message type: {x:?}");
}
},
Some(Err(e)) => Err(e.into()),
None => Ok(None),
};
}
}
}
#[cfg(test)]
mod test {}