pub mod pull;
pub mod push;
#[cfg(feature = "server_2_10")]
use std::collections::HashMap;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_json::json;
use time::serde::rfc3339;
#[cfg(feature = "server_2_11")]
use time::OffsetDateTime;
use super::context::{ConsumerInfoError, RequestError};
use super::response::Response;
use super::stream::ClusterInfo;
use super::Context;
use crate::error::Error;
use crate::jetstream::consumer;
pub trait IntoConsumerConfig {
fn into_consumer_config(self) -> Config;
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct Consumer<T: IntoConsumerConfig> {
pub(crate) context: Context,
pub(crate) config: T,
pub(crate) info: Info,
}
impl<T: IntoConsumerConfig> Consumer<T> {
pub fn new(config: T, info: consumer::Info, context: Context) -> Self {
Self {
config,
info,
context,
}
}
}
impl<T: IntoConsumerConfig> Consumer<T> {
pub async fn info(&mut self) -> Result<&consumer::Info, ConsumerInfoError> {
let info = self.get_info().await?;
self.info = info;
Ok(&self.info)
}
pub async fn get_info(&self) -> Result<consumer::Info, ConsumerInfoError> {
let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
match self.context.request(subject, &json!({})).await? {
Response::Err { error } => Err(error.into()),
Response::Ok(info) => Ok(info),
}
}
pub fn cached_info(&self) -> &consumer::Info {
&self.info
}
}
pub trait FromConsumer {
fn try_from_consumer_config(
config: crate::jetstream::consumer::Config,
) -> Result<Self, crate::Error>
where
Self: Sized;
}
pub type PullConsumer = Consumer<self::pull::Config>;
pub type PushConsumer = Consumer<self::push::Config>;
pub type OrderedPullConsumer = Consumer<self::pull::OrderedConfig>;
pub type OrderedPushConsumer = Consumer<self::push::OrderedConfig>;
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct Info {
pub stream_name: String,
pub name: String,
#[serde(with = "rfc3339")]
pub created: time::OffsetDateTime,
pub config: Config,
pub delivered: SequenceInfo,
pub ack_floor: SequenceInfo,
pub num_ack_pending: usize,
pub num_redelivered: usize,
pub num_waiting: usize,
pub num_pending: u64,
#[serde(skip_serializing_if = "is_default")]
pub cluster: Option<ClusterInfo>,
#[serde(default, skip_serializing_if = "is_default")]
pub push_bound: bool,
#[cfg(feature = "server_2_11")]
#[serde(default)]
pub paused: bool,
#[cfg(feature = "server_2_11")]
#[serde(default, with = "serde_nanos")]
pub pause_remaining: Option<Duration>,
}
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct SequenceInfo {
#[serde(rename = "consumer_seq")]
pub consumer_sequence: u64,
#[serde(rename = "stream_seq")]
pub stream_sequence: u64,
#[serde(
default,
with = "rfc3339::option",
skip_serializing_if = "Option::is_none"
)]
pub last_active: Option<time::OffsetDateTime>,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Config {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deliver_subject: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deliver_group: Option<String>,
#[serde(flatten)]
pub deliver_policy: DeliverPolicy,
#[serde(default)]
pub ack_policy: AckPolicy,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub ack_wait: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub max_deliver: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: String,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subjects: Vec<String>,
#[serde(default)]
pub replay_policy: ReplayPolicy,
#[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
#[serde(
rename = "sample_freq",
with = "sample_freq_deser",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
#[serde(default, skip_serializing_if = "is_default")]
pub max_waiting: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub max_ack_pending: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub headers_only: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub flow_control: bool,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub idle_heartbeat: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub max_batch: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub max_bytes: i64,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub max_expires: Duration,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub inactive_threshold: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub num_replicas: usize,
#[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
pub memory_storage: bool,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub metadata: HashMap<String, String>,
#[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
pub backoff: Vec<Duration>,
#[cfg(feature = "server_2_11")]
#[serde(default, skip_serializing_if = "is_default")]
pub priority_policy: PriorityPolicy,
#[cfg(feature = "server_2_11")]
#[serde(default, skip_serializing_if = "is_default")]
pub priority_groups: Vec<String>,
#[cfg(feature = "server_2_11")]
#[serde(
default,
with = "rfc3339::option",
skip_serializing_if = "Option::is_none"
)]
pub pause_until: Option<OffsetDateTime>,
}
#[cfg(feature = "server_2_11")]
#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum PriorityPolicy {
#[serde(rename = "overflow")]
Overflow,
#[serde(rename = "pinned_client")]
PinnedClient,
#[cfg(feature = "server_2_12")]
#[serde(rename = "prioritized")]
Prioritized,
#[serde(rename = "none")]
#[default]
None,
}
impl From<&Config> for Config {
fn from(cc: &Config) -> Config {
cc.clone()
}
}
impl From<&str> for Config {
fn from(s: &str) -> Config {
Config {
durable_name: Some(s.to_string()),
..Default::default()
}
}
}
impl IntoConsumerConfig for Config {
fn into_consumer_config(self) -> Config {
self
}
}
impl IntoConsumerConfig for &Config {
fn into_consumer_config(self) -> Config {
self.clone()
}
}
impl FromConsumer for Config {
fn try_from_consumer_config(config: Config) -> Result<Self, crate::Error>
where
Self: Sized,
{
Ok(config)
}
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
#[serde(tag = "deliver_policy")]
pub enum DeliverPolicy {
#[default]
#[serde(rename = "all")]
All,
#[serde(rename = "last")]
Last,
#[serde(rename = "new")]
New,
#[serde(rename = "by_start_sequence")]
ByStartSequence {
#[serde(rename = "opt_start_seq")]
start_sequence: u64,
},
#[serde(rename = "by_start_time")]
ByStartTime {
#[serde(rename = "opt_start_time", with = "rfc3339")]
start_time: time::OffsetDateTime,
},
#[serde(rename = "last_per_subject")]
LastPerSubject,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum AckPolicy {
#[default]
#[serde(rename = "explicit")]
Explicit = 2,
#[serde(rename = "none")]
None = 0,
#[serde(rename = "all")]
All = 1,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ReplayPolicy {
#[default]
#[serde(rename = "instant")]
Instant = 0,
#[serde(rename = "original")]
Original = 1,
}
fn is_default<T: Default + Eq>(t: &T) -> bool {
t == &T::default()
}
pub(crate) mod sample_freq_deser {
pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: std::str::FromStr,
T::Err: std::fmt::Display,
D: serde::Deserializer<'de>,
{
let s = <String as serde::Deserialize>::deserialize(deserializer)?;
let mut spliterator = s.split('%');
match (spliterator.next(), spliterator.next()) {
(Some(number), None) => T::from_str(number).map_err(serde::de::Error::custom),
(Some(number), Some("")) => T::from_str(number).map_err(serde::de::Error::custom),
_ => Err(serde::de::Error::custom(format!(
"Malformed sample frequency: {s}"
))),
}
}
pub(crate) fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
T: std::fmt::Display,
S: serde::Serializer,
{
serializer.serialize_str(&value.to_string())
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum StreamErrorKind {
TimedOut,
Other,
}
impl std::fmt::Display for StreamErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TimedOut => write!(f, "timed out"),
Self::Other => write!(f, "failed"),
}
}
}
pub type StreamError = Error<StreamErrorKind>;
fn backoff(attempt: u32, _: &impl std::error::Error) -> Duration {
if attempt < 5 {
Duration::from_millis(500 * attempt as u64)
} else {
Duration::from_secs(10)
}
}