Skip to main content

avassa_client/volga/
mod.rs

1//!
2//! Library for producing and consuming Volga messages.
3//!
4use crate::{Error, Result};
5use bytes::Bytes;
6use futures_util::{SinkExt, StreamExt};
7use serde::{Deserialize, Serialize};
8
9pub mod consumer;
10pub mod query_topic;
11pub mod producer;
12
13type WebSocketStream =
14    tokio_tungstenite::WebSocketStream<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
15
16/// Volga stream persistence
17#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
18pub enum Persistence {
19    /// Persist messages to disk
20    #[serde(rename = "disk")]
21    Disk,
22    /// Store messages in RAM
23    #[serde(rename = "ram")]
24    RAM,
25}
26
27/// Format of the data on the volga topic
28#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
29pub enum Format {
30    /// JSON format
31    #[serde(rename = "json")]
32    JSON,
33    /// String encoded
34    #[serde(rename = "string")]
35    String,
36}
37
38impl Default for Format {
39    fn default() -> Self {
40        Self::JSON
41    }
42}
43
44/// Behavior when topic doesn't exist
45#[derive(Clone, Copy, Debug)]
46// #[serde(rename_all = "kebab-case")]
47pub enum OnNoExists {
48    Create(CreateOptions),
49    Wait,
50    Fail,
51}
52
53impl Serialize for OnNoExists {
54    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
55    where
56        S: serde::Serializer,
57    {
58        use serde::ser::SerializeMap;
59        let mut map = serializer.serialize_map(None)?;
60        map.serialize_entry(
61            "on-no-exists",
62            match self {
63                Self::Wait => "wait",
64                Self::Fail => "fail",
65                Self::Create(_) => "create",
66            },
67        )?;
68        if let Self::Create(create_opts) = self {
69            map.serialize_entry("create-options", create_opts)?;
70        }
71
72        map.end()
73    }
74}
75
76/// Local or NAT connection
77#[derive(Clone, Copy, Debug, Serialize, Eq, PartialEq)]
78#[serde(rename_all = "kebab-case")]
79pub enum Location {
80    Local,
81    ChildSite,
82    Parent,
83}
84
85/// Volga options for consumers and producers
86#[derive(Clone, Copy, Debug, Serialize)]
87#[serde(rename_all = "kebab-case")]
88pub struct CreateOptions {
89    /// Number of replicas in the cluster
90    pub replication_factor: u32,
91    /// Volga stream persistence
92    pub persistence: Persistence,
93
94    /// number of 1Mb chunks
95    pub num_chunks: usize,
96
97    /// Volga format
98    pub format: Format,
99
100    /// Delete topic after call producers and consumers disconnect
101    pub ephemeral: bool,
102}
103
104impl Default for CreateOptions {
105    fn default() -> Self {
106        Self {
107            replication_factor: 1,
108            persistence: Persistence::Disk,
109            format: Format::default(),
110            num_chunks: 100,
111            ephemeral: false,
112        }
113    }
114}
115
116#[tracing::instrument(skip(ws))]
117async fn get_binary_response(ws: &mut WebSocketStream) -> Result<Bytes> {
118    loop {
119        let resp = ws
120            .next()
121            .await
122            .ok_or_else(|| Error::Volga(Some("Expected websocket message".to_string())))??;
123
124        match resp {
125            tokio_tungstenite::tungstenite::Message::Ping(data) => {
126                tracing::trace!("Received ping");
127                let msg = tokio_tungstenite::tungstenite::Message::Pong(data);
128                ws.send(msg).await?;
129            }
130            tokio_tungstenite::tungstenite::Message::Pong(_) => (),
131            tokio_tungstenite::tungstenite::Message::Binary(m) => return Ok(m),
132            tokio_tungstenite::tungstenite::Message::Close(_) => {
133                return Err(Error::Volga(Some("closed".to_string())));
134            }
135            msg => {
136                return Err(Error::Volga(Some(format!(
137                    "Unexpected message type: '{msg}'",
138                ))))
139            }
140        }
141    }
142}
143
144async fn get_ok_volga_response(ws: &mut WebSocketStream) -> Result<()> {
145    let msg = get_binary_response(ws).await?;
146    let resp: VolgaResponse = serde_json::from_slice(&msg)?;
147    tracing::trace!("volga response {:?}", resp);
148    match resp.result {
149        VolgaResult::Ok => Ok(()),
150        VolgaResult::Error => {
151            let err_msg = serde_json::to_string(&resp.errors)
152                .unwrap_or_else(|e| format!("Failed to decode volga error: {e}"));
153            Err(Error::Volga(Some(err_msg)))
154        }
155    }
156}
157
158#[derive(Debug, Deserialize)]
159enum VolgaResult {
160    #[serde(rename = "ok")]
161    Ok,
162    #[serde(rename = "error")]
163    Error,
164}
165
166#[derive(Debug, Deserialize)]
167struct VolgaResponse {
168    result: VolgaResult,
169    #[serde(default)]
170    errors: Vec<serde_json::Value>,
171}
172
173#[cfg(test)]
174mod test {
175    #[test]
176    fn on_no_exists() {
177        let wait = serde_json::to_string(&super::OnNoExists::Wait).unwrap();
178        assert_eq!(&wait, r#"{"on-no-exists":"wait"}"#);
179
180        let fail = serde_json::to_string(&super::OnNoExists::Fail).unwrap();
181        assert_eq!(&fail, r#"{"on-no-exists":"fail"}"#);
182
183        let create = serde_json::to_string(&super::OnNoExists::Create(
184            crate::volga::CreateOptions::default(),
185        ))
186        .unwrap();
187        assert_eq!(
188            &create,
189            r#"{"on-no-exists":"create","create-options":{"replication-factor":1,"persistence":"disk","num-chunks":100,"format":"json","ephemeral":false}}"#
190        );
191    }
192}