avassa_client/volga/
query_topic.rs1use crate::Result;
2use futures_util::{stream::StreamExt as _, SinkExt as _};
3use tokio_tungstenite::tungstenite::Message as WSMessage;
4
5pub type Query = serde_json::Value;
6
7#[pin_project::pin_project]
9pub struct QueryStream {
10 ws: super::WebSocketStream,
11}
12
13impl QueryStream {
14 #[expect(clippy::indexing_slicing, reason="query[op] is ok")]
15 #[expect(clippy::single_call_fn, reason="Don't want to inline")]
16 pub(crate) async fn new(avassa_client: &crate::Client, mut query: Query) -> Result<Self> {
17 let ws_uri = avassa_client
18 .get_websocket_url()
19 .join("volga")?
20 .to_string()
21 .parse()?;
22 let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
23 .with_header(
24 "Authorization",
25 format!("Bearer {}", avassa_client.bearer_token().await),
26 );
27 let tls = avassa_client.open_tls_stream().await?;
28 let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
29
30 query["op"] = "query-topics".into();
31 let json = serde_json::to_string_pretty(&query)?;
32 tracing::debug!("{}", json);
33
34 ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
35 .await?;
36
37 Ok(Self { ws })
38 }
39
40 pub async fn recv(&mut self) -> Result<Option<bytes::Bytes>> {
46 loop {
47 return match self.ws.next().await {
48 Some(Ok(val)) => match val {
49 WSMessage::Binary(bin) => Ok(Some(bin)),
50 WSMessage::Ping(ping) => {
51 tracing::trace!("Received ping: {:?}", ping);
52 self.ws.send(WSMessage::Pong(ping)).await?;
53 continue;
54 }
55 unknown => Err(crate::Error::general(&format!(
56 "Unexpected websocket message type: {unknown:?}"
57 ))),
58 },
59 Some(Err(err)) => Err(err.into()),
60 None => Ok(None),
61 };
62 }
63 }
64}
65
66#[cfg(test)]
67mod test {}