use crate::Result;
use futures_util::{stream::StreamExt as _, SinkExt as _};
use tokio_tungstenite::tungstenite::Message as WSMessage;
pub type Query = serde_json::Value;
#[pin_project::pin_project]
pub struct QueryStream {
ws: super::WebSocketStream,
}
impl QueryStream {
#[expect(clippy::indexing_slicing, reason="query[op] is ok")]
#[expect(clippy::single_call_fn, reason="Don't want to inline")]
pub(crate) async fn new(avassa_client: &crate::Client, mut query: Query) -> Result<Self> {
let ws_uri = avassa_client
.get_websocket_url()
.join("volga")?
.to_string()
.parse()?;
let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
.with_header(
"Authorization",
format!("Bearer {}", avassa_client.bearer_token().await),
);
let tls = avassa_client.open_tls_stream().await?;
let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
query["op"] = "query-topics".into();
let json = serde_json::to_string_pretty(&query)?;
tracing::debug!("{}", json);
ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
.await?;
Ok(Self { ws })
}
pub async fn recv(&mut self) -> Result<Option<bytes::Bytes>> {
loop {
return match self.ws.next().await {
Some(Ok(val)) => match val {
WSMessage::Binary(bin) => Ok(Some(bin)),
WSMessage::Ping(ping) => {
tracing::trace!("Received ping: {:?}", ping);
self.ws.send(WSMessage::Pong(ping)).await?;
continue;
}
unknown => Err(crate::Error::general(&format!(
"Unexpected websocket message type: {unknown:?}"
))),
},
Some(Err(err)) => Err(err.into()),
None => Ok(None),
};
}
}
}
#[cfg(test)]
mod test {}