pub mod pull;
pub mod push;
#[cfg(feature = "server_2_10")]
use std::collections::HashMap;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_json::json;
use time::serde::rfc3339;
use super::context::RequestError;
use super::stream::ClusterInfo;
use super::Context;
use crate::error::Error;
use crate::jetstream::consumer;
pub trait IntoConsumerConfig {
fn into_consumer_config(self) -> Config;
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct Consumer<T: IntoConsumerConfig> {
pub(crate) context: Context,
pub(crate) config: T,
pub(crate) info: Info,
}
impl<T: IntoConsumerConfig> Consumer<T> {
pub fn new(config: T, info: consumer::Info, context: Context) -> Self {
Self {
config,
info,
context,
}
}
}
impl<T: IntoConsumerConfig> Consumer<T> {
pub async fn info(&mut self) -> Result<&consumer::Info, RequestError> {
let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
let info = self.context.request(subject, &json!({})).await?;
self.info = info;
Ok(&self.info)
}
async fn fetch_info(&self) -> Result<consumer::Info, RequestError> {
let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
self.context.request(subject, &json!({})).await
}
pub fn cached_info(&self) -> &consumer::Info {
&self.info
}
}
pub trait FromConsumer {
fn try_from_consumer_config(
config: crate::jetstream::consumer::Config,
) -> Result<Self, crate::Error>
where
Self: Sized;
}
pub type PullConsumer = Consumer<self::pull::Config>;
pub type PushConsumer = Consumer<self::push::Config>;
pub type OrderedPullConsumer = Consumer<self::pull::OrderedConfig>;
pub type OrderedPushConsumer = Consumer<self::push::OrderedConfig>;
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct Info {
pub stream_name: String,
pub name: String,
#[serde(with = "rfc3339")]
pub created: time::OffsetDateTime,
pub config: Config,
pub delivered: SequenceInfo,
pub ack_floor: SequenceInfo,
pub num_ack_pending: usize,
pub num_redelivered: usize,
pub num_waiting: usize,
pub num_pending: u64,
#[serde(skip_serializing_if = "is_default")]
pub cluster: Option<ClusterInfo>,
#[serde(default, skip_serializing_if = "is_default")]
pub push_bound: bool,
}
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct SequenceInfo {
#[serde(rename = "consumer_seq")]
pub consumer_sequence: u64,
#[serde(rename = "stream_seq")]
pub stream_sequence: u64,
#[serde(
default,
with = "rfc3339::option",
skip_serializing_if = "Option::is_none"
)]
pub last_active: Option<time::OffsetDateTime>,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Config {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deliver_subject: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deliver_group: Option<String>,
#[serde(flatten)]
pub deliver_policy: DeliverPolicy,
pub ack_policy: AckPolicy,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub ack_wait: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub max_deliver: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: String,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subjects: Vec<String>,
pub replay_policy: ReplayPolicy,
#[serde(default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
#[serde(
rename = "sample_freq",
with = "sample_freq_deser",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
#[serde(default, skip_serializing_if = "is_default")]
pub max_waiting: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub max_ack_pending: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub headers_only: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub flow_control: bool,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub idle_heartbeat: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub max_batch: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub max_bytes: i64,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub max_expires: Duration,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub inactive_threshold: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub num_replicas: usize,
#[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
pub memory_storage: bool,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub metadata: HashMap<String, String>,
#[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
pub backoff: Vec<Duration>,
}
impl From<&Config> for Config {
fn from(cc: &Config) -> Config {
cc.clone()
}
}
impl From<&str> for Config {
fn from(s: &str) -> Config {
Config {
durable_name: Some(s.to_string()),
..Default::default()
}
}
}
impl IntoConsumerConfig for Config {
fn into_consumer_config(self) -> Config {
self
}
}
impl IntoConsumerConfig for &Config {
fn into_consumer_config(self) -> Config {
self.clone()
}
}
impl FromConsumer for Config {
fn try_from_consumer_config(config: Config) -> Result<Self, crate::Error>
where
Self: Sized,
{
Ok(config)
}
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
#[serde(tag = "deliver_policy")]
pub enum DeliverPolicy {
#[default]
#[serde(rename = "all")]
All,
#[serde(rename = "last")]
Last,
#[serde(rename = "new")]
New,
#[serde(rename = "by_start_sequence")]
ByStartSequence {
#[serde(rename = "opt_start_seq")]
start_sequence: u64,
},
#[serde(rename = "by_start_time")]
ByStartTime {
#[serde(rename = "opt_start_time", with = "rfc3339")]
start_time: time::OffsetDateTime,
},
#[serde(rename = "last_per_subject")]
LastPerSubject,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum AckPolicy {
#[default]
#[serde(rename = "explicit")]
Explicit = 2,
#[serde(rename = "none")]
None = 0,
#[serde(rename = "all")]
All = 1,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ReplayPolicy {
#[default]
#[serde(rename = "instant")]
Instant = 0,
#[serde(rename = "original")]
Original = 1,
}
fn is_default<T: Default + Eq>(t: &T) -> bool {
t == &T::default()
}
pub(crate) mod sample_freq_deser {
pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: std::str::FromStr,
T::Err: std::fmt::Display,
D: serde::Deserializer<'de>,
{
let s = <String as serde::Deserialize>::deserialize(deserializer)?;
let mut spliterator = s.split('%');
match (spliterator.next(), spliterator.next()) {
(Some(number), None) => T::from_str(number).map_err(serde::de::Error::custom),
(Some(number), Some("")) => T::from_str(number).map_err(serde::de::Error::custom),
_ => Err(serde::de::Error::custom(format!(
"Malformed sample frequency: {s}"
))),
}
}
pub(crate) fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
T: std::fmt::Display,
S: serde::Serializer,
{
serializer.serialize_str(&value.to_string())
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum StreamErrorKind {
TimedOut,
Other,
}
impl std::fmt::Display for StreamErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TimedOut => write!(f, "timed out"),
Self::Other => write!(f, "failed"),
}
}
}
pub type StreamError = Error<StreamErrorKind>;