avassa-client 0.14.0

Library for integrating with the Avassa APIs
Documentation
use crate::Result;
use futures_util::{stream::StreamExt as _, SinkExt as _};
use tokio_tungstenite::tungstenite::Message as WSMessage;

pub type Query = serde_json::Value;

/// Stream for query results
#[pin_project::pin_project]
pub struct QueryStream {
    ws: super::WebSocketStream,
}

impl QueryStream {
    #[expect(clippy::indexing_slicing, reason="query[op] is ok")]
    #[expect(clippy::single_call_fn, reason="Don't want to inline")]
    pub(crate) async fn new(avassa_client: &crate::Client, mut query: Query) -> Result<Self> {
        let ws_uri = avassa_client
            .get_websocket_url()
            .join("volga")?
            .to_string()
            .parse()?;
        let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
            .with_header(
                "Authorization",
                format!("Bearer {}", avassa_client.bearer_token().await),
            );
        let tls = avassa_client.open_tls_stream().await?;
        let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;

        query["op"] = "query-topics".into();
        let json = serde_json::to_string_pretty(&query)?;
        tracing::debug!("{}", json);

        ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
            .await?;

        Ok(Self { ws })
    }

    /// Try to read one message.
    /// # Panics
    /// If the message is not a binary message or a ping.
    /// # Errors
    /// `crate::error::Error`
    pub async fn recv(&mut self) -> Result<Option<bytes::Bytes>> {
        loop {
            return match self.ws.next().await {
                Some(Ok(val)) => match val {
                    WSMessage::Binary(bin) => Ok(Some(bin)),
                    WSMessage::Ping(ping) => {
                        tracing::trace!("Received ping: {:?}", ping);
                        self.ws.send(WSMessage::Pong(ping)).await?;
                        continue;
                    }
                    unknown => Err(crate::Error::general(&format!(
                        "Unexpected websocket message type: {unknown:?}"
                    ))),
                },
                Some(Err(err)) => Err(err.into()),
                None => Ok(None),
            };
        }
    }
}

#[cfg(test)]
mod test {}