use super::WebSocketStream;
use crate::Result;
use chrono::{DateTime, Utc};
use futures_util::SinkExt;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio_tungstenite::tungstenite::Message as WSMessage;
const N_IN_AUTO_MORE: u64 = 5;
#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
pub enum Mode {
#[serde(rename = "exclusive")]
Exclusive,
#[serde(rename = "shared")]
Shared,
#[serde(rename = "standby")]
Standby,
}
#[derive(Clone, Copy, Debug)]
pub struct Options<'a> {
pub position: Position<'a>,
pub auto_more: bool,
pub mode: Mode,
pub on_no_exists: super::OnNoExists,
}
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "kebab-case")]
struct OpenConsumer<'a> {
op: &'a str,
location: super::Location,
#[serde(skip_serializing_if = "Option::is_none")]
child_site: Option<&'a str>,
topic: &'a str,
name: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
position: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
position_sequence_number: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
position_timestamp: Option<chrono::DateTime<chrono::Local>>,
#[serde(skip_serializing_if = "Option::is_none")]
position_since: Option<&'a str>,
#[serde(flatten)]
on_no_exists: super::OnNoExists,
mode: Mode,
}
impl Default for Options<'_> {
fn default() -> Self {
Self {
on_no_exists: super::OnNoExists::Wait,
position: Position::default(),
auto_more: true,
mode: Mode::Exclusive,
}
}
}
#[derive(Clone, Copy, Default, Debug, Serialize)]
#[non_exhaustive]
pub enum Position<'a> {
#[serde(rename = "beginning")]
Beginning,
#[serde(rename = "end")]
End,
#[serde(rename = "unread")]
#[default]
Unread,
#[serde(skip)]
SequenceNumber(u64),
#[serde(skip)]
TimeStamp(chrono::DateTime<chrono::Local>),
#[serde(skip)]
Since(&'a str),
}
pub struct Builder<'a> {
avassa_client: &'a crate::Client,
location: super::Location,
child_site: Option<&'a str>,
topic: &'a str,
ws_url: url::Url,
name: &'a str,
options: Options<'a>,
}
impl<'a> Builder<'a> {
pub(crate) fn new(
avassa_client: &'a crate::Client,
name: &'a str,
topic: &'a str,
) -> Result<Self> {
let ws_url = avassa_client.websocket_url.join("volga")?;
Ok(Self {
avassa_client,
location: super::Location::Local,
child_site: None,
topic,
ws_url,
name,
options: crate::volga::consumer::Options::default(),
})
}
pub(crate) fn new_child(
avassa_client: &'a crate::Client,
name: &'a str,
topic: &'a str,
site: &'a str,
) -> Result<Self> {
let ws_url = avassa_client.websocket_url.join("volga")?;
Ok(Self {
avassa_client,
location: super::Location::ChildSite,
child_site: Some(site),
topic,
ws_url,
name,
options: crate::volga::consumer::Options::default(),
})
}
pub(crate) fn new_parent(
avassa_client: &'a crate::Client,
name: &'a str,
topic: &'a str,
) -> Result<Self> {
let ws_url = avassa_client.websocket_url.join("volga")?;
Ok(Self {
avassa_client,
location: super::Location::Parent,
child_site: None,
topic,
ws_url,
name,
options: crate::volga::consumer::Options::default(),
})
}
#[must_use]
pub fn set_options(self, options: Options<'a>) -> Self {
Self { options, ..self }
}
pub async fn connect(self) -> Result<Consumer> {
let ws_uri = self.ws_url.to_string().parse()?;
let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
.with_header(
"Authorization",
format!("Bearer {}", self.avassa_client.bearer_token().await),
);
let tls = self.avassa_client.open_tls_stream().await?;
let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
let cmd = OpenConsumer {
op: "open-consumer",
location: self.location,
child_site: self.child_site,
topic: self.topic,
name: self.name,
mode: self.options.mode,
position: match self.options.position {
Position::SequenceNumber(_seqno) => None,
Position::TimeStamp(_ts) => None,
Position::Since(_) => None,
Position::Beginning => Some("beginning"),
Position::End => Some("end"),
Position::Unread => Some("unread"),
},
position_sequence_number: match self.options.position {
Position::SequenceNumber(seqno) => Some(seqno),
_ => None,
},
position_timestamp: match self.options.position {
Position::TimeStamp(ts) => Some(ts),
_ => None,
},
position_since: match self.options.position {
Position::Since(s) => Some(s),
_ => None,
},
on_no_exists: self.options.on_no_exists,
};
tracing::debug!("{:#?}", serde_json::to_string_pretty(&cmd));
ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
.await?;
super::get_ok_volga_response(&mut ws).await?;
tracing::debug!("Successfully connected consumer to topic {}", self.topic);
let mut consumer = Consumer {
ws: Some(ws),
auto_more: self.options.auto_more,
last_seq_no: 0,
last_remain: N_IN_AUTO_MORE,
};
if consumer.auto_more {
consumer.more(N_IN_AUTO_MORE).await?;
}
Ok(consumer)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct MessageMetadata<T> {
pub time: DateTime<Utc>,
pub mtime: u64,
pub seqno: u64,
pub remain: u64,
pub payload: T,
pub producer_name: Option<String>,
}
pub struct Consumer {
ws: Option<WebSocketStream>,
auto_more: bool,
last_seq_no: u64,
last_remain: u64,
}
impl Consumer {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn more(&mut self, n: u64) -> Result<()> {
let cmd = json!( {
"op": "more",
"n": n,
});
tracing::trace!("{}", cmd);
if let Some(ws) = self.ws.as_mut() {
ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
.await?;
self.last_remain = n;
}
Ok(())
}
#[tracing::instrument(skip(self), level = "trace")]
pub async fn consume<T: serde::de::DeserializeOwned + std::fmt::Debug>(
&mut self,
) -> Result<MessageMetadata<T>> {
if self.last_remain == 0 && self.auto_more {
self.more(N_IN_AUTO_MORE).await?;
}
if let Some(ws) = self.ws.as_mut() {
let msg = super::get_binary_response(ws).await?;
tracing::trace!("message: {}", String::from_utf8_lossy(&msg));
let resp: MessageMetadata<T> = serde_json::from_slice(&msg)?;
self.last_seq_no = resp.seqno;
tracing::trace!("Metadata: {:?}", resp);
self.last_remain = resp.remain;
Ok(resp)
} else {
Err(crate::Error::Volga(Some(
"No web socket available".to_string(),
)))
}
}
pub async fn ack(&mut self, seqno: u64) -> Result<()> {
if let Some(ws) = self.ws.as_mut() {
tracing::trace!("ack: {}", seqno);
let cmd = serde_json::json!({
"op": "ack",
"seqno": seqno,
});
ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
.await?;
}
Ok(())
}
#[must_use]
pub const fn last_seq_no(&self) -> u64 {
self.last_seq_no
}
}
impl Drop for Consumer {
fn drop(&mut self) {
if let Some(mut ws) = self.ws.take() {
tokio::spawn(async move {
let _res = ws.close(None).await;
});
}
}
}