use std::{
collections::{self, HashMap},
fmt::{self, Debug, Display},
future::IntoFuture,
io::{self},
pin::Pin,
str::FromStr,
task::Poll,
time::Duration,
};
use crate::{
error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
};
use base64::engine::general_purpose::STANDARD;
use base64::engine::Engine;
use bytes::Bytes;
use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::json;
use time::{serde::rfc3339, OffsetDateTime};
use super::{
consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
context::{
ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
StreamsErrorKind,
},
errors::ErrorCode,
is_valid_name,
message::{StreamMessage, StreamMessageError},
response::Response,
Context,
};
pub type InfoError = RequestError;
#[derive(Clone, Debug, PartialEq)]
pub enum DirectGetErrorKind {
NotFound,
InvalidSubject,
TimedOut,
Request,
ErrorResponse(StatusCode, String),
Other,
}
impl Display for DirectGetErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidSubject => write!(f, "invalid subject"),
Self::NotFound => write!(f, "message not found"),
Self::ErrorResponse(status, description) => {
write!(f, "unable to get message: {status} {description}")
}
Self::Other => write!(f, "error getting message"),
Self::TimedOut => write!(f, "timed out"),
Self::Request => write!(f, "request failed"),
}
}
}
pub type DirectGetError = Error<DirectGetErrorKind>;
impl From<crate::RequestError> for DirectGetError {
fn from(err: crate::RequestError) -> Self {
match err.kind() {
crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
crate::RequestErrorKind::NoResponders => {
DirectGetError::new(DirectGetErrorKind::ErrorResponse(
StatusCode::NO_RESPONDERS,
"no responders".to_string(),
))
}
crate::RequestErrorKind::InvalidSubject | crate::RequestErrorKind::Other => {
DirectGetError::with_source(DirectGetErrorKind::Other, err)
}
}
}
}
impl From<serde_json::Error> for DirectGetError {
fn from(err: serde_json::Error) -> Self {
DirectGetError::with_source(DirectGetErrorKind::Other, err)
}
}
impl From<StreamMessageError> for DirectGetError {
fn from(err: StreamMessageError) -> Self {
DirectGetError::with_source(DirectGetErrorKind::Other, err)
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum DeleteMessageErrorKind {
Request,
TimedOut,
JetStream(super::errors::Error),
}
impl Display for DeleteMessageErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Request => write!(f, "request failed"),
Self::TimedOut => write!(f, "timed out"),
Self::JetStream(err) => write!(f, "JetStream error: {err}"),
}
}
}
pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
#[derive(Debug, Clone)]
pub struct Stream<T = Info> {
pub(crate) info: T,
pub(crate) context: Context,
pub(crate) name: String,
}
impl Stream<Info> {
pub async fn info(&mut self) -> Result<&Info, InfoError> {
let subject = format!("STREAM.INFO.{}", self.info.config.name);
match self.context.request(subject, &json!({})).await? {
Response::Ok::<Info>(info) => {
self.info = info;
Ok(&self.info)
}
Response::Err { error } => Err(error.into()),
}
}
pub fn cached_info(&self) -> &Info {
&self.info
}
}
impl<I> Stream<I> {
pub async fn get_info(&self) -> Result<Info, InfoError> {
let subject = format!("STREAM.INFO.{}", self.name);
match self.context.request(subject, &json!({})).await? {
Response::Ok::<Info>(info) => Ok(info),
Response::Err { error } => Err(error.into()),
}
}
pub async fn info_with_subjects<F: AsRef<str>>(
&self,
subjects_filter: F,
) -> Result<InfoWithSubjects, InfoError> {
let subjects_filter = subjects_filter.as_ref().to_string();
let info = stream_info_with_details(
self.context.clone(),
self.name.clone(),
0,
false,
subjects_filter.clone(),
)
.await?;
Ok(InfoWithSubjects::new(
self.context.clone(),
info,
subjects_filter,
))
}
pub fn info_builder(&self) -> StreamInfoBuilder {
StreamInfoBuilder::new(self.context.clone(), self.name.clone())
}
pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders> {
DirectGetBuilder::new(self.context.clone(), self.name.clone())
}
pub async fn direct_get_next_for_subject<T: Into<String>>(
&self,
subject: T,
sequence: Option<u64>,
) -> Result<StreamMessage, DirectGetError> {
let subject_str = subject.into();
if !is_valid_subject(&subject_str) {
return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
}
let mut builder = self.direct_get_builder().next_by_subject(subject_str);
if let Some(seq) = sequence {
builder = builder.sequence(seq);
}
builder.send().await
}
pub async fn direct_get_first_for_subject<T: Into<String>>(
&self,
subject: T,
) -> Result<StreamMessage, DirectGetError> {
let subject_str = subject.into();
if !is_valid_subject(&subject_str) {
return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
}
self.direct_get_builder()
.next_by_subject(subject_str)
.send()
.await
}
pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
self.direct_get_builder().sequence(sequence).send().await
}
pub async fn direct_get_last_for_subject<T: Into<String>>(
&self,
subject: T,
) -> Result<StreamMessage, DirectGetError> {
self.direct_get_builder()
.last_by_subject(subject)
.send()
.await
}
pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders> {
RawMessageBuilder::new(self.context.clone(), self.name.clone())
}
pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
self.raw_message_builder().sequence(sequence).send().await
}
pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
&self,
subject: T,
sequence: u64,
) -> Result<StreamMessage, RawMessageError> {
self.raw_message_builder()
.sequence(sequence)
.next_by_subject(subject.as_ref().to_string())
.send()
.await
}
pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
&self,
subject: T,
) -> Result<StreamMessage, RawMessageError> {
self.raw_message_builder()
.next_by_subject(subject.as_ref().to_string())
.send()
.await
}
pub async fn get_last_raw_message_by_subject(
&self,
stream_subject: &str,
) -> Result<StreamMessage, LastRawMessageError> {
self.raw_message_builder()
.last_by_subject(stream_subject.to_string())
.send()
.await
}
pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
let payload = json!({
"seq": sequence,
});
let response: Response<DeleteStatus> = self
.context
.request(subject, &payload)
.map_err(|err| match err.kind() {
RequestErrorKind::TimedOut => {
DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
}
_ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
})
.await?;
match response {
Response::Err { error } => Err(DeleteMessageError::new(
DeleteMessageErrorKind::JetStream(error),
)),
Response::Ok(value) => Ok(value.success),
}
}
pub fn purge(&self) -> Purge<No, No> {
Purge::build(self)
}
#[deprecated(
since = "0.25.0",
note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
)]
pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
where
T: Into<String>,
{
self.purge().filter(subject).await
}
pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C,
) -> Result<Consumer<C>, ConsumerError> {
self.context
.create_consumer_on_stream(config, self.name.clone())
.await
}
#[cfg(feature = "server_2_10")]
pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C,
) -> Result<Consumer<C>, ConsumerUpdateError> {
self.context
.update_consumer_on_stream(config, self.name.clone())
.await
}
#[cfg(feature = "server_2_10")]
pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C,
) -> Result<Consumer<C>, ConsumerCreateStrictError> {
self.context
.create_consumer_strict_on_stream(config, self.name.clone())
.await
}
pub async fn consumer_info<T: AsRef<str>>(
&self,
name: T,
) -> Result<consumer::Info, ConsumerInfoError> {
let name = name.as_ref();
if !is_valid_name(name) {
return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
}
let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
match self.context.request(subject, &json!({})).await? {
Response::Ok(info) => Ok(info),
Response::Err { error } => Err(error.into()),
}
}
pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
) -> Result<Consumer<T>, crate::Error> {
let info = self.consumer_info(name).await?;
Ok(Consumer::new(
T::try_from_consumer_config(info.config.clone())?,
info,
self.context.clone(),
))
}
pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
config: T,
) -> Result<Consumer<T>, ConsumerError> {
let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
match self.context.request(subject, &json!({})).await? {
Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
Response::Err { error } => Err(error.into()),
Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
T::try_from_consumer_config(info.config.clone()).map_err(|err| {
ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
})?,
info,
self.context.clone(),
)),
}
}
pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
match self.context.request(subject, &json!({})).await? {
Response::Ok(delete_status) => Ok(delete_status),
Response::Err { error } => Err(error.into()),
}
}
#[cfg(feature = "server_2_11")]
pub async fn pause_consumer(
&self,
name: &str,
pause_until: OffsetDateTime,
) -> Result<PauseResponse, ConsumerError> {
self.request_pause_consumer(name, Some(pause_until)).await
}
#[cfg(feature = "server_2_11")]
pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
self.request_pause_consumer(name, None).await
}
#[cfg(feature = "server_2_11")]
async fn request_pause_consumer(
&self,
name: &str,
pause_until: Option<OffsetDateTime>,
) -> Result<PauseResponse, ConsumerError> {
let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
let payload = &PauseResumeConsumerRequest { pause_until };
match self.context.request(subject, payload).await? {
Response::Ok::<PauseResponse>(resp) => Ok(resp),
Response::Err { error } => Err(error.into()),
}
}
pub fn consumer_names(&self) -> ConsumerNames {
ConsumerNames {
context: self.context.clone(),
stream: self.name.clone(),
offset: 0,
page_request: None,
consumers: Vec::new(),
done: false,
}
}
pub fn consumers(&self) -> Consumers {
Consumers {
context: self.context.clone(),
stream: self.name.clone(),
offset: 0,
page_request: None,
consumers: Vec::new(),
done: false,
}
}
}
pub struct StreamInfoBuilder {
pub(crate) context: Context,
pub(crate) name: String,
pub(crate) deleted: bool,
pub(crate) subject: String,
}
impl StreamInfoBuilder {
fn new(context: Context, name: String) -> Self {
Self {
context,
name,
deleted: false,
subject: "".to_string(),
}
}
pub fn with_deleted(mut self, deleted: bool) -> Self {
self.deleted = deleted;
self
}
pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
self.subject = subject.into();
self
}
pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
let info = stream_info_with_details(
self.context.clone(),
self.name.clone(),
0,
self.deleted,
self.subject.clone(),
)
.await?;
Ok(InfoWithSubjects::new(self.context, info, self.subject))
}
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Config {
pub name: String,
#[serde(default)]
pub max_bytes: i64,
#[serde(default, rename = "max_msgs")]
pub max_messages: i64,
#[serde(default, rename = "max_msgs_per_subject")]
pub max_messages_per_subject: i64,
pub discard: DiscardPolicy,
#[serde(default, skip_serializing_if = "is_default")]
pub discard_new_per_subject: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub subjects: Vec<String>,
pub retention: RetentionPolicy,
#[serde(default)]
pub max_consumers: i32,
#[serde(default, with = "serde_nanos")]
pub max_age: Duration,
#[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
pub max_message_size: i32,
pub storage: StorageType,
pub num_replicas: usize,
#[serde(default, skip_serializing_if = "is_default")]
pub no_ack: bool,
#[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
pub duplicate_window: Duration,
#[serde(default, skip_serializing_if = "is_default")]
pub template_owner: String,
#[serde(default, skip_serializing_if = "is_default")]
pub sealed: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub description: Option<String>,
#[serde(
default,
rename = "allow_rollup_hdrs",
skip_serializing_if = "is_default"
)]
pub allow_rollup: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub deny_delete: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub deny_purge: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub republish: Option<Republish>,
#[serde(default, skip_serializing_if = "is_default")]
pub allow_direct: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub mirror_direct: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mirror: Option<Source>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sources: Option<Vec<Source>>,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub metadata: HashMap<String, String>,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub subject_transform: Option<SubjectTransform>,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub compression: Option<Compression>,
#[cfg(feature = "server_2_10")]
#[serde(default, deserialize_with = "default_consumer_limits_as_none")]
pub consumer_limits: Option<ConsumerLimits>,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
pub first_sequence: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub placement: Option<Placement>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub persist_mode: Option<PersistenceMode>,
#[cfg(feature = "server_2_11")]
#[serde(
default,
with = "rfc3339::option",
skip_serializing_if = "Option::is_none"
)]
pub pause_until: Option<OffsetDateTime>,
#[cfg(feature = "server_2_11")]
#[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
pub allow_message_ttl: bool,
#[cfg(feature = "server_2_11")]
#[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
pub subject_delete_marker_ttl: Option<Duration>,
#[cfg(feature = "server_2_12")]
#[serde(default, skip_serializing_if = "is_default", rename = "allow_atomic")]
pub allow_atomic_publish: bool,
#[cfg(feature = "server_2_12")]
#[serde(
default,
skip_serializing_if = "is_default",
rename = "allow_msg_schedules"
)]
pub allow_message_schedules: bool,
#[cfg(feature = "server_2_12")]
#[serde(
default,
skip_serializing_if = "is_default",
rename = "allow_msg_counter"
)]
pub allow_message_counter: bool,
}
impl From<&Config> for Config {
fn from(sc: &Config) -> Config {
sc.clone()
}
}
impl From<&str> for Config {
fn from(s: &str) -> Config {
Config {
name: s.to_string(),
..Default::default()
}
}
}
#[cfg(feature = "server_2_10")]
fn default_consumer_limits_as_none<'de, D>(
deserializer: D,
) -> Result<Option<ConsumerLimits>, D::Error>
where
D: Deserializer<'de>,
{
let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
if let Some(cl) = consumer_limits {
if cl == ConsumerLimits::default() {
Ok(None)
} else {
Ok(Some(cl))
}
} else {
Ok(None)
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
pub struct ConsumerLimits {
#[serde(default, with = "serde_nanos")]
pub inactive_threshold: std::time::Duration,
#[serde(default)]
pub max_ack_pending: i64,
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub enum Compression {
#[serde(rename = "s2")]
S2,
#[serde(rename = "none")]
None,
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct SubjectTransform {
#[serde(rename = "src")]
pub source: String,
#[serde(rename = "dest")]
pub destination: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct Republish {
#[serde(rename = "src")]
pub source: String,
#[serde(rename = "dest")]
pub destination: String,
#[serde(default)]
pub headers_only: bool,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct Placement {
#[serde(default, skip_serializing_if = "is_default")]
pub cluster: Option<String>,
#[serde(default, skip_serializing_if = "is_default")]
pub tags: Vec<String>,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum DiscardPolicy {
#[default]
#[serde(rename = "old")]
Old = 0,
#[serde(rename = "new")]
New = 1,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum RetentionPolicy {
#[default]
#[serde(rename = "limits")]
Limits = 0,
#[serde(rename = "interest")]
Interest = 1,
#[serde(rename = "workqueue")]
WorkQueue = 2,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StorageType {
#[default]
#[serde(rename = "file")]
File = 0,
#[serde(rename = "memory")]
Memory = 1,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum PersistenceMode {
#[default]
#[serde(rename = "default")]
Default = 0,
#[serde(rename = "async")]
Async = 1,
}
async fn stream_info_with_details(
context: Context,
stream: String,
offset: usize,
deleted_details: bool,
subjects_filter: String,
) -> Result<Info, InfoError> {
let subject = format!("STREAM.INFO.{stream}");
let payload = StreamInfoRequest {
offset,
deleted_details,
subjects_filter,
};
let response: Response<Info> = context.request(subject, &payload).await?;
match response {
Response::Ok(info) => Ok(info),
Response::Err { error } => Err(error.into()),
}
}
type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct StreamInfoRequest {
offset: usize,
deleted_details: bool,
subjects_filter: String,
}
pub struct InfoWithSubjects {
stream: String,
context: Context,
pub info: Info,
offset: usize,
subjects: collections::hash_map::IntoIter<String, usize>,
info_request: Option<InfoRequest>,
subjects_filter: String,
pages_done: bool,
}
impl InfoWithSubjects {
pub fn new(context: Context, mut info: Info, subject: String) -> Self {
let subjects = info.state.subjects.take().unwrap_or_default();
let name = info.config.name.clone();
InfoWithSubjects {
context,
info,
pages_done: subjects.is_empty(),
offset: subjects.len(),
subjects: subjects.into_iter(),
subjects_filter: subject,
stream: name,
info_request: None,
}
}
}
impl futures_util::Stream for InfoWithSubjects {
type Item = Result<(String, usize), InfoError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.subjects.next() {
Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
None => {
if self.pages_done {
return Poll::Ready(None);
}
let stream = self.stream.clone();
let context = self.context.clone();
let subjects_filter = self.subjects_filter.clone();
let offset = self.offset;
match self
.info_request
.get_or_insert_with(|| {
Box::pin(stream_info_with_details(
context,
stream,
offset,
false,
subjects_filter,
))
})
.poll_unpin(cx)
{
Poll::Ready(resp) => match resp {
Ok(info) => {
let subjects = info.state.subjects.clone();
self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
self.info_request = None;
let subjects = subjects.unwrap_or_default();
self.subjects = info.state.subjects.unwrap_or_default().into_iter();
let total = info.paged_info.map(|info| info.total).unwrap_or(0);
if total <= self.offset || subjects.is_empty() {
self.pages_done = true;
}
match self.subjects.next() {
Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
None => Poll::Ready(None),
}
}
Err(err) => Poll::Ready(Some(Err(err))),
},
Poll::Pending => Poll::Pending,
}
}
}
}
}
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct Info {
pub config: Config,
#[serde(with = "rfc3339")]
pub created: time::OffsetDateTime,
pub state: State,
pub cluster: Option<ClusterInfo>,
#[serde(default)]
pub mirror: Option<SourceInfo>,
#[serde(default)]
pub sources: Vec<SourceInfo>,
#[serde(flatten)]
paged_info: Option<PagedInfo>,
}
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct PagedInfo {
offset: usize,
total: usize,
limit: usize,
}
#[derive(Deserialize)]
pub struct DeleteStatus {
pub success: bool,
}
#[cfg(feature = "server_2_11")]
#[derive(Deserialize)]
pub struct PauseResponse {
pub paused: bool,
#[serde(with = "rfc3339")]
pub pause_until: OffsetDateTime,
#[serde(default, with = "serde_nanos")]
pub pause_remaining: Option<Duration>,
}
#[cfg(feature = "server_2_11")]
#[derive(Serialize, Debug)]
struct PauseResumeConsumerRequest {
#[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
pause_until: Option<OffsetDateTime>,
}
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct State {
pub messages: u64,
pub bytes: u64,
#[serde(rename = "first_seq")]
pub first_sequence: u64,
#[serde(with = "rfc3339", rename = "first_ts")]
pub first_timestamp: time::OffsetDateTime,
#[serde(rename = "last_seq")]
pub last_sequence: u64,
#[serde(with = "rfc3339", rename = "last_ts")]
pub last_timestamp: time::OffsetDateTime,
pub consumer_count: usize,
#[serde(default, rename = "num_subjects")]
pub subjects_count: u64,
#[serde(default, rename = "num_deleted")]
pub deleted_count: Option<u64>,
#[serde(default)]
pub deleted: Option<Vec<u64>>,
pub(crate) subjects: Option<HashMap<String, usize>>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RawMessage {
#[serde(rename = "subject")]
pub subject: String,
#[serde(rename = "seq")]
pub sequence: u64,
#[serde(default, rename = "data")]
pub payload: String,
#[serde(default, rename = "hdrs")]
pub headers: Option<String>,
#[serde(rename = "time", with = "rfc3339")]
pub time: time::OffsetDateTime,
}
impl TryFrom<RawMessage> for StreamMessage {
type Error = crate::Error;
fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
let decoded_payload = STANDARD
.decode(value.payload)
.map_err(|err| Box::new(std::io::Error::other(err)))?;
let decoded_headers = value
.headers
.map(|header| STANDARD.decode(header))
.map_or(Ok(None), |v| v.map(Some))?;
let (headers, _, _) = decoded_headers
.map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
Ok(StreamMessage {
subject: value.subject.into(),
payload: decoded_payload.into(),
headers,
sequence: value.sequence,
time: value.time,
})
}
}
fn is_continuation(c: char) -> bool {
c == ' ' || c == '\t'
}
const HEADER_LINE: &str = "NATS/1.0";
#[allow(clippy::type_complexity)]
fn parse_headers(
buf: &[u8],
) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
let mut headers = HeaderMap::new();
let mut maybe_status: Option<StatusCode> = None;
let mut maybe_description: Option<String> = None;
let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
line.lines().peekable()
} else {
return Err(Box::new(std::io::Error::other("invalid header")));
};
if let Some(line) = lines.next() {
let line = line
.strip_prefix(HEADER_LINE)
.ok_or_else(|| {
Box::new(std::io::Error::other(
"version line does not start with NATS/1.0",
))
})?
.trim();
match line.split_once(' ') {
Some((status, description)) => {
if !status.is_empty() {
maybe_status = Some(status.parse()?);
}
if !description.is_empty() {
maybe_description = Some(description.trim().to_string());
}
}
None => {
if !line.is_empty() {
maybe_status = Some(line.parse()?);
}
}
}
} else {
return Err(Box::new(std::io::Error::other(
"expected header information not found",
)));
};
while let Some(line) = lines.next() {
if line.is_empty() {
continue;
}
if let Some((k, v)) = line.split_once(':').to_owned() {
let mut s = String::from(v.trim());
while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
s.push(' ');
s.push_str(v.trim());
}
headers.insert(
HeaderName::from_str(k)?,
HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
);
} else {
return Err(Box::new(std::io::Error::other("malformed header line")));
}
}
if headers.is_empty() {
Ok((HeaderMap::new(), maybe_status, maybe_description))
} else {
Ok((headers, maybe_status, maybe_description))
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct GetRawMessage {
pub(crate) message: RawMessage,
}
fn is_default<T: Default + Eq>(t: &T) -> bool {
t == &T::default()
}
#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
pub struct ClusterInfo {
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub raft_group: Option<String>,
#[serde(default)]
pub leader: Option<String>,
#[serde(default, with = "rfc3339::option")]
pub leader_since: Option<OffsetDateTime>,
#[cfg(feature = "server_2_12")]
#[serde(default)]
pub system_account: bool,
#[cfg(feature = "server_2_12")]
#[serde(default)]
pub traffic_account: Option<String>,
#[serde(default)]
pub replicas: Vec<PeerInfo>,
}
#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
pub struct PeerInfo {
pub name: String,
pub current: bool,
#[serde(with = "serde_nanos")]
pub active: Duration,
#[serde(default)]
pub offline: bool,
pub lag: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct SourceInfo {
pub name: String,
pub lag: u64,
#[serde(deserialize_with = "negative_duration_as_none")]
pub active: Option<std::time::Duration>,
#[serde(default)]
pub filter_subject: Option<String>,
#[serde(default)]
pub subject_transform_dest: Option<String>,
#[serde(default)]
pub subject_transforms: Vec<SubjectTransform>,
}
fn negative_duration_as_none<'de, D>(
deserializer: D,
) -> Result<Option<std::time::Duration>, D::Error>
where
D: Deserializer<'de>,
{
let n = i64::deserialize(deserializer)?;
if n.is_negative() {
Ok(None)
} else {
Ok(Some(std::time::Duration::from_nanos(n as u64)))
}
}
#[derive(Debug, Deserialize, Clone, Copy)]
pub struct PurgeResponse {
pub success: bool,
pub purged: u64,
}
#[derive(Default, Debug, Serialize, Clone)]
pub struct PurgeRequest {
#[serde(default, rename = "seq", skip_serializing_if = "is_default")]
pub sequence: Option<u64>,
#[serde(default, skip_serializing_if = "is_default")]
pub filter: Option<String>,
#[serde(default, skip_serializing_if = "is_default")]
pub keep: Option<u64>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct Source {
pub name: String,
#[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
pub start_sequence: Option<u64>,
#[serde(
default,
rename = "opt_start_time",
skip_serializing_if = "is_default",
with = "rfc3339::option"
)]
pub start_time: Option<OffsetDateTime>,
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub external: Option<External>,
#[serde(default, skip_serializing_if = "is_default")]
pub domain: Option<String>,
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub subject_transforms: Vec<SubjectTransform>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct External {
#[serde(rename = "api")]
pub api_prefix: String,
#[serde(rename = "deliver", skip_serializing_if = "is_default")]
pub delivery_prefix: Option<String>,
}
use std::marker::PhantomData;
#[derive(Debug, Default)]
pub struct Yes;
#[derive(Debug, Default)]
pub struct No;
pub trait ToAssign: Debug {}
impl ToAssign for Yes {}
impl ToAssign for No {}
#[derive(Debug)]
pub struct Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
inner: PurgeRequest,
sequence_set: PhantomData<SEQUENCE>,
keep_set: PhantomData<KEEP>,
context: Context,
stream_name: String,
}
impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
self.inner.filter = Some(filter.into());
self
}
}
impl Purge<No, No> {
pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
Purge {
context: stream.context.clone(),
stream_name: stream.name.clone(),
inner: Default::default(),
sequence_set: PhantomData {},
keep_set: PhantomData {},
}
}
}
impl<KEEP> Purge<No, KEEP>
where
KEEP: ToAssign,
{
pub fn keep(self, keep: u64) -> Purge<No, Yes> {
Purge {
context: self.context.clone(),
stream_name: self.stream_name.clone(),
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
keep: Some(keep),
..self.inner
},
}
}
}
impl<SEQUENCE> Purge<SEQUENCE, No>
where
SEQUENCE: ToAssign,
{
pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
Purge {
context: self.context.clone(),
stream_name: self.stream_name.clone(),
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
sequence: Some(sequence),
..self.inner
},
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum PurgeErrorKind {
Request,
TimedOut,
JetStream(super::errors::Error),
}
impl Display for PurgeErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Request => write!(f, "request failed"),
Self::TimedOut => write!(f, "timed out"),
Self::JetStream(err) => write!(f, "JetStream error: {err}"),
}
}
}
pub type PurgeError = Error<PurgeErrorKind>;
impl<S, K> IntoFuture for Purge<S, K>
where
S: ToAssign + std::marker::Send,
K: ToAssign + std::marker::Send,
{
type Output = Result<PurgeResponse, PurgeError>;
type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(std::future::IntoFuture::into_future(async move {
let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
let response: Response<PurgeResponse> = self
.context
.request(request_subject, &self.inner)
.map_err(|err| match err.kind() {
RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
_ => PurgeError::with_source(PurgeErrorKind::Request, err),
})
.await?;
match response {
Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
Response::Ok(response) => Ok(response),
}
}))
}
}
#[derive(Deserialize, Debug)]
struct ConsumerPage {
total: usize,
consumers: Option<Vec<String>>,
}
#[derive(Deserialize, Debug)]
struct ConsumerInfoPage {
total: usize,
consumers: Option<Vec<super::consumer::Info>>,
}
type ConsumerNamesErrorKind = StreamsErrorKind;
type ConsumerNamesError = StreamsError;
type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
pub struct ConsumerNames {
context: Context,
stream: String,
offset: usize,
page_request: Option<PageRequest>,
consumers: Vec<String>,
done: bool,
}
impl futures_util::Stream for ConsumerNames {
type Item = Result<String, ConsumerNamesError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.page_request.as_mut() {
Some(page) => match page.try_poll_unpin(cx) {
std::task::Poll::Ready(page) => {
self.page_request = None;
let page = page.map_err(|err| {
ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
})?;
if let Some(consumers) = page.consumers {
self.offset += consumers.len();
self.consumers = consumers;
if self.offset >= page.total {
self.done = true;
}
match self.consumers.pop() {
Some(stream) => Poll::Ready(Some(Ok(stream))),
None => Poll::Ready(None),
}
} else {
Poll::Ready(None)
}
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
None => {
if let Some(stream) = self.consumers.pop() {
Poll::Ready(Some(Ok(stream)))
} else {
if self.done {
return Poll::Ready(None);
}
let context = self.context.clone();
let offset = self.offset;
let stream = self.stream.clone();
self.page_request = Some(Box::pin(async move {
match context
.request(
format!("CONSUMER.NAMES.{stream}"),
&json!({
"offset": offset,
}),
)
.await?
{
Response::Err { error } => Err(RequestError::with_source(
super::context::RequestErrorKind::Other,
error,
)),
Response::Ok(page) => Ok(page),
}
}));
self.poll_next(cx)
}
}
}
}
}
pub type ConsumersErrorKind = StreamsErrorKind;
pub type ConsumersError = StreamsError;
type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
pub struct Consumers {
context: Context,
stream: String,
offset: usize,
page_request: Option<PageInfoRequest>,
consumers: Vec<super::consumer::Info>,
done: bool,
}
impl futures_util::Stream for Consumers {
type Item = Result<super::consumer::Info, ConsumersError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.page_request.as_mut() {
Some(page) => match page.try_poll_unpin(cx) {
std::task::Poll::Ready(page) => {
self.page_request = None;
let page = page.map_err(|err| {
ConsumersError::with_source(ConsumersErrorKind::Other, err)
})?;
if let Some(consumers) = page.consumers {
self.offset += consumers.len();
self.consumers = consumers;
if self.offset >= page.total {
self.done = true;
}
match self.consumers.pop() {
Some(consumer) => Poll::Ready(Some(Ok(consumer))),
None => Poll::Ready(None),
}
} else {
Poll::Ready(None)
}
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
None => {
if let Some(stream) = self.consumers.pop() {
Poll::Ready(Some(Ok(stream)))
} else {
if self.done {
return Poll::Ready(None);
}
let context = self.context.clone();
let offset = self.offset;
let stream = self.stream.clone();
self.page_request = Some(Box::pin(async move {
match context
.request(
format!("CONSUMER.LIST.{stream}"),
&json!({
"offset": offset,
}),
)
.await?
{
Response::Err { error } => Err(RequestError::with_source(
super::context::RequestErrorKind::Other,
error,
)),
Response::Ok(page) => Ok(page),
}
}));
self.poll_next(cx)
}
}
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum LastRawMessageErrorKind {
NoMessageFound,
InvalidSubject,
JetStream(super::errors::Error),
Other,
}
impl Display for LastRawMessageErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoMessageFound => write!(f, "no message found"),
Self::InvalidSubject => write!(f, "invalid subject"),
Self::Other => write!(f, "failed to get last raw message"),
Self::JetStream(err) => write!(f, "JetStream error: {err}"),
}
}
}
pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
pub type RawMessageErrorKind = LastRawMessageErrorKind;
pub type RawMessageError = LastRawMessageError;
#[derive(Clone, Debug, PartialEq)]
pub enum ConsumerErrorKind {
TimedOut,
Request,
InvalidConsumerType,
InvalidName,
JetStream(super::errors::Error),
Other,
}
impl Display for ConsumerErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TimedOut => write!(f, "timed out"),
Self::Request => write!(f, "request failed"),
Self::JetStream(err) => write!(f, "JetStream error: {err}"),
Self::Other => write!(f, "consumer error"),
Self::InvalidConsumerType => write!(f, "invalid consumer type"),
Self::InvalidName => write!(f, "invalid consumer name"),
}
}
}
pub type ConsumerError = Error<ConsumerErrorKind>;
#[derive(Clone, Debug, PartialEq)]
pub enum ConsumerCreateStrictErrorKind {
TimedOut,
Request,
InvalidConsumerType,
InvalidName,
AlreadyExists,
JetStream(super::errors::Error),
Other,
}
impl Display for ConsumerCreateStrictErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TimedOut => write!(f, "timed out"),
Self::Request => write!(f, "request failed"),
Self::JetStream(err) => write!(f, "JetStream error: {err}"),
Self::Other => write!(f, "consumer error"),
Self::InvalidConsumerType => write!(f, "invalid consumer type"),
Self::InvalidName => write!(f, "invalid consumer name"),
Self::AlreadyExists => write!(f, "consumer already exists"),
}
}
}
pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
#[derive(Clone, Debug, PartialEq)]
pub enum ConsumerUpdateErrorKind {
TimedOut,
Request,
InvalidConsumerType,
InvalidName,
DoesNotExist,
JetStream(super::errors::Error),
Other,
}
impl Display for ConsumerUpdateErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TimedOut => write!(f, "timed out"),
Self::Request => write!(f, "request failed"),
Self::JetStream(err) => write!(f, "JetStream error: {err}"),
Self::Other => write!(f, "consumer error"),
Self::InvalidConsumerType => write!(f, "invalid consumer type"),
Self::InvalidName => write!(f, "invalid consumer name"),
Self::DoesNotExist => write!(f, "consumer does not exist"),
}
}
}
pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
impl From<super::errors::Error> for ConsumerError {
fn from(err: super::errors::Error) -> Self {
ConsumerError::new(ConsumerErrorKind::JetStream(err))
}
}
impl From<super::errors::Error> for ConsumerCreateStrictError {
fn from(err: super::errors::Error) -> Self {
if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
} else {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
}
}
}
impl From<super::errors::Error> for ConsumerUpdateError {
fn from(err: super::errors::Error) -> Self {
if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
} else {
ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
}
}
}
impl From<ConsumerError> for ConsumerUpdateError {
fn from(err: ConsumerError) -> Self {
match err.kind() {
ConsumerErrorKind::JetStream(err) => {
if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
} else {
ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
}
}
ConsumerErrorKind::Request => {
ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
}
ConsumerErrorKind::TimedOut => {
ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
}
ConsumerErrorKind::InvalidConsumerType => {
ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
}
ConsumerErrorKind::InvalidName => {
ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
}
ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
}
}
}
impl From<ConsumerError> for ConsumerCreateStrictError {
fn from(err: ConsumerError) -> Self {
match err.kind() {
ConsumerErrorKind::JetStream(err) => {
if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
} else {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
}
}
ConsumerErrorKind::Request => {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
}
ConsumerErrorKind::TimedOut => {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
}
ConsumerErrorKind::InvalidConsumerType => {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
}
ConsumerErrorKind::InvalidName => {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
}
ConsumerErrorKind::Other => {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
}
}
}
}
impl From<super::context::RequestError> for ConsumerError {
fn from(err: super::context::RequestError) -> Self {
match err.kind() {
RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
_ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
}
}
}
impl From<super::context::RequestError> for ConsumerUpdateError {
fn from(err: super::context::RequestError) -> Self {
match err.kind() {
RequestErrorKind::TimedOut => {
ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
}
_ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
}
}
}
impl From<super::context::RequestError> for ConsumerCreateStrictError {
fn from(err: super::context::RequestError) -> Self {
match err.kind() {
RequestErrorKind::TimedOut => {
ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
}
_ => {
ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
}
}
}
}
#[derive(Debug, Serialize, Default)]
pub struct DirectGetRequest {
#[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
sequence: Option<u64>,
#[serde(rename = "last_by_subj", skip_serializing)]
last_by_subject: Option<String>,
#[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
next_by_subject: Option<String>,
}
pub struct WithHeaders;
pub struct WithoutHeaders;
trait DirectGetResponse: Sized {
fn from_message(message: crate::Message) -> Result<Self, DirectGetError>;
}
impl DirectGetResponse for StreamMessage {
fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
StreamMessage::try_from(message).map_err(Into::into)
}
}
impl DirectGetResponse for StreamValue {
fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
Ok(StreamValue {
data: message.payload,
})
}
}
pub struct DirectGetBuilder<T = WithHeaders> {
context: Context,
stream_name: String,
request: DirectGetRequest,
_phantom: std::marker::PhantomData<T>,
}
impl DirectGetBuilder<WithHeaders> {
fn new(context: Context, stream_name: String) -> DirectGetBuilder<WithHeaders> {
DirectGetBuilder {
context,
stream_name,
request: DirectGetRequest::default(),
_phantom: std::marker::PhantomData,
}
}
}
impl<T> DirectGetBuilder<T> {
async fn send_internal<R: DirectGetResponse>(&self) -> Result<R, DirectGetError> {
let payload = if self.request.last_by_subject.is_some() {
Bytes::new()
} else {
serde_json::to_vec(&self.request).map(Bytes::from)?
};
let request_subject = if let Some(ref subject) = self.request.last_by_subject {
format!(
"{}.DIRECT.GET.{}.{}",
&self.context.prefix, &self.stream_name, subject
)
} else {
format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.stream_name)
};
let response = self
.context
.client
.request(request_subject, payload)
.await?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
match status {
StatusCode::NOT_FOUND => {
return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
}
StatusCode::TIMEOUT => {
return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
}
_ => {
return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
status,
description.to_string(),
)));
}
}
}
}
R::from_message(response)
}
pub fn sequence(mut self, seq: u64) -> Self {
self.request.sequence = Some(seq);
self
}
pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
self.request.last_by_subject = Some(subject.into());
self
}
pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
self.request.next_by_subject = Some(subject.into());
self
}
}
impl DirectGetBuilder<WithHeaders> {
pub async fn send(self) -> Result<StreamMessage, DirectGetError> {
self.send_internal::<StreamMessage>().await
}
}
impl DirectGetBuilder<WithoutHeaders> {
pub async fn send(self) -> Result<StreamValue, DirectGetError> {
self.send_internal::<StreamValue>().await
}
}
pub struct StreamValue {
pub data: Bytes,
}
#[derive(Debug, Serialize, Default)]
pub struct RawMessageRequest {
#[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
sequence: Option<u64>,
#[serde(rename = "last_by_subj", skip_serializing_if = "Option::is_none")]
last_by_subject: Option<String>,
#[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
next_by_subject: Option<String>,
}
trait RawMessageResponse: Sized {
fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError>;
}
impl RawMessageResponse for StreamMessage {
fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
StreamMessage::try_from(message)
.map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
}
}
impl RawMessageResponse for StreamValue {
fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
let decoded_payload = STANDARD.decode(message.payload).map_err(|err| {
RawMessageError::with_source(
RawMessageErrorKind::Other,
Box::new(std::io::Error::other(err)),
)
})?;
Ok(StreamValue {
data: decoded_payload.into(),
})
}
}
pub struct RawMessageBuilder<T = WithHeaders> {
context: Context,
stream_name: String,
request: RawMessageRequest,
_phantom: std::marker::PhantomData<T>,
}
impl RawMessageBuilder<WithHeaders> {
fn new(context: Context, stream_name: String) -> Self {
RawMessageBuilder {
context,
stream_name,
request: RawMessageRequest::default(),
_phantom: std::marker::PhantomData,
}
}
}
impl<T> RawMessageBuilder<T> {
async fn send_internal<R: RawMessageResponse>(&self) -> Result<R, RawMessageError> {
for subject in [&self.request.last_by_subject, &self.request.next_by_subject]
.into_iter()
.flatten()
{
if !is_valid_subject(subject) {
return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
}
}
let subject = format!("STREAM.MSG.GET.{}", &self.stream_name);
let response: Response<GetRawMessage> = self
.context
.request(subject, &self.request)
.map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
.await?;
match response {
Response::Err { error } => {
if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
Err(RawMessageError::new(RawMessageErrorKind::NoMessageFound))
} else {
Err(RawMessageError::new(RawMessageErrorKind::JetStream(error)))
}
}
Response::Ok(value) => R::from_raw_message(value.message),
}
}
pub fn sequence(mut self, seq: u64) -> Self {
self.request.sequence = Some(seq);
self
}
pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
self.request.last_by_subject = Some(subject.into());
self
}
pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
self.request.next_by_subject = Some(subject.into());
self
}
}
impl RawMessageBuilder<WithHeaders> {
pub async fn send(self) -> Result<StreamMessage, RawMessageError> {
self.send_internal::<StreamMessage>().await
}
}
impl RawMessageBuilder<WithoutHeaders> {
pub async fn send(self) -> Result<StreamValue, RawMessageError> {
self.send_internal::<StreamValue>().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn consumer_limits_de() {
let config = Config {
..Default::default()
};
let roundtrip: Config = {
let ser = serde_json::to_string(&config).unwrap();
serde_json::from_str(&ser).unwrap()
};
assert_eq!(config, roundtrip);
}
}