pub mod bucket;
use std::{
fmt::{self, Display},
str::FromStr,
task::Poll,
time::Duration,
};
use crate::HeaderValue;
use bytes::Bytes;
use futures_util::StreamExt;
use regex::Regex;
use std::sync::LazyLock;
use time::OffsetDateTime;
use tracing::debug;
use crate::error::Error;
use crate::header;
use self::bucket::Status;
use super::{
consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
context::{PublishError, PublishErrorKind},
message::StreamMessage,
stream::{
self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
Source, StorageType, Stream,
},
};
fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
if let Some(op) = message.headers.get(KV_OPERATION) {
Operation::from_str(op.as_str())
.map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
} else if let Some(reason) = message.headers.get(header::NATS_MARKER_REASON) {
match reason.as_str() {
"MaxAge" | "Purge" => Ok(Operation::Purge),
"Remove" => Ok(Operation::Delete),
_ => Err(EntryError::with_source(
EntryErrorKind::Other,
"invalid marker reason",
)),
}
} else {
Err(EntryError::with_source(
EntryErrorKind::Other,
"missing operation",
))
}
}
fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
let headers = match message.headers.as_ref() {
Some(headers) => headers,
None => return Ok(Operation::Put),
};
if let Some(op) = headers.get(KV_OPERATION) {
Operation::from_str(op.as_str())
.map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
} else if let Some(reason) = headers.get(header::NATS_MARKER_REASON) {
match reason.as_str() {
"MaxAge" | "Purge" => Ok(Operation::Purge),
"Remove" => Ok(Operation::Delete),
_ => Err(EntryError::with_source(
EntryErrorKind::Other,
"invalid marker reason",
)),
}
} else {
Ok(Operation::Put)
}
}
static VALID_BUCKET_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
static VALID_KEY_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
pub(crate) const MAX_HISTORY: i64 = 64;
const ALL_KEYS: &str = ">";
const KV_OPERATION: &str = "KV-Operation";
const KV_OPERATION_DELETE: &str = "DEL";
const KV_OPERATION_PURGE: &str = "PURGE";
const KV_OPERATION_PUT: &str = "PUT";
const NATS_ROLLUP: &str = "Nats-Rollup";
const ROLLUP_SUBJECT: &str = "sub";
pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
VALID_BUCKET_RE.is_match(bucket_name)
}
pub(crate) fn is_valid_key(key: &str) -> bool {
if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
return false;
}
VALID_KEY_RE.is_match(key)
}
#[derive(Debug, Clone, Default)]
pub struct Config {
pub bucket: String,
pub description: String,
pub max_value_size: i32,
pub history: i64,
pub max_age: std::time::Duration,
pub max_bytes: i64,
pub storage: StorageType,
pub num_replicas: usize,
pub republish: Option<Republish>,
pub mirror: Option<Source>,
pub sources: Option<Vec<Source>>,
pub mirror_direct: bool,
#[cfg(feature = "server_2_10")]
pub compression: bool,
pub placement: Option<stream::Placement>,
#[cfg(feature = "server_2_11")]
pub limit_markers: Option<Duration>,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum Operation {
Put,
Delete,
Purge,
}
impl FromStr for Operation {
type Err = ParseOperationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
KV_OPERATION_DELETE => Ok(Operation::Delete),
KV_OPERATION_PURGE => Ok(Operation::Purge),
KV_OPERATION_PUT => Ok(Operation::Put),
_ => Err(ParseOperationError),
}
}
}
#[derive(Debug, Clone)]
pub struct ParseOperationError;
impl fmt::Display for ParseOperationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
}
}
impl std::error::Error for ParseOperationError {}
#[derive(Debug, Clone)]
pub struct Store {
pub name: String,
pub stream_name: String,
pub prefix: String,
pub put_prefix: Option<String>,
pub use_jetstream_prefix: bool,
pub stream: Stream,
}
impl Store {
pub async fn status(&self) -> Result<Status, StatusError> {
let info = self.stream.info.clone();
Ok(Status {
info,
bucket: self.name.to_string(),
})
}
pub async fn create<T: AsRef<str>>(
&self,
key: T,
value: bytes::Bytes,
) -> Result<u64, CreateError> {
self.create_maybe_ttl(key, value, None).await
}
pub async fn create_with_ttl<T: AsRef<str>>(
&self,
key: T,
value: bytes::Bytes,
ttl: Duration,
) -> Result<u64, CreateError> {
self.create_maybe_ttl(key, value, Some(ttl)).await
}
async fn create_maybe_ttl<T: AsRef<str>>(
&self,
key: T,
value: bytes::Bytes,
ttl: Option<Duration>,
) -> Result<u64, CreateError> {
let update_err = match self
.update_maybe_ttl(key.as_ref(), value.clone(), 0, ttl)
.await
{
Ok(revision) => return Ok(revision),
Err(err) => err,
};
match self.entry(key.as_ref()).await? {
Some(Entry {
operation: Operation::Delete | Operation::Purge,
revision,
..
}) => {
let revision = self.update(key, value, revision).await?;
Ok(revision)
}
Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
None => Err(update_err.into()),
}
}
pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
if !is_valid_key(key.as_ref()) {
return Err(PutError::new(PutErrorKind::InvalidKey));
}
let mut subject = String::new();
if self.use_jetstream_prefix {
subject.push_str(&self.stream.context.prefix);
subject.push('.');
}
subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
subject.push_str(key.as_ref());
let publish_ack = self
.stream
.context
.publish(subject, value)
.await
.map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
let ack = publish_ack
.await
.map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
Ok(ack.sequence)
}
async fn entry_maybe_revision<T: Into<String>>(
&self,
key: T,
revision: Option<u64>,
) -> Result<Option<Entry>, EntryError> {
let key: String = key.into();
if !is_valid_key(key.as_ref()) {
return Err(EntryError::new(EntryErrorKind::InvalidKey));
}
let subject = format!("{}{}", self.prefix.as_str(), &key);
let result: Option<(StreamMessage, Operation)> = {
if self.stream.info.config.allow_direct {
let message = match revision {
Some(revision) => {
let message = self.stream.direct_get(revision).await;
if let Ok(message) = message.as_ref() {
if message.subject.as_str() != subject {
println!("subject mismatch {}", message.subject);
return Ok(None);
}
}
message
}
None => {
self.stream
.direct_get_last_for_subject(subject.as_str())
.await
}
};
match message {
Ok(message) => {
let operation =
kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);
Some((message, operation))
}
Err(err) => {
if err.kind() == DirectGetErrorKind::NotFound {
None
} else {
return Err(err.into());
}
}
}
} else {
let raw_message = match revision {
Some(revision) => {
let message = self.stream.get_raw_message(revision).await;
if let Ok(message) = message.as_ref() {
if message.subject.as_str() != subject {
return Ok(None);
}
}
message
}
None => {
self.stream
.get_last_raw_message_by_subject(subject.as_str())
.await
}
};
match raw_message {
Ok(raw_message) => {
let operation = kv_operation_from_stream_message(&raw_message)
.unwrap_or(Operation::Put);
Some((raw_message, operation))
}
Err(err) => match err.kind() {
crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
return Err(EntryError::new(EntryErrorKind::InvalidKey))
}
crate::jetstream::stream::LastRawMessageErrorKind::Other => {
return Err(EntryError::with_source(EntryErrorKind::Other, err))
}
crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
return Err(EntryError::with_source(EntryErrorKind::Other, err))
}
},
}
}
};
match result {
Some((message, operation)) => {
let entry = Entry {
bucket: self.name.clone(),
key,
value: message.payload,
revision: message.sequence,
created: message.time,
operation,
delta: 0,
seen_current: false,
};
Ok(Some(entry))
}
None => Ok(None),
}
}
pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
self.entry_maybe_revision(key, None).await
}
pub async fn entry_for_revision<T: Into<String>>(
&self,
key: T,
revision: u64,
) -> Result<Option<Entry>, EntryError> {
self.entry_maybe_revision(key, Some(revision)).await
}
pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
self.watch_with_deliver_policy(key, DeliverPolicy::New)
.await
}
#[cfg(feature = "server_2_10")]
pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
where
T: AsRef<str>,
K: IntoIterator<Item = T>,
{
self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
.await
}
pub async fn watch_from_revision<T: AsRef<str>>(
&self,
key: T,
revision: u64,
) -> Result<Watch, WatchError> {
self.watch_with_deliver_policy(
key,
DeliverPolicy::ByStartSequence {
start_sequence: revision,
},
)
.await
}
pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
.await
}
#[cfg(feature = "server_2_10")]
pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
&self,
keys: K,
) -> Result<Watch, WatchError> {
self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
.await
}
#[cfg(feature = "server_2_10")]
async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
&self,
keys: K,
deliver_policy: DeliverPolicy,
) -> Result<Watch, WatchError> {
let subjects = keys
.into_iter()
.map(|key| {
let key = key.as_ref();
format!("{}{}", self.prefix.as_str(), key)
})
.collect::<Vec<_>>();
debug!("initial consumer creation");
let consumer = self
.stream
.create_consumer(super::consumer::push::OrderedConfig {
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("kv watch consumer".to_string()),
filter_subjects: subjects,
replay_policy: super::consumer::ReplayPolicy::Instant,
deliver_policy,
..Default::default()
})
.await
.map_err(|err| match err.kind() {
crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
WatchError::new(WatchErrorKind::TimedOut)
}
_ => WatchError::with_source(WatchErrorKind::Other, err),
})?;
let seen_current = consumer.cached_info().num_pending == 0;
Ok(Watch {
subscription: consumer.messages().await.map_err(|err| match err.kind() {
crate::jetstream::consumer::StreamErrorKind::TimedOut => {
WatchError::new(WatchErrorKind::TimedOut)
}
crate::jetstream::consumer::StreamErrorKind::Other => {
WatchError::with_source(WatchErrorKind::Other, err)
}
})?,
prefix: self.prefix.clone(),
bucket: self.name.clone(),
seen_current,
})
}
async fn watch_with_deliver_policy<T: AsRef<str>>(
&self,
key: T,
deliver_policy: DeliverPolicy,
) -> Result<Watch, WatchError> {
let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
debug!("initial consumer creation");
let consumer = self
.stream
.create_consumer(super::consumer::push::OrderedConfig {
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("kv watch consumer".to_string()),
filter_subject: subject,
replay_policy: super::consumer::ReplayPolicy::Instant,
deliver_policy,
..Default::default()
})
.await
.map_err(|err| match err.kind() {
crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
WatchError::new(WatchErrorKind::TimedOut)
}
_ => WatchError::with_source(WatchErrorKind::Other, err),
})?;
let seen_current = consumer.cached_info().num_pending == 0;
Ok(Watch {
subscription: consumer.messages().await.map_err(|err| match err.kind() {
crate::jetstream::consumer::StreamErrorKind::TimedOut => {
WatchError::new(WatchErrorKind::TimedOut)
}
crate::jetstream::consumer::StreamErrorKind::Other => {
WatchError::with_source(WatchErrorKind::Other, err)
}
})?,
prefix: self.prefix.clone(),
bucket: self.name.clone(),
seen_current,
})
}
pub async fn watch_all(&self) -> Result<Watch, WatchError> {
self.watch(ALL_KEYS).await
}
pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
self.watch_from_revision(ALL_KEYS, revision).await
}
pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
match self.entry(key).await {
Ok(Some(entry)) => match entry.operation {
Operation::Put => Ok(Some(entry.value)),
_ => Ok(None),
},
Ok(None) => Ok(None),
Err(err) => Err(err),
}
}
pub async fn update<T: AsRef<str>>(
&self,
key: T,
value: Bytes,
revision: u64,
) -> Result<u64, UpdateError> {
self.update_maybe_ttl(key, value, revision, None).await
}
async fn update_maybe_ttl<T: AsRef<str>>(
&self,
key: T,
value: Bytes,
revision: u64,
ttl: Option<Duration>,
) -> Result<u64, UpdateError> {
if !is_valid_key(key.as_ref()) {
return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
}
let mut subject = String::new();
if self.use_jetstream_prefix {
subject.push_str(&self.stream.context.prefix);
subject.push('.');
}
subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
subject.push_str(key.as_ref());
let mut headers = crate::HeaderMap::default();
headers.insert(
header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
HeaderValue::from(revision),
);
if let Some(ttl) = ttl {
headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
}
self.stream
.context
.publish_with_headers(subject, headers, value)
.await?
.await
.map_err(|err| err.into())
.map(|publish_ack| publish_ack.sequence)
}
pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
self.delete_expect_revision(key, None).await
}
pub async fn delete_expect_revision<T: AsRef<str>>(
&self,
key: T,
revison: Option<u64>,
) -> Result<(), DeleteError> {
if !is_valid_key(key.as_ref()) {
return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
}
let mut subject = String::new();
if self.use_jetstream_prefix {
subject.push_str(&self.stream.context.prefix);
subject.push('.');
}
subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
subject.push_str(key.as_ref());
let mut headers = crate::HeaderMap::default();
headers.insert(
KV_OPERATION,
KV_OPERATION_DELETE
.parse::<HeaderValue>()
.map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
);
if let Some(revision) = revison {
headers.insert(
header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
HeaderValue::from(revision),
);
}
self.stream
.context
.publish_with_headers(subject, headers, "".into())
.await?
.await?;
Ok(())
}
pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
self.purge_expect_revision(key, None).await
}
pub async fn purge_with_ttl<T: AsRef<str>>(
&self,
key: T,
ttl: Duration,
) -> Result<(), PurgeError> {
self.purge_expect_revision_maybe_ttl(key, None, Some(ttl))
.await
}
pub async fn purge_expect_revision<T: AsRef<str>>(
&self,
key: T,
revision: Option<u64>,
) -> Result<(), PurgeError> {
self.purge_expect_revision_maybe_ttl(key, revision, None)
.await
}
pub async fn purge_expect_revision_with_ttl<T: AsRef<str>>(
&self,
key: T,
revision: u64,
ttl: Duration,
) -> Result<(), PurgeError> {
self.purge_expect_revision_maybe_ttl(key, Some(revision), Some(ttl))
.await
}
async fn purge_expect_revision_maybe_ttl<T: AsRef<str>>(
&self,
key: T,
revision: Option<u64>,
ttl: Option<Duration>,
) -> Result<(), PurgeError> {
if !is_valid_key(key.as_ref()) {
return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
}
let mut subject = String::new();
if self.use_jetstream_prefix {
subject.push_str(&self.stream.context.prefix);
subject.push('.');
}
subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
subject.push_str(key.as_ref());
let mut headers = crate::HeaderMap::default();
headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
if let Some(ttl) = ttl {
headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
}
if let Some(revision) = revision {
headers.insert(
header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
HeaderValue::from(revision),
);
}
self.stream
.context
.publish_with_headers(subject, headers, "".into())
.await?
.await?;
Ok(())
}
pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
if !is_valid_key(key.as_ref()) {
return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
}
let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
let consumer = self
.stream
.create_consumer(super::consumer::push::OrderedConfig {
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("kv history consumer".to_string()),
filter_subject: subject,
replay_policy: super::consumer::ReplayPolicy::Instant,
..Default::default()
})
.await?;
Ok(History {
subscription: consumer.messages().await?,
done: false,
prefix: self.prefix.clone(),
bucket: self.name.clone(),
})
}
pub async fn keys(&self) -> Result<Keys, HistoryError> {
let subject = format!("{}>", self.prefix.as_str());
let consumer = self
.stream
.create_consumer(super::consumer::push::OrderedConfig {
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("kv history consumer".to_string()),
filter_subject: subject,
headers_only: true,
replay_policy: super::consumer::ReplayPolicy::Instant,
deliver_policy: DeliverPolicy::LastPerSubject,
..Default::default()
})
.await?;
let entries = History {
done: consumer.info.num_pending == 0,
subscription: consumer.messages().await?,
prefix: self.prefix.clone(),
bucket: self.name.clone(),
};
Ok(Keys { inner: entries })
}
}
pub struct Watch {
seen_current: bool,
subscription: super::consumer::push::Ordered,
prefix: String,
bucket: String,
}
impl futures_util::Stream for Watch {
type Item = Result<Entry, WatcherError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.subscription.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
None => Poll::Ready(None),
Some(message) => {
let message = message?;
let info = message.info().map_err(|err| {
WatcherError::with_source(
WatcherErrorKind::Other,
format!("failed to parse message metadata: {err}"),
)
})?;
let operation =
kv_operation_from_message(&message.message).unwrap_or(Operation::Put);
let key = message
.subject
.strip_prefix(&self.prefix)
.map(|s| s.to_string())
.unwrap();
if !self.seen_current && info.pending == 0 {
self.seen_current = true;
}
Poll::Ready(Some(Ok(Entry {
bucket: self.bucket.clone(),
key,
value: message.payload.clone(),
revision: info.stream_sequence,
created: info.published,
delta: info.pending,
operation,
seen_current: self.seen_current,
})))
}
},
std::task::Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
pub struct History {
subscription: super::consumer::push::Ordered,
done: bool,
prefix: String,
bucket: String,
}
impl futures_util::Stream for History {
type Item = Result<Entry, WatcherError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
match self.subscription.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
None => Poll::Ready(None),
Some(message) => {
let message = message?;
let info = message.info().map_err(|err| {
WatcherError::with_source(
WatcherErrorKind::Other,
format!("failed to parse message metadata: {err}"),
)
})?;
if info.pending == 0 {
self.done = true;
}
let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
let key = message
.subject
.strip_prefix(&self.prefix)
.map(|s| s.to_string())
.unwrap();
Poll::Ready(Some(Ok(Entry {
bucket: self.bucket.clone(),
key,
value: message.payload.clone(),
revision: info.stream_sequence,
created: info.published,
delta: info.pending,
operation,
seen_current: self.done,
})))
}
},
std::task::Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
pub struct Keys {
inner: History,
}
impl futures_util::Stream for Keys {
type Item = Result<String, WatcherError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
match self.inner.poll_next_unpin(cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(res)) => match res {
Ok(entry) => {
if matches!(entry.operation, Operation::Purge | Operation::Delete) {
continue;
} else {
return Poll::Ready(Some(Ok(entry.key)));
}
}
Err(e) => return Poll::Ready(Some(Err(e))),
},
Poll::Pending => return Poll::Pending,
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Entry {
pub bucket: String,
pub key: String,
pub value: Bytes,
pub revision: u64,
pub delta: u64,
pub created: OffsetDateTime,
pub operation: Operation,
pub seen_current: bool,
}
#[derive(Clone, Debug, PartialEq)]
pub enum StatusErrorKind {
JetStream(crate::jetstream::Error),
TimedOut,
}
impl Display for StatusErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::JetStream(err) => write!(f, "jetstream request failed: {err}"),
Self::TimedOut => write!(f, "timed out"),
}
}
}
pub type StatusError = Error<StatusErrorKind>;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum CreateErrorKind {
AlreadyExists,
InvalidKey,
Publish,
Ack,
Other,
}
impl From<UpdateError> for CreateError {
fn from(error: UpdateError) -> Self {
match error.kind() {
UpdateErrorKind::InvalidKey => {
CreateError::with_source(CreateErrorKind::InvalidKey, error)
}
UpdateErrorKind::TimedOut => CreateError::with_source(CreateErrorKind::Publish, error),
UpdateErrorKind::WrongLastRevision => {
CreateError::with_source(CreateErrorKind::AlreadyExists, error)
}
UpdateErrorKind::Other => CreateError::with_source(CreateErrorKind::Other, error),
}
}
}
impl From<PutError> for CreateError {
fn from(error: PutError) -> Self {
match error.kind() {
PutErrorKind::InvalidKey => {
CreateError::with_source(CreateErrorKind::InvalidKey, error)
}
PutErrorKind::Publish => CreateError::with_source(CreateErrorKind::Publish, error),
PutErrorKind::Ack => CreateError::with_source(CreateErrorKind::Ack, error),
}
}
}
impl From<EntryError> for CreateError {
fn from(error: EntryError) -> Self {
match error.kind() {
EntryErrorKind::InvalidKey => {
CreateError::with_source(CreateErrorKind::InvalidKey, error)
}
EntryErrorKind::TimedOut => CreateError::with_source(CreateErrorKind::Publish, error),
EntryErrorKind::Other => CreateError::with_source(CreateErrorKind::Other, error),
}
}
}
impl Display for CreateErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AlreadyExists => write!(f, "key already exists"),
Self::Publish => write!(f, "failed to create key in store"),
Self::Ack => write!(f, "ack error"),
Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
Self::Other => write!(f, "other error"),
}
}
}
pub type CreateError = Error<CreateErrorKind>;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum PutErrorKind {
InvalidKey,
Publish,
Ack,
}
impl Display for PutErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Publish => write!(f, "failed to put key into store"),
Self::Ack => write!(f, "ack error"),
Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
}
}
}
pub type PutError = Error<PutErrorKind>;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum EntryErrorKind {
InvalidKey,
TimedOut,
Other,
}
impl Display for EntryErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
Self::TimedOut => write!(f, "timed out"),
Self::Other => write!(f, "failed getting entry"),
}
}
}
pub type EntryError = Error<EntryErrorKind>;
crate::from_with_timeout!(
EntryError,
EntryErrorKind,
DirectGetError,
DirectGetErrorKind
);
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum WatchErrorKind {
InvalidKey,
TimedOut,
ConsumerCreate,
Other,
}
impl Display for WatchErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
Self::Other => write!(f, "watch failed"),
Self::TimedOut => write!(f, "timed out"),
Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
}
}
}
pub type WatchError = Error<WatchErrorKind>;
crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum UpdateErrorKind {
InvalidKey,
TimedOut,
WrongLastRevision,
Other,
}
impl Display for UpdateErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
Self::TimedOut => write!(f, "timed out"),
Self::WrongLastRevision => write!(f, "wrong last revision"),
Self::Other => write!(f, "failed getting entry"),
}
}
}
pub type UpdateError = Error<UpdateErrorKind>;
impl From<PublishError> for UpdateError {
fn from(err: PublishError) -> Self {
match err.kind() {
PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
PublishErrorKind::WrongLastSequence => {
Self::with_source(UpdateErrorKind::WrongLastRevision, err)
}
_ => Self::with_source(UpdateErrorKind::Other, err),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum WatcherErrorKind {
Consumer,
Other,
}
impl Display for WatcherErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Consumer => write!(f, "watcher consumer error"),
Self::Other => write!(f, "watcher error"),
}
}
}
pub type WatcherError = Error<WatcherErrorKind>;
impl From<OrderedError> for WatcherError {
fn from(err: OrderedError) -> Self {
WatcherError::with_source(WatcherErrorKind::Consumer, err)
}
}
pub type DeleteError = UpdateError;
pub type DeleteErrorKind = UpdateErrorKind;
pub type PurgeError = UpdateError;
pub type PurgeErrorKind = UpdateErrorKind;
pub type HistoryError = WatchError;
pub type HistoryErrorKind = WatchErrorKind;