use super::context::Context;
use crate::header::{IntoHeaderName, IntoHeaderValue};
use crate::subject::ToSubject;
use crate::{error, header, message, Error, HeaderValue};
use crate::{subject::Subject, HeaderMap};
use bytes::Bytes;
use futures_util::future::TryFutureExt;
use futures_util::StreamExt;
use std::fmt::Display;
use std::{mem, time::Duration};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
#[derive(Debug, Clone)]
pub struct StreamMessage {
pub subject: Subject,
pub sequence: u64,
pub headers: HeaderMap,
pub payload: Bytes,
pub time: OffsetDateTime,
}
pub struct OutboundMessage {
pub subject: Subject,
pub payload: Bytes,
pub headers: Option<HeaderMap>,
}
impl OutboundMessage {
pub fn new(subject: Subject, payload: Bytes, headers: Option<HeaderMap>) -> Self {
Self {
subject,
payload,
headers,
}
}
}
impl From<OutboundMessage> for message::OutboundMessage {
fn from(message: OutboundMessage) -> Self {
message::OutboundMessage {
subject: message.subject,
payload: message.payload,
headers: message.headers,
reply: None,
}
}
}
#[derive(Default, Clone, Debug)]
pub struct PublishMessage {
pub(crate) payload: Bytes,
pub(crate) headers: Option<header::HeaderMap>,
}
impl PublishMessage {
pub fn build() -> Self {
Default::default()
}
pub fn payload(mut self, payload: Bytes) -> Self {
self.payload = payload;
self
}
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.headers = Some(headers);
self
}
pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
self.headers
.get_or_insert(header::HeaderMap::new())
.insert(name, value);
self
}
pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
self.header(header::NATS_MESSAGE_ID, id.as_ref())
}
pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
self.header(
header::NATS_EXPECTED_LAST_MESSAGE_ID,
last_message_id.as_ref(),
)
}
pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
self.header(
header::NATS_EXPECTED_LAST_SEQUENCE,
HeaderValue::from(last_sequence),
)
}
pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
self.header(
header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
HeaderValue::from(subject_sequence),
)
}
pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
self.header(
header::NATS_EXPECTED_STREAM,
HeaderValue::from(stream.as_ref()),
)
}
#[cfg(feature = "server_2_11")]
pub fn ttl(self, ttl: Duration) -> Self {
self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
}
pub fn outbound_message<S: ToSubject>(self, subject: S) -> OutboundMessage {
OutboundMessage {
subject: subject.to_subject(),
payload: self.payload,
headers: self.headers,
}
}
}
#[derive(Clone, Debug)]
pub struct Message {
pub message: crate::Message,
pub context: Context,
}
impl TryFrom<crate::Message> for StreamMessage {
type Error = StreamMessageError;
fn try_from(message: crate::Message) -> Result<Self, Self::Error> {
let headers = message.headers.ok_or_else(|| {
StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers")
})?;
let sequence = headers
.get_last(header::NATS_SEQUENCE)
.ok_or_else(|| {
StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence")
})
.and_then(|seq| {
seq.as_str().parse().map_err(|err| {
StreamMessageError::with_source(
StreamMessageErrorKind::ParseError,
format!("could not parse sequence header: {err}"),
)
})
})?;
let time = headers
.get_last(header::NATS_TIME_STAMP)
.ok_or_else(|| {
StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp")
})
.and_then(|time| {
OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| {
StreamMessageError::with_source(
StreamMessageErrorKind::ParseError,
format!("could not parse timestamp header: {err}"),
)
})
})?;
let subject = headers
.get_last(header::NATS_SUBJECT)
.ok_or_else(|| {
StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject")
})?
.as_str()
.into();
Ok(StreamMessage {
subject,
sequence,
headers,
payload: message.payload,
time,
})
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum StreamMessageErrorKind {
MissingHeader,
ParseError,
}
pub type StreamMessageError = error::Error<StreamMessageErrorKind>;
impl Display for StreamMessageErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"),
StreamMessageErrorKind::ParseError => write!(f, "parse error"),
}
}
}
impl std::ops::Deref for Message {
type Target = crate::Message;
fn deref(&self) -> &Self::Target {
&self.message
}
}
impl From<Message> for crate::Message {
fn from(source: Message) -> crate::Message {
source.message
}
}
impl Message {
pub fn split(mut self) -> (crate::Message, Acker) {
let reply = mem::take(&mut self.message.reply);
(
self.message,
Acker {
context: self.context,
reply,
},
)
}
pub async fn ack(&self) -> Result<(), Error> {
if let Some(ref reply) = self.reply {
self.context
.client
.publish(reply.clone(), "".into())
.map_err(Error::from)
.await
} else {
Err(Box::new(std::io::Error::other(
"No reply subject, not a JetStream message",
)))
}
}
pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
if let Some(ref reply) = self.reply {
self.context
.client
.publish(reply.to_owned(), kind.into())
.map_err(Error::from)
.await
} else {
Err(Box::new(std::io::Error::other(
"No reply subject, not a JetStream message",
)))
}
}
pub async fn double_ack_with(&self, ack_kind: AckKind) -> Result<(), Error> {
if let Some(ref reply) = self.reply {
let inbox = self.context.client.new_inbox();
let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
self.context
.client
.publish_with_reply(reply.clone(), inbox, ack_kind.into())
.await?;
match tokio::time::timeout(self.context.timeout, subscription.next())
.await
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::TimedOut,
"double ack response timed out",
)
})? {
Some(_) => Ok(()),
None => Err(Box::new(std::io::Error::other("subscription dropped"))),
}
} else {
Err(Box::new(std::io::Error::other(
"No reply subject, not a JetStream message",
)))
}
}
pub async fn double_ack(&self) -> Result<(), Error> {
self.double_ack_with(AckKind::Ack).await
}
#[allow(clippy::mixed_read_write_in_expression)]
pub fn info(&self) -> Result<Info<'_>, Error> {
const PREFIX: &str = "$JS.ACK.";
const SKIP: usize = PREFIX.len();
let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
})?;
if !reply.starts_with(PREFIX) {
return Err(Box::new(std::io::Error::other(
"did not found proper prefix",
)));
}
reply = &reply[SKIP..];
let mut split = reply.split('.');
let mut tokens: [Option<&str>; 10] = [None; 10];
let mut n_tokens = 0;
for each_token in &mut tokens {
if let Some(token) = split.next() {
*each_token = Some(token);
n_tokens += 1;
}
}
let mut token_index = 0;
macro_rules! try_parse {
() => {
match str::parse(try_parse!(str)) {
Ok(parsed) => parsed,
Err(e) => {
return Err(Box::new(e));
}
}
};
(str) => {
if let Some(next) = tokens[token_index].take() {
#[allow(unused)]
{
token_index += 1;
}
next
} else {
return Err(Box::new(std::io::Error::other("too few tokens")));
}
};
}
if n_tokens >= 9 {
Ok(Info {
domain: {
let domain: &str = try_parse!(str);
if domain == "_" {
None
} else {
Some(domain)
}
},
acc_hash: Some(try_parse!(str)),
stream: try_parse!(str),
consumer: try_parse!(str),
delivered: try_parse!(),
stream_sequence: try_parse!(),
consumer_sequence: try_parse!(),
published: {
let nanos: i128 = try_parse!();
OffsetDateTime::from_unix_timestamp_nanos(nanos)?
},
pending: try_parse!(),
token: if n_tokens >= 9 {
Some(try_parse!(str))
} else {
None
},
})
} else if n_tokens == 7 {
Ok(Info {
domain: None,
acc_hash: None,
stream: try_parse!(str),
consumer: try_parse!(str),
delivered: try_parse!(),
stream_sequence: try_parse!(),
consumer_sequence: try_parse!(),
published: {
let nanos: i128 = try_parse!();
OffsetDateTime::from_unix_timestamp_nanos(nanos)?
},
pending: try_parse!(),
token: None,
})
} else {
Err(Box::new(std::io::Error::other("bad token number")))
}
}
}
pub struct Acker {
context: Context,
reply: Option<Subject>,
}
impl Acker {
pub fn new(context: Context, reply: Option<Subject>) -> Self {
Self { context, reply }
}
pub async fn ack(&self) -> Result<(), Error> {
if let Some(ref reply) = self.reply {
self.context
.client
.publish(reply.to_owned(), "".into())
.map_err(Error::from)
.await
} else {
Err(Box::new(std::io::Error::other(
"No reply subject, not a JetStream message",
)))
}
}
pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
if let Some(ref reply) = self.reply {
self.context
.client
.publish(reply.to_owned(), kind.into())
.map_err(Error::from)
.await
} else {
Err(Box::new(std::io::Error::other(
"No reply subject, not a JetStream message",
)))
}
}
pub async fn double_ack_with(&self, ack_kind: AckKind) -> Result<(), Error> {
if let Some(ref reply) = self.reply {
let inbox = self.context.client.new_inbox();
let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
self.context
.client
.publish_with_reply(reply.to_owned(), inbox, ack_kind.into())
.await?;
match tokio::time::timeout(self.context.timeout, subscription.next())
.await
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::TimedOut,
"double ack response timed out",
)
})? {
Some(_) => Ok(()),
None => Err(Box::new(std::io::Error::other("subscription dropped"))),
}
} else {
Err(Box::new(std::io::Error::other(
"No reply subject, not a JetStream message",
)))
}
}
pub async fn double_ack(&self) -> Result<(), Error> {
self.double_ack_with(AckKind::Ack).await
}
}
#[derive(Debug, Clone, Copy)]
pub enum AckKind {
Ack,
Nak(Option<Duration>),
Progress,
Next,
Term,
}
impl From<AckKind> for Bytes {
fn from(kind: AckKind) -> Self {
use AckKind::*;
match kind {
Ack => Bytes::from_static(b"+ACK"),
Nak(maybe_duration) => match maybe_duration {
None => Bytes::from_static(b"-NAK"),
Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
},
Progress => Bytes::from_static(b"+WPI"),
Next => Bytes::from_static(b"+NXT"),
Term => Bytes::from_static(b"+TERM"),
}
}
}
#[derive(Debug, Clone)]
pub struct Info<'a> {
pub domain: Option<&'a str>,
pub acc_hash: Option<&'a str>,
pub stream: &'a str,
pub consumer: &'a str,
pub stream_sequence: u64,
pub consumer_sequence: u64,
pub delivered: i64,
pub pending: u64,
pub published: time::OffsetDateTime,
pub token: Option<&'a str>,
}