avassa-client 0.14.0

Library for integrating with the Avassa APIs
Documentation
//!
//! Library for producing and consuming Volga messages.
//!
use crate::{Error, Result};
use bytes::Bytes;
use futures_util::{SinkExt as _, StreamExt as _};
use serde::{Deserialize, Serialize};

pub mod consumer;
pub mod producer;
pub mod query_topic;

pub(crate) type WebSocketStream =
    tokio_tungstenite::WebSocketStream<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;

/// Volga stream persistence.
#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[non_exhaustive]
pub enum Persistence {
    /// Persist messages to disk.
    #[serde(rename = "disk")]
    Disk,
    /// Store messages in RAM.
    #[serde(rename = "ram")]
    RAM,
}

/// Format of the data on the volga topic.
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
#[non_exhaustive]
pub enum Format {
    /// JSON format.
    #[serde(rename = "json")]
    #[default]
    JSON,
    /// String encoded.
    #[serde(rename = "string")]
    String,
}

/// Behavior when topic doesn't exist.
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
pub enum OnNoExists {
    Create(CreateOptions),
    Wait,
    Fail,
}

impl Serialize for OnNoExists {
    fn serialize<S>(&self, serializer: S) -> core::result::Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        use serde::ser::SerializeMap as _;
        let mut map = serializer.serialize_map(None)?;
        map.serialize_entry(
            "on-no-exists",
            match *self {
                Self::Wait => "wait",
                Self::Fail => "fail",
                Self::Create(_) => "create",
            },
        )?;
        if let &Self::Create(create_opts) = self {
            map.serialize_entry("create-options", &create_opts)?;
        }

        map.end()
    }
}

/// Local or NAT connection.
#[derive(Clone, Copy, Debug, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
#[non_exhaustive]
pub enum Location {
    Local,
    ChildSite,
    Parent,
}

/// Volga options for consumers and producers.
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "kebab-case")]
#[non_exhaustive]
pub struct CreateOptions {
    replication_factor: u32,
    persistence: Persistence,

    format: Format,

    ephemeral: bool,
}

impl Default for CreateOptions {
    fn default() -> Self {
        Self {
            replication_factor: 1,
            persistence: Persistence::Disk,
            format: Format::default(),
            ephemeral: false,
        }
    }
}

impl CreateOptions {
    /// Number of replicas in the cluster.
    #[must_use]
    pub fn replication_factor(self, replication_factor: u32) -> Self {
        Self {replication_factor, .. self}
    }

    /// Volga stream persistence.
    #[must_use]
    pub fn persistence(self, persistence: Persistence) -> Self {
        Self {persistence, ..self}
    }

    /// Volga format.
    #[must_use]
    pub fn format(self, format: Format) -> Self {
        Self {format, ..self}
    }

    /// Delete topic after call producers and consumers disconnect.
    #[must_use]
    pub fn ephemeral(self, ephemeral: bool) -> Self {
        Self {ephemeral, ..self}
    }
}


#[tracing::instrument(skip(ws))]
pub(crate) async fn get_binary_response(ws: &mut WebSocketStream) -> Result<Bytes> {
    loop {
        let resp = ws
            .next()
            .await
            .ok_or_else(|| Error::Volga(Some("Expected websocket message".to_owned())))??;

        match resp {
            tokio_tungstenite::tungstenite::Message::Ping(data) => {
                tracing::trace!("Received ping");
                let msg = tokio_tungstenite::tungstenite::Message::Pong(data);
                ws.send(msg).await?;
            }
            tokio_tungstenite::tungstenite::Message::Pong(_) => (),
            tokio_tungstenite::tungstenite::Message::Binary(bin) => return Ok(bin),
            tokio_tungstenite::tungstenite::Message::Close(cf) => {
                if let Some(cf) = cf {
                    return Err(Error::Volga(Some(format!("closed: {cf}"))));
                }
                return Err(Error::Volga(Some("closed".to_owned())));
            }
            msg => {
                return Err(Error::Volga(Some(format!(
                    "Unexpected message type: '{msg}'",
                ))))
            }
        }
    }
}

async fn get_ok_volga_response(ws: &mut WebSocketStream) -> Result<()> {
    let msg = get_binary_response(ws).await?;
    let resp: VolgaResponse = serde_json::from_slice(&msg)?;
    tracing::trace!("volga response {:?}", resp);
    match resp.result {
        VolgaResult::Ok => Ok(()),
        VolgaResult::Error => {
            let err_msg = serde_json::to_string(&resp.errors)
                .unwrap_or_else(|err| format!("Failed to decode volga error: {err}"));
            Err(Error::Volga(Some(err_msg)))
        }
    }
}

#[derive(Debug, Deserialize)]
enum VolgaResult {
    #[serde(rename = "ok")]
    Ok,
    #[serde(rename = "error")]
    Error,
}

#[derive(Debug, Deserialize)]
struct VolgaResponse {
    result: VolgaResult,
    #[serde(default)]
    errors: Vec<serde_json::Value>,
}

#[cfg(test)]
mod test {
    #[test]
    fn on_no_exists() {
        let wait = serde_json::to_string(&super::OnNoExists::Wait).unwrap();
        assert_eq!(&wait, r#"{"on-no-exists":"wait"}"#);

        let fail = serde_json::to_string(&super::OnNoExists::Fail).unwrap();
        assert_eq!(&fail, r#"{"on-no-exists":"fail"}"#);

        let create = serde_json::to_string(&super::OnNoExists::Create(
            crate::volga::CreateOptions::default(),
        ))
        .unwrap();
        assert_eq!(
            &create,
            r#"{"on-no-exists":"create","create-options":{"replication-factor":1,"persistence":"disk","format":"json","ephemeral":false}}"#
        );
    }
}