use super::nats::{Message, NatsClient, NatsError};
use crate::cx::Cx;
use crate::time::{timeout_at, wall_now};
use crate::tracing_compat::warn;
use crate::types::Time;
use std::fmt;
use std::fmt::Write as _;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
#[derive(Debug)]
pub enum JsError {
Nats(NatsError),
Api {
code: u32,
description: String,
},
StreamNotFound(String),
ConsumerNotFound {
stream: String,
consumer: String,
},
NotAcked,
AlreadyAcknowledged,
InvalidConfig(String),
ParseError(String),
}
impl fmt::Display for JsError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Nats(e) => write!(f, "JetStream NATS error: {e}"),
Self::Api { code, description } => {
write!(f, "JetStream API error {code}: {description}")
}
Self::StreamNotFound(name) => write!(f, "JetStream stream not found: {name}"),
Self::ConsumerNotFound { stream, consumer } => {
write!(f, "JetStream consumer not found: {stream}/{consumer}")
}
Self::NotAcked => write!(f, "JetStream message not acknowledged"),
Self::AlreadyAcknowledged => {
write!(
f,
"JetStream message already acknowledged/nacked/terminated"
)
}
Self::InvalidConfig(msg) => write!(f, "JetStream invalid config: {msg}"),
Self::ParseError(msg) => write!(f, "JetStream parse error: {msg}"),
}
}
}
impl std::error::Error for JsError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Nats(e) => Some(e),
_ => None,
}
}
}
impl From<NatsError> for JsError {
fn from(err: NatsError) -> Self {
Self::Nats(err)
}
}
impl JsError {
#[must_use]
pub fn is_transient(&self) -> bool {
match self {
Self::Nats(e) => e.is_transient(),
Self::Api { code, .. } => matches!(code, 503 | 408),
Self::NotAcked => true,
_ => false,
}
}
#[must_use]
pub fn is_connection_error(&self) -> bool {
matches!(self, Self::Nats(e) if e.is_connection_error())
}
#[must_use]
pub fn is_capacity_error(&self) -> bool {
matches!(self, Self::Api { code: 429, .. })
}
#[must_use]
pub fn is_timeout(&self) -> bool {
match self {
Self::Nats(e) => e.is_timeout(),
Self::Api { code: 408, .. } | Self::NotAcked => true,
_ => false,
}
}
#[must_use]
pub fn is_retryable(&self) -> bool {
self.is_transient()
}
}
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub name: String,
pub subjects: Vec<String>,
pub retention: RetentionPolicy,
pub storage: StorageType,
pub max_msgs: Option<i64>,
pub max_bytes: Option<i64>,
pub max_age: Option<Duration>,
pub max_msg_size: Option<i32>,
pub discard: DiscardPolicy,
pub replicas: u32,
pub duplicate_window: Option<Duration>,
}
impl StreamConfig {
#[must_use]
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
subjects: Vec::new(),
retention: RetentionPolicy::Limits,
storage: StorageType::File,
max_msgs: None,
max_bytes: None,
max_age: None,
max_msg_size: None,
discard: DiscardPolicy::Old,
replicas: 1,
duplicate_window: None,
}
}
#[must_use]
pub fn subjects(mut self, subjects: &[&str]) -> Self {
self.subjects = subjects.iter().map(|s| (*s).to_string()).collect();
self
}
#[must_use]
pub fn retention(mut self, policy: RetentionPolicy) -> Self {
self.retention = policy;
self
}
#[must_use]
pub fn storage(mut self, storage: StorageType) -> Self {
self.storage = storage;
self
}
#[must_use]
pub fn max_messages(mut self, max: i64) -> Self {
self.max_msgs = Some(max);
self
}
#[must_use]
pub fn max_bytes(mut self, max: i64) -> Self {
self.max_bytes = Some(max);
self
}
#[must_use]
pub fn max_age(mut self, age: Duration) -> Self {
self.max_age = Some(age);
self
}
#[must_use]
pub fn replicas(mut self, count: u32) -> Self {
self.replicas = count;
self
}
#[must_use]
pub fn duplicate_window(mut self, window: Duration) -> Self {
self.duplicate_window = Some(window);
self
}
fn to_json(&self) -> String {
let mut json = String::from("{");
write!(&mut json, "\"name\":\"{}\"", json_escape(&self.name)).expect("write to String");
if !self.subjects.is_empty() {
json.push_str(",\"subjects\":[");
for (i, s) in self.subjects.iter().enumerate() {
if i > 0 {
json.push(',');
}
write!(&mut json, "\"{}\"", json_escape(s)).expect("write to String");
}
json.push(']');
}
write!(&mut json, ",\"retention\":\"{}\"", self.retention.as_str())
.expect("write to String");
write!(&mut json, ",\"storage\":\"{}\"", self.storage.as_str()).expect("write to String");
write!(&mut json, ",\"discard\":\"{}\"", self.discard.as_str()).expect("write to String");
write!(&mut json, ",\"num_replicas\":{}", self.replicas).expect("write to String");
if let Some(max) = self.max_msgs {
write!(&mut json, ",\"max_msgs\":{max}").expect("write to String");
}
if let Some(max) = self.max_bytes {
write!(&mut json, ",\"max_bytes\":{max}").expect("write to String");
}
if let Some(age) = self.max_age {
write!(&mut json, ",\"max_age\":{}", age.as_nanos()).expect("write to String");
}
if let Some(size) = self.max_msg_size {
write!(&mut json, ",\"max_msg_size\":{size}").expect("write to String");
}
if let Some(window) = self.duplicate_window {
write!(&mut json, ",\"duplicate_window\":{}", window.as_nanos())
.expect("write to String");
}
json.push('}');
json
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RetentionPolicy {
#[default]
Limits,
Interest,
WorkQueue,
}
impl RetentionPolicy {
fn as_str(self) -> &'static str {
match self {
Self::Limits => "limits",
Self::Interest => "interest",
Self::WorkQueue => "workqueue",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum StorageType {
#[default]
File,
Memory,
}
impl StorageType {
fn as_str(self) -> &'static str {
match self {
Self::File => "file",
Self::Memory => "memory",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DiscardPolicy {
#[default]
Old,
New,
}
impl DiscardPolicy {
fn as_str(self) -> &'static str {
match self {
Self::Old => "old",
Self::New => "new",
}
}
}
#[derive(Debug, Clone)]
pub struct StreamInfo {
pub config: StreamConfig,
pub state: StreamState,
}
#[derive(Debug, Clone, Default)]
pub struct StreamState {
pub messages: u64,
pub bytes: u64,
pub first_seq: u64,
pub last_seq: u64,
pub consumer_count: u32,
}
#[derive(Debug, Clone)]
pub struct ConsumerConfig {
pub name: Option<String>,
pub durable_name: Option<String>,
pub deliver_policy: DeliverPolicy,
pub ack_policy: AckPolicy,
pub ack_wait: Duration,
pub max_deliver: i64,
pub filter_subject: Option<String>,
pub max_ack_pending: i64,
}
impl ConsumerConfig {
#[must_use]
pub fn new(name: impl Into<String>) -> Self {
Self {
name: Some(name.into()),
durable_name: None,
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
ack_wait: Duration::from_secs(30),
max_deliver: -1,
filter_subject: None,
max_ack_pending: 1000,
}
}
#[must_use]
pub fn ephemeral() -> Self {
Self {
name: None,
durable_name: None,
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
ack_wait: Duration::from_secs(30),
max_deliver: -1,
filter_subject: None,
max_ack_pending: 1000,
}
}
#[must_use]
pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
self.deliver_policy = policy;
self
}
#[must_use]
pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
self.ack_policy = policy;
self
}
#[must_use]
pub fn ack_wait(mut self, wait: Duration) -> Self {
self.ack_wait = wait;
self
}
#[must_use]
pub fn max_deliver(mut self, max: i64) -> Self {
self.max_deliver = max;
self
}
#[must_use]
pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
self.filter_subject = Some(subject.into());
self
}
fn to_json(&self) -> String {
let mut json = String::from("{");
let mut parts = Vec::new();
if let Some(ref name) = self.name {
parts.push(format!("\"name\":\"{}\"", json_escape(name)));
}
if let Some(ref durable) = self.durable_name {
parts.push(format!("\"durable_name\":\"{}\"", json_escape(durable)));
}
parts.push(format!(
"\"deliver_policy\":\"{}\"",
self.deliver_policy.as_str()
));
if let DeliverPolicy::ByStartSequence(seq) = self.deliver_policy {
parts.push(format!("\"opt_start_seq\":{seq}"));
}
parts.push(format!("\"ack_policy\":\"{}\"", self.ack_policy.as_str()));
parts.push(format!("\"ack_wait\":{}", self.ack_wait.as_nanos()));
parts.push(format!("\"max_deliver\":{}", self.max_deliver));
parts.push(format!("\"max_ack_pending\":{}", self.max_ack_pending));
if let Some(ref filter) = self.filter_subject {
parts.push(format!("\"filter_subject\":\"{}\"", json_escape(filter)));
}
json.push_str(&parts.join(","));
json.push('}');
json
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DeliverPolicy {
#[default]
All,
New,
ByStartSequence(u64),
Last,
LastPerSubject,
}
impl DeliverPolicy {
fn as_str(self) -> &'static str {
match self {
Self::All => "all",
Self::New => "new",
Self::ByStartSequence(_) => "by_start_sequence",
Self::Last => "last",
Self::LastPerSubject => "last_per_subject",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AckPolicy {
#[default]
Explicit,
None,
All,
}
impl AckPolicy {
fn as_str(self) -> &'static str {
match self {
Self::Explicit => "explicit",
Self::None => "none",
Self::All => "all",
}
}
}
#[derive(Debug, Clone)]
pub struct PubAck {
pub stream: String,
pub seq: u64,
pub duplicate: bool,
}
pub struct JsMessage {
pub subject: String,
pub payload: Vec<u8>,
pub sequence: u64,
pub delivered: u32,
reply_subject: String,
acked: AtomicBool,
}
impl fmt::Debug for JsMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JsMessage")
.field("subject", &self.subject)
.field("sequence", &self.sequence)
.field("delivered", &self.delivered)
.field("payload_len", &self.payload.len())
.field("reply_subject", &self.reply_subject)
.field("acked", &self.acked.load(Ordering::Relaxed))
.finish()
}
}
impl JsMessage {
pub fn is_acked(&self) -> bool {
self.acked.load(Ordering::Acquire)
}
}
impl Drop for JsMessage {
fn drop(&mut self) {
if !self.acked.load(Ordering::Acquire) {
warn!(
subject = %self.subject,
sequence = self.sequence,
"JetStream message dropped without ack/nack - will be redelivered"
);
}
}
}
pub struct JetStreamContext {
client: NatsClient,
prefix: String,
}
impl fmt::Debug for JetStreamContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JetStreamContext")
.field("prefix", &self.prefix)
.finish_non_exhaustive()
}
}
impl JetStreamContext {
pub fn new(client: NatsClient) -> Self {
Self {
client,
prefix: "$JS.API".to_string(),
}
}
pub fn with_prefix(client: NatsClient, prefix: impl Into<String>) -> Self {
Self {
client,
prefix: prefix.into(),
}
}
pub async fn create_stream(
&mut self,
cx: &Cx,
config: StreamConfig,
) -> Result<StreamInfo, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let subject = format!("{}.STREAM.CREATE.{}", self.prefix, config.name);
let payload = config.to_json();
let response = self
.client
.request(cx, &subject, payload.as_bytes())
.await?;
Self::parse_stream_info(&response.payload)
}
pub async fn get_stream(&mut self, cx: &Cx, name: &str) -> Result<StreamInfo, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let subject = format!("{}.STREAM.INFO.{}", self.prefix, name);
let response = self.client.request(cx, &subject, b"").await?;
Self::parse_stream_info(&response.payload)
}
pub async fn delete_stream(&mut self, cx: &Cx, name: &str) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let subject = format!("{}.STREAM.DELETE.{}", self.prefix, name);
let response = self.client.request(cx, &subject, b"").await?;
let response_str = String::from_utf8_lossy(&response.payload);
if response_str.contains("\"error\":{\"code\":") {
return Err(Self::parse_api_error(&response_str));
}
Ok(())
}
pub async fn publish(
&mut self,
cx: &Cx,
subject: &str,
payload: &[u8],
) -> Result<PubAck, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let response = self.client.request(cx, subject, payload).await?;
Self::parse_pub_ack(&response.payload)
}
pub fn publish_with_id(
&mut self,
cx: &Cx,
subject: &str,
msg_id: &str,
payload: &[u8],
) -> Result<PubAck, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
Err(JsError::InvalidConfig(format!(
"publish_with_id requires NATS headers (Nats-Msg-Id); subject={subject} msg_id={msg_id} payload_len={}",
payload.len()
)))
}
pub async fn create_consumer(
&mut self,
cx: &Cx,
stream: &str,
config: ConsumerConfig,
) -> Result<Consumer, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let consumer_name = config.name.clone().unwrap_or_default();
let subject = if consumer_name.is_empty() {
format!("{}.CONSUMER.CREATE.{}", self.prefix, stream)
} else {
format!(
"{}.CONSUMER.CREATE.{}.{}",
self.prefix, stream, consumer_name
)
};
let payload = format!(
"{{\"stream_name\":\"{}\",\"config\":{}}}",
json_escape(stream),
config.to_json()
);
let response = self
.client
.request(cx, &subject, payload.as_bytes())
.await?;
let response_str = String::from_utf8_lossy(&response.payload);
if response_str.contains("\"error\":{\"code\":") {
return Err(Self::parse_api_error(&response_str));
}
let name = extract_json_string_simple(&response_str, "name")
.unwrap_or_else(|| consumer_name.clone());
Ok(Consumer {
stream: stream.to_string(),
name,
prefix: self.prefix.clone(),
})
}
pub async fn get_consumer(
&mut self,
cx: &Cx,
stream: &str,
consumer: &str,
) -> Result<Consumer, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let subject = format!("{}.CONSUMER.INFO.{}.{}", self.prefix, stream, consumer);
let response = self.client.request(cx, &subject, b"").await?;
let response_str = String::from_utf8_lossy(&response.payload);
if response_str.contains("\"error\":{\"code\":") {
return Err(Self::parse_api_error(&response_str));
}
Ok(Consumer {
stream: stream.to_string(),
name: consumer.to_string(),
prefix: self.prefix.clone(),
})
}
pub async fn delete_consumer(
&mut self,
cx: &Cx,
stream: &str,
consumer: &str,
) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let subject = format!("{}.CONSUMER.DELETE.{}.{}", self.prefix, stream, consumer);
let response = self.client.request(cx, &subject, b"").await?;
let response_str = String::from_utf8_lossy(&response.payload);
if response_str.contains("\"error\":{\"code\":") {
return Err(Self::parse_api_error(&response_str));
}
Ok(())
}
pub fn client(&mut self) -> &mut NatsClient {
&mut self.client
}
fn parse_stream_info(payload: &[u8]) -> Result<StreamInfo, JsError> {
let json = String::from_utf8_lossy(payload);
if json.contains("\"error\":{\"code\":") {
return Err(Self::parse_api_error(&json));
}
let name = extract_json_string_simple(&json, "name")
.ok_or_else(|| JsError::ParseError("missing stream name".to_string()))?;
let state = StreamState {
messages: extract_json_u64(&json, "messages").unwrap_or(0),
bytes: extract_json_u64(&json, "bytes").unwrap_or(0),
first_seq: extract_json_u64(&json, "first_seq").unwrap_or(0),
last_seq: extract_json_u64(&json, "last_seq").unwrap_or(0),
consumer_count: extract_json_u64(&json, "consumer_count")
.unwrap_or(0)
.min(u64::from(u32::MAX)) as u32,
};
Ok(StreamInfo {
config: StreamConfig::new(name),
state,
})
}
fn parse_pub_ack(payload: &[u8]) -> Result<PubAck, JsError> {
let json = String::from_utf8_lossy(payload);
if json.contains("\"error\":{\"code\":") {
return Err(Self::parse_api_error(&json));
}
let stream = extract_json_string_simple(&json, "stream")
.ok_or_else(|| JsError::ParseError("missing stream in PubAck".to_string()))?;
let seq = extract_json_u64(&json, "seq")
.ok_or_else(|| JsError::ParseError("missing seq in PubAck".to_string()))?;
let duplicate = json.contains("\"duplicate\":true");
Ok(PubAck {
stream,
seq,
duplicate,
})
}
fn parse_api_error(json: &str) -> JsError {
let code = extract_json_u64(json, "code").unwrap_or(0) as u32;
let err_code = extract_json_u64(json, "err_code").unwrap_or(0) as u32;
let description = extract_json_string_simple(json, "description")
.unwrap_or_else(|| "unknown error".to_string());
if err_code == 10059 {
return JsError::StreamNotFound(description);
}
JsError::Api { code, description }
}
}
pub struct Consumer {
stream: String,
name: String,
prefix: String,
}
impl fmt::Debug for Consumer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Consumer")
.field("stream", &self.stream)
.field("name", &self.name)
.field("prefix", &self.prefix)
.finish()
}
}
impl Consumer {
pub const DEFAULT_PULL_TIMEOUT: Duration = Duration::from_secs(30);
const CLIENT_TIMEOUT_SLACK: Duration = Duration::from_millis(100);
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn stream(&self) -> &str {
&self.stream
}
pub async fn pull(
&self,
client: &mut NatsClient,
cx: &Cx,
batch: usize,
) -> Result<Vec<JsMessage>, JsError> {
self.pull_with_timeout(client, cx, batch, Self::DEFAULT_PULL_TIMEOUT)
.await
}
pub async fn pull_with_timeout(
&self,
client: &mut NatsClient,
cx: &Cx,
batch: usize,
pull_timeout: Duration,
) -> Result<Vec<JsMessage>, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let subject = format!(
"{}.CONSUMER.MSG.NEXT.{}.{}",
self.prefix, self.stream, self.name
);
let expires = if pull_timeout.is_zero() {
0_i64
} else {
let nanos = pull_timeout.as_nanos();
let max = i64::MAX as u128;
let clamped = if nanos > max { max } else { nanos };
clamped as i64
};
let request = format!("{{\"batch\":{batch},\"expires\":{expires}}}");
let mut sub = client
.subscribe(cx, &format!("_INBOX.{}", random_id(cx)))
.await?;
let sid = sub.sid();
if let Err(err) = client
.publish_request(cx, &subject, sub.subject(), request.as_bytes())
.await
{
let _ = client.unsubscribe(cx, sid).await;
return Err(err.into());
}
let mut messages = Vec::with_capacity(batch);
let now = cx
.timer_driver()
.map_or_else(wall_now, |driver| driver.now());
let client_deadline =
compute_client_deadline(now, pull_timeout, Self::CLIENT_TIMEOUT_SLACK);
let mut result: Result<(), JsError> = Ok(());
let mut received = 0usize;
loop {
if received >= batch {
break;
}
let item = if let Some(deadline) = client_deadline {
let next = std::pin::pin!(sub.next(cx));
timeout_at(deadline, next).await
} else {
Ok(sub.next(cx).await)
};
match item {
Ok(Ok(Some(msg))) => {
if let Some(js_msg) = Self::parse_js_message(msg) {
messages.push(js_msg);
received += 1;
}
}
Ok(Ok(None)) | Err(_) => break, Ok(Err(e)) => {
result = Err(e.into());
break;
}
}
}
#[allow(unused_variables)] if let Err(err) = client.unsubscribe(cx, sid).await {
warn!(
subject = %sub.subject(),
sid,
error = ?err,
"JetStream pull unsubscribe failed"
);
#[cfg(not(feature = "tracing-integration"))]
let _ = &err;
}
result.map(|()| messages)
}
fn parse_js_message(msg: Message) -> Option<JsMessage> {
let reply = msg.reply_to?;
if !reply.starts_with("$JS.ACK.") {
return None;
}
let parts: Vec<&str> = reply.split('.').collect();
if parts.len() < 9 {
return None;
}
let delivered: u32 = parts[parts.len() - 5].parse().ok()?;
let sequence: u64 = parts[parts.len() - 4].parse().ok()?;
Some(JsMessage {
subject: msg.subject,
payload: msg.payload,
sequence,
delivered,
reply_subject: reply,
acked: AtomicBool::new(false),
})
}
}
impl JsMessage {
pub async fn ack(&self, client: &mut NatsClient, cx: &Cx) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
if self.acked.swap(true, Ordering::AcqRel) {
return Err(JsError::AlreadyAcknowledged);
}
client.publish(cx, &self.reply_subject, b"+ACK").await?;
Ok(())
}
pub async fn nack(&self, client: &mut NatsClient, cx: &Cx) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
if self.acked.swap(true, Ordering::AcqRel) {
return Err(JsError::AlreadyAcknowledged);
}
client.publish(cx, &self.reply_subject, b"-NAK").await?;
Ok(())
}
pub async fn in_progress(&self, client: &mut NatsClient, cx: &Cx) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
client.publish(cx, &self.reply_subject, b"+WPI").await?;
Ok(())
}
pub async fn term(&self, client: &mut NatsClient, cx: &Cx) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
if self.acked.swap(true, Ordering::AcqRel) {
return Err(JsError::AlreadyAcknowledged);
}
client.publish(cx, &self.reply_subject, b"+TERM").await?;
Ok(())
}
}
fn json_escape(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for ch in s.chars() {
match ch {
'"' => out.push_str("\\\""),
'\\' => out.push_str("\\\\"),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
c if c.is_control() => {
write!(&mut out, "\\u{:04x}", c as u32).expect("write to String");
}
c => out.push(c),
}
}
out
}
fn extract_json_string_simple(json: &str, key: &str) -> Option<String> {
let pattern = format!("\"{key}\":\"");
let start = json.find(&pattern)? + pattern.len();
let slice = &json[start..];
let mut chars = slice.char_indices();
let mut result = String::new();
loop {
match chars.next()? {
(_, '"') => return Some(result),
(_, '\\') => {
let (_, esc) = chars.next()?;
match esc {
'b' => result.push('\x08'),
'f' => result.push('\x0C'),
'n' => result.push('\n'),
'r' => result.push('\r'),
't' => result.push('\t'),
'u' => {
let mut hex = String::with_capacity(4);
for _ in 0..4 {
let (_, h) = chars.next()?;
hex.push(h);
}
if let Ok(val) = u32::from_str_radix(&hex, 16) {
if let Some(c) = std::char::from_u32(val) {
result.push(c);
} else {
result.push(std::char::REPLACEMENT_CHARACTER);
}
} else {
result.push(std::char::REPLACEMENT_CHARACTER);
}
}
_ => result.push(esc),
}
}
(_, c) => result.push(c),
}
}
}
fn extract_json_u64(json: &str, key: &str) -> Option<u64> {
let pattern = format!("\"{key}\":");
let start = json.find(&pattern)? + pattern.len();
let rest = json[start..].trim_start();
let end = rest
.find(|c: char| !c.is_ascii_digit())
.unwrap_or(rest.len());
rest[..end].parse().ok()
}
#[cfg(test)]
fn base64_encode(data: &[u8]) -> String {
const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut result = String::new();
for chunk in data.chunks(3) {
let n = match chunk.len() {
1 => (u32::from(chunk[0]) << 16, 2),
2 => ((u32::from(chunk[0]) << 16) | (u32::from(chunk[1]) << 8), 3),
3 => (
(u32::from(chunk[0]) << 16) | (u32::from(chunk[1]) << 8) | u32::from(chunk[2]),
4,
),
_ => continue,
};
for i in 0..n.1 {
let idx = ((n.0 >> (18 - 6 * i)) & 0x3F) as usize;
result.push(ALPHABET[idx] as char);
}
}
let padding = (3 - data.len() % 3) % 3;
for _ in 0..padding {
result.push('=');
}
result
}
fn random_id(cx: &Cx) -> String {
format!("{:016x}", cx.random_u64())
}
fn duration_to_nanos_saturating(duration: Duration) -> u64 {
duration.as_nanos().min(u128::from(u64::MAX)) as u64
}
fn compute_client_deadline(now: Time, pull_timeout: Duration, slack: Duration) -> Option<Time> {
if pull_timeout.is_zero() {
None
} else {
let timeout_dur = pull_timeout.saturating_add(slack);
Some(now.saturating_add_nanos(duration_to_nanos_saturating(timeout_dur)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn scrub_js_ack_reply_subject(reply: &str) -> String {
let mut parts: Vec<String> = reply.split('.').map(ToString::to_string).collect();
if parts.len() >= 9 {
let len = parts.len();
parts[len - 4] = "[STREAM_SEQ]".to_string();
parts[len - 3] = "[CONSUMER_SEQ]".to_string();
parts[len - 2] = "[TIMESTAMP]".to_string();
parts[len - 1] = "[PENDING]".to_string();
}
parts.join(".")
}
fn jetstream_ack_snapshot(
subject: &str,
payload: &[u8],
reply_subject: &str,
ack_payload: &str,
) -> serde_json::Value {
let msg = Message {
subject: subject.to_string(),
sid: 7,
payload: payload.to_vec(),
reply_to: Some(reply_subject.to_string()),
};
let js_msg = Consumer::parse_js_message(msg).expect("valid JetStream reply subject");
json!({
"subject": js_msg.subject,
"payload_utf8": String::from_utf8_lossy(&js_msg.payload),
"delivered": js_msg.delivered,
"sequence": "[STREAM_SEQ]",
"reply_subject": scrub_js_ack_reply_subject(&js_msg.reply_subject),
"ack": {
"payload": ack_payload,
"terminal": matches!(ack_payload, "+ACK" | "-NAK" | "+TERM"),
}
})
}
#[test]
fn test_stream_config_to_json() {
let config = StreamConfig::new("TEST")
.subjects(&["test.>"])
.max_messages(1000)
.replicas(1);
let json = config.to_json();
assert!(json.contains("\"name\":\"TEST\""));
assert!(json.contains("\"subjects\":[\"test.>\"]"));
assert!(json.contains("\"max_msgs\":1000"));
}
#[test]
fn test_consumer_config_to_json() {
let config = ConsumerConfig::new("my-consumer")
.ack_policy(AckPolicy::Explicit)
.filter_subject("orders.>");
let json = config.to_json();
assert!(json.contains("\"name\":\"my-consumer\""));
assert!(json.contains("\"ack_policy\":\"explicit\""));
assert!(json.contains("\"filter_subject\":\"orders.>\""));
}
#[test]
fn test_ephemeral_consumer_config_to_json() {
let config = ConsumerConfig::ephemeral();
let json = config.to_json();
assert!(json.starts_with("{\"deliver_policy\""));
assert!(!json.contains("{,"));
assert!(json.contains("\"deliver_policy\":\"all\""));
assert!(json.contains("\"ack_policy\":\"explicit\""));
}
#[test]
fn test_retention_policy_str() {
assert_eq!(RetentionPolicy::Limits.as_str(), "limits");
assert_eq!(RetentionPolicy::Interest.as_str(), "interest");
assert_eq!(RetentionPolicy::WorkQueue.as_str(), "workqueue");
}
#[test]
fn test_storage_type_str() {
assert_eq!(StorageType::File.as_str(), "file");
assert_eq!(StorageType::Memory.as_str(), "memory");
}
#[test]
fn test_ack_policy_str() {
assert_eq!(AckPolicy::Explicit.as_str(), "explicit");
assert_eq!(AckPolicy::None.as_str(), "none");
assert_eq!(AckPolicy::All.as_str(), "all");
}
#[test]
fn test_deliver_policy_str() {
assert_eq!(DeliverPolicy::All.as_str(), "all");
assert_eq!(DeliverPolicy::New.as_str(), "new");
assert_eq!(DeliverPolicy::Last.as_str(), "last");
}
#[test]
fn test_base64_encode() {
assert_eq!(base64_encode(b""), "");
assert_eq!(base64_encode(b"f"), "Zg==");
assert_eq!(base64_encode(b"fo"), "Zm8=");
assert_eq!(base64_encode(b"foo"), "Zm9v");
assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
assert_eq!(base64_encode(b"hello"), "aGVsbG8=");
}
#[test]
fn test_extract_json_u64() {
let json = r#"{"seq":12345,"messages":100}"#;
assert_eq!(extract_json_u64(json, "seq"), Some(12345));
assert_eq!(extract_json_u64(json, "messages"), Some(100));
assert_eq!(extract_json_u64(json, "missing"), None);
}
#[test]
fn test_js_error_display() {
assert_eq!(
format!("{}", JsError::StreamNotFound("TEST".to_string())),
"JetStream stream not found: TEST"
);
assert_eq!(
format!(
"{}",
JsError::Api {
code: 10059,
description: "not found".to_string()
}
),
"JetStream API error 10059: not found"
);
assert_eq!(
format!("{}", JsError::NotAcked),
"JetStream message not acknowledged"
);
}
#[test]
fn test_duration_to_nanos_saturating_max_duration() {
assert_eq!(duration_to_nanos_saturating(Duration::MAX), u64::MAX);
}
#[test]
fn test_compute_client_deadline_saturates_for_large_timeout() {
let now = Time::from_nanos(1);
let deadline = compute_client_deadline(now, Duration::MAX, Consumer::CLIENT_TIMEOUT_SLACK);
assert_eq!(deadline, Some(Time::MAX));
}
#[test]
fn js_error_display_all_variants() {
let nats_err = JsError::Nats(NatsError::Io(std::io::Error::other("e")));
assert!(nats_err.to_string().contains("NATS error"));
let api_err = JsError::Api {
code: 404,
description: "not here".into(),
};
assert!(api_err.to_string().contains("404"));
assert!(api_err.to_string().contains("not here"));
let stream_err = JsError::StreamNotFound("ORDERS".into());
assert!(stream_err.to_string().contains("ORDERS"));
let consumer_err = JsError::ConsumerNotFound {
stream: "S".into(),
consumer: "C".into(),
};
assert!(consumer_err.to_string().contains("S/C"));
let not_acked = JsError::NotAcked;
assert!(not_acked.to_string().contains("not acknowledged"));
let invalid = JsError::InvalidConfig("bad".into());
assert!(invalid.to_string().contains("invalid config"));
let parse = JsError::ParseError("json".into());
assert!(parse.to_string().contains("parse error"));
}
#[test]
fn js_error_debug() {
let err = JsError::NotAcked;
let dbg = format!("{err:?}");
assert!(dbg.contains("NotAcked"));
}
#[test]
fn js_error_source_nats() {
let err = JsError::Nats(NatsError::Io(std::io::Error::other("x")));
assert!(std::error::Error::source(&err).is_some());
}
#[test]
fn js_error_source_none_for_others() {
let err = JsError::NotAcked;
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn js_error_from_nats_error() {
let nats = NatsError::Io(std::io::Error::other("z"));
let err: JsError = JsError::from(nats);
assert!(matches!(err, JsError::Nats(_)));
}
#[test]
fn retention_policy_default_debug_copy_eq() {
assert_eq!(RetentionPolicy::default(), RetentionPolicy::Limits);
let p = RetentionPolicy::Interest;
let dbg = format!("{p:?}");
assert!(dbg.contains("Interest"));
let copy = p;
assert_eq!(p, copy);
assert_ne!(p, RetentionPolicy::WorkQueue);
}
#[test]
fn storage_type_default_debug_copy_eq() {
assert_eq!(StorageType::default(), StorageType::File);
let s = StorageType::Memory;
let dbg = format!("{s:?}");
assert!(dbg.contains("Memory"));
let copy = s;
assert_eq!(s, copy);
assert_ne!(s, StorageType::File);
}
#[test]
fn discard_policy_default_debug_copy_eq() {
assert_eq!(DiscardPolicy::default(), DiscardPolicy::Old);
let d = DiscardPolicy::New;
let dbg = format!("{d:?}");
assert!(dbg.contains("New"));
let copy = d;
assert_eq!(d, copy);
}
#[test]
fn deliver_policy_default_debug_copy_eq() {
assert_eq!(DeliverPolicy::default(), DeliverPolicy::All);
let d = DeliverPolicy::Last;
let dbg = format!("{d:?}");
assert!(dbg.contains("Last"));
let copy = d;
assert_eq!(d, copy);
assert_ne!(d, DeliverPolicy::New);
}
#[test]
fn deliver_policy_by_start_sequence() {
let d = DeliverPolicy::ByStartSequence(42);
assert_eq!(d, DeliverPolicy::ByStartSequence(42));
assert_ne!(d, DeliverPolicy::ByStartSequence(99));
}
#[test]
fn ack_policy_default_debug_copy_eq() {
assert_eq!(AckPolicy::default(), AckPolicy::Explicit);
let a = AckPolicy::None;
let dbg = format!("{a:?}");
assert!(dbg.contains("None"));
let copy = a;
assert_eq!(a, copy);
assert_ne!(a, AckPolicy::All);
}
#[test]
fn stream_config_debug_clone() {
let cfg = StreamConfig::new("TEST");
let dbg = format!("{cfg:?}");
assert!(dbg.contains("StreamConfig"));
assert!(dbg.contains("TEST"));
let cloned = cfg;
assert_eq!(cloned.name, "TEST");
}
#[test]
fn stream_config_new_defaults() {
let cfg = StreamConfig::new("EVENTS");
assert_eq!(cfg.name, "EVENTS");
assert!(cfg.subjects.is_empty());
assert_eq!(cfg.retention, RetentionPolicy::Limits);
assert_eq!(cfg.storage, StorageType::File);
assert_eq!(cfg.discard, DiscardPolicy::Old);
assert_eq!(cfg.replicas, 1);
assert!(cfg.max_msgs.is_none());
assert!(cfg.max_bytes.is_none());
assert!(cfg.max_age.is_none());
assert!(cfg.duplicate_window.is_none());
}
#[test]
fn stream_config_builder_chain() {
let cfg = StreamConfig::new("ORDERS")
.subjects(&["orders.>", "returns.>"])
.retention(RetentionPolicy::WorkQueue)
.storage(StorageType::Memory)
.max_messages(1000)
.max_bytes(1_000_000)
.max_age(Duration::from_secs(3600))
.replicas(3)
.duplicate_window(Duration::from_secs(120));
assert_eq!(cfg.subjects.len(), 2);
assert_eq!(cfg.retention, RetentionPolicy::WorkQueue);
assert_eq!(cfg.storage, StorageType::Memory);
assert_eq!(cfg.max_msgs, Some(1000));
assert_eq!(cfg.max_bytes, Some(1_000_000));
assert_eq!(cfg.max_age, Some(Duration::from_secs(3600)));
assert_eq!(cfg.replicas, 3);
assert_eq!(cfg.duplicate_window, Some(Duration::from_secs(120)));
}
#[test]
fn consumer_config_debug_clone() {
let cfg = ConsumerConfig::new("processor");
let dbg = format!("{cfg:?}");
assert!(dbg.contains("ConsumerConfig"));
let cloned = cfg;
assert_eq!(cloned.name, Some("processor".into()));
}
#[test]
fn consumer_config_new_defaults() {
let cfg = ConsumerConfig::new("worker");
assert_eq!(cfg.name, Some("worker".into()));
assert!(cfg.durable_name.is_none());
assert_eq!(cfg.deliver_policy, DeliverPolicy::All);
assert_eq!(cfg.ack_policy, AckPolicy::Explicit);
assert_eq!(cfg.ack_wait, Duration::from_secs(30));
assert_eq!(cfg.max_deliver, -1);
assert!(cfg.filter_subject.is_none());
assert_eq!(cfg.max_ack_pending, 1000);
}
#[test]
fn consumer_config_ephemeral() {
let cfg = ConsumerConfig::ephemeral();
assert!(cfg.name.is_none());
assert!(cfg.durable_name.is_none());
}
#[test]
fn consumer_config_builder_chain() {
let cfg = ConsumerConfig::new("c1")
.deliver_policy(DeliverPolicy::New)
.ack_policy(AckPolicy::All)
.ack_wait(Duration::from_secs(60))
.max_deliver(5)
.filter_subject("orders.new");
assert_eq!(cfg.deliver_policy, DeliverPolicy::New);
assert_eq!(cfg.ack_policy, AckPolicy::All);
assert_eq!(cfg.ack_wait, Duration::from_secs(60));
assert_eq!(cfg.max_deliver, 5);
assert_eq!(cfg.filter_subject, Some("orders.new".into()));
}
#[test]
fn stream_state_default_debug_clone() {
let state = StreamState::default();
assert_eq!(state.messages, 0);
assert_eq!(state.bytes, 0);
assert_eq!(state.first_seq, 0);
assert_eq!(state.last_seq, 0);
assert_eq!(state.consumer_count, 0);
let dbg = format!("{state:?}");
assert!(dbg.contains("StreamState"));
let cloned = state;
assert_eq!(cloned.messages, 0);
}
#[test]
fn pub_ack_debug_clone() {
let ack = PubAck {
stream: "ORDERS".into(),
seq: 42,
duplicate: false,
};
let dbg = format!("{ack:?}");
assert!(dbg.contains("PubAck"));
assert!(dbg.contains("ORDERS"));
let cloned = ack;
assert_eq!(cloned.seq, 42);
assert!(!cloned.duplicate);
}
#[test]
fn stream_info_debug_clone() {
let info = StreamInfo {
config: StreamConfig::new("S"),
state: StreamState::default(),
};
let dbg = format!("{info:?}");
assert!(dbg.contains("StreamInfo"));
let cloned = info;
assert_eq!(cloned.config.name, "S");
}
#[test]
fn retention_policy_debug_clone_copy_default_eq() {
let r = RetentionPolicy::default();
assert_eq!(r, RetentionPolicy::Limits);
let dbg = format!("{r:?}");
assert!(dbg.contains("Limits"), "{dbg}");
let copied: RetentionPolicy = r;
let cloned = r;
assert_eq!(copied, cloned);
assert_ne!(r, RetentionPolicy::WorkQueue);
}
#[test]
fn storage_type_debug_clone_copy_default_eq() {
let s = StorageType::default();
assert_eq!(s, StorageType::File);
let dbg = format!("{s:?}");
assert!(dbg.contains("File"), "{dbg}");
let copied: StorageType = s;
let cloned = s;
assert_eq!(copied, cloned);
assert_ne!(s, StorageType::Memory);
}
#[test]
fn discard_policy_debug_clone_copy_default_eq() {
let d = DiscardPolicy::default();
assert_eq!(d, DiscardPolicy::Old);
let dbg = format!("{d:?}");
assert!(dbg.contains("Old"), "{dbg}");
let copied: DiscardPolicy = d;
let cloned = d;
assert_eq!(copied, cloned);
assert_ne!(d, DiscardPolicy::New);
}
#[test]
fn stream_state_debug_clone_default() {
let s = StreamState::default();
let dbg = format!("{s:?}");
assert!(dbg.contains("StreamState"), "{dbg}");
assert_eq!(s.messages, 0);
let cloned = s;
assert_eq!(format!("{cloned:?}"), dbg);
}
#[test]
fn parse_js_message_dotted_stream_name() {
let reply = "$JS.ACK.orders.v2.my.consumer.1.42.3.1234567890.5";
let msg = Message {
subject: "test.subject".to_string(),
sid: 1,
payload: b"hello".to_vec(),
reply_to: Some(reply.to_string()),
};
let js_msg = Consumer::parse_js_message(msg).expect("should parse dotted names");
assert_eq!(js_msg.delivered, 1);
assert_eq!(js_msg.sequence, 42);
}
#[test]
fn parse_js_message_simple_names() {
let reply = "$JS.ACK.mystream.myconsumer.2.100.50.9999999.10";
let msg = Message {
subject: "test".to_string(),
sid: 1,
payload: vec![],
reply_to: Some(reply.to_string()),
};
let js_msg = Consumer::parse_js_message(msg).expect("should parse simple names");
assert_eq!(js_msg.delivered, 2);
assert_eq!(js_msg.sequence, 100);
}
#[test]
fn error_detection_no_false_positive() {
let response = r#"{"stream":"error-handler","seq":1}"#;
assert!(
!response.contains("\"error\":{\"code\":"),
"data containing 'error' in name should not match error envelope"
);
let error_response = r#"{"error":{"code":404,"description":"not found"}}"#;
assert!(
error_response.contains("\"error\":{\"code\":"),
"actual error envelope should match"
);
}
#[test]
fn parse_api_error_uses_err_code_for_stream_not_found() {
let json = r#"{"error":{"code":404,"err_code":10059,"description":"stream not found"}}"#;
let err = JetStreamContext::parse_api_error(json);
assert!(
matches!(err, JsError::StreamNotFound(ref d) if d.contains("stream not found")),
"should classify as StreamNotFound, got: {err:?}"
);
let json2 = r#"{"error":{"code":404,"description":"generic not found"}}"#;
let err2 = JetStreamContext::parse_api_error(json2);
assert!(
matches!(err2, JsError::Api { code: 404, .. }),
"should be generic Api error, got: {err2:?}"
);
}
#[test]
fn test_extract_json_string_handles_unicode_escape() {
let json = r#"{"name":"hello\u0020world","other":"val"}"#;
let result = extract_json_string_simple(json, "name");
assert_eq!(
result,
Some("hello world".to_string()),
"unicode escape should be correctly parsed"
);
}
#[test]
fn jetstream_message_ack_format_snapshot_scrubs_sequences() {
insta::assert_json_snapshot!(
"jetstream_message_ack_format_scrubbed",
json!({
"happy": jetstream_ack_snapshot(
"orders.created",
br#"{"event":"created","status":"ok"}"#,
"$JS.ACK.orders.consumer.1.42.7.1713790000000000000.0",
"+ACK",
),
"redeliver": jetstream_ack_snapshot(
"orders.retry",
br#"{"event":"retry","reason":"redelivery"}"#,
"$JS.ACK.orders.v2.retry.worker.3.108.14.1713790000000001234.2",
"-NAK",
),
"term": jetstream_ack_snapshot(
"orders.poison",
br#"{"event":"poison","resolution":"term"}"#,
"$JS.ACK.orders.deadletter.processor.5.512.44.1713790000000005678.1",
"+TERM",
),
})
);
}
}