Skip to main content

avassa_client/volga/
query_topic.rs

1use crate::Result;
2use futures_util::{stream::StreamExt as _, SinkExt as _};
3use tokio_tungstenite::tungstenite::Message as WSMessage;
4
5pub type Query = serde_json::Value;
6
7/// Stream for query results
8#[pin_project::pin_project]
9pub struct QueryStream {
10    ws: super::WebSocketStream,
11}
12
13impl QueryStream {
14    #[expect(clippy::indexing_slicing, reason="query[op] is ok")]
15    #[expect(clippy::single_call_fn, reason="Don't want to inline")]
16    pub(crate) async fn new(avassa_client: &crate::Client, mut query: Query) -> Result<Self> {
17        let ws_uri = avassa_client
18            .get_websocket_url()
19            .join("volga")?
20            .to_string()
21            .parse()?;
22        let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
23            .with_header(
24                "Authorization",
25                format!("Bearer {}", avassa_client.bearer_token().await),
26            );
27        let tls = avassa_client.open_tls_stream().await?;
28        let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
29
30        query["op"] = "query-topics".into();
31        let json = serde_json::to_string_pretty(&query)?;
32        tracing::debug!("{}", json);
33
34        ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
35            .await?;
36
37        Ok(Self { ws })
38    }
39
40    /// Try to read one message.
41    /// # Panics
42    /// If the message is not a binary message or a ping.
43    /// # Errors
44    /// `crate::error::Error`
45    pub async fn recv(&mut self) -> Result<Option<bytes::Bytes>> {
46        loop {
47            return match self.ws.next().await {
48                Some(Ok(val)) => match val {
49                    WSMessage::Binary(bin) => Ok(Some(bin)),
50                    WSMessage::Ping(ping) => {
51                        tracing::trace!("Received ping: {:?}", ping);
52                        self.ws.send(WSMessage::Pong(ping)).await?;
53                        continue;
54                    }
55                    unknown => Err(crate::Error::general(&format!(
56                        "Unexpected websocket message type: {unknown:?}"
57                    ))),
58                },
59                Some(Err(err)) => Err(err.into()),
60                None => Ok(None),
61            };
62        }
63    }
64}
65
66#[cfg(test)]
67mod test {}