avassa-client 0.12.0

Library for integrating with the Avassa APIs
Documentation
use crate::Result;
use futures_util::{stream::StreamExt, SinkExt};
use tokio_tungstenite::tungstenite::Message as WSMessage;

pub type Query = serde_json::Value;

/// Stream for query results
#[pin_project::pin_project]
pub struct QueryStream {
    ws: super::WebSocketStream,
}

impl QueryStream {
    pub(crate) async fn new(avassa_client: &crate::Client, mut query: Query) -> Result<Self> {
        let ws_uri = avassa_client
            .websocket_url
            .join("volga")?
            .to_string()
            .parse()?;
        let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
            .with_header(
                "Authorization",
                format!("Bearer {}", avassa_client.bearer_token().await),
            );
        let tls = avassa_client.open_tls_stream().await?;
        let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;

        query["op"] = "query-topics".into();
        let json = serde_json::to_string_pretty(&query)?;
        tracing::debug!("{}", json);

        ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
            .await?;

        Ok(Self { ws })
    }

    /// Try to read one message
    /// # Panics
    /// If the message is not a binary message or a ping
    pub async fn recv(&mut self) -> Result<Option<bytes::Bytes>> {
        loop {
            return match self.ws.next().await {
                Some(Ok(val)) => match val {
                    WSMessage::Binary(v) => Ok(Some(v)),
                    WSMessage::Ping(v) => {
                        tracing::trace!("Received ping: {:?}", v);
                        self.ws.send(WSMessage::Pong(v)).await?;
                        continue;
                    }
                    x => {
                        panic!("Unexpected message type: {x:?}");
                    }
                },
                Some(Err(e)) => Err(e.into()),
                None => Ok(None),
            };
        }
    }
}

// impl Stream for QueryStream {
//     type Item = crate::Result<String>;
//
//     fn poll_next(
//         self: std::pin::Pin<&mut Self>,
//         cx: &mut core::task::Context<'_>,
//     ) -> core::task::Poll<Option<Self::Item>> {
//         let mut this = self.project();
//
//         match core::pin::Pin::new(&mut this.ws).poll_next(cx) {
//             core::task::Poll::Ready(val) => {
//                 let res: Option<Self::Item> = match val {
//                     Some(Ok(WSMessage::Binary(m))) => Some(Ok(String::from_utf8_lossy(&m).into())),
//                     Some(Ok(msg)) => Some(Err(crate::Error::Volga(Some(format!(
//                         "Unexpected message ({msg:?})",
//                     ))))),
//                     Some(Err(e)) => Some(Err(e.into())),
//                     None => None,
//                 };
//
//                 core::task::Poll::Ready(res)
//             }
//             core::task::Poll::Pending => core::task::Poll::Pending,
//         }
//     }
// }

#[cfg(test)]
mod test {}