1use super::WebSocketStream;
2use crate::Result;
3use chrono::{DateTime, Utc};
4use futures_util::SinkExt;
5use serde::{Deserialize, Serialize};
6use serde_json::json;
7use tokio_tungstenite::tungstenite::Message as WSMessage;
8
9const N_IN_AUTO_MORE: u64 = 5;
10
11#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
13pub enum Mode {
14 #[serde(rename = "exclusive")]
16 Exclusive,
17 #[serde(rename = "shared")]
19 Shared,
20 #[serde(rename = "standby")]
22 Standby,
23}
24
25#[derive(Clone, Copy, Debug)]
27pub struct Options<'a> {
28 pub position: Position<'a>,
30
31 pub auto_more: bool,
33
34 pub mode: Mode,
36
37 pub on_no_exists: super::OnNoExists,
39}
40
41#[derive(Clone, Debug, Serialize)]
42#[serde(rename_all = "kebab-case")]
43struct OpenConsumer<'a> {
44 op: &'a str,
45 location: super::Location,
46 #[serde(skip_serializing_if = "Option::is_none")]
47 child_site: Option<&'a str>,
48 topic: &'a str,
49 name: &'a str,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 position: Option<&'a str>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 position_sequence_number: Option<u64>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 position_timestamp: Option<chrono::DateTime<chrono::Local>>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 position_since: Option<&'a str>,
58 #[serde(flatten)]
59 on_no_exists: super::OnNoExists,
60 mode: Mode,
61}
62
63impl Default for Options<'_> {
64 fn default() -> Self {
65 Self {
66 on_no_exists: super::OnNoExists::Wait,
67 position: Position::default(),
68 auto_more: true,
69 mode: Mode::Exclusive,
70 }
71 }
72}
73
74#[derive(Clone, Copy, Default, Debug, Serialize)]
76#[non_exhaustive]
77pub enum Position<'a> {
78 #[serde(rename = "beginning")]
80 Beginning,
81 #[serde(rename = "end")]
83 End,
84 #[serde(rename = "unread")]
86 #[default]
87 Unread,
88 #[serde(skip)]
90 SequenceNumber(u64),
91
92 #[serde(skip)]
94 TimeStamp(chrono::DateTime<chrono::Local>),
95
96 #[serde(skip)]
97 Since(&'a str),
98}
99
100pub struct Builder<'a> {
102 avassa_client: &'a crate::Client,
103 location: super::Location,
104 child_site: Option<&'a str>,
105 topic: &'a str,
106 ws_url: url::Url,
107 name: &'a str,
108 options: Options<'a>,
109}
110
111impl<'a> Builder<'a> {
113 pub(crate) fn new(
115 avassa_client: &'a crate::Client,
116 name: &'a str,
117 topic: &'a str,
118 ) -> Result<Self> {
119 let ws_url = avassa_client.websocket_url.join("volga")?;
120
121 Ok(Self {
122 avassa_client,
123 location: super::Location::Local,
124 child_site: None,
125 topic,
126 ws_url,
127 name,
128 options: crate::volga::consumer::Options::default(),
129 })
130 }
131
132 pub(crate) fn new_child(
134 avassa_client: &'a crate::Client,
135 name: &'a str,
136 topic: &'a str,
137 site: &'a str,
138 ) -> Result<Self> {
139 let ws_url = avassa_client.websocket_url.join("volga")?;
140
141 Ok(Self {
142 avassa_client,
143 location: super::Location::ChildSite,
144 child_site: Some(site),
145 topic,
146 ws_url,
147 name,
148 options: crate::volga::consumer::Options::default(),
149 })
150 }
151
152 pub(crate) fn new_parent(
154 avassa_client: &'a crate::Client,
155 name: &'a str,
156 topic: &'a str,
157 ) -> Result<Self> {
158 let ws_url = avassa_client.websocket_url.join("volga")?;
159
160 Ok(Self {
161 avassa_client,
162 location: super::Location::Parent,
163 child_site: None,
164 topic,
165 ws_url,
166 name,
167 options: crate::volga::consumer::Options::default(),
168 })
169 }
170
171 #[must_use]
173 pub fn set_options(self, options: Options<'a>) -> Self {
174 Self { options, ..self }
175 }
176
177 pub async fn connect(self) -> Result<Consumer> {
179 let ws_uri = self.ws_url.to_string().parse()?;
180 let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
181 .with_header(
182 "Authorization",
183 format!("Bearer {}", self.avassa_client.bearer_token().await),
184 );
185
186 let tls = self.avassa_client.open_tls_stream().await?;
187 let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
188 let cmd = OpenConsumer {
189 op: "open-consumer",
190 location: self.location,
191 child_site: self.child_site,
192 topic: self.topic,
193 name: self.name,
194 mode: self.options.mode,
195 position: match self.options.position {
196 Position::SequenceNumber(_seqno) => None,
197 Position::TimeStamp(_ts) => None,
198 Position::Since(_) => None,
199 Position::Beginning => Some("beginning"),
200 Position::End => Some("end"),
201 Position::Unread => Some("unread"),
202 },
203 position_sequence_number: match self.options.position {
204 Position::SequenceNumber(seqno) => Some(seqno),
205 _ => None,
206 },
207 position_timestamp: match self.options.position {
208 Position::TimeStamp(ts) => Some(ts),
209 _ => None,
210 },
211 position_since: match self.options.position {
212 Position::Since(s) => Some(s),
213 _ => None,
214 },
215 on_no_exists: self.options.on_no_exists,
216 };
217
218 tracing::debug!("{:#?}", serde_json::to_string_pretty(&cmd));
219
220 ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
221 .await?;
222
223 super::get_ok_volga_response(&mut ws).await?;
224
225 tracing::debug!("Successfully connected consumer to topic {}", self.topic);
226 let mut consumer = Consumer {
227 ws: Some(ws),
228 auto_more: self.options.auto_more,
229 last_seq_no: 0,
230 last_remain: N_IN_AUTO_MORE,
231 };
232
233 if consumer.auto_more {
234 consumer.more(N_IN_AUTO_MORE).await?;
235 }
236
237 Ok(consumer)
238 }
239}
240
241#[derive(Debug, Serialize, Deserialize, Clone)]
243#[serde(rename_all = "kebab-case")]
244pub struct MessageMetadata<T> {
245 pub time: DateTime<Utc>,
247
248 pub mtime: u64,
250
251 pub seqno: u64,
253
254 pub remain: u64,
257
258 pub payload: T,
260
261 pub producer_name: Option<String>,
263}
264
265pub struct Consumer {
267 ws: Option<WebSocketStream>,
269 auto_more: bool,
270 last_seq_no: u64,
271 last_remain: u64,
273}
274
275impl Consumer {
276 #[tracing::instrument(skip(self), level = "debug")]
279 pub async fn more(&mut self, n: u64) -> Result<()> {
280 let cmd = json!( {
281 "op": "more",
282 "n": n,
283 });
284
285 tracing::trace!("{}", cmd);
286 if let Some(ws) = self.ws.as_mut() {
287 ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
288 .await?;
289
290 self.last_remain = n;
291 }
292
293 Ok(())
294 }
295
296 #[tracing::instrument(skip(self), level = "trace")]
298 pub async fn consume<T: serde::de::DeserializeOwned + std::fmt::Debug>(
299 &mut self,
300 ) -> Result<MessageMetadata<T>> {
301 if self.last_remain == 0 && self.auto_more {
303 self.more(N_IN_AUTO_MORE).await?;
304 }
305
306 if let Some(ws) = self.ws.as_mut() {
307 let msg = super::get_binary_response(ws).await?;
308 tracing::trace!("message: {}", String::from_utf8_lossy(&msg));
309
310 let resp: MessageMetadata<T> = serde_json::from_slice(&msg)?;
311 self.last_seq_no = resp.seqno;
312 tracing::trace!("Metadata: {:?}", resp);
313
314 self.last_remain = resp.remain;
315 Ok(resp)
316 } else {
317 Err(crate::Error::Volga(Some(
318 "No web socket available".to_string(),
319 )))
320 }
321 }
322
323 pub async fn ack(&mut self, seqno: u64) -> Result<()> {
325 if let Some(ws) = self.ws.as_mut() {
326 tracing::trace!("ack: {}", seqno);
327 let cmd = serde_json::json!({
328 "op": "ack",
329 "seqno": seqno,
330 });
331
332 ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
333 .await?;
334 }
335 Ok(())
336 }
337
338 #[must_use]
340 pub const fn last_seq_no(&self) -> u64 {
341 self.last_seq_no
342 }
343}
344
345impl Drop for Consumer {
346 fn drop(&mut self) {
347 if let Some(mut ws) = self.ws.take() {
348 tokio::spawn(async move {
349 let _res = ws.close(None).await;
350 });
351 }
352 }
353}