Skip to main content

avassa_client/volga/
consumer.rs

1use super::WebSocketStream;
2use crate::Result;
3use chrono::{DateTime, Utc};
4use futures_util::SinkExt;
5use serde::{Deserialize, Serialize};
6use serde_json::json;
7use tokio_tungstenite::tungstenite::Message as WSMessage;
8
9const N_IN_AUTO_MORE: u64 = 5;
10
11/// Volga stream mode
12#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
13pub enum Mode {
14    /// Consumer names has to be unique
15    #[serde(rename = "exclusive")]
16    Exclusive,
17    /// Messages are sent to consumers, with the same name, in a round-robin fashon.
18    #[serde(rename = "shared")]
19    Shared,
20    /// Act as a backup/standby consumer
21    #[serde(rename = "standby")]
22    Standby,
23}
24
25/// [`Consumer`] options
26#[derive(Clone, Copy, Debug)]
27pub struct Options<'a> {
28    /// Starting position
29    pub position: Position<'a>,
30
31    /// If set, the client will automatically request more items
32    pub auto_more: bool,
33
34    /// Volga stream mode
35    pub mode: Mode,
36
37    /// Optional create options
38    pub on_no_exists: super::OnNoExists,
39}
40
41#[derive(Clone, Debug, Serialize)]
42#[serde(rename_all = "kebab-case")]
43struct OpenConsumer<'a> {
44    op: &'a str,
45    location: super::Location,
46    #[serde(skip_serializing_if = "Option::is_none")]
47    child_site: Option<&'a str>,
48    topic: &'a str,
49    name: &'a str,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    position: Option<&'a str>,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    position_sequence_number: Option<u64>,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    position_timestamp: Option<chrono::DateTime<chrono::Local>>,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    position_since: Option<&'a str>,
58    #[serde(flatten)]
59    on_no_exists: super::OnNoExists,
60    mode: Mode,
61}
62
63impl Default for Options<'_> {
64    fn default() -> Self {
65        Self {
66            on_no_exists: super::OnNoExists::Wait,
67            position: Position::default(),
68            auto_more: true,
69            mode: Mode::Exclusive,
70        }
71    }
72}
73
74/// Volga Consumer starting position
75#[derive(Clone, Copy, Default, Debug, Serialize)]
76#[non_exhaustive]
77pub enum Position<'a> {
78    /// Get all messages from the beginning
79    #[serde(rename = "beginning")]
80    Beginning,
81    /// Start consuming from the end, i.e only get new messages
82    #[serde(rename = "end")]
83    End,
84    /// Get all unread messages
85    #[serde(rename = "unread")]
86    #[default]
87    Unread,
88    /// Start consuming from a sequence number
89    #[serde(skip)]
90    SequenceNumber(u64),
91
92    /// Start consuming from a timestamp
93    #[serde(skip)]
94    TimeStamp(chrono::DateTime<chrono::Local>),
95
96    #[serde(skip)]
97    Since(&'a str),
98}
99
100/// [`Consumer`] builder
101pub struct Builder<'a> {
102    avassa_client: &'a crate::Client,
103    location: super::Location,
104    child_site: Option<&'a str>,
105    topic: &'a str,
106    ws_url: url::Url,
107    name: &'a str,
108    options: Options<'a>,
109}
110
111/// Created from the Avassa Client.
112impl<'a> Builder<'a> {
113    /// Create a Volga Consumer Builder
114    pub(crate) fn new(
115        avassa_client: &'a crate::Client,
116        name: &'a str,
117        topic: &'a str,
118    ) -> Result<Self> {
119        let ws_url = avassa_client.websocket_url.join("volga")?;
120
121        Ok(Self {
122            avassa_client,
123            location: super::Location::Local,
124            child_site: None,
125            topic,
126            ws_url,
127            name,
128            options: crate::volga::consumer::Options::default(),
129        })
130    }
131
132    /// Create a Volga NAT Consumer Builder
133    pub(crate) fn new_child(
134        avassa_client: &'a crate::Client,
135        name: &'a str,
136        topic: &'a str,
137        site: &'a str,
138    ) -> Result<Self> {
139        let ws_url = avassa_client.websocket_url.join("volga")?;
140
141        Ok(Self {
142            avassa_client,
143            location: super::Location::ChildSite,
144            child_site: Some(site),
145            topic,
146            ws_url,
147            name,
148            options: crate::volga::consumer::Options::default(),
149        })
150    }
151
152    /// Create a Volga parent Consumer Builder
153    pub(crate) fn new_parent(
154        avassa_client: &'a crate::Client,
155        name: &'a str,
156        topic: &'a str,
157    ) -> Result<Self> {
158        let ws_url = avassa_client.websocket_url.join("volga")?;
159
160        Ok(Self {
161            avassa_client,
162            location: super::Location::Parent,
163            child_site: None,
164            topic,
165            ws_url,
166            name,
167            options: crate::volga::consumer::Options::default(),
168        })
169    }
170
171    /// Set Volga `Options`
172    #[must_use]
173    pub fn set_options(self, options: Options<'a>) -> Self {
174        Self { options, ..self }
175    }
176
177    /// Connect and create a `Consumer`
178    pub async fn connect(self) -> Result<Consumer> {
179        let ws_uri = self.ws_url.to_string().parse()?;
180        let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
181            .with_header(
182                "Authorization",
183                format!("Bearer {}", self.avassa_client.bearer_token().await),
184            );
185
186        let tls = self.avassa_client.open_tls_stream().await?;
187        let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
188        let cmd = OpenConsumer {
189            op: "open-consumer",
190            location: self.location,
191            child_site: self.child_site,
192            topic: self.topic,
193            name: self.name,
194            mode: self.options.mode,
195            position: match self.options.position {
196                Position::SequenceNumber(_seqno) => None,
197                Position::TimeStamp(_ts) => None,
198                Position::Since(_) => None,
199                Position::Beginning => Some("beginning"),
200                Position::End => Some("end"),
201                Position::Unread => Some("unread"),
202            },
203            position_sequence_number: match self.options.position {
204                Position::SequenceNumber(seqno) => Some(seqno),
205                _ => None,
206            },
207            position_timestamp: match self.options.position {
208                Position::TimeStamp(ts) => Some(ts),
209                _ => None,
210            },
211            position_since: match self.options.position {
212                Position::Since(s) => Some(s),
213                _ => None,
214            },
215            on_no_exists: self.options.on_no_exists,
216        };
217
218        tracing::debug!("{:#?}", serde_json::to_string_pretty(&cmd));
219
220        ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
221            .await?;
222
223        super::get_ok_volga_response(&mut ws).await?;
224
225        tracing::debug!("Successfully connected consumer to topic {}", self.topic);
226        let mut consumer = Consumer {
227            ws: Some(ws),
228            auto_more: self.options.auto_more,
229            last_seq_no: 0,
230            last_remain: N_IN_AUTO_MORE,
231        };
232
233        if consumer.auto_more {
234            consumer.more(N_IN_AUTO_MORE).await?;
235        }
236
237        Ok(consumer)
238    }
239}
240
241/// Metadata on the Volga message received in `Consumer::consume`
242#[derive(Debug, Serialize, Deserialize, Clone)]
243#[serde(rename_all = "kebab-case")]
244pub struct MessageMetadata<T> {
245    /// Timestamp
246    pub time: DateTime<Utc>,
247
248    /// Milliseconds since epoch
249    pub mtime: u64,
250
251    /// Sequence number
252    pub seqno: u64,
253
254    /// The number of remaining message the client has indicated it can handle,
255    /// see the [Consumer more](struct.Consumer.html) function.
256    pub remain: u64,
257
258    /// The message payload
259    pub payload: T,
260
261    /// Name of the producer
262    pub producer_name: Option<String>,
263}
264
265/// Volga Consumer
266pub struct Consumer {
267    // An option so we can take it and drop it.
268    ws: Option<WebSocketStream>,
269    auto_more: bool,
270    last_seq_no: u64,
271    // Last indicated remain when consuming, stored in case consume is cancelled.
272    last_remain: u64,
273}
274
275impl Consumer {
276    /// Indicate the client is ready for n more messages. If `auto_more` is set in the
277    /// options, this is automatically handled.
278    #[tracing::instrument(skip(self), level = "debug")]
279    pub async fn more(&mut self, n: u64) -> Result<()> {
280        let cmd = json!( {
281            "op": "more",
282            "n": n,
283        });
284
285        tracing::trace!("{}", cmd);
286        if let Some(ws) = self.ws.as_mut() {
287            ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
288                .await?;
289
290            self.last_remain = n;
291        }
292
293        Ok(())
294    }
295
296    /// Wait for the next message from Volga
297    #[tracing::instrument(skip(self), level = "trace")]
298    pub async fn consume<T: serde::de::DeserializeOwned + std::fmt::Debug>(
299        &mut self,
300    ) -> Result<MessageMetadata<T>> {
301        // Do we need to indicate we need more messages?
302        if self.last_remain == 0 && self.auto_more {
303            self.more(N_IN_AUTO_MORE).await?;
304        }
305
306        if let Some(ws) = self.ws.as_mut() {
307            let msg = super::get_binary_response(ws).await?;
308            tracing::trace!("message: {}", String::from_utf8_lossy(&msg));
309
310            let resp: MessageMetadata<T> = serde_json::from_slice(&msg)?;
311            self.last_seq_no = resp.seqno;
312            tracing::trace!("Metadata: {:?}", resp);
313
314            self.last_remain = resp.remain;
315            Ok(resp)
316        } else {
317            Err(crate::Error::Volga(Some(
318                "No web socket available".to_string(),
319            )))
320        }
321    }
322
323    /// Ack a received message
324    pub async fn ack(&mut self, seqno: u64) -> Result<()> {
325        if let Some(ws) = self.ws.as_mut() {
326            tracing::trace!("ack: {}", seqno);
327            let cmd = serde_json::json!({
328                "op": "ack",
329                "seqno": seqno,
330            });
331
332            ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
333                .await?;
334        }
335        Ok(())
336    }
337
338    /// returns the last received sequence number
339    #[must_use]
340    pub const fn last_seq_no(&self) -> u64 {
341        self.last_seq_no
342    }
343}
344
345impl Drop for Consumer {
346    fn drop(&mut self) {
347        if let Some(mut ws) = self.ws.take() {
348            tokio::spawn(async move {
349                let _res = ws.close(None).await;
350            });
351        }
352    }
353}