use std::fmt;
use std::{fmt::Display, time::Duration};
use bytes::Bytes;
use resources::Response;
use serde::{Deserialize, Serialize};
use serde_json::json;
use watermelon_proto::StatusCode;
use watermelon_proto::{Subject, error::SubjectValidateError};
pub use self::commands::{ConsumerBatch, ConsumerStream, ConsumerStreamError, Consumers, Streams};
pub use self::resources::{
AckPolicy, Compression, Consumer, ConsumerConfig, ConsumerDurability, ConsumerSpecificConfig,
ConsumerStorage, DeliverPolicy, DiscardPolicy, ReplayPolicy, RetentionPolicy, Storage, Stream,
StreamConfig, StreamState,
};
use crate::client::ClientRequest;
use crate::core::Client;
use super::{ClientClosedError, ResponseError};
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
mod commands;
mod resources;
#[derive(Debug, Clone)]
pub struct JetstreamClient {
client: Client,
prefix: Subject,
request_timeout: Duration,
}
#[derive(Debug, Deserialize, thiserror::Error)]
#[error("jetstream error status={status} code={code} description={description}")]
pub struct JetstreamApiError {
#[serde(rename = "code")]
status: StatusCode,
#[serde(rename = "err_code")]
code: JetstreamErrorCode,
description: String,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct JetstreamErrorCode(u16);
#[derive(Debug, thiserror::Error)]
pub enum JetstreamError {
#[error("invalid subject")]
Subject(#[source] SubjectValidateError),
#[error("client closed")]
ClientClosed(#[source] ClientClosedError),
#[error("client request failure")]
ResponseError(#[source] ResponseError),
#[error("JSON deserialization")]
Json(#[source] serde_json::Error),
#[error("bad response code")]
Api(#[source] JetstreamApiError),
}
impl JetstreamClient {
#[must_use]
pub fn new(client: Client) -> Self {
Self::new_with_prefix(client, Subject::from_static("$JS.API"))
}
pub fn new_with_domain(
client: Client,
domain: impl Display,
) -> Result<Self, SubjectValidateError> {
let prefix = format!("$JS.{domain}.API").try_into()?;
Ok(Self::new_with_prefix(client, prefix))
}
#[must_use]
pub fn new_with_prefix(client: Client, prefix: Subject) -> Self {
Self {
client,
prefix,
request_timeout: DEFAULT_REQUEST_TIMEOUT,
}
}
pub async fn create_stream(&self, config: &StreamConfig) -> Result<Stream, JetstreamError> {
let subject = format!("{}.STREAM.CREATE.{}", self.prefix, config.name)
.try_into()
.map_err(JetstreamError::Subject)?;
let payload = serde_json::to_vec(config).map_err(JetstreamError::Json)?;
let resp = self
.make_request(subject)
.payload(payload.into())
.await
.map_err(JetstreamError::ClientClosed)?;
let resp = resp.await.map_err(JetstreamError::ResponseError)?;
let json = serde_json::from_slice::<Response<Stream>>(&resp.base.payload)
.map_err(JetstreamError::Json)?;
match json {
Response::Response(stream) => Ok(stream),
Response::Error { error } => Err(JetstreamError::Api(error)),
}
}
pub fn streams(&self) -> Streams {
Streams::new(self.clone())
}
pub async fn stream(&self, name: impl Display) -> Result<Option<Stream>, JetstreamError> {
let subject = format!("{}.STREAM.INFO.{}", self.prefix, name)
.try_into()
.map_err(JetstreamError::Subject)?;
let resp = self
.make_request(subject)
.payload(Bytes::new())
.await
.map_err(JetstreamError::ClientClosed)?;
let resp = resp.await.map_err(JetstreamError::ResponseError)?;
let json = serde_json::from_slice::<Response<Stream>>(&resp.base.payload)
.map_err(JetstreamError::Json)?;
match json {
Response::Response(stream) => Ok(Some(stream)),
Response::Error { error } if error.code == JetstreamErrorCode::STREAM_NOT_FOUND => {
Ok(None)
}
Response::Error { error } => Err(JetstreamError::Api(error)),
}
}
pub async fn create_consumer(
&self,
stream_name: &str,
config: &ConsumerConfig,
) -> Result<Consumer, JetstreamError> {
let mut subject = format!(
"{}.CONSUMER.CREATE.{}.{}",
self.prefix, stream_name, config.name
);
if let [filter_subject] = &*config.filter_subjects {
subject.push('.');
subject.push_str(filter_subject);
}
let subject = subject.try_into().map_err(JetstreamError::Subject)?;
let payload = serde_json::to_vec(&json!({
"stream_name": stream_name,
"config": config,
"action": "create",
"pedantic": true,
}))
.map_err(JetstreamError::Json)?;
let resp = self
.make_request(subject)
.payload(payload.into())
.await
.map_err(JetstreamError::ClientClosed)?;
let resp = resp.await.map_err(JetstreamError::ResponseError)?;
let json = serde_json::from_slice::<Response<Consumer>>(&resp.base.payload)
.map_err(JetstreamError::Json)?;
match json {
Response::Response(consumer) => Ok(consumer),
Response::Error { error } => Err(JetstreamError::Api(error)),
}
}
pub fn consumers(&self, stream_name: impl Display) -> Consumers {
Consumers::new(self.clone(), stream_name)
}
pub async fn consumer(
&self,
stream_name: impl Display,
consumer_name: impl Display,
) -> Result<Option<Consumer>, JetstreamError> {
let subject = format!(
"{}.CONSUMER.INFO.{}.{}",
self.prefix, stream_name, consumer_name
)
.try_into()
.map_err(JetstreamError::Subject)?;
let resp = self
.make_request(subject)
.payload(Bytes::new())
.await
.map_err(JetstreamError::ClientClosed)?;
let resp = resp.await.map_err(JetstreamError::ResponseError)?;
let json = serde_json::from_slice::<Response<Consumer>>(&resp.base.payload)
.map_err(JetstreamError::Json)?;
match json {
Response::Response(stream) => Ok(Some(stream)),
Response::Error { error } if error.code == JetstreamErrorCode::CONSUMER_NOT_FOUND => {
Ok(None)
}
Response::Error { error } => Err(JetstreamError::Api(error)),
}
}
pub async fn consumer_batch(
&self,
consumer: &Consumer,
expires: Duration,
max_msgs: usize,
) -> Result<ConsumerBatch, JetstreamError> {
ConsumerBatch::new(consumer, self.clone(), expires, max_msgs).await
}
pub fn consumer_stream(
&self,
consumer: Consumer,
expires: Duration,
max_msgs: usize,
) -> ConsumerStream {
ConsumerStream::new(consumer, self.clone(), expires, max_msgs)
}
pub(crate) fn subject_for_request(&self, endpoint: &Subject) -> Subject {
Subject::from_dangerous_value(format!("{}.{}", self.prefix, endpoint).into())
}
fn make_request(&self, subject: Subject) -> ClientRequest<'_> {
self.client
.request(subject)
.response_timeout(self.request_timeout)
}
#[must_use]
pub fn client(&self) -> &Client {
&self.client
}
#[must_use]
pub fn prefix(&self) -> &Subject {
&self.prefix
}
}
impl JetstreamErrorCode {
pub const NOT_ENABLED: Self = Self(10076);
pub const NOT_ENABLED_FOR_ACCOUNT: Self = Self(10039);
pub const BAD_REQUEST: Self = Self(10003);
pub const STREAM_NOT_FOUND: Self = Self(10059);
pub const STREAM_NAME_IN_USE: Self = Self(10058);
pub const STREAM_MESSAGE_NOT_FOUND: Self = Self(10037);
pub const STREAM_WRONG_LAST_SEQUENCE: Self = Self(10071);
pub const COULD_NOT_CREATE_CONSUMER: Self = Self(10012);
pub const CONSUMER_NOT_FOUND: Self = Self(10014);
pub const CONSUMER_NAME_IN_USE: Self = Self(10148);
pub const CONSUMER_DUPLICATE_FILTER_SUBJECTS: Self = Self(10136);
pub const CONSUMER_OVERLAPPING_FILTER_SUBJECTS: Self = Self(10138);
pub const CONSUMER_FILTER_SUBJECTS_IS_EMPTY: Self = Self(10139);
}
impl Display for JetstreamErrorCode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Display::fmt(&self.0, f)
}
}
impl From<u16> for JetstreamErrorCode {
fn from(value: u16) -> Self {
Self(value)
}
}
impl From<JetstreamErrorCode> for u16 {
fn from(value: JetstreamErrorCode) -> Self {
value.0
}
}