use std::{
fmt::Debug,
future::IntoFuture,
io::{self, ErrorKind},
pin::Pin,
str::FromStr,
time::Duration,
};
use crate::{header::HeaderName, HeaderMap, HeaderValue};
use crate::{Error, StatusCode};
use bytes::Bytes;
use futures::Future;
use serde::{Deserialize, Serialize};
use serde_json::json;
use time::{serde::rfc3339, OffsetDateTime};
use super::{
consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
response::Response,
Context, Message,
};
#[derive(Debug, Clone)]
pub struct Stream {
pub(crate) info: Info,
pub(crate) context: Context,
}
impl Stream {
pub async fn info(&mut self) -> Result<&Info, Error> {
let subject = format!("STREAM.INFO.{}", self.info.config.name);
match self.context.request(subject, &json!({})).await? {
Response::Ok::<Info>(info) => {
self.info = info;
Ok(&self.info)
}
Response::Err { error } => Err(Box::new(std::io::Error::new(
ErrorKind::Other,
format!(
"nats: error while getting stream info: {}, {}, {}",
error.code, error.status, error.description
),
))),
}
}
pub fn cached_info(&self) -> &Info {
&self.info
}
pub async fn direct_get_next_for_subject<T: AsRef<str>>(
&self,
subject: T,
sequence: Option<u64>,
) -> Result<Message, Error> {
let request_subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let payload;
if let Some(sequence) = sequence {
payload = json!({
"seq": sequence,
"next_by_subj": subject.as_ref(),
});
} else {
payload = json!({
"next_by_subj": subject.as_ref(),
});
}
let response = self
.context
.client
.request(
request_subject,
serde_json::to_vec(&payload).map(Bytes::from)?,
)
.await
.map(|message| Message {
message,
context: self.context.clone(),
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{status} {description}"),
)));
}
}
Ok(response)
}
pub async fn direct_get_first_for_subject<T: AsRef<str>>(
&self,
subject: T,
) -> Result<Message, Error> {
let request_subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let payload = json!({
"next_by_subj": subject.as_ref(),
});
let response = self
.context
.client
.request(
request_subject,
serde_json::to_vec(&payload).map(Bytes::from)?,
)
.await
.map(|message| Message {
message,
context: self.context.clone(),
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{status} {description}"),
)));
}
}
Ok(response)
}
pub async fn direct_get(&self, sequence: u64) -> Result<Message, Error> {
let subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let payload = json!({
"seq": sequence,
});
let response = self
.context
.client
.request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
.await
.map(|message| Message {
context: self.context.clone(),
message,
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{status} {description}"),
)));
}
}
Ok(response)
}
pub async fn direct_get_last_for_subject<T: AsRef<str>>(
&self,
subject: T,
) -> Result<Message, Error> {
let subject = format!(
"{}.DIRECT.GET.{}.{}",
&self.context.prefix,
&self.info.config.name,
subject.as_ref()
);
let response = self
.context
.client
.request(subject, "".into())
.await
.map(|message| Message {
context: self.context.clone(),
message,
})?;
if let Some(status) = response.status {
if let Some(ref description) = response.description {
match status {
StatusCode::NOT_FOUND => {
return Err(Box::from(std::io::Error::new(
ErrorKind::NotFound,
"message not found in stream",
)))
}
StatusCode::TIMEOUT => {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
"empty or invalid request",
)))
}
other => {
return Err(Box::from(std::io::Error::new(
ErrorKind::Other,
format!("{other}: {description}"),
)))
}
}
}
}
Ok(response)
}
pub async fn get_raw_message(&self, sequence: u64) -> Result<RawMessage, Error> {
let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name);
let payload = json!({
"seq": sequence,
});
let response: Response<GetRawMessage> = self.context.request(subject, &payload).await?;
match response {
Response::Err { error } => Err(Box::new(std::io::Error::new(
ErrorKind::Other,
format!(
"nats: error while getting message: {}, {}",
error.code, error.description
),
))),
Response::Ok(value) => Ok(value.message),
}
}
pub async fn get_last_raw_message_by_subject(
&self,
stream_subject: &str,
) -> Result<RawMessage, Error> {
let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name);
let payload = json!({
"last_by_subj": stream_subject,
});
let response: Response<GetRawMessage> = self.context.request(subject, &payload).await?;
match response {
Response::Err { error } => Err(Box::new(std::io::Error::new(ErrorKind::Other, error))),
Response::Ok(value) => Ok(value.message),
}
}
pub async fn delete_message(&self, sequence: u64) -> Result<bool, Error> {
let subject = format!("STREAM.MSG.DELETE.{}", &self.info.config.name);
let payload = json!({
"seq": sequence,
});
let response: Response<DeleteStatus> = self.context.request(subject, &payload).await?;
match response {
Response::Err { error } => Err(Box::new(std::io::Error::new(
ErrorKind::Other,
format!(
"nats: error while deleting message: {}, {}",
error.code, error.status
),
))),
Response::Ok(value) => Ok(value.success),
}
}
pub fn purge(&self) -> Purge<No, No> {
Purge::build(self.clone())
}
#[deprecated(
since = "0.25.0",
note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
)]
pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, Error>
where
T: Into<String>,
{
self.purge().filter(subject).await
}
pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C,
) -> Result<Consumer<C>, Error> {
let config = config.into_consumer_config();
let subject = {
if self.context.client.is_server_compatible(2, 9, 0) {
let filter = if config.filter_subject.is_empty() {
"".to_string()
} else {
format!(".{}", config.filter_subject)
};
config
.name
.as_ref()
.or(config.durable_name.as_ref())
.map(|name| {
format!(
"CONSUMER.CREATE.{}.{}{}",
self.info.config.name, name, filter
)
})
.unwrap_or_else(|| format!("CONSUMER.CREATE.{}", self.info.config.name))
} else if config.name.is_some() {
return Err(Box::new(std::io::Error::new(
ErrorKind::Other,
"can't use consumer name with server below version 2.9",
)));
} else if let Some(ref durable_name) = config.durable_name {
format!(
"CONSUMER.DURABLE.CREATE.{}.{}",
self.info.config.name, durable_name
)
} else {
format!("CONSUMER.CREATE.{}", self.info.config.name)
}
};
match self
.context
.request(
subject,
&json!({"stream_name": self.info.config.name.clone(), "config": config}),
)
.await?
{
Response::Err { error } => Err(Box::new(std::io::Error::new(
ErrorKind::Other,
format!(
"nats: error while creating stream: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
FromConsumer::try_from_consumer_config(info.clone().config)?,
info,
self.context.clone(),
)),
}
}
pub async fn consumer_info<T: AsRef<str>>(&self, name: T) -> Result<consumer::Info, Error> {
let name = name.as_ref();
let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name);
match self.context.request(subject, &json!({})).await? {
Response::Ok(info) => Ok(info),
Response::Err { error } => Err(Box::new(std::io::Error::new(
ErrorKind::Other,
format!(
"nats: error while getting consumer info: {}, {}, {}",
error.code, error.status, error.description
),
))),
}
}
pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
) -> Result<Consumer<T>, Error> {
let info = self.consumer_info(name).await?;
Ok(Consumer::new(
T::try_from_consumer_config(info.config.clone())?,
info,
self.context.clone(),
))
}
pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
config: T,
) -> Result<Consumer<T>, Error> {
let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name);
match self.context.request(subject, &json!({})).await? {
Response::Err { error } if error.status == 404 => self.create_consumer(config).await,
Response::Err { error } => Err(Box::new(io::Error::new(
ErrorKind::Other,
format!(
"nats: error while getting or creating stream: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
T::try_from_consumer_config(info.config.clone())?,
info,
self.context.clone(),
)),
}
}
pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, Error> {
let subject = format!("CONSUMER.DELETE.{}.{}", self.info.config.name, name);
match self.context.request(subject, &json!({})).await? {
Response::Ok(delete_status) => Ok(delete_status),
Response::Err { error } => Err(Box::new(std::io::Error::new(
ErrorKind::Other,
format!(
"nats: error while deleting consumer: {}, {}, {}",
error.code, error.status, error.description
),
))),
}
}
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Config {
pub name: String,
pub max_bytes: i64,
#[serde(rename = "max_msgs")]
pub max_messages: i64,
#[serde(rename = "max_msgs_per_subject")]
pub max_messages_per_subject: i64,
pub discard: DiscardPolicy,
#[serde(default, skip_serializing_if = "is_default")]
pub discard_new_per_subject: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub subjects: Vec<String>,
pub retention: RetentionPolicy,
pub max_consumers: i32,
#[serde(with = "serde_nanos")]
pub max_age: Duration,
#[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
pub max_message_size: i32,
pub storage: StorageType,
pub num_replicas: usize,
#[serde(default, skip_serializing_if = "is_default")]
pub no_ack: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub duplicate_window: i64,
#[serde(default, skip_serializing_if = "is_default")]
pub template_owner: String,
#[serde(default, skip_serializing_if = "is_default")]
pub sealed: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub description: Option<String>,
#[serde(
default,
rename = "allow_rollup_hdrs",
skip_serializing_if = "is_default"
)]
pub allow_rollup: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub deny_delete: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub deny_purge: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub republish: Option<Republish>,
#[serde(default, skip_serializing_if = "is_default")]
pub allow_direct: bool,
#[serde(default, skip_serializing_if = "is_default")]
pub mirror_direct: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mirror: Option<Source>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sources: Option<Vec<Source>>,
}
impl From<&Config> for Config {
fn from(sc: &Config) -> Config {
sc.clone()
}
}
impl From<&str> for Config {
fn from(s: &str) -> Config {
Config {
name: s.to_string(),
..Default::default()
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct Republish {
#[serde(rename = "src")]
pub source: String,
#[serde(rename = "dest")]
pub destination: String,
#[serde(default)]
pub headers_only: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum DiscardPolicy {
#[serde(rename = "old")]
Old = 0,
#[serde(rename = "new")]
New = 1,
}
impl Default for DiscardPolicy {
fn default() -> DiscardPolicy {
DiscardPolicy::Old
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum RetentionPolicy {
#[serde(rename = "limits")]
Limits = 0,
#[serde(rename = "interest")]
Interest = 1,
#[serde(rename = "workqueue")]
WorkQueue = 2,
}
impl Default for RetentionPolicy {
fn default() -> RetentionPolicy {
RetentionPolicy::Limits
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StorageType {
#[serde(rename = "file")]
File = 0,
#[serde(rename = "memory")]
Memory = 1,
}
impl Default for StorageType {
fn default() -> StorageType {
StorageType::File
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Info {
pub config: Config,
#[serde(with = "rfc3339")]
pub created: time::OffsetDateTime,
pub state: State,
#[serde(default)]
pub cluster: Option<ClusterInfo>,
}
#[derive(Deserialize)]
pub struct DeleteStatus {
pub success: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub struct State {
pub messages: u64,
pub bytes: u64,
#[serde(rename = "first_seq")]
pub first_sequence: u64,
#[serde(with = "rfc3339", rename = "first_ts")]
pub first_timestamp: time::OffsetDateTime,
#[serde(rename = "last_seq")]
pub last_sequence: u64,
#[serde(with = "rfc3339", rename = "last_ts")]
pub last_timestamp: time::OffsetDateTime,
pub consumer_count: usize,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RawMessage {
#[serde(rename = "subject")]
pub subject: String,
#[serde(rename = "seq")]
pub sequence: u64,
#[serde(default, rename = "data")]
pub payload: String,
#[serde(default, rename = "hdrs")]
pub headers: Option<String>,
#[serde(rename = "time", with = "rfc3339")]
pub time: time::OffsetDateTime,
}
impl TryFrom<RawMessage> for crate::Message {
type Error = Error;
fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
let decoded_payload = base64::decode(value.payload)
.map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?;
let decoded_headers = value
.headers
.map(base64::decode)
.map_or(Ok(None), |v| v.map(Some))?;
let length = decoded_headers
.as_ref()
.map_or_else(|| 0, |headers| headers.len())
+ decoded_payload.len()
+ value.subject.len();
let (headers, status, description) =
decoded_headers.map_or_else(|| Ok((None, None, None)), |h| parse_headers(&h))?;
Ok(crate::Message {
subject: value.subject,
reply: None,
payload: decoded_payload.into(),
headers,
status,
description,
length,
})
}
}
fn is_continuation(c: char) -> bool {
c == ' ' || c == '\t'
}
const HEADER_LINE: &str = "NATS/1.0";
const HEADER_LINE_LEN: usize = HEADER_LINE.len();
#[allow(clippy::type_complexity)]
fn parse_headers(
buf: &[u8],
) -> Result<(Option<HeaderMap>, Option<StatusCode>, Option<String>), Error> {
let mut headers = HeaderMap::new();
let mut maybe_status: Option<StatusCode> = None;
let mut maybe_description: Option<String> = None;
let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
line.lines().peekable()
} else {
return Err(Box::new(std::io::Error::new(
ErrorKind::Other,
"invalid header",
)));
};
if let Some(line) = lines.next() {
if !line.starts_with(HEADER_LINE) {
return Err(Box::new(std::io::Error::new(
ErrorKind::Other,
"version lie does not start with NATS/1.0",
)));
}
if let Some(slice) = line.get(HEADER_LINE_LEN..).map(|s| s.trim()) {
match slice.split_once(' ') {
Some((status, description)) => {
if !status.is_empty() {
maybe_status = Some(status.trim().parse()?);
}
if !description.is_empty() {
maybe_description = Some(description.trim().to_string());
}
}
None => {
if !slice.is_empty() {
maybe_status = Some(slice.trim().parse()?);
}
}
}
}
} else {
return Err(Box::new(std::io::Error::new(
ErrorKind::Other,
"expected header information not found",
)));
};
while let Some(line) = lines.next() {
if line.is_empty() {
continue;
}
if let Some((k, v)) = line.split_once(':').to_owned() {
let mut s = String::from(v.trim());
while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
s.push(' ');
s.push_str(v.trim());
}
headers.insert(
HeaderName::from_str(k)?,
HeaderValue::from_str(&s)
.map_err(|err| Box::new(io::Error::new(ErrorKind::Other, err)))?,
);
} else {
return Err(Box::new(std::io::Error::new(
ErrorKind::Other,
"malformed header line",
)));
}
}
if headers.is_empty() {
Ok((None, maybe_status, maybe_description))
} else {
Ok((Some(headers), maybe_status, maybe_description))
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct GetRawMessage {
pub(crate) message: RawMessage,
}
fn is_default<T: Default + Eq>(t: &T) -> bool {
t == &T::default()
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ClusterInfo {
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub leader: Option<String>,
#[serde(default)]
pub replicas: Vec<PeerInfo>,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct PeerInfo {
pub name: String,
pub current: bool,
#[serde(with = "serde_nanos")]
pub active: Duration,
#[serde(default)]
pub offline: bool,
pub lag: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub struct PurgeResponse {
pub success: bool,
pub purged: u64,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
pub struct PurgeRequest {
#[serde(default, rename = "seq", skip_serializing_if = "is_default")]
pub sequence: Option<u64>,
#[serde(default, skip_serializing_if = "is_default")]
pub filter: Option<String>,
#[serde(default, skip_serializing_if = "is_default")]
pub keep: Option<u64>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct Source {
pub name: String,
#[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
pub start_sequence: Option<u64>,
#[serde(
default,
rename = "opt_start_time",
skip_serializing_if = "is_default",
with = "rfc3339::option"
)]
pub start_time: Option<OffsetDateTime>,
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub external: Option<External>,
#[serde(default, skip_serializing_if = "is_default")]
pub domain: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct External {
#[serde(rename = "api")]
pub api_prefix: String,
#[serde(rename = "deliver", skip_serializing_if = "is_default")]
pub delivery_prefix: Option<String>,
}
use std::marker::PhantomData;
#[derive(Debug, Default)]
pub struct Yes;
#[derive(Debug, Default)]
pub struct No;
pub trait ToAssign: Debug {}
impl ToAssign for Yes {}
impl ToAssign for No {}
#[derive(Debug)]
pub struct Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
stream: Stream,
inner: PurgeRequest,
sequence_set: PhantomData<SEQUENCE>,
keep_set: PhantomData<KEEP>,
}
impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
self.inner.filter = Some(filter.into());
self
}
}
impl Purge<No, No> {
pub(crate) fn build(stream: Stream) -> Purge<No, No> {
Purge {
stream,
inner: Default::default(),
sequence_set: PhantomData {},
keep_set: PhantomData {},
}
}
}
impl<KEEP> Purge<No, KEEP>
where
KEEP: ToAssign,
{
pub fn keep(self, keep: u64) -> Purge<No, Yes> {
Purge {
stream: self.stream,
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
keep: Some(keep),
..self.inner
},
}
}
}
impl<SEQUENCE> Purge<SEQUENCE, No>
where
SEQUENCE: ToAssign,
{
pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
Purge {
stream: self.stream,
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
sequence: Some(sequence),
..self.inner
},
}
}
}
impl<S, K> IntoFuture for Purge<S, K>
where
S: ToAssign + std::marker::Send,
K: ToAssign + std::marker::Send,
{
type Output = Result<PurgeResponse, Error>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<PurgeResponse, Error>> + Send>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(std::future::IntoFuture::into_future(async move {
let request_subject = format!("STREAM.PURGE.{}", self.stream.info.config.name);
let response: Response<PurgeResponse> = self
.stream
.context
.request(request_subject, &self.inner)
.await?;
match response {
Response::Err { error } => Err(Box::from(io::Error::new(
ErrorKind::Other,
format!(
"error while purging stream: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok(response) => Ok(response),
}
}))
}
}