avassa_client/volga/query_topic.rs
1use crate::Result;
2use futures_util::{stream::StreamExt, SinkExt};
3use tokio_tungstenite::tungstenite::Message as WSMessage;
4
5pub type Query = serde_json::Value;
6
7/// Stream for query results
8#[pin_project::pin_project]
9pub struct QueryStream {
10 ws: super::WebSocketStream,
11}
12
13impl QueryStream {
14 pub(crate) async fn new(avassa_client: &crate::Client, mut query: Query) -> Result<Self> {
15 let ws_uri = avassa_client
16 .websocket_url
17 .join("volga")?
18 .to_string()
19 .parse()?;
20 let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
21 .with_header(
22 "Authorization",
23 format!("Bearer {}", avassa_client.bearer_token().await),
24 );
25 let tls = avassa_client.open_tls_stream().await?;
26 let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
27
28 query["op"] = "query-topics".into();
29 let json = serde_json::to_string_pretty(&query)?;
30 tracing::debug!("{}", json);
31
32 ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
33 .await?;
34
35 Ok(Self { ws })
36 }
37
38 /// Try to read one message
39 /// # Panics
40 /// If the message is not a binary message or a ping
41 pub async fn recv(&mut self) -> Result<Option<bytes::Bytes>> {
42 loop {
43 return match self.ws.next().await {
44 Some(Ok(val)) => match val {
45 WSMessage::Binary(v) => Ok(Some(v)),
46 WSMessage::Ping(v) => {
47 tracing::trace!("Received ping: {:?}", v);
48 self.ws.send(WSMessage::Pong(v)).await?;
49 continue;
50 }
51 x => {
52 panic!("Unexpected message type: {x:?}");
53 }
54 },
55 Some(Err(e)) => Err(e.into()),
56 None => Ok(None),
57 };
58 }
59 }
60}
61
62// impl Stream for QueryStream {
63// type Item = crate::Result<String>;
64//
65// fn poll_next(
66// self: std::pin::Pin<&mut Self>,
67// cx: &mut core::task::Context<'_>,
68// ) -> core::task::Poll<Option<Self::Item>> {
69// let mut this = self.project();
70//
71// match core::pin::Pin::new(&mut this.ws).poll_next(cx) {
72// core::task::Poll::Ready(val) => {
73// let res: Option<Self::Item> = match val {
74// Some(Ok(WSMessage::Binary(m))) => Some(Ok(String::from_utf8_lossy(&m).into())),
75// Some(Ok(msg)) => Some(Err(crate::Error::Volga(Some(format!(
76// "Unexpected message ({msg:?})",
77// ))))),
78// Some(Err(e)) => Some(Err(e.into())),
79// None => None,
80// };
81//
82// core::task::Poll::Ready(res)
83// }
84// core::task::Poll::Pending => core::task::Poll::Pending,
85// }
86// }
87// }
88
89#[cfg(test)]
90mod test {}