use super::WebSocketStream;
use crate::Result;
use futures_util::SinkExt;
use serde::Serialize;
use serde_json::json;
use tokio_tungstenite::tungstenite::Message as WSMessage;
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "kebab-case")]
struct OpenProducer<'a> {
op: &'a str,
location: super::Location,
#[serde(skip_serializing_if = "Option::is_none")]
child_site: Option<&'a str>,
topic: &'a str,
name: &'a str,
#[serde(flatten)]
on_no_exists: super::OnNoExists,
}
pub struct Builder<'a> {
avassa_client: &'a crate::Client,
open_producer: OpenProducer<'a>,
ws_url: url::Url,
}
impl<'a> Builder<'a> {
pub fn new(
avassa_client: &'a crate::Client,
name: &'a str,
topic: &'a str,
on_no_exists: super::OnNoExists,
) -> Result<Self> {
let ws_url = avassa_client.websocket_url.join("volga")?;
Ok(Self {
avassa_client,
ws_url,
open_producer: OpenProducer {
op: "open-producer",
location: super::Location::Local,
child_site: None,
topic,
name,
on_no_exists,
},
})
}
pub(crate) fn new_child(
avassa_client: &'a crate::Client,
name: &'a str,
topic: &'a str,
udc: &'a str,
on_no_exists: super::OnNoExists,
) -> Result<Self> {
let ws_url = avassa_client.websocket_url.join("volga")?;
Ok(Self {
avassa_client,
ws_url,
open_producer: OpenProducer {
op: "open-producer",
location: super::Location::ChildSite,
child_site: Some(udc),
topic,
name,
on_no_exists,
},
})
}
pub(crate) fn new_parent(
avassa_client: &'a crate::Client,
name: &'a str,
topic: &'a str,
on_no_exists: super::OnNoExists,
) -> Result<Self> {
let ws_url = avassa_client.websocket_url.join("volga")?;
Ok(Self {
avassa_client,
ws_url,
open_producer: OpenProducer {
op: "open-producer",
location: super::Location::Parent,
child_site: None,
topic,
name,
on_no_exists,
},
})
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn connect(self) -> Result<Producer> {
let ws_uri = self.ws_url.to_string().parse()?;
let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
.with_header(
"Authorization",
format!("Bearer {}", self.avassa_client.bearer_token().await),
);
let tls = self.avassa_client.open_tls_stream().await?;
let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
tracing::debug!("{}", serde_json::to_string(&self.open_producer)?);
ws.send(WSMessage::Binary(
serde_json::to_vec(&self.open_producer)?.into(),
))
.await?;
tracing::debug!("Waiting for ok");
super::get_ok_volga_response(&mut ws).await?;
tracing::debug!(
"Successfully connected producer to topic {}",
self.open_producer.topic
);
Ok(Producer { ws })
}
}
pub struct Producer {
ws: WebSocketStream,
}
impl Producer {
#[tracing::instrument(level = "trace", skip(self))]
pub async fn produce<T: serde::Serialize + std::fmt::Debug>(
&mut self,
content: &T,
) -> Result<()> {
let cmd = json!({
"op": "produce",
"payload": content,
});
tracing::debug!("Cmd: {}", cmd);
self.ws
.send(WSMessage::Binary(serde_json::to_vec(&cmd)?.into()))
.await?;
tracing::debug!("Waiting for ok");
super::get_ok_volga_response(&mut self.ws).await?;
Ok(())
}
}
#[derive(Debug, Serialize)]
pub enum InfraDirection {
#[serde(rename = "down")]
Down,
#[serde(rename = "up")]
Up,
#[serde(rename = "both")]
Both,
#[serde(rename = "stitch")]
Stitch,
}