Skip to main content

avassa_client/
volga.rs

1//!
2//! Library for producing and consuming Volga messages.
3//!
4use crate::{Error, Result};
5use bytes::Bytes;
6use futures_util::{SinkExt as _, StreamExt as _};
7use serde::{Deserialize, Serialize};
8
9pub mod consumer;
10pub mod producer;
11pub mod query_topic;
12
13pub(crate) type WebSocketStream =
14    tokio_tungstenite::WebSocketStream<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
15
16/// Volga stream persistence.
17#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
18#[non_exhaustive]
19pub enum Persistence {
20    /// Persist messages to disk.
21    #[serde(rename = "disk")]
22    Disk,
23    /// Store messages in RAM.
24    #[serde(rename = "ram")]
25    RAM,
26}
27
28/// Format of the data on the volga topic.
29#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
30#[non_exhaustive]
31pub enum Format {
32    /// JSON format.
33    #[serde(rename = "json")]
34    #[default]
35    JSON,
36    /// String encoded.
37    #[serde(rename = "string")]
38    String,
39}
40
41/// Behavior when topic doesn't exist.
42#[derive(Clone, Copy, Debug)]
43#[non_exhaustive]
44pub enum OnNoExists {
45    Create(CreateOptions),
46    Wait,
47    Fail,
48}
49
50impl Serialize for OnNoExists {
51    fn serialize<S>(&self, serializer: S) -> core::result::Result<S::Ok, S::Error>
52    where
53        S: serde::Serializer,
54    {
55        use serde::ser::SerializeMap as _;
56        let mut map = serializer.serialize_map(None)?;
57        map.serialize_entry(
58            "on-no-exists",
59            match *self {
60                Self::Wait => "wait",
61                Self::Fail => "fail",
62                Self::Create(_) => "create",
63            },
64        )?;
65        if let &Self::Create(create_opts) = self {
66            map.serialize_entry("create-options", &create_opts)?;
67        }
68
69        map.end()
70    }
71}
72
73/// Local or NAT connection.
74#[derive(Clone, Copy, Debug, Serialize, Eq, PartialEq)]
75#[serde(rename_all = "kebab-case")]
76#[non_exhaustive]
77pub enum Location {
78    Local,
79    ChildSite,
80    Parent,
81}
82
83/// Volga options for consumers and producers.
84#[derive(Clone, Copy, Debug, Serialize)]
85#[serde(rename_all = "kebab-case")]
86#[non_exhaustive]
87pub struct CreateOptions {
88    replication_factor: u32,
89    persistence: Persistence,
90
91    format: Format,
92
93    ephemeral: bool,
94}
95
96impl Default for CreateOptions {
97    fn default() -> Self {
98        Self {
99            replication_factor: 1,
100            persistence: Persistence::Disk,
101            format: Format::default(),
102            ephemeral: false,
103        }
104    }
105}
106
107impl CreateOptions {
108    /// Number of replicas in the cluster.
109    #[must_use]
110    pub fn replication_factor(self, replication_factor: u32) -> Self {
111        Self {replication_factor, .. self}
112    }
113
114    /// Volga stream persistence.
115    #[must_use]
116    pub fn persistence(self, persistence: Persistence) -> Self {
117        Self {persistence, ..self}
118    }
119
120    /// Volga format.
121    #[must_use]
122    pub fn format(self, format: Format) -> Self {
123        Self {format, ..self}
124    }
125
126    /// Delete topic after call producers and consumers disconnect.
127    #[must_use]
128    pub fn ephemeral(self, ephemeral: bool) -> Self {
129        Self {ephemeral, ..self}
130    }
131}
132
133
134#[tracing::instrument(skip(ws))]
135pub(crate) async fn get_binary_response(ws: &mut WebSocketStream) -> Result<Bytes> {
136    loop {
137        let resp = ws
138            .next()
139            .await
140            .ok_or_else(|| Error::Volga(Some("Expected websocket message".to_owned())))??;
141
142        match resp {
143            tokio_tungstenite::tungstenite::Message::Ping(data) => {
144                tracing::trace!("Received ping");
145                let msg = tokio_tungstenite::tungstenite::Message::Pong(data);
146                ws.send(msg).await?;
147            }
148            tokio_tungstenite::tungstenite::Message::Pong(_) => (),
149            tokio_tungstenite::tungstenite::Message::Binary(bin) => return Ok(bin),
150            tokio_tungstenite::tungstenite::Message::Close(cf) => {
151                if let Some(cf) = cf {
152                    return Err(Error::Volga(Some(format!("closed: {cf}"))));
153                }
154                return Err(Error::Volga(Some("closed".to_owned())));
155            }
156            msg => {
157                return Err(Error::Volga(Some(format!(
158                    "Unexpected message type: '{msg}'",
159                ))))
160            }
161        }
162    }
163}
164
165async fn get_ok_volga_response(ws: &mut WebSocketStream) -> Result<()> {
166    let msg = get_binary_response(ws).await?;
167    let resp: VolgaResponse = serde_json::from_slice(&msg)?;
168    tracing::trace!("volga response {:?}", resp);
169    match resp.result {
170        VolgaResult::Ok => Ok(()),
171        VolgaResult::Error => {
172            let err_msg = serde_json::to_string(&resp.errors)
173                .unwrap_or_else(|err| format!("Failed to decode volga error: {err}"));
174            Err(Error::Volga(Some(err_msg)))
175        }
176    }
177}
178
179#[derive(Debug, Deserialize)]
180enum VolgaResult {
181    #[serde(rename = "ok")]
182    Ok,
183    #[serde(rename = "error")]
184    Error,
185}
186
187#[derive(Debug, Deserialize)]
188struct VolgaResponse {
189    result: VolgaResult,
190    #[serde(default)]
191    errors: Vec<serde_json::Value>,
192}
193
194#[cfg(test)]
195mod test {
196    #[test]
197    fn on_no_exists() {
198        let wait = serde_json::to_string(&super::OnNoExists::Wait).unwrap();
199        assert_eq!(&wait, r#"{"on-no-exists":"wait"}"#);
200
201        let fail = serde_json::to_string(&super::OnNoExists::Fail).unwrap();
202        assert_eq!(&fail, r#"{"on-no-exists":"fail"}"#);
203
204        let create = serde_json::to_string(&super::OnNoExists::Create(
205            crate::volga::CreateOptions::default(),
206        ))
207        .unwrap();
208        assert_eq!(
209            &create,
210            r#"{"on-no-exists":"create","create-options":{"replication-factor":1,"persistence":"disk","format":"json","ephemeral":false}}"#
211        );
212    }
213}