use super::WebSocketStream;
use crate::Result;
use chrono::{DateTime, Utc};
use futures_util::SinkExt;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio_tungstenite::tungstenite::{client::IntoClientRequest, Message as WSMessage};
const N_IN_AUTO_MORE: u64 = 5;
#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
pub enum Mode {
#[serde(rename = "exclusive")]
Exclusive,
#[serde(rename = "shared")]
Shared,
#[serde(rename = "standby")]
Standby,
}
#[derive(Clone, Copy, Debug)]
pub struct Options {
pub position: Position,
pub auto_more: bool,
pub mode: Mode,
pub on_no_exists: super::OnNoExists,
}
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "kebab-case")]
struct OpenConsumer<'a> {
op: &'a str,
location: super::Location,
#[serde(skip_serializing_if = "Option::is_none")]
child_site: Option<&'a str>,
topic: &'a str,
name: &'a str,
position: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
position_sequence_number: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
position_timestamp: Option<chrono::DateTime<chrono::Local>>,
#[serde(flatten)]
on_no_exists: super::OnNoExists,
mode: Mode,
}
impl Default for Options {
fn default() -> Self {
Self {
on_no_exists: super::OnNoExists::Wait,
position: Position::default(),
auto_more: true,
mode: Mode::Exclusive,
}
}
}
#[derive(Clone, Copy, Debug, Serialize)]
pub enum Position {
#[serde(rename = "beginning")]
Beginning,
#[serde(rename = "end")]
End,
#[serde(rename = "unread")]
Unread,
#[serde(skip)]
SequenceNumber(u64),
#[serde(skip)]
TimeStamp(chrono::DateTime<chrono::Local>),
}
impl Default for Position {
fn default() -> Self {
Self::End
}
}
pub struct Builder<'a> {
avassa_client: &'a crate::Client,
location: super::Location,
child_site: Option<&'a str>,
topic: &'a str,
ws_url: url::Url,
name: &'a str,
options: Options,
}
impl<'a> Builder<'a> {
pub(crate) fn new(
avassa_client: &'a crate::Client,
name: &'a str,
topic: &'a str,
) -> Result<Self> {
let ws_url = avassa_client.websocket_url.join("volga")?;
Ok(Self {
avassa_client,
location: super::Location::Local,
child_site: None,
topic,
ws_url,
name,
options: crate::volga::consumer::Options::default(),
})
}
pub(crate) fn new_child(
avassa_client: &'a crate::Client,
name: &'a str,
topic: &'a str,
site: &'a str,
) -> Result<Self> {
let ws_url = avassa_client.websocket_url.join("volga")?;
Ok(Self {
avassa_client,
location: super::Location::ChildSite,
child_site: Some(site),
topic,
ws_url,
name,
options: crate::volga::consumer::Options::default(),
})
}
#[must_use]
pub fn set_options(self, options: Options) -> Self {
Self { options, ..self }
}
pub async fn connect(self) -> Result<Consumer> {
let mut request = self.ws_url.into_client_request()?;
request.headers_mut().insert(
reqwest::header::AUTHORIZATION,
reqwest::header::HeaderValue::from_str(&format!(
"Bearer {}",
self.avassa_client.bearer_token().await
))
.map_err(|_e| {
crate::Error::General("Failed to set Authorization header".to_string())
})?,
);
let tls = self.avassa_client.open_tls_stream().await?;
let (mut ws, _) = tokio_tungstenite::client_async(request, tls).await?;
let cmd = OpenConsumer {
op: "open-consumer",
location: self.location,
child_site: self.child_site,
topic: self.topic,
name: self.name,
mode: self.options.mode,
position: match self.options.position {
Position::SequenceNumber(_seqno) => "seqno",
Position::TimeStamp(_ts) => "timestamp",
Position::Beginning => "beginning",
Position::End => "end",
Position::Unread => "unread",
},
position_sequence_number: match self.options.position {
Position::SequenceNumber(seqno) => Some(seqno),
_ => None,
},
position_timestamp: match self.options.position {
Position::TimeStamp(ts) => Some(ts),
_ => None,
},
on_no_exists: self.options.on_no_exists,
};
tracing::debug!("{:#?}", serde_json::to_string_pretty(&cmd));
ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?))
.await?;
super::get_ok_volga_response(&mut ws).await?;
tracing::debug!("Successfully connected consumer to topic {}", self.topic);
let mut consumer = Consumer {
ws: Some(ws),
options: self.options,
last_seq_no: 0,
last_remain: N_IN_AUTO_MORE,
};
if consumer.options.auto_more {
consumer.more(N_IN_AUTO_MORE).await?;
}
Ok(consumer)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct MessageMetadata<T> {
pub time: DateTime<Utc>,
pub mtime: u64,
pub seqno: u64,
pub remain: u64,
pub payload: T,
pub producer_name: Option<String>,
}
pub struct Consumer {
ws: Option<WebSocketStream>,
options: Options,
last_seq_no: u64,
last_remain: u64,
}
impl Consumer {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn more(&mut self, n: u64) -> Result<()> {
let cmd = json!( {
"op": "more",
"n": n,
});
tracing::trace!("{}", cmd);
if let Some(ws) = self.ws.as_mut() {
ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?))
.await?;
self.last_remain = n;
}
Ok(())
}
#[tracing::instrument(skip(self), level = "trace")]
pub async fn consume<T: serde::de::DeserializeOwned + std::fmt::Debug>(
&mut self,
) -> Result<MessageMetadata<T>> {
if self.last_remain == 0 && self.options.auto_more {
self.more(N_IN_AUTO_MORE).await?;
}
if let Some(ws) = self.ws.as_mut() {
let msg = super::get_binary_response(ws).await?;
tracing::trace!("message: {}", String::from_utf8_lossy(&msg));
let resp: MessageMetadata<T> = serde_json::from_slice(&msg)?;
self.last_seq_no = resp.seqno;
tracing::trace!("Metadata: {:?}", resp);
self.last_remain = resp.remain;
Ok(resp)
} else {
Err(crate::Error::Volga(Some(
"No web socket available".to_string(),
)))
}
}
pub async fn ack(&mut self, seqno: u64) -> Result<()> {
if let Some(ws) = self.ws.as_mut() {
tracing::trace!("ack: {}", seqno);
let cmd = serde_json::json!({
"op": "ack",
"seqno": seqno,
});
ws.send(WSMessage::Binary(serde_json::to_vec(&cmd)?))
.await?;
}
Ok(())
}
#[must_use]
pub const fn last_seq_no(&self) -> u64 {
self.last_seq_no
}
}
impl Drop for Consumer {
fn drop(&mut self) {
if let Some(mut ws) = self.ws.take() {
tokio::spawn(async move {
let _res = ws.close(None).await;
});
}
}
}