avassa_client/volga/
query_topic.rs

1use crate::Result;
2use futures_util::{stream::StreamExt, SinkExt};
3use tokio_tungstenite::tungstenite::Message as WSMessage;
4
5pub type Query = serde_json::Value;
6
7/// Stream for query results
8#[pin_project::pin_project]
9pub struct QueryStream {
10    ws: super::WebSocketStream,
11}
12
13impl QueryStream {
14    pub(crate) async fn new(avassa_client: &crate::Client, mut query: Query) -> Result<Self> {
15        let ws_uri = avassa_client
16            .websocket_url
17            .join("volga")?
18            .to_string()
19            .parse()?;
20        let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
21            .with_header(
22                "Authorization",
23                format!("Bearer {}", avassa_client.bearer_token().await),
24            );
25        let tls = avassa_client.open_tls_stream().await?;
26        let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
27
28        query["op"] = "query-topics".into();
29        let json = serde_json::to_string_pretty(&query)?;
30        tracing::debug!("{}", json);
31
32        ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
33            .await?;
34
35        Ok(Self { ws })
36    }
37
38    /// Try to read one message
39    /// # Panics
40    /// If the message is not a binary message or a ping
41    pub async fn recv(&mut self) -> Result<Option<bytes::Bytes>> {
42        loop {
43            return match self.ws.next().await {
44                Some(Ok(val)) => match val {
45                    WSMessage::Binary(v) => Ok(Some(v)),
46                    WSMessage::Ping(v) => {
47                        tracing::trace!("Received ping: {:?}", v);
48                        self.ws.send(WSMessage::Pong(v)).await?;
49                        continue;
50                    }
51                    x => {
52                        panic!("Unexpected message type: {x:?}");
53                    }
54                },
55                Some(Err(e)) => Err(e.into()),
56                None => Ok(None),
57            };
58        }
59    }
60}
61
62// impl Stream for QueryStream {
63//     type Item = crate::Result<String>;
64//
65//     fn poll_next(
66//         self: std::pin::Pin<&mut Self>,
67//         cx: &mut core::task::Context<'_>,
68//     ) -> core::task::Poll<Option<Self::Item>> {
69//         let mut this = self.project();
70//
71//         match core::pin::Pin::new(&mut this.ws).poll_next(cx) {
72//             core::task::Poll::Ready(val) => {
73//                 let res: Option<Self::Item> = match val {
74//                     Some(Ok(WSMessage::Binary(m))) => Some(Ok(String::from_utf8_lossy(&m).into())),
75//                     Some(Ok(msg)) => Some(Err(crate::Error::Volga(Some(format!(
76//                         "Unexpected message ({msg:?})",
77//                     ))))),
78//                     Some(Err(e)) => Some(Err(e.into())),
79//                     None => None,
80//                 };
81//
82//                 core::task::Poll::Ready(res)
83//             }
84//             core::task::Poll::Pending => core::task::Poll::Pending,
85//         }
86//     }
87// }
88
89#[cfg(test)]
90mod test {}