use std::time::Duration;
use crate::header::HeaderMap;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::io::{self, ErrorKind};
use time::serde::rfc3339;
pub type DateTime = time::OffsetDateTime;
#[derive(Serialize)]
pub(crate) struct StreamMessageGetRequest {
#[serde(default, skip_serializing_if = "is_default")]
pub seq: Option<u64>,
#[serde(default, rename = "last_by_subj", skip_serializing_if = "is_default")]
pub last_by_subject: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RawStreamMessage {
#[serde(rename = "subject")]
pub subject: String,
#[serde(rename = "seq")]
pub sequence: u64,
#[serde(default, rename = "data")]
pub data: String,
#[serde(default, rename = "hdrs")]
pub headers: Option<String>,
#[serde(rename = "time", with = "rfc3339")]
pub time: DateTime,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) struct StreamMessageGetResponse {
#[serde(rename = "type")]
pub kind: String,
#[serde(rename = "message")]
pub message: RawStreamMessage,
}
#[derive(Debug, Clone)]
pub struct StreamMessage {
pub subject: String,
pub sequence: u64,
pub headers: Option<HeaderMap>,
pub data: Vec<u8>,
pub time: DateTime,
}
impl TryFrom<RawStreamMessage> for StreamMessage {
type Error = std::io::Error;
fn try_from(raw_message: RawStreamMessage) -> Result<StreamMessage, Self::Error> {
let maybe_headers = if let Some(raw_headers) = raw_message.headers {
let decoded_headers = match base64::decode(raw_headers) {
Ok(data) => data,
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)),
};
let headers = HeaderMap::try_from(decoded_headers.as_slice())?;
Some(headers)
} else {
None
};
let decoded_data = match base64::decode(&raw_message.data) {
Ok(data) => data,
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)),
};
Ok(StreamMessage {
subject: raw_message.subject,
sequence: raw_message.sequence,
headers: maybe_headers,
data: decoded_data,
time: raw_message.time,
})
}
}
#[derive(Serialize)]
pub(crate) struct DeleteRequest {
pub seq: u64,
}
#[derive(Deserialize)]
pub(crate) struct DeleteResponse {
pub success: bool,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub(crate) struct CreateConsumerRequest {
pub stream_name: String,
pub config: ConsumerConfig,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum ConsumerOwnership {
Yes,
No,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ConsumerConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deliver_subject: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deliver_group: Option<String>,
pub deliver_policy: DeliverPolicy,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub opt_start_seq: Option<u64>,
#[serde(default, skip_serializing_if = "is_default", with = "rfc3339::option")]
pub opt_start_time: Option<DateTime>,
pub ack_policy: AckPolicy,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub ack_wait: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub max_deliver: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: String,
pub replay_policy: ReplayPolicy,
#[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
#[serde(
rename = "sample_freq",
with = "from_str",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
#[serde(default, skip_serializing_if = "is_default")]
pub max_waiting: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub max_ack_pending: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub headers_only: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub flow_control: bool,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub idle_heartbeat: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub max_batch: i64,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub max_expires: Duration,
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub inactive_threshold: Duration,
}
mod from_str {
pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: std::str::FromStr,
T::Err: std::fmt::Display,
D: serde::Deserializer<'de>,
{
let s = <String as serde::Deserialize>::deserialize(deserializer)?;
T::from_str(&s).map_err(serde::de::Error::custom)
}
pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
T: std::fmt::Display,
S: serde::Serializer,
{
serializer.serialize_str(&value.to_string())
}
}
pub(crate) enum ConsumerKind {
Pull,
}
impl ConsumerConfig {
pub(crate) fn validate_for(&self, kind: &ConsumerKind) -> io::Result<()> {
match kind {
ConsumerKind::Pull => {
if self.deliver_subject.is_some() {
return Err(io::Error::new(
ErrorKind::Other,
"pull subscription cannot bind to Push Consumer",
));
}
if let AckPolicy::None = self.ack_policy {
return Err(io::Error::new(
io::ErrorKind::Other,
"pull subscription cannot have Ack Policy set to None",
));
}
}
}
Ok(())
}
}
impl From<&ConsumerConfig> for ConsumerConfig {
fn from(cc: &ConsumerConfig) -> ConsumerConfig {
cc.clone()
}
}
impl From<&str> for ConsumerConfig {
fn from(s: &str) -> ConsumerConfig {
ConsumerConfig {
durable_name: Some(s.to_string()),
..Default::default()
}
}
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct StreamConfig {
pub name: String,
pub max_bytes: i64,
pub max_msgs: i64,
pub max_msgs_per_subject: i64,
pub discard: DiscardPolicy,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub subjects: Vec<String>,
pub retention: RetentionPolicy,
pub max_consumers: i32,
#[serde(with = "serde_nanos")]
pub max_age: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub max_msg_size: i32,
pub storage: StorageType,
pub num_replicas: usize,
#[serde(default, skip_serializing_if = "is_default")]
pub no_ack: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub duplicate_window: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub template_owner: String,
#[serde(default, skip_serializing_if = "is_default")]
pub sealed: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub description: Option<String>,
#[serde(
default,
rename = "allow_rollup_hdrs",
skip_serializing_if = "is_default"
)]
pub allow_rollup: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub deny_delete: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub deny_purge: bool,
}
fn is_default<T: Default + Eq>(t: &T) -> bool {
t == &T::default()
}
impl From<&StreamConfig> for StreamConfig {
fn from(sc: &StreamConfig) -> StreamConfig {
sc.clone()
}
}
impl From<&str> for StreamConfig {
fn from(s: &str) -> StreamConfig {
StreamConfig {
name: s.to_string(),
..Default::default()
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct StreamInfo {
pub config: StreamConfig,
#[serde(with = "rfc3339")]
pub created: DateTime,
pub state: StreamState,
#[serde(default)]
pub cluster: ClusterInfo,
}
#[derive(Debug, Clone)]
pub struct JetStreamMessageInfo<'a> {
pub domain: Option<&'a str>,
pub acc_hash: Option<&'a str>,
pub stream: &'a str,
pub consumer: &'a str,
pub stream_seq: u64,
pub consumer_seq: u64,
pub delivered: i64,
pub pending: u64,
pub published: DateTime,
pub token: Option<&'a str>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub struct StreamState {
pub messages: u64,
pub bytes: u64,
pub first_seq: u64,
#[serde(with = "rfc3339")]
pub first_ts: DateTime,
pub last_seq: u64,
#[serde(with = "rfc3339")]
pub last_ts: DateTime,
pub consumer_count: usize,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum DeliverPolicy {
#[default]
#[serde(rename = "all")]
All = 0,
#[serde(rename = "last")]
Last = 1,
#[serde(rename = "new")]
New = 2,
#[serde(rename = "by_start_sequence")]
ByStartSeq = 3,
#[serde(rename = "by_start_time")]
ByStartTime = 4,
#[serde(rename = "last_per_subject")]
LastPerSubject = 5,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum AckPolicy {
#[default]
#[serde(rename = "explicit")]
Explicit = 2,
#[serde(rename = "none")]
None = 0,
#[serde(rename = "all")]
All = 1,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ReplayPolicy {
#[default]
#[serde(rename = "instant")]
Instant = 0,
#[serde(rename = "original")]
Original = 1,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
pub struct PurgeRequest {
#[serde(default, rename = "seq", skip_serializing_if = "is_default")]
pub sequence: Option<u64>,
#[serde(default, rename = "filter", skip_serializing_if = "is_default")]
pub filter: Option<String>,
#[serde(default, rename = "filter", skip_serializing_if = "is_default")]
pub keep: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub struct PurgeResponse {
pub success: bool,
pub purged: u64,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum RetentionPolicy {
#[default]
#[serde(rename = "limits")]
Limits = 0,
#[serde(rename = "interest")]
Interest = 1,
#[serde(rename = "workqueue")]
WorkQueue = 2,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum DiscardPolicy {
#[default]
#[serde(rename = "old")]
Old = 0,
#[serde(rename = "new")]
New = 1,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StorageType {
#[default]
#[serde(rename = "file")]
File = 0,
#[serde(rename = "memory")]
Memory = 1,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct AccountLimits {
pub max_memory: i64,
pub max_storage: i64,
pub max_streams: i64,
pub max_consumers: i64,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub(crate) struct AccountStats {
pub memory: u64,
pub storage: u64,
pub streams: usize,
pub limits: AccountLimits,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct PublishAck {
pub stream: String,
#[serde(rename = "seq")]
pub sequence: u64,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub domain: String,
#[serde(default, skip_serializing_if = "is_default")]
pub duplicate: bool,
}
#[derive(Debug, Clone, Copy)]
pub enum AckKind {
Ack,
Nak,
Progress,
Next,
Term,
}
impl AsRef<[u8]> for AckKind {
fn as_ref(&self) -> &[u8] {
use AckKind::*;
match self {
Ack => b"+ACK",
Nak => b"-NAK",
Progress => b"+WPI",
Next => b"+NXT",
Term => b"+TERM",
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ConsumerInfo {
pub stream_name: String,
pub name: String,
#[serde(with = "rfc3339")]
pub created: DateTime,
pub config: ConsumerConfig,
pub delivered: SequencePair,
pub ack_floor: SequencePair,
pub num_ack_pending: usize,
pub num_redelivered: usize,
pub num_waiting: usize,
pub num_pending: u64,
#[serde(default)]
pub cluster: ClusterInfo,
#[serde(default)]
pub push_bound: bool,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ClusterInfo {
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub leader: Option<String>,
#[serde(default)]
pub replicas: Vec<PeerInfo>,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct PeerInfo {
pub name: String,
pub current: bool,
#[serde(with = "serde_nanos")]
pub active: Duration,
#[serde(default)]
pub offline: bool,
pub lag: Option<u64>,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct SequencePair {
pub consumer_seq: u64,
pub stream_seq: u64,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct BatchOptions {
pub batch: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub expires: Option<usize>,
#[serde(default, skip_serializing_if = "is_default")]
pub no_wait: bool,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub(crate) struct StreamNamesRequest {
#[serde(default, skip_serializing_if = "is_default")]
pub subject: String,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub(crate) struct StreamNamesResponse {
#[serde(default, skip_serializing_if = "is_default")]
pub streams: Option<Vec<String>>,
}
#[derive(Debug, Default, Clone)]
pub struct PullSubscribeOptions {
pub(crate) stream_name: Option<String>,
pub(crate) durable_name: Option<String>,
pub(crate) bind_only: bool,
pub(crate) consumer_config: Option<ConsumerConfig>,
}
impl PullSubscribeOptions {
pub fn new() -> PullSubscribeOptions {
Default::default()
}
pub fn bind_stream(mut self, stream_name: String) -> Self {
self.stream_name = Some(stream_name);
self.bind_only = true;
self
}
pub fn consumer_config(mut self, consumer_config: ConsumerConfig) -> Self {
self.consumer_config = Some(consumer_config);
self
}
pub fn durable_name(mut self, consumer_name: String) -> Self {
self.durable_name = Some(consumer_name);
self
}
}
#[derive(Debug, Default, Clone)]
pub struct SubscribeOptions {
pub(crate) bind_only: bool,
pub(crate) stream_name: Option<String>,
pub(crate) consumer_name: Option<String>,
pub(crate) ordered: bool,
pub(crate) ack_policy: Option<AckPolicy>,
pub(crate) ack_wait: Option<Duration>,
pub(crate) replay_policy: Option<ReplayPolicy>,
pub(crate) deliver_policy: Option<DeliverPolicy>,
pub(crate) deliver_subject: Option<String>,
pub(crate) description: Option<String>,
pub(crate) durable_name: Option<String>,
pub(crate) sample_frequency: Option<u8>,
pub(crate) idle_heartbeat: Option<Duration>,
pub(crate) max_ack_pending: Option<i64>,
pub(crate) max_deliver: Option<i64>,
pub(crate) max_waiting: Option<i64>,
pub(crate) opt_start_seq: Option<u64>,
pub(crate) opt_start_time: Option<DateTime>,
pub(crate) flow_control: Option<bool>,
pub(crate) rate_limit: Option<u64>,
pub(crate) headers_only: Option<bool>,
}
impl SubscribeOptions {
pub fn new() -> Self {
Self::default()
}
pub fn bind(stream_name: String, consumer_name: String) -> Self {
Self {
stream_name: Some(stream_name),
consumer_name: Some(consumer_name),
bind_only: true,
..Default::default()
}
}
pub fn ordered() -> Self {
Self {
ordered: true,
..Self::default()
}
}
pub fn bind_stream(stream_name: String) -> Self {
Self {
stream_name: Some(stream_name),
..Default::default()
}
}
pub fn description(mut self, description: String) -> Self {
self.description = Some(description);
self
}
pub fn durable_name(mut self, consumer: String) -> Self {
self.durable_name = Some(consumer);
self
}
pub fn deliver_all(mut self) -> Self {
self.deliver_policy = Some(DeliverPolicy::All);
self
}
pub fn deliver_last(mut self) -> Self {
self.deliver_policy = Some(DeliverPolicy::Last);
self
}
pub fn deliver_last_per_subject(mut self) -> Self {
self.deliver_policy = Some(DeliverPolicy::LastPerSubject);
self
}
pub fn deliver_new(mut self) -> Self {
self.deliver_policy = Some(DeliverPolicy::New);
self
}
pub fn deliver_by_start_sequence(mut self, seq: u64) -> Self {
self.deliver_policy = Some(DeliverPolicy::ByStartSeq);
self.opt_start_seq = Some(seq);
self
}
pub fn deliver_by_start_time(mut self, time: DateTime) -> Self {
self.deliver_policy = Some(DeliverPolicy::ByStartTime);
self.opt_start_time = Some(time);
self
}
pub fn ack_none(mut self) -> Self {
self.ack_policy = Some(AckPolicy::None);
self
}
pub fn ack_all(mut self) -> Self {
self.ack_policy = Some(AckPolicy::All);
self
}
pub fn ack_explicit(mut self) -> Self {
self.ack_policy = Some(AckPolicy::Explicit);
self
}
pub fn max_deliver(mut self, n: i64) -> Self {
self.max_deliver = Some(n);
self
}
pub fn max_ack_pending(mut self, n: i64) -> Self {
self.max_ack_pending = Some(n);
self
}
pub fn replay_original(mut self) -> Self {
self.replay_policy = Some(ReplayPolicy::Original);
self
}
pub fn replay_instant(mut self) -> Self {
self.replay_policy = Some(ReplayPolicy::Instant);
self
}
pub fn rate_limit(mut self, n: u64) -> Self {
self.rate_limit = Some(n);
self
}
pub fn deliver_subject(mut self, subject: String) -> Self {
self.deliver_subject = Some(subject);
self
}
pub fn headers_only(mut self) -> Self {
self.headers_only = Some(true);
self
}
pub fn enable_flow_control(mut self) -> Self {
self.flow_control = Some(true);
self
}
#[allow(clippy::cast_possible_truncation)]
pub fn idle_heartbeat(mut self, interval: Duration) -> Self {
self.idle_heartbeat = Some(interval);
self
}
}
#[derive(Debug, Default, Clone)]
pub struct PublishOptions {
pub timeout: Option<Duration>,
pub id: Option<String>,
pub expected_last_msg_id: Option<String>,
pub expected_stream: Option<String>,
pub expected_last_sequence: Option<u64>,
pub expected_last_subject_sequence: Option<u64>,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct AccountInfo {
pub(crate) r#type: String,
pub memory: i64,
pub storage: i64,
pub streams: i64,
pub consumers: i64,
pub api: ApiStats,
pub limits: AccountLimits,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)]
pub struct ApiStats {
pub total: u64,
pub errors: u64,
}