1use crate::{Error, Result};
5use bytes::Bytes;
6use futures_util::{SinkExt as _, StreamExt as _};
7use serde::{Deserialize, Serialize};
8
9pub mod consumer;
10pub mod producer;
11pub mod query_topic;
12
13pub(crate) type WebSocketStream =
14 tokio_tungstenite::WebSocketStream<tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;
15
16#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
18#[non_exhaustive]
19pub enum Persistence {
20 #[serde(rename = "disk")]
22 Disk,
23 #[serde(rename = "ram")]
25 RAM,
26}
27
28#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
30#[non_exhaustive]
31pub enum Format {
32 #[serde(rename = "json")]
34 #[default]
35 JSON,
36 #[serde(rename = "string")]
38 String,
39}
40
41#[derive(Clone, Copy, Debug)]
43#[non_exhaustive]
44pub enum OnNoExists {
45 Create(CreateOptions),
46 Wait,
47 Fail,
48}
49
50impl Serialize for OnNoExists {
51 fn serialize<S>(&self, serializer: S) -> core::result::Result<S::Ok, S::Error>
52 where
53 S: serde::Serializer,
54 {
55 use serde::ser::SerializeMap as _;
56 let mut map = serializer.serialize_map(None)?;
57 map.serialize_entry(
58 "on-no-exists",
59 match *self {
60 Self::Wait => "wait",
61 Self::Fail => "fail",
62 Self::Create(_) => "create",
63 },
64 )?;
65 if let &Self::Create(create_opts) = self {
66 map.serialize_entry("create-options", &create_opts)?;
67 }
68
69 map.end()
70 }
71}
72
73#[derive(Clone, Copy, Debug, Serialize, Eq, PartialEq)]
75#[serde(rename_all = "kebab-case")]
76#[non_exhaustive]
77pub enum Location {
78 Local,
79 ChildSite,
80 Parent,
81}
82
83#[derive(Clone, Copy, Debug, Serialize)]
85#[serde(rename_all = "kebab-case")]
86#[non_exhaustive]
87pub struct CreateOptions {
88 replication_factor: u32,
89 persistence: Persistence,
90
91 format: Format,
92
93 ephemeral: bool,
94}
95
96impl Default for CreateOptions {
97 fn default() -> Self {
98 Self {
99 replication_factor: 1,
100 persistence: Persistence::Disk,
101 format: Format::default(),
102 ephemeral: false,
103 }
104 }
105}
106
107impl CreateOptions {
108 #[must_use]
110 pub fn replication_factor(self, replication_factor: u32) -> Self {
111 Self {replication_factor, .. self}
112 }
113
114 #[must_use]
116 pub fn persistence(self, persistence: Persistence) -> Self {
117 Self {persistence, ..self}
118 }
119
120 #[must_use]
122 pub fn format(self, format: Format) -> Self {
123 Self {format, ..self}
124 }
125
126 #[must_use]
128 pub fn ephemeral(self, ephemeral: bool) -> Self {
129 Self {ephemeral, ..self}
130 }
131}
132
133
134#[tracing::instrument(skip(ws))]
135pub(crate) async fn get_binary_response(ws: &mut WebSocketStream) -> Result<Bytes> {
136 loop {
137 let resp = ws
138 .next()
139 .await
140 .ok_or_else(|| Error::Volga(Some("Expected websocket message".to_owned())))??;
141
142 match resp {
143 tokio_tungstenite::tungstenite::Message::Ping(data) => {
144 tracing::trace!("Received ping");
145 let msg = tokio_tungstenite::tungstenite::Message::Pong(data);
146 ws.send(msg).await?;
147 }
148 tokio_tungstenite::tungstenite::Message::Pong(_) => (),
149 tokio_tungstenite::tungstenite::Message::Binary(bin) => return Ok(bin),
150 tokio_tungstenite::tungstenite::Message::Close(cf) => {
151 if let Some(cf) = cf {
152 return Err(Error::Volga(Some(format!("closed: {cf}"))));
153 }
154 return Err(Error::Volga(Some("closed".to_owned())));
155 }
156 msg => {
157 return Err(Error::Volga(Some(format!(
158 "Unexpected message type: '{msg}'",
159 ))))
160 }
161 }
162 }
163}
164
165async fn get_ok_volga_response(ws: &mut WebSocketStream) -> Result<()> {
166 let msg = get_binary_response(ws).await?;
167 let resp: VolgaResponse = serde_json::from_slice(&msg)?;
168 tracing::trace!("volga response {:?}", resp);
169 match resp.result {
170 VolgaResult::Ok => Ok(()),
171 VolgaResult::Error => {
172 let err_msg = serde_json::to_string(&resp.errors)
173 .unwrap_or_else(|err| format!("Failed to decode volga error: {err}"));
174 Err(Error::Volga(Some(err_msg)))
175 }
176 }
177}
178
179#[derive(Debug, Deserialize)]
180enum VolgaResult {
181 #[serde(rename = "ok")]
182 Ok,
183 #[serde(rename = "error")]
184 Error,
185}
186
187#[derive(Debug, Deserialize)]
188struct VolgaResponse {
189 result: VolgaResult,
190 #[serde(default)]
191 errors: Vec<serde_json::Value>,
192}
193
194#[cfg(test)]
195mod test {
196 #[test]
197 fn on_no_exists() {
198 let wait = serde_json::to_string(&super::OnNoExists::Wait).unwrap();
199 assert_eq!(&wait, r#"{"on-no-exists":"wait"}"#);
200
201 let fail = serde_json::to_string(&super::OnNoExists::Fail).unwrap();
202 assert_eq!(&fail, r#"{"on-no-exists":"fail"}"#);
203
204 let create = serde_json::to_string(&super::OnNoExists::Create(
205 crate::volga::CreateOptions::default(),
206 ))
207 .unwrap();
208 assert_eq!(
209 &create,
210 r#"{"on-no-exists":"create","create-options":{"replication-factor":1,"persistence":"disk","format":"json","ephemeral":false}}"#
211 );
212 }
213}