avassa_client/volga/
mod.rs1use crate::{Error, Result};
5use bytes::Bytes;
6use futures_util::{SinkExt, StreamExt};
7use serde::{Deserialize, Serialize};
8
9pub mod consumer;
10pub mod query_topic;
11pub mod producer;
12
13type WebSocketStream =
14 tokio_tungstenite::WebSocketStream<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
15
16#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
18pub enum Persistence {
19 #[serde(rename = "disk")]
21 Disk,
22 #[serde(rename = "ram")]
24 RAM,
25}
26
27#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
29pub enum Format {
30 #[serde(rename = "json")]
32 JSON,
33 #[serde(rename = "string")]
35 String,
36}
37
38impl Default for Format {
39 fn default() -> Self {
40 Self::JSON
41 }
42}
43
44#[derive(Clone, Copy, Debug)]
46pub enum OnNoExists {
48 Create(CreateOptions),
49 Wait,
50 Fail,
51}
52
53impl Serialize for OnNoExists {
54 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
55 where
56 S: serde::Serializer,
57 {
58 use serde::ser::SerializeMap;
59 let mut map = serializer.serialize_map(None)?;
60 map.serialize_entry(
61 "on-no-exists",
62 match self {
63 Self::Wait => "wait",
64 Self::Fail => "fail",
65 Self::Create(_) => "create",
66 },
67 )?;
68 if let Self::Create(create_opts) = self {
69 map.serialize_entry("create-options", create_opts)?;
70 }
71
72 map.end()
73 }
74}
75
76#[derive(Clone, Copy, Debug, Serialize, Eq, PartialEq)]
78#[serde(rename_all = "kebab-case")]
79pub enum Location {
80 Local,
81 ChildSite,
82 Parent,
83}
84
85#[derive(Clone, Copy, Debug, Serialize)]
87#[serde(rename_all = "kebab-case")]
88pub struct CreateOptions {
89 pub replication_factor: u32,
91 pub persistence: Persistence,
93
94 pub num_chunks: usize,
96
97 pub format: Format,
99
100 pub ephemeral: bool,
102}
103
104impl Default for CreateOptions {
105 fn default() -> Self {
106 Self {
107 replication_factor: 1,
108 persistence: Persistence::Disk,
109 format: Format::default(),
110 num_chunks: 100,
111 ephemeral: false,
112 }
113 }
114}
115
116#[tracing::instrument(skip(ws))]
117async fn get_binary_response(ws: &mut WebSocketStream) -> Result<Bytes> {
118 loop {
119 let resp = ws
120 .next()
121 .await
122 .ok_or_else(|| Error::Volga(Some("Expected websocket message".to_string())))??;
123
124 match resp {
125 tokio_tungstenite::tungstenite::Message::Ping(data) => {
126 tracing::trace!("Received ping");
127 let msg = tokio_tungstenite::tungstenite::Message::Pong(data);
128 ws.send(msg).await?;
129 }
130 tokio_tungstenite::tungstenite::Message::Pong(_) => (),
131 tokio_tungstenite::tungstenite::Message::Binary(m) => return Ok(m),
132 tokio_tungstenite::tungstenite::Message::Close(_) => {
133 return Err(Error::Volga(Some("closed".to_string())));
134 }
135 msg => {
136 return Err(Error::Volga(Some(format!(
137 "Unexpected message type: '{msg}'",
138 ))))
139 }
140 }
141 }
142}
143
144async fn get_ok_volga_response(ws: &mut WebSocketStream) -> Result<()> {
145 let msg = get_binary_response(ws).await?;
146 let resp: VolgaResponse = serde_json::from_slice(&msg)?;
147 tracing::trace!("volga response {:?}", resp);
148 match resp.result {
149 VolgaResult::Ok => Ok(()),
150 VolgaResult::Error => {
151 let err_msg = serde_json::to_string(&resp.errors)
152 .unwrap_or_else(|e| format!("Failed to decode volga error: {e}"));
153 Err(Error::Volga(Some(err_msg)))
154 }
155 }
156}
157
158#[derive(Debug, Deserialize)]
159enum VolgaResult {
160 #[serde(rename = "ok")]
161 Ok,
162 #[serde(rename = "error")]
163 Error,
164}
165
166#[derive(Debug, Deserialize)]
167struct VolgaResponse {
168 result: VolgaResult,
169 #[serde(default)]
170 errors: Vec<serde_json::Value>,
171}
172
173#[cfg(test)]
174mod test {
175 #[test]
176 fn on_no_exists() {
177 let wait = serde_json::to_string(&super::OnNoExists::Wait).unwrap();
178 assert_eq!(&wait, r#"{"on-no-exists":"wait"}"#);
179
180 let fail = serde_json::to_string(&super::OnNoExists::Fail).unwrap();
181 assert_eq!(&fail, r#"{"on-no-exists":"fail"}"#);
182
183 let create = serde_json::to_string(&super::OnNoExists::Create(
184 crate::volga::CreateOptions::default(),
185 ))
186 .unwrap();
187 assert_eq!(
188 &create,
189 r#"{"on-no-exists":"create","create-options":{"replication-factor":1,"persistence":"disk","num-chunks":100,"format":"json","ephemeral":false}}"#
190 );
191 }
192}