use super::nats::{
Message, NatsClient, NatsError, validate_nats_publish_subject,
validate_nats_subscription_pattern,
};
use crate::cx::Cx;
use crate::time::{timeout_at, wall_now};
use crate::tracing_compat::warn;
use crate::types::Time;
use std::borrow::Cow;
use std::fmt;
use std::fmt::Write as _;
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::collections::HashSet;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU64};
const MAX_NAME_BYTES: usize = 256;
const MAX_CONSUMER_NAME_CHARS: usize = 128;
const MAX_STREAM_SUBJECT_BYTES: usize = 4 * 1024;
const ACK_TOKEN_TIMEOUT_SECS: u64 = 300;
const MAX_ACK_TOKEN_HISTORY: usize = 10000;
const MIN_PULL_INTERVAL_MS: u64 = 50;
const PULL_BACKOFF_MULTIPLIER: f64 = 1.5;
const MAX_PULL_BACKOFF_MS: u64 = 5000;
const MEMORY_PRESSURE_THRESHOLD_MB: u64 = 512;
const MIN_BATCH_SIZE_UNDER_PRESSURE: usize = 16;
const GLOBAL_PULL_RATE_LIMIT: u64 = 1000;
const MAX_PULL_BATCH: usize = 1024;
#[derive(Debug)]
struct AckTokenTracker {
used_tokens: Mutex<HashSet<(String, u64)>>, }
#[derive(Debug)]
struct PullRateLimiter {
last_pull_ns: AtomicU64,
current_backoff_ms: AtomicU64,
rapid_request_count: AtomicU64,
rate_limiting_active: AtomicBool,
}
#[derive(Debug)]
struct GlobalPullRateTracker {
recent_pulls: Vec<u64>,
buffer_position: usize,
estimated_memory_usage: u64,
}
static GLOBAL_PULL_RATE_TRACKER: std::sync::LazyLock<Mutex<GlobalPullRateTracker>> =
std::sync::LazyLock::new(|| Mutex::new(GlobalPullRateTracker::new()));
#[allow(dead_code)]
static GLOBAL_ACK_TOKEN_TRACKER: std::sync::LazyLock<AckTokenTracker> =
std::sync::LazyLock::new(AckTokenTracker::new);
impl AckTokenTracker {
fn new() -> Self {
Self {
used_tokens: Mutex::new(HashSet::new()),
}
}
fn validate_and_mark_token(&self, token: &str, timestamp: u64) -> bool {
let now = wall_now().as_nanos() / 1_000_000_000; let token_hash = self.hash_token(token);
self.cleanup_expired_tokens(now);
let mut tokens = self.used_tokens.lock().unwrap();
if tokens.iter().any(|(hash, _)| hash == &token_hash) {
return false; }
if now > timestamp + ACK_TOKEN_TIMEOUT_SECS {
return false; }
if tokens.len() >= MAX_ACK_TOKEN_HISTORY {
let min_timestamp = tokens.iter().map(|(_, ts)| *ts).min().unwrap_or(0);
tokens.retain(|(_, ts)| *ts > min_timestamp);
}
tokens.insert((token_hash, timestamp + ACK_TOKEN_TIMEOUT_SECS));
true
}
fn cleanup_expired_tokens(&self, now: u64) {
let mut tokens = self.used_tokens.lock().unwrap();
tokens.retain(|(_, expires_at)| *expires_at > now);
}
fn hash_token(&self, token: &str) -> String {
let mut hash = 0xcbf2_9ce4_8422_2325_u64;
for byte in token.as_bytes() {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
}
format!("{hash:016x}")
}
}
impl PullRateLimiter {
fn new() -> Self {
Self {
last_pull_ns: AtomicU64::new(0),
current_backoff_ms: AtomicU64::new(0),
rapid_request_count: AtomicU64::new(0),
rate_limiting_active: AtomicBool::new(false),
}
}
fn check_pull_request(&self, now_ns: u64) -> Result<(), Duration> {
let last_pull_ns = self.last_pull_ns.load(Ordering::Relaxed);
let time_since_last_ms = (now_ns - last_pull_ns) / 1_000_000;
if time_since_last_ms < MIN_PULL_INTERVAL_MS {
let rapid_count = self.rapid_request_count.fetch_add(1, Ordering::Relaxed);
let backoff_ms = self.calculate_backoff(rapid_count);
self.current_backoff_ms.store(backoff_ms, Ordering::Relaxed);
self.rate_limiting_active.store(true, Ordering::Relaxed);
warn!(
"JetStream pull rate limit exceeded - backoff required: {}ms (rapid requests: {})",
backoff_ms,
rapid_count + 1
);
return Err(Duration::from_millis(backoff_ms));
}
let current_backoff_ms = self.current_backoff_ms.load(Ordering::Relaxed);
if current_backoff_ms > 0 && time_since_last_ms < current_backoff_ms {
let remaining_backoff = current_backoff_ms - time_since_last_ms;
return Err(Duration::from_millis(remaining_backoff));
}
self.last_pull_ns.store(now_ns, Ordering::Relaxed);
if time_since_last_ms > current_backoff_ms * 2 {
self.rapid_request_count.store(0, Ordering::Relaxed);
self.current_backoff_ms.store(0, Ordering::Relaxed);
self.rate_limiting_active.store(false, Ordering::Relaxed);
}
Ok(())
}
fn calculate_backoff(&self, rapid_count: u64) -> u64 {
let base_backoff = MIN_PULL_INTERVAL_MS * 2; let exponential_backoff =
(base_backoff as f64 * PULL_BACKOFF_MULTIPLIER.powi(rapid_count as i32)) as u64;
exponential_backoff.min(MAX_PULL_BACKOFF_MS)
}
fn is_rate_limiting_active(&self) -> bool {
self.rate_limiting_active.load(Ordering::Relaxed)
}
}
impl GlobalPullRateTracker {
fn new() -> Self {
Self {
recent_pulls: vec![0; 2000], buffer_position: 0,
estimated_memory_usage: 0,
}
}
fn check_global_pull_request(
&mut self,
now_ns: u64,
estimated_batch_memory: u64,
) -> Result<(), Duration> {
let now_ms = now_ns / 1_000_000; let one_second_ago = now_ms.saturating_sub(1000);
let recent_count = self
.recent_pulls
.iter()
.filter(|&×tamp| timestamp > one_second_ago)
.count();
if recent_count >= GLOBAL_PULL_RATE_LIMIT as usize {
warn!(
"JetStream global pull rate limit exceeded: {} requests/second (limit: {})",
recent_count, GLOBAL_PULL_RATE_LIMIT
);
return Err(Duration::from_millis(100)); }
let new_memory_usage = self.estimated_memory_usage + estimated_batch_memory;
if new_memory_usage > MEMORY_PRESSURE_THRESHOLD_MB * 1_024 * 1_024 {
warn!(
"JetStream memory pressure detected: {}MB estimated usage (threshold: {}MB)",
new_memory_usage / (1_024 * 1_024),
MEMORY_PRESSURE_THRESHOLD_MB
);
return Err(Duration::from_millis(500)); }
self.recent_pulls[self.buffer_position] = now_ms;
self.buffer_position = (self.buffer_position + 1) % self.recent_pulls.len();
self.estimated_memory_usage = new_memory_usage;
if self.buffer_position % 100 == 0 {
self.estimated_memory_usage = self
.estimated_memory_usage
.saturating_sub(estimated_batch_memory * 50);
}
Ok(())
}
}
static ACK_TOKEN_TRACKER: std::sync::OnceLock<AckTokenTracker> = std::sync::OnceLock::new();
fn get_ack_token_tracker() -> &'static AckTokenTracker {
ACK_TOKEN_TRACKER.get_or_init(AckTokenTracker::new)
}
fn redacted_name_fingerprint(value: &str) -> String {
let mut hash = 0xcbf2_9ce4_8422_2325_u64;
for byte in value.as_bytes() {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
}
format!("bytes={},fnv1a64={hash:016x}", value.len())
}
fn validate_pull_batch_size(batch: usize) -> Result<(), JsError> {
if batch == 0 {
return Err(JsError::InvalidConfig(
"pull batch size must be > 0".to_string(),
));
}
if batch > MAX_PULL_BATCH {
return Err(JsError::InvalidConfig(format!(
"pull batch size {batch} exceeds {MAX_PULL_BATCH}-message cap; \
issue multiple smaller pulls or raise the cap deliberately"
)));
}
Ok(())
}
fn validate_and_clamp_pull_batch_size(
requested_batch: usize,
consumer: &Consumer,
) -> Result<usize, JsError> {
validate_pull_batch_size(requested_batch)?;
let clamped_batch = if consumer.pull_rate_limiter.is_rate_limiting_active() {
let reduced_batch = (requested_batch / 2).max(MIN_BATCH_SIZE_UNDER_PRESSURE);
warn!(
stream = %consumer.stream,
consumer = %consumer.name,
requested = requested_batch,
clamped = reduced_batch,
"JetStream batch size reduced due to rate limiting"
);
reduced_batch
} else {
let global_tracker = GLOBAL_PULL_RATE_TRACKER.lock().unwrap();
let current_memory_mb = global_tracker.estimated_memory_usage / (1_024 * 1_024);
if current_memory_mb > MEMORY_PRESSURE_THRESHOLD_MB / 2 {
let pressure_factor =
(current_memory_mb as f64 / MEMORY_PRESSURE_THRESHOLD_MB as f64).min(1.0);
let reduced_batch = ((requested_batch as f64 * (1.0 - pressure_factor * 0.5)) as usize)
.max(MIN_BATCH_SIZE_UNDER_PRESSURE);
if reduced_batch < requested_batch {
warn!(
stream = %consumer.stream,
consumer = %consumer.name,
requested = requested_batch,
clamped = reduced_batch,
memory_mb = current_memory_mb,
"JetStream batch size reduced due to memory pressure"
);
}
reduced_batch
} else {
requested_batch
}
};
Ok(clamped_batch)
}
const DEFAULT_MAX_IN_FLIGHT_PUBLISHES: usize = 1;
const DEFAULT_MAX_PUBLISH_WAITERS: usize = 0;
const DEFAULT_EMERGENCY_MAX_IN_FLIGHT_PUBLISHES: usize = 0;
#[derive(Debug)]
pub enum JsError {
Nats(NatsError),
Api {
code: u32,
description: String,
},
StreamNotFound(String),
ConsumerNotFound {
stream: String,
consumer: String,
},
NotAcked,
AlreadyAcknowledged,
InvalidConfig(String),
ParseError(String),
}
impl fmt::Display for JsError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Nats(e) => write!(f, "JetStream NATS error: {e}"),
Self::Api { code, description } => {
write!(f, "JetStream API error {code}: {description}")
}
Self::StreamNotFound(name) => write!(f, "JetStream stream not found: {name}"),
Self::ConsumerNotFound { stream, consumer } => {
write!(f, "JetStream consumer not found: {stream}/{consumer}")
}
Self::NotAcked => write!(f, "JetStream message not acknowledged"),
Self::AlreadyAcknowledged => {
write!(
f,
"JetStream message already acknowledged/nacked/terminated"
)
}
Self::InvalidConfig(msg) => write!(f, "JetStream invalid config: {msg}"),
Self::ParseError(msg) => write!(f, "JetStream parse error: {msg}"),
}
}
}
impl std::error::Error for JsError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Nats(e) => Some(e),
_ => None,
}
}
}
impl From<NatsError> for JsError {
fn from(err: NatsError) -> Self {
Self::Nats(err)
}
}
impl JsError {
#[must_use]
pub fn is_transient(&self) -> bool {
match self {
Self::Nats(e) => e.is_transient(),
Self::Api { code, .. } => matches!(code, 503 | 408),
Self::NotAcked => true,
_ => false,
}
}
#[must_use]
pub fn is_connection_error(&self) -> bool {
matches!(self, Self::Nats(e) if e.is_connection_error())
}
#[must_use]
pub fn is_capacity_error(&self) -> bool {
matches!(self, Self::Api { code: 429, .. })
}
#[must_use]
pub fn is_timeout(&self) -> bool {
match self {
Self::Nats(e) => e.is_timeout(),
Self::Api { code: 408, .. } | Self::NotAcked => true,
_ => false,
}
}
#[must_use]
pub fn is_retryable(&self) -> bool {
self.is_transient()
}
}
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub name: String,
pub subjects: Vec<String>,
pub retention: RetentionPolicy,
pub storage: StorageType,
pub max_msgs: Option<i64>,
pub max_bytes: Option<i64>,
pub max_age: Option<Duration>,
pub max_msg_size: Option<i32>,
pub discard: DiscardPolicy,
pub replicas: u32,
pub duplicate_window: Option<Duration>,
}
impl StreamConfig {
#[must_use]
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
subjects: Vec::new(),
retention: RetentionPolicy::Limits,
storage: StorageType::File,
max_msgs: None,
max_bytes: None,
max_age: None,
max_msg_size: None,
discard: DiscardPolicy::Old,
replicas: 1,
duplicate_window: None,
}
}
#[must_use]
pub fn subjects(mut self, subjects: &[&str]) -> Self {
self.subjects = subjects.iter().map(|s| (*s).to_string()).collect();
self
}
#[must_use]
pub fn retention(mut self, policy: RetentionPolicy) -> Self {
self.retention = policy;
self
}
#[must_use]
pub fn storage(mut self, storage: StorageType) -> Self {
self.storage = storage;
self
}
#[must_use]
pub fn max_messages(mut self, max: i64) -> Self {
self.max_msgs = Some(max);
self
}
#[must_use]
pub fn max_bytes(mut self, max: i64) -> Self {
self.max_bytes = Some(max);
self
}
#[must_use]
pub fn max_age(mut self, age: Duration) -> Self {
self.max_age = Some(age);
self
}
#[must_use]
pub fn replicas(mut self, count: u32) -> Self {
self.replicas = count;
self
}
#[must_use]
pub fn duplicate_window(mut self, window: Duration) -> Self {
self.duplicate_window = Some(window);
self
}
fn validate(&self) -> Result<(), JsError> {
ConsumerConfig::validate_stream_name(&self.name)?;
for (index, subject) in self.subjects.iter().enumerate() {
validate_stream_subject_pattern(subject).map_err(|reason| {
JsError::InvalidConfig(format!("stream subjects[{index}] {reason}: {subject:?}"))
})?;
}
if let Some(max_msgs) = self.max_msgs
&& max_msgs < 0
{
return Err(JsError::InvalidConfig(
"stream max_msgs must be >= 0 when set".to_string(),
));
}
if let Some(max_bytes) = self.max_bytes
&& max_bytes < 0
{
return Err(JsError::InvalidConfig(
"stream max_bytes must be >= 0 when set".to_string(),
));
}
if let Some(max_msg_size) = self.max_msg_size
&& max_msg_size < 0
{
return Err(JsError::InvalidConfig(
"stream max_msg_size must be >= 0 when set".to_string(),
));
}
if self.replicas == 0 {
return Err(JsError::InvalidConfig(
"stream replicas must be >= 1".to_string(),
));
}
Ok(())
}
fn to_json(&self) -> String {
let mut json = String::from("{");
write!(&mut json, "\"name\":\"{}\"", json_escape(&self.name)).expect("write to String");
if !self.subjects.is_empty() {
json.push_str(",\"subjects\":[");
for (i, s) in self.subjects.iter().enumerate() {
if i > 0 {
json.push(',');
}
write!(&mut json, "\"{}\"", json_escape(s)).expect("write to String");
}
json.push(']');
}
write!(&mut json, ",\"retention\":\"{}\"", self.retention.as_str())
.expect("write to String");
write!(&mut json, ",\"storage\":\"{}\"", self.storage.as_str()).expect("write to String");
write!(&mut json, ",\"discard\":\"{}\"", self.discard.as_str()).expect("write to String");
write!(&mut json, ",\"num_replicas\":{}", self.replicas).expect("write to String");
if let Some(max) = self.max_msgs {
write!(&mut json, ",\"max_msgs\":{max}").expect("write to String");
}
if let Some(max) = self.max_bytes {
write!(&mut json, ",\"max_bytes\":{max}").expect("write to String");
}
if let Some(age) = self.max_age {
write!(&mut json, ",\"max_age\":{}", age.as_nanos()).expect("write to String");
}
if let Some(size) = self.max_msg_size {
write!(&mut json, ",\"max_msg_size\":{size}").expect("write to String");
}
if let Some(window) = self.duplicate_window {
write!(&mut json, ",\"duplicate_window\":{}", window.as_nanos())
.expect("write to String");
}
json.push('}');
json
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RetentionPolicy {
#[default]
Limits,
Interest,
WorkQueue,
}
impl RetentionPolicy {
fn as_str(self) -> &'static str {
match self {
Self::Limits => "limits",
Self::Interest => "interest",
Self::WorkQueue => "workqueue",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum StorageType {
#[default]
File,
Memory,
}
impl StorageType {
fn as_str(self) -> &'static str {
match self {
Self::File => "file",
Self::Memory => "memory",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DiscardPolicy {
#[default]
Old,
New,
}
impl DiscardPolicy {
fn as_str(self) -> &'static str {
match self {
Self::Old => "old",
Self::New => "new",
}
}
}
#[derive(Debug, Clone)]
pub struct StreamInfo {
pub config: StreamConfig,
pub state: StreamState,
}
#[derive(Debug, Clone, Default)]
pub struct StreamState {
pub messages: u64,
pub bytes: u64,
pub first_seq: u64,
pub last_seq: u64,
pub consumer_count: u32,
}
#[derive(Debug, Clone)]
pub struct ConsumerConfig {
pub name: Option<String>,
pub durable_name: Option<String>,
pub deliver_subject: Option<String>,
pub deliver_policy: DeliverPolicy,
pub ack_policy: AckPolicy,
pub ack_wait: Duration,
pub max_deliver: i64,
pub filter_subject: Option<String>,
pub rate_limit_bps: Option<u64>,
pub max_ack_pending: i64,
}
impl ConsumerConfig {
#[must_use]
pub fn new(name: impl Into<String>) -> Self {
Self {
name: Some(name.into()),
durable_name: None,
deliver_subject: None,
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
ack_wait: Duration::from_secs(30),
max_deliver: -1,
filter_subject: None,
rate_limit_bps: None,
max_ack_pending: 1000,
}
}
#[must_use]
pub fn ephemeral() -> Self {
Self {
name: None,
durable_name: None,
deliver_subject: None,
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
ack_wait: Duration::from_secs(30),
max_deliver: -1,
filter_subject: None,
rate_limit_bps: None,
max_ack_pending: 1000,
}
}
#[must_use]
pub fn deliver_subject(mut self, subject: impl Into<String>) -> Self {
self.deliver_subject = Some(subject.into());
self
}
#[must_use]
pub fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
self.deliver_policy = policy;
self
}
#[must_use]
pub fn ack_policy(mut self, policy: AckPolicy) -> Self {
self.ack_policy = policy;
self
}
#[must_use]
pub fn ack_wait(mut self, wait: Duration) -> Self {
self.ack_wait = wait;
self
}
#[must_use]
pub fn max_deliver(mut self, max: i64) -> Self {
self.max_deliver = max;
self
}
#[must_use]
pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
self.filter_subject = Some(subject.into());
self
}
#[must_use]
pub fn rate_limit_bps(mut self, bits_per_second: u64) -> Self {
self.rate_limit_bps = Some(bits_per_second);
self
}
#[must_use]
pub fn max_ack_pending(mut self, max_ack_pending: i64) -> Self {
self.max_ack_pending = max_ack_pending;
self
}
fn validate(&mut self) -> Result<(), JsError> {
self.normalize_identity()?;
if let Some(deliver_subject) = self.deliver_subject.as_deref() {
validate_nats_publish_subject(deliver_subject, "deliver_subject")
.map_err(|err| JsError::InvalidConfig(err.to_string()))?;
}
if let Some(filter_subject) = self.filter_subject.as_deref() {
validate_nats_subscription_pattern(filter_subject, "filter_subject")
.map_err(|err| JsError::InvalidConfig(err.to_string()))?;
}
if self.rate_limit_bps.is_some() && self.deliver_subject.is_none() {
return Err(JsError::InvalidConfig(
"consumer rate_limit_bps requires deliver_subject for push consumers".to_string(),
));
}
Ok(())
}
fn normalize_identity(&mut self) -> Result<(), JsError> {
let name = Self::validate_consumer_name("name", self.name.as_deref())?;
let durable_name =
Self::validate_consumer_name("durable_name", self.durable_name.as_deref())?;
let canonical_name = match (name, durable_name) {
(Some(name), Some(durable_name)) if name != durable_name => {
return Err(JsError::InvalidConfig(format!(
"consumer name mismatch: name '{name}' != durable_name '{durable_name}'"
)));
}
(Some(name), _) => Some(name.to_string()),
(None, Some(durable_name)) => Some(durable_name.to_string()),
(None, None) => None,
};
self.name = canonical_name;
self.durable_name = None;
Ok(())
}
fn validate_consumer_name<'a>(
field: &str,
value: Option<&'a str>,
) -> Result<Option<&'a str>, JsError> {
let Some(value) = value else {
return Ok(None);
};
if value.is_empty() {
return Err(JsError::InvalidConfig(format!(
"consumer {field} must be non-empty when set"
)));
}
let char_count = value.chars().count();
if char_count > MAX_CONSUMER_NAME_CHARS {
return Err(JsError::InvalidConfig(format!(
"consumer {field} exceeds JetStream spec limit of {MAX_CONSUMER_NAME_CHARS} characters (got {char_count})"
)));
}
if value.len() > MAX_NAME_BYTES {
return Err(JsError::InvalidConfig(format!(
"consumer {field} exceeds {MAX_NAME_BYTES}-byte cap (got {} bytes)",
value.len(),
)));
}
if value
.chars()
.any(|ch| !matches!(ch, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_'))
{
let fingerprint = redacted_name_fingerprint(value);
return Err(JsError::InvalidConfig(format!(
"consumer {field} must contain only ASCII letters, digits, '-' or '_' per JetStream spec (fingerprint {fingerprint}, {char_count} chars)"
)));
}
Ok(Some(value))
}
pub(crate) fn validate_stream_name(name: &str) -> Result<(), JsError> {
if name.is_empty() {
return Err(JsError::InvalidConfig(
"stream name must be non-empty".to_string(),
));
}
if name.len() > MAX_NAME_BYTES {
return Err(JsError::InvalidConfig(format!(
"stream name exceeds {MAX_NAME_BYTES}-byte cap (got {} bytes)",
name.len(),
)));
}
if name
.chars()
.any(|ch| !matches!(ch, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_'))
{
let fingerprint = redacted_name_fingerprint(name);
return Err(JsError::InvalidConfig(format!(
"stream name must contain only ASCII letters, digits, '-' or '_' (fingerprint {fingerprint})"
)));
}
Ok(())
}
fn to_json(&self) -> String {
let mut json = String::from("{");
let mut parts = Vec::new();
if let Some(ref name) = self.name {
parts.push(format!("\"name\":\"{}\"", json_escape(name)));
}
if let Some(ref durable) = self.durable_name {
parts.push(format!("\"durable_name\":\"{}\"", json_escape(durable)));
}
if let Some(ref deliver_subject) = self.deliver_subject {
parts.push(format!(
"\"deliver_subject\":\"{}\"",
json_escape(deliver_subject)
));
}
parts.push(format!(
"\"deliver_policy\":\"{}\"",
self.deliver_policy.as_str()
));
match self.deliver_policy {
DeliverPolicy::ByStartSequence(seq) => {
parts.push(format!("\"opt_start_seq\":{seq}"));
}
DeliverPolicy::ByStartTime(start_time) => {
parts.push(format!(
"\"opt_start_time\":\"{}\"",
json_escape(&format_system_time_rfc3339(start_time))
));
}
DeliverPolicy::All
| DeliverPolicy::New
| DeliverPolicy::Last
| DeliverPolicy::LastPerSubject => {}
}
parts.push(format!("\"ack_policy\":\"{}\"", self.ack_policy.as_str()));
parts.push(format!("\"ack_wait\":{}", self.ack_wait.as_nanos()));
parts.push(format!("\"max_deliver\":{}", self.max_deliver));
if let Some(rate_limit_bps) = self.rate_limit_bps {
parts.push(format!("\"rate_limit_bps\":{rate_limit_bps}"));
}
parts.push(format!("\"max_ack_pending\":{}", self.max_ack_pending));
if let Some(ref filter) = self.filter_subject {
parts.push(format!("\"filter_subject\":\"{}\"", json_escape(filter)));
}
json.push_str(&parts.join(","));
json.push('}');
json
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DeliverPolicy {
#[default]
All,
New,
ByStartSequence(u64),
ByStartTime(SystemTime),
Last,
LastPerSubject,
}
impl DeliverPolicy {
fn as_str(self) -> &'static str {
match self {
Self::All => "all",
Self::New => "new",
Self::ByStartSequence(_) => "by_start_sequence",
Self::ByStartTime(_) => "by_start_time",
Self::Last => "last",
Self::LastPerSubject => "last_per_subject",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AckPolicy {
#[default]
Explicit,
None,
All,
}
impl AckPolicy {
fn as_str(self) -> &'static str {
match self {
Self::Explicit => "explicit",
Self::None => "none",
Self::All => "all",
}
}
}
#[derive(Debug, Clone)]
pub struct PubAck {
pub stream: String,
pub seq: u64,
pub duplicate: bool,
}
pub struct JsMessage {
pub subject: String,
pub payload: Vec<u8>,
pub sequence: u64,
pub delivered: u32,
reply_subject: String,
ack_state: AtomicU8,
pending_acks: Option<Arc<AtomicUsize>>,
}
impl fmt::Debug for JsMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JsMessage")
.field("subject", &self.subject)
.field("sequence", &self.sequence)
.field("delivered", &self.delivered)
.field("payload_len", &self.payload.len())
.field("reply_subject", &self.reply_subject)
.field("acked", &self.is_acked())
.finish()
}
}
impl JsMessage {
pub fn is_acked(&self) -> bool {
self.ack_state.load(Ordering::Acquire) != ACK_STATE_PENDING
}
}
impl Drop for JsMessage {
fn drop(&mut self) {
if self.ack_state.load(Ordering::Acquire) == ACK_STATE_PENDING {
warn!(
subject = %self.subject,
sequence = self.sequence,
"JetStream message dropped without ack/nack - will be redelivered"
);
if let Some(ref pending) = self.pending_acks {
decrement_pending_counter(pending);
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct JetStreamPublishBackpressurePolicy {
max_in_flight_publishes: usize,
max_waiters: usize,
emergency_max_in_flight_publishes: usize,
}
impl Default for JetStreamPublishBackpressurePolicy {
fn default() -> Self {
Self {
max_in_flight_publishes: DEFAULT_MAX_IN_FLIGHT_PUBLISHES,
max_waiters: DEFAULT_MAX_PUBLISH_WAITERS,
emergency_max_in_flight_publishes: DEFAULT_EMERGENCY_MAX_IN_FLIGHT_PUBLISHES,
}
}
}
#[derive(Debug)]
struct JetStreamPublishBackpressureGate {
policy: JetStreamPublishBackpressurePolicy,
in_flight_publishes: AtomicUsize,
refused_publishes: AtomicUsize,
}
impl JetStreamPublishBackpressureGate {
fn new(policy: JetStreamPublishBackpressurePolicy) -> Self {
Self {
policy,
in_flight_publishes: AtomicUsize::new(0),
refused_publishes: AtomicUsize::new(0),
}
}
fn pressure_level_label(cx: &Cx) -> &'static str {
cx.pressure()
.map_or("detached", crate::types::SystemPressure::level_label)
}
fn effective_max_in_flight_publishes(&self, cx: &Cx) -> usize {
if cx
.pressure()
.is_some_and(|pressure| pressure.degradation_level() >= 4)
{
self.policy
.emergency_max_in_flight_publishes
.min(self.policy.max_in_flight_publishes)
} else {
self.policy.max_in_flight_publishes
}
}
fn refuse(&self, cx: &Cx, subject: &str, current: usize, limit: usize) -> JsError {
self.refused_publishes.fetch_add(1, Ordering::Relaxed);
JsError::Api {
code: 429,
description: format!(
"local publish backpressure: subject={subject} in_flight={current} limit={limit} \
max_waiters={} pressure={}",
self.policy.max_waiters,
Self::pressure_level_label(cx),
),
}
}
fn begin_publish<'a>(
&'a self,
cx: &Cx,
subject: &str,
) -> Result<JetStreamPublishPermit<'a>, JsError> {
let limit = self.effective_max_in_flight_publishes(cx);
let mut current = self.in_flight_publishes.load(Ordering::Acquire);
loop {
if current >= limit {
return Err(self.refuse(cx, subject, current, limit));
}
match self.in_flight_publishes.compare_exchange_weak(
current,
current.saturating_add(1),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
return Ok(JetStreamPublishPermit { gate: self });
}
Err(observed) => {
current = observed;
}
}
}
}
}
struct JetStreamPublishPermit<'a> {
gate: &'a JetStreamPublishBackpressureGate,
}
impl Drop for JetStreamPublishPermit<'_> {
fn drop(&mut self) {
self.gate
.in_flight_publishes
.fetch_sub(1, Ordering::Release);
}
}
fn decrement_pending_counter(counter: &AtomicUsize) {
let mut current = counter.load(Ordering::Relaxed);
while current > 0 {
match counter.compare_exchange_weak(
current,
current - 1,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(observed) => current = observed,
}
}
}
pub struct JetStreamContext {
client: NatsClient,
prefix: String,
publish_backpressure: JetStreamPublishBackpressureGate,
}
impl fmt::Debug for JetStreamContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JetStreamContext")
.field("prefix", &self.prefix)
.field(
"publish_backpressure_policy",
&self.publish_backpressure.policy,
)
.finish_non_exhaustive()
}
}
impl JetStreamContext {
pub fn new(client: NatsClient) -> Self {
Self {
client,
prefix: "$JS.API".to_string(),
publish_backpressure: JetStreamPublishBackpressureGate::new(Default::default()),
}
}
pub fn with_prefix(client: NatsClient, prefix: impl Into<String>) -> Self {
Self {
client,
prefix: prefix.into(),
publish_backpressure: JetStreamPublishBackpressureGate::new(Default::default()),
}
}
pub async fn create_stream(
&mut self,
cx: &Cx,
config: StreamConfig,
) -> Result<StreamInfo, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
config.validate()?;
let subject = format!("{}.STREAM.CREATE.{}", self.prefix, config.name);
let payload = config.to_json();
let response = self
.client
.request(cx, &subject, payload.as_bytes())
.await?;
Self::parse_stream_info(&response.payload)
}
pub async fn get_stream(&mut self, cx: &Cx, name: &str) -> Result<StreamInfo, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
ConsumerConfig::validate_stream_name(name)?;
let subject = format!("{}.STREAM.INFO.{}", self.prefix, name);
let response = self.client.request(cx, &subject, b"").await?;
Self::parse_stream_info(&response.payload)
}
pub async fn delete_stream(&mut self, cx: &Cx, name: &str) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
ConsumerConfig::validate_stream_name(name)?;
let subject = format!("{}.STREAM.DELETE.{}", self.prefix, name);
let response = self.client.request(cx, &subject, b"").await?;
let response_str = String::from_utf8_lossy(&response.payload);
if has_json_api_error(&response_str) {
return Err(Self::parse_api_error(&response_str));
}
Ok(())
}
pub async fn publish(
&mut self,
cx: &Cx,
subject: &str,
payload: &[u8],
) -> Result<PubAck, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let _permit = self.publish_backpressure.begin_publish(cx, subject)?;
let response = self.client.request(cx, subject, payload).await?;
Self::parse_pub_ack(&response.payload)
}
pub async fn publish_with_id(
&mut self,
cx: &Cx,
subject: &str,
msg_id: &str,
payload: &[u8],
) -> Result<PubAck, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
if msg_id.is_empty() {
return Err(JsError::InvalidConfig(
"publish_with_id: msg_id must be non-empty".to_string(),
));
}
let _permit = self.publish_backpressure.begin_publish(cx, subject)?;
let headers: [(&str, &[u8]); 1] = [("Nats-Msg-Id", msg_id.as_bytes())];
let response = self
.client
.request_with_headers(cx, subject, &headers, payload)
.await?;
Self::parse_pub_ack(&response.payload)
}
pub async fn create_consumer(
&mut self,
cx: &Cx,
stream: &str,
mut config: ConsumerConfig,
) -> Result<Consumer, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
ConsumerConfig::validate_stream_name(stream)?;
config.validate()?;
let consumer_name = config.name.clone().unwrap_or_default();
let subject = if consumer_name.is_empty() {
format!("{}.CONSUMER.CREATE.{}", self.prefix, stream)
} else {
format!(
"{}.CONSUMER.CREATE.{}.{}",
self.prefix, stream, consumer_name
)
};
let payload = format!(
"{{\"stream_name\":\"{}\",\"config\":{}}}",
json_escape(stream),
config.to_json()
);
let response = self
.client
.request(cx, &subject, payload.as_bytes())
.await?;
let response_str = String::from_utf8_lossy(&response.payload);
if has_json_api_error(&response_str) {
return Err(Self::parse_api_error(&response_str));
}
let name = extract_json_string_simple(&response_str, "name")
.unwrap_or_else(|| consumer_name.clone());
Ok(Consumer {
stream: stream.to_string(),
name,
prefix: self.prefix.clone(),
pending_acks: Arc::new(AtomicUsize::new(0)),
max_ack_pending: config.max_ack_pending.max(1) as usize,
pull_rate_limiter: PullRateLimiter::new(),
})
}
pub async fn get_consumer(
&mut self,
cx: &Cx,
stream: &str,
consumer: &str,
) -> Result<Consumer, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let subject = format!("{}.CONSUMER.INFO.{}.{}", self.prefix, stream, consumer);
let response = self.client.request(cx, &subject, b"").await?;
let response_str = String::from_utf8_lossy(&response.payload);
if has_json_api_error(&response_str) {
return Err(Self::parse_api_error(&response_str));
}
let max_ack_pending = extract_json_i64_simple(&response_str, "max_ack_pending")
.unwrap_or(1000)
.max(1) as usize;
Ok(Consumer {
stream: stream.to_string(),
name: consumer.to_string(),
prefix: self.prefix.clone(),
pending_acks: Arc::new(AtomicUsize::new(0)),
max_ack_pending,
pull_rate_limiter: PullRateLimiter::new(),
})
}
pub async fn delete_consumer(
&mut self,
cx: &Cx,
stream: &str,
consumer: &str,
) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let subject = format!("{}.CONSUMER.DELETE.{}.{}", self.prefix, stream, consumer);
let response = self.client.request(cx, &subject, b"").await?;
let response_str = String::from_utf8_lossy(&response.payload);
if has_json_api_error(&response_str) {
return Err(Self::parse_api_error(&response_str));
}
Ok(())
}
pub fn client(&mut self) -> &mut NatsClient {
&mut self.client
}
fn parse_stream_info(payload: &[u8]) -> Result<StreamInfo, JsError> {
let json = String::from_utf8_lossy(payload);
if has_json_api_error(&json) {
return Err(Self::parse_api_error(&json));
}
let name = extract_json_string_simple(&json, "name")
.ok_or_else(|| JsError::ParseError("missing stream name".to_string()))?;
let state = StreamState {
messages: extract_json_u64(&json, "messages").unwrap_or(0),
bytes: extract_json_u64(&json, "bytes").unwrap_or(0),
first_seq: extract_json_u64(&json, "first_seq").unwrap_or(0),
last_seq: extract_json_u64(&json, "last_seq").unwrap_or(0),
consumer_count: extract_json_u64(&json, "consumer_count")
.unwrap_or(0)
.min(u64::from(u32::MAX)) as u32,
};
Ok(StreamInfo {
config: StreamConfig::new(name),
state,
})
}
fn parse_pub_ack(payload: &[u8]) -> Result<PubAck, JsError> {
let json = String::from_utf8_lossy(payload);
if has_json_api_error(&json) {
return Err(Self::parse_api_error(&json));
}
let stream = extract_json_string_simple(&json, "stream")
.ok_or_else(|| JsError::ParseError("missing stream in PubAck".to_string()))?;
let seq = extract_json_u64(&json, "seq")
.ok_or_else(|| JsError::ParseError("missing seq in PubAck".to_string()))?;
let duplicate = extract_json_bool(&json, "duplicate").unwrap_or(false);
Ok(PubAck {
stream,
seq,
duplicate,
})
}
fn parse_api_error(json: &str) -> JsError {
let error_json = extract_json_object(json, "error").unwrap_or(json);
let code = extract_json_u64(error_json, "code").unwrap_or(0) as u32;
let err_code = extract_json_u64(error_json, "err_code").unwrap_or(0) as u32;
let description = extract_json_string_simple(error_json, "description")
.unwrap_or_else(|| "unknown error".to_string());
if err_code == 10059 {
return JsError::StreamNotFound(description);
}
JsError::Api { code, description }
}
}
pub struct Consumer {
stream: String,
name: String,
prefix: String,
pending_acks: Arc<AtomicUsize>,
max_ack_pending: usize,
pull_rate_limiter: PullRateLimiter,
}
impl fmt::Debug for Consumer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Consumer")
.field("stream", &self.stream)
.field("name", &self.name)
.field("prefix", &self.prefix)
.field("pending_acks", &self.pending_acks.load(Ordering::Relaxed))
.field("max_ack_pending", &self.max_ack_pending)
.field(
"rate_limiting_active",
&self.pull_rate_limiter.is_rate_limiting_active(),
)
.finish()
}
}
impl Consumer {
pub const DEFAULT_PULL_TIMEOUT: Duration = Duration::from_secs(30);
const CLIENT_TIMEOUT_SLACK: Duration = Duration::from_millis(100);
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn stream(&self) -> &str {
&self.stream
}
#[must_use]
pub fn pending_acks(&self) -> usize {
self.pending_acks.load(Ordering::Relaxed)
}
#[must_use]
pub fn can_accept_message(&self) -> bool {
self.pending_acks.load(Ordering::Relaxed) < self.max_ack_pending
}
fn increment_pending(&self) -> bool {
let current = self.pending_acks.fetch_add(1, Ordering::Relaxed);
if current >= self.max_ack_pending {
self.pending_acks.fetch_sub(1, Ordering::Relaxed);
false
} else {
true
}
}
#[cfg(any(test, feature = "test-internals"))]
fn decrement_pending(&self) {
decrement_pending_counter(&self.pending_acks);
}
pub async fn ack_message(
&self,
client: &mut NatsClient,
cx: &Cx,
msg: &JsMessage,
) -> Result<(), JsError> {
msg.ack(client, cx).await
}
pub async fn nack_message(
&self,
client: &mut NatsClient,
cx: &Cx,
msg: &JsMessage,
) -> Result<(), JsError> {
msg.nack(client, cx).await
}
pub async fn nack_message_with_delay(
&self,
client: &mut NatsClient,
cx: &Cx,
msg: &JsMessage,
delay: Duration,
) -> Result<(), JsError> {
msg.nack_with_delay(client, cx, delay).await
}
pub async fn pull(
&self,
client: &mut NatsClient,
cx: &Cx,
batch: usize,
) -> Result<Vec<JsMessage>, JsError> {
self.pull_with_timeout(client, cx, batch, Self::DEFAULT_PULL_TIMEOUT)
.await
}
pub async fn pull_with_timeout(
&self,
client: &mut NatsClient,
cx: &Cx,
batch: usize,
pull_timeout: Duration,
) -> Result<Vec<JsMessage>, JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
if let Err(delay) = self.pull_rate_limiter.check_pull_request(now_ns) {
return Err(JsError::InvalidConfig(format!(
"JetStream pull request rate limited - retry after {}ms",
delay.as_millis()
)));
}
let effective_batch = validate_and_clamp_pull_batch_size(batch, self)?;
let estimated_batch_memory = effective_batch as u64 * 2048; {
let mut global_tracker = GLOBAL_PULL_RATE_TRACKER.lock().unwrap();
if let Err(delay) =
global_tracker.check_global_pull_request(now_ns, estimated_batch_memory)
{
return Err(JsError::InvalidConfig(format!(
"JetStream global rate limit or memory pressure - retry after {}ms",
delay.as_millis()
)));
}
}
let subject = format!(
"{}.CONSUMER.MSG.NEXT.{}.{}",
self.prefix, self.stream, self.name
);
let expires = if pull_timeout.is_zero() {
0_i64
} else {
let nanos = pull_timeout.as_nanos();
let max = i64::MAX as u128;
let clamped = if nanos > max { max } else { nanos };
clamped as i64
};
let request = build_pull_request_json(effective_batch, expires, None);
let mut sub = client
.subscribe(cx, &format!("_INBOX.{}", random_id(cx)))
.await?;
let sid = sub.sid();
if let Err(err) = client
.publish_request(cx, &subject, sub.subject(), request.as_bytes())
.await
{
let _ = client.unsubscribe(cx, sid).await;
return Err(err.into());
}
let mut messages = Vec::with_capacity(effective_batch);
let mut pull_state = PullSubscriberState::new(effective_batch);
let now = cx
.timer_driver()
.map_or_else(wall_now, |driver| driver.now());
let client_deadline =
compute_client_deadline(now, pull_timeout, Self::CLIENT_TIMEOUT_SLACK);
loop {
if !pull_state.is_active() {
break;
}
while pull_state.is_active() && pull_state.received() < effective_batch {
let Some(msg) = sub.try_next() else {
break;
};
if let Some(js_msg) = Self::parse_js_message(msg, Some(self.pending_acks.clone())) {
if self.increment_pending() {
messages.push(js_msg);
pull_state.observe_parsed_message();
} else {
warn!(
stream = %self.stream,
consumer = %self.name,
pending = self.pending_acks(),
max_ack_pending = self.max_ack_pending,
sequence = js_msg.sequence,
"JetStream flow control: dropping message - max_ack_pending exceeded"
);
pull_state.observe_ignored_message();
}
} else {
pull_state.observe_ignored_message();
}
}
if !pull_state.is_active() {
break;
}
let process_result = if let Some(deadline) = client_deadline {
let next = std::pin::pin!(client.process(cx));
timeout_at(deadline, next).await
} else {
Ok(client.process(cx).await)
};
match process_result {
Ok(Ok(())) => pull_state.observe_process_ready(),
Ok(Err(NatsError::Closed)) => pull_state.observe_closed(),
Err(_) => pull_state.observe_timeout(),
Ok(Err(err)) => pull_state.observe_error(err.into()),
}
}
#[allow(unused_variables)] if let Err(err) = client.unsubscribe(cx, sid).await {
warn!(
subject = %sub.subject(),
sid,
error = ?err,
"JetStream pull unsubscribe failed"
);
#[cfg(not(feature = "tracing-integration"))]
let _ = &err;
}
finish_pull(messages, pull_state)
}
fn parse_js_message(msg: Message, pending_acks: Option<Arc<AtomicUsize>>) -> Option<JsMessage> {
let reply = msg.reply_to?;
if !reply.starts_with("$JS.ACK.") {
return None;
}
let parts: Vec<&str> = reply.split('.').collect();
if parts.len() < 9 {
return None;
}
let delivered: u32 = parts[parts.len() - 5].parse().ok()?;
let sequence: u64 = parts[parts.len() - 4].parse().ok()?;
Some(JsMessage {
subject: msg.subject,
payload: msg.payload,
sequence,
delivered,
reply_subject: reply,
ack_state: AtomicU8::new(ACK_STATE_PENDING),
pending_acks,
})
}
}
fn finish_pull(
messages: Vec<JsMessage>,
pull_state: PullSubscriberState,
) -> Result<Vec<JsMessage>, JsError> {
pull_state.result().map(|()| messages)
}
#[derive(Debug)]
struct PullSubscriberState {
batch: usize,
received: usize,
termination: PullSubscriberTermination,
error: Option<JsError>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum PullSubscriberTermination {
Active,
Completed,
Closed,
TimedOut,
Error,
}
impl PullSubscriberState {
fn new(batch: usize) -> Self {
debug_assert!(batch > 0);
Self {
batch,
received: 0,
termination: PullSubscriberTermination::Active,
error: None,
}
}
fn received(&self) -> usize {
self.received
}
fn is_active(&self) -> bool {
matches!(self.termination, PullSubscriberTermination::Active)
}
#[cfg(any(test, feature = "test-internals"))]
fn termination(&self) -> PullSubscriberTermination {
self.termination
}
fn observe_parsed_message(&mut self) {
if !self.is_active() {
return;
}
self.received = self.received.saturating_add(1).min(self.batch);
if self.received >= self.batch {
self.termination = PullSubscriberTermination::Completed;
}
}
fn observe_ignored_message(&mut self) {}
fn observe_process_ready(&mut self) {}
fn observe_closed(&mut self) {
if self.is_active() {
self.termination = PullSubscriberTermination::Closed;
}
}
fn observe_timeout(&mut self) {
if self.is_active() {
self.termination = PullSubscriberTermination::TimedOut;
}
}
fn observe_error(&mut self, err: JsError) {
if self.is_active() {
self.termination = PullSubscriberTermination::Error;
self.error = Some(err);
}
}
fn result(self) -> Result<(), JsError> {
match self.error {
Some(err) => Err(err),
None => Ok(()),
}
}
}
const ACK_STATE_PENDING: u8 = 0;
const ACK_STATE_ACK_IN_FLIGHT: u8 = 1;
const ACK_STATE_ACKED: u8 = 2;
const ACK_STATE_NAK_IN_FLIGHT: u8 = 3;
const ACK_STATE_NAKED: u8 = 4;
const ACK_STATE_TERM_IN_FLIGHT: u8 = 5;
const ACK_STATE_TERMED: u8 = 6;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum TerminalAckKind {
Ack,
Nak,
Term,
}
impl TerminalAckKind {
const fn in_flight_state(self) -> u8 {
match self {
Self::Ack => ACK_STATE_ACK_IN_FLIGHT,
Self::Nak => ACK_STATE_NAK_IN_FLIGHT,
Self::Term => ACK_STATE_TERM_IN_FLIGHT,
}
}
const fn committed_state(self) -> u8 {
match self {
Self::Ack => ACK_STATE_ACKED,
Self::Nak => ACK_STATE_NAKED,
Self::Term => ACK_STATE_TERMED,
}
}
const fn is_idempotent(self) -> bool {
matches!(self, Self::Ack)
}
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, PartialEq, Eq)]
#[doc(hidden)]
pub struct FuzzJsAckMetadata {
pub subject: String,
pub sequence: u64,
pub delivered: u32,
pub payload_len: usize,
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_parse_stream_info(payload: &[u8]) -> Result<StreamInfo, JsError> {
JetStreamContext::parse_stream_info(payload)
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_parse_pub_ack(payload: &[u8]) -> Result<PubAck, JsError> {
JetStreamContext::parse_pub_ack(payload)
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_parse_api_error(json: &str) -> JsError {
JetStreamContext::parse_api_error(json)
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_parse_js_message(msg: Message) -> Option<FuzzJsAckMetadata> {
Consumer::parse_js_message(msg, None).map(|parsed| FuzzJsAckMetadata {
subject: parsed.subject.clone(),
sequence: parsed.sequence,
delivered: parsed.delivered,
payload_len: parsed.payload.len(),
})
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[doc(hidden)]
pub enum FuzzJsAckControl {
Ack,
Nak,
InProgress,
Term,
Unknown,
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_parse_ack_control(payload: &[u8]) -> FuzzJsAckControl {
match payload {
b"+ACK" => FuzzJsAckControl::Ack,
b"-NAK" => FuzzJsAckControl::Nak,
b"+WPI" => FuzzJsAckControl::InProgress,
b"+TERM" => FuzzJsAckControl::Term,
_ => FuzzJsAckControl::Unknown,
}
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_normalize_consumer_identity(
name: Option<&str>,
durable_name: Option<&str>,
) -> Result<Option<String>, JsError> {
let mut config = ConsumerConfig::ephemeral();
config.name = name.map(ToOwned::to_owned);
config.durable_name = durable_name.map(ToOwned::to_owned);
config.normalize_identity()?;
Ok(config.name)
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_validate_consumer_config(
name: Option<&str>,
durable_name: Option<&str>,
filter_subject: Option<&str>,
) -> Result<Option<String>, String> {
let mut config = ConsumerConfig::ephemeral();
config.name = name.map(ToOwned::to_owned);
config.durable_name = durable_name.map(ToOwned::to_owned);
config.filter_subject = filter_subject.map(ToOwned::to_owned);
config.validate().map_err(|err| err.to_string())?;
Ok(config.name)
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_validate_push_consumer_config(
name: Option<&str>,
durable_name: Option<&str>,
deliver_subject: Option<&str>,
filter_subject: Option<&str>,
rate_limit_bps: Option<u64>,
) -> Result<Option<String>, String> {
let mut config = ConsumerConfig::ephemeral();
config.name = name.map(ToOwned::to_owned);
config.durable_name = durable_name.map(ToOwned::to_owned);
config.deliver_subject = deliver_subject.map(ToOwned::to_owned);
config.filter_subject = filter_subject.map(ToOwned::to_owned);
config.rate_limit_bps = rate_limit_bps;
config.validate().map_err(|err| err.to_string())?;
Ok(config.name)
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub const fn fuzz_stream_name_max_bytes() -> usize {
MAX_NAME_BYTES
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_validate_stream_name(name: &str) -> Result<(), String> {
ConsumerConfig::validate_stream_name(name).map_err(|err| err.to_string())
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub const fn fuzz_stream_subject_max_bytes() -> usize {
MAX_STREAM_SUBJECT_BYTES
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_validate_stream_config(config: &StreamConfig) -> Result<String, String> {
config.validate().map_err(|err| err.to_string())?;
Ok(config.to_json())
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_format_deliver_by_start_time_rfc3339(time: SystemTime) -> String {
format_system_time_rfc3339(time)
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_consumer_config_deliver_by_start_time_json(time: SystemTime) -> String {
ConsumerConfig::ephemeral()
.deliver_policy(DeliverPolicy::ByStartTime(time))
.to_json()
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, PartialEq, Eq)]
#[doc(hidden)]
pub struct FuzzJetStreamPublishBackpressureSnapshot {
pub effective_max_in_flight_publishes: usize,
pub max_waiters: usize,
pub acquired: bool,
pub in_flight_publishes_after: usize,
pub refused_publishes: usize,
pub pressure_level: Option<String>,
pub error: Option<String>,
}
#[cfg(feature = "test-internals")]
fn quantile_from_sorted_micros(samples: &[u64], numerator: usize, denominator: usize) -> u64 {
if samples.is_empty() {
return 0;
}
let span = samples.len().saturating_sub(1);
let rank = (span.saturating_mul(numerator) + denominator.saturating_sub(1)) / denominator;
samples[rank]
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, PartialEq, Eq)]
#[doc(hidden)]
pub struct FuzzJetStreamPublishBackpressureTailSnapshot {
pub tail_sample_count: usize,
pub accepted_count: usize,
pub refused_count: usize,
pub waiter_queue_absent: bool,
pub waiter_fairness_mode: String,
pub refusal_only_policy: bool,
pub tail_evidence_mode: String,
pub pressure_level: Option<String>,
pub publish_wait_latency_p95_micros: u64,
pub publish_wait_latency_p99_micros: u64,
pub publish_wait_latency_p999_micros: u64,
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, PartialEq, Eq)]
#[doc(hidden)]
pub struct FuzzJetStreamPublishBackpressureCohortSnapshot {
pub publisher_count: usize,
pub occupied_publisher_count: usize,
pub accepted_count: usize,
pub refused_count: usize,
pub waiter_queue_absent: bool,
pub waiter_fairness_mode: String,
pub refusal_only_policy: bool,
pub queueing_model: String,
pub multi_publisher_tail_evidence_present: bool,
pub publish_wait_latency_p95_micros: u64,
pub publish_wait_latency_p99_micros: u64,
pub publish_wait_latency_p999_micros: u64,
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_probe_publish_backpressure(
pressure_headroom: Option<f32>,
preexisting_in_flight_publishes: usize,
) -> FuzzJetStreamPublishBackpressureSnapshot {
let gate = JetStreamPublishBackpressureGate::new(Default::default());
gate.in_flight_publishes
.store(preexisting_in_flight_publishes, Ordering::Relaxed);
let mut cx = Cx::new(
crate::types::RegionId::testing_default(),
crate::types::TaskId::testing_default(),
crate::types::Budget::INFINITE,
);
let pressure_level = if let Some(headroom) = pressure_headroom {
let pressure = Arc::new(crate::types::SystemPressure::with_headroom(headroom));
let label = pressure.level_label().to_string();
cx = cx.with_pressure(pressure);
Some(label)
} else {
None
};
let effective_max_in_flight_publishes = gate.effective_max_in_flight_publishes(&cx);
let probe = gate.begin_publish(&cx, "audit.subject");
let (acquired, error) = match probe {
Ok(permit) => {
drop(permit);
(true, None)
}
Err(err) => (false, Some(err.to_string())),
};
FuzzJetStreamPublishBackpressureSnapshot {
effective_max_in_flight_publishes,
max_waiters: gate.policy.max_waiters,
acquired,
in_flight_publishes_after: gate.in_flight_publishes.load(Ordering::Relaxed),
refused_publishes: gate.refused_publishes.load(Ordering::Relaxed),
pressure_level,
error,
}
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_probe_publish_backpressure_tail_evidence(
pressure_headroom: Option<f32>,
preexisting_in_flight_publishes: usize,
attempts: usize,
) -> FuzzJetStreamPublishBackpressureTailSnapshot {
let gate = JetStreamPublishBackpressureGate::new(Default::default());
gate.in_flight_publishes
.store(preexisting_in_flight_publishes, Ordering::Relaxed);
let mut cx = Cx::new(
crate::types::RegionId::testing_default(),
crate::types::TaskId::testing_default(),
crate::types::Budget::INFINITE,
);
let pressure_level = if let Some(headroom) = pressure_headroom {
let pressure = Arc::new(crate::types::SystemPressure::with_headroom(headroom));
let label = pressure.level_label().to_string();
cx = cx.with_pressure(pressure);
Some(label)
} else {
None
};
let attempts = attempts.max(1);
let mut accepted_count = 0usize;
let mut wait_samples_micros = Vec::with_capacity(attempts);
for _ in 0..attempts {
match gate.begin_publish(&cx, "audit.subject") {
Ok(permit) => {
accepted_count += 1;
wait_samples_micros.push(0);
drop(permit);
}
Err(_) => {
wait_samples_micros.push(0);
}
}
}
wait_samples_micros.sort_unstable();
FuzzJetStreamPublishBackpressureTailSnapshot {
tail_sample_count: wait_samples_micros.len(),
accepted_count,
refused_count: gate.refused_publishes.load(Ordering::Relaxed),
waiter_queue_absent: DEFAULT_MAX_PUBLISH_WAITERS == 0,
waiter_fairness_mode: "vacuous_zero_wait_refusal".to_string(),
refusal_only_policy: DEFAULT_MAX_PUBLISH_WAITERS == 0,
tail_evidence_mode: "zero_wait_refusal_only".to_string(),
pressure_level,
publish_wait_latency_p95_micros: quantile_from_sorted_micros(&wait_samples_micros, 95, 100),
publish_wait_latency_p99_micros: quantile_from_sorted_micros(&wait_samples_micros, 99, 100),
publish_wait_latency_p999_micros: quantile_from_sorted_micros(
&wait_samples_micros,
999,
1000,
),
}
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_probe_publish_backpressure_cohort_tail_evidence(
publisher_count: usize,
occupied_publisher_count: usize,
) -> FuzzJetStreamPublishBackpressureCohortSnapshot {
let publisher_count = publisher_count.max(1);
let occupied_publisher_count = occupied_publisher_count.min(publisher_count);
let mut wait_samples_micros = Vec::with_capacity(publisher_count);
let mut accepted_count = 0usize;
let mut refused_count = 0usize;
for publisher_index in 0..publisher_count {
let gate = JetStreamPublishBackpressureGate::new(Default::default());
if publisher_index < occupied_publisher_count {
gate.in_flight_publishes.store(1, Ordering::Relaxed);
}
let cx = Cx::new(
crate::types::RegionId::testing_default(),
crate::types::TaskId::testing_default(),
crate::types::Budget::INFINITE,
);
match gate.begin_publish(&cx, "audit.subject") {
Ok(permit) => {
accepted_count += 1;
wait_samples_micros.push(0);
drop(permit);
}
Err(_) => {
refused_count += 1;
wait_samples_micros.push(0);
}
}
}
wait_samples_micros.sort_unstable();
FuzzJetStreamPublishBackpressureCohortSnapshot {
publisher_count,
occupied_publisher_count,
accepted_count,
refused_count,
waiter_queue_absent: DEFAULT_MAX_PUBLISH_WAITERS == 0,
waiter_fairness_mode: "vacuous_zero_wait_refusal".to_string(),
refusal_only_policy: DEFAULT_MAX_PUBLISH_WAITERS == 0,
queueing_model: "mg11_loss_system".to_string(),
multi_publisher_tail_evidence_present: true,
publish_wait_latency_p95_micros: quantile_from_sorted_micros(&wait_samples_micros, 95, 100),
publish_wait_latency_p99_micros: quantile_from_sorted_micros(&wait_samples_micros, 99, 100),
publish_wait_latency_p999_micros: quantile_from_sorted_micros(
&wait_samples_micros,
999,
1000,
),
}
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_create_test_consumer(max_ack_pending: usize) -> Consumer {
Consumer {
stream: "TEST_STREAM".to_string(),
name: "test_consumer".to_string(),
prefix: "$JS.API".to_string(),
pending_acks: Arc::new(AtomicUsize::new(0)),
max_ack_pending: max_ack_pending.max(1),
pull_rate_limiter: PullRateLimiter::new(),
}
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_consumer_max_ack_pending(consumer: &Consumer) -> usize {
consumer.max_ack_pending
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_consumer_increment_pending(consumer: &Consumer) -> bool {
consumer.increment_pending()
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_consumer_decrement_pending(consumer: &Consumer) {
consumer.decrement_pending();
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_create_test_js_message(sequence: u64, consumer: Option<&Consumer>) -> JsMessage {
JsMessage {
subject: "orders.new".to_string(),
payload: b"test payload".to_vec(),
sequence,
delivered: 1,
reply_subject: "$JS.ACK.TEST_STREAM.test_consumer.1.1.1.1234567890.0".to_string(),
ack_state: AtomicU8::new(ACK_STATE_PENDING),
pending_acks: consumer.map(|consumer| Arc::clone(&consumer.pending_acks)),
}
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[doc(hidden)]
pub enum FuzzPullSubscriberTerminal {
Active,
Completed,
Closed,
TimedOut,
Error,
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[doc(hidden)]
pub enum FuzzPullSubscriberStep {
ParsedMessage,
IgnoredMessage,
ProcessReady,
ProcessClosed,
ProcessTimedOut,
ProcessError,
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, PartialEq, Eq)]
#[doc(hidden)]
pub struct FuzzPullSubscriberState {
pub batch: usize,
pub received: usize,
pub ignored: usize,
pub terminal: FuzzPullSubscriberTerminal,
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_apply_pull_subscriber_step(
state: &mut FuzzPullSubscriberState,
step: FuzzPullSubscriberStep,
) {
let batch = state.batch.max(1);
let mut inner = PullSubscriberState {
batch,
received: state.received.min(batch),
termination: match state.terminal {
FuzzPullSubscriberTerminal::Active => PullSubscriberTermination::Active,
FuzzPullSubscriberTerminal::Completed => PullSubscriberTermination::Completed,
FuzzPullSubscriberTerminal::Closed => PullSubscriberTermination::Closed,
FuzzPullSubscriberTerminal::TimedOut => PullSubscriberTermination::TimedOut,
FuzzPullSubscriberTerminal::Error => PullSubscriberTermination::Error,
},
error: None,
};
match step {
FuzzPullSubscriberStep::ParsedMessage => inner.observe_parsed_message(),
FuzzPullSubscriberStep::IgnoredMessage => {
if inner.is_active() {
state.ignored = state.ignored.saturating_add(1);
}
inner.observe_ignored_message();
}
FuzzPullSubscriberStep::ProcessReady => inner.observe_process_ready(),
FuzzPullSubscriberStep::ProcessClosed => inner.observe_closed(),
FuzzPullSubscriberStep::ProcessTimedOut => inner.observe_timeout(),
FuzzPullSubscriberStep::ProcessError => {
inner.observe_error(JsError::InvalidConfig("fuzz-process-error".to_string()));
}
}
state.batch = batch;
state.received = inner.received();
state.terminal = match inner.termination() {
PullSubscriberTermination::Active => FuzzPullSubscriberTerminal::Active,
PullSubscriberTermination::Completed => FuzzPullSubscriberTerminal::Completed,
PullSubscriberTermination::Closed => FuzzPullSubscriberTerminal::Closed,
PullSubscriberTermination::TimedOut => FuzzPullSubscriberTerminal::TimedOut,
PullSubscriberTermination::Error => FuzzPullSubscriberTerminal::Error,
};
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[doc(hidden)]
pub enum FuzzOrderedConsumerPhase {
Tracking,
ResetPending,
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[doc(hidden)]
pub enum FuzzOrderedConsumerStep {
Observe { sequence: u64, delivered: u32 },
CompleteReset,
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, PartialEq, Eq)]
#[doc(hidden)]
pub struct FuzzOrderedConsumerState {
pub phase: FuzzOrderedConsumerPhase,
pub last_sequence: Option<u64>,
pub accepted_messages: u64,
pub reset_count: u32,
pub pending_gap_from: Option<u64>,
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_apply_ordered_consumer_step(
state: &mut FuzzOrderedConsumerState,
step: FuzzOrderedConsumerStep,
) {
match step {
FuzzOrderedConsumerStep::Observe {
sequence,
delivered,
} => match state.phase {
FuzzOrderedConsumerPhase::Tracking => {
let contiguous = state
.last_sequence
.is_none_or(|last| sequence == last.saturating_add(1));
if delivered == 1 && contiguous {
state.last_sequence = Some(sequence);
state.accepted_messages = state.accepted_messages.saturating_add(1);
} else {
state.phase = FuzzOrderedConsumerPhase::ResetPending;
state.reset_count = state.reset_count.saturating_add(1);
state.pending_gap_from = state.last_sequence.map(|last| last.saturating_add(1));
}
}
FuzzOrderedConsumerPhase::ResetPending => {}
},
FuzzOrderedConsumerStep::CompleteReset => {
if matches!(state.phase, FuzzOrderedConsumerPhase::ResetPending) {
state.phase = FuzzOrderedConsumerPhase::Tracking;
state.last_sequence = None;
state.pending_gap_from = None;
}
}
}
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[doc(hidden)]
pub enum FuzzMaxDeliverTerminal {
Pending,
Acked,
DeadLettered,
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[doc(hidden)]
pub enum FuzzMaxDeliverStep {
Redeliver,
Ack,
ResetMessage,
}
#[cfg(feature = "test-internals")]
#[derive(Debug, Clone, PartialEq, Eq)]
#[doc(hidden)]
pub struct FuzzMaxDeliverState {
pub max_deliver: i64,
pub delivered: u32,
pub accepted_deliveries: u32,
pub rejected_deliveries: u32,
pub dlq_messages: u32,
pub terminal: FuzzMaxDeliverTerminal,
}
#[cfg(feature = "test-internals")]
#[doc(hidden)]
pub fn fuzz_apply_max_deliver_step(state: &mut FuzzMaxDeliverState, step: FuzzMaxDeliverStep) {
let max_deliver = state.max_deliver.max(-1);
match step {
FuzzMaxDeliverStep::Redeliver => match state.terminal {
FuzzMaxDeliverTerminal::Pending => {
let delivered = state.delivered.saturating_add(1);
state.delivered = delivered;
if max_deliver >= 0 && i64::from(delivered) > max_deliver {
state.rejected_deliveries = state.rejected_deliveries.saturating_add(1);
state.dlq_messages = state.dlq_messages.saturating_add(1);
state.terminal = FuzzMaxDeliverTerminal::DeadLettered;
} else {
state.accepted_deliveries = state.accepted_deliveries.saturating_add(1);
}
}
FuzzMaxDeliverTerminal::Acked | FuzzMaxDeliverTerminal::DeadLettered => {
state.rejected_deliveries = state.rejected_deliveries.saturating_add(1);
}
},
FuzzMaxDeliverStep::Ack => {
if matches!(state.terminal, FuzzMaxDeliverTerminal::Pending) && state.delivered > 0 {
state.terminal = FuzzMaxDeliverTerminal::Acked;
}
}
FuzzMaxDeliverStep::ResetMessage => {
state.delivered = 0;
state.terminal = FuzzMaxDeliverTerminal::Pending;
}
}
}
impl JsMessage {
pub async fn ack(&self, client: &mut NatsClient, cx: &Cx) -> Result<(), JsError> {
self.publish_terminal_ack(client, cx, Cow::Borrowed(b"+ACK"), TerminalAckKind::Ack)
.await
}
pub async fn nack(&self, client: &mut NatsClient, cx: &Cx) -> Result<(), JsError> {
self.publish_terminal_ack(
client,
cx,
build_nak_payload(Duration::ZERO),
TerminalAckKind::Nak,
)
.await
}
pub async fn nack_with_delay(
&self,
client: &mut NatsClient,
cx: &Cx,
delay: Duration,
) -> Result<(), JsError> {
self.publish_terminal_ack(client, cx, build_nak_payload(delay), TerminalAckKind::Nak)
.await
}
pub async fn in_progress(&self, client: &mut NatsClient, cx: &Cx) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
client.publish(cx, &self.reply_subject, b"+WPI").await?;
Ok(())
}
pub async fn term(&self, client: &mut NatsClient, cx: &Cx) -> Result<(), JsError> {
self.publish_terminal_ack(client, cx, Cow::Borrowed(b"+TERM"), TerminalAckKind::Term)
.await
}
async fn publish_terminal_ack(
&self,
client: &mut NatsClient,
cx: &Cx,
payload: Cow<'_, [u8]>,
kind: TerminalAckKind,
) -> Result<(), JsError> {
cx.checkpoint().map_err(|_| NatsError::Cancelled)?;
let in_flight = kind.in_flight_state();
let committed = kind.committed_state();
loop {
match self.ack_state.load(Ordering::Acquire) {
ACK_STATE_PENDING => {
if self
.ack_state
.compare_exchange(
ACK_STATE_PENDING,
in_flight,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
break;
}
}
state if state == committed && kind.is_idempotent() => return Ok(()),
_ => return Err(JsError::AlreadyAcknowledged),
}
}
let (protected_reply_subject, ack_token) = self.generate_protected_ack_token();
let now = wall_now().as_nanos() / 1_000_000_000;
let tracker = get_ack_token_tracker();
if !tracker.validate_and_mark_token(&ack_token, now) {
return Err(JsError::InvalidConfig(
"Acknowledgment token replay detected or expired".to_string(),
));
}
match client
.publish(cx, &protected_reply_subject, payload.as_ref())
.await
{
Ok(()) => {
self.ack_state.store(committed, Ordering::Release);
if let Some(ref pending) = self.pending_acks {
decrement_pending_counter(pending);
}
Ok(())
}
Err(err) => {
self.ack_state.store(ACK_STATE_PENDING, Ordering::Release);
Err(JsError::Nats(err))
}
}
}
fn generate_protected_ack_token(&self) -> (String, String) {
let now = wall_now().as_nanos() / 1_000_000_000;
let nonce = self.generate_secure_nonce();
let ack_token = format!(
"{}.{}.{}.{}.{}",
now,
self.sequence,
self.delivered,
nonce,
self.hash_reply_subject_components()
);
let hmac = self.generate_ack_token_hmac(&ack_token);
let protected_reply_subject = format!("{}.{}", self.reply_subject, hmac);
(protected_reply_subject, ack_token)
}
fn generate_secure_nonce(&self) -> u64 {
let mut hasher = 0xcbf2_9ce4_8422_2325_u64;
let now = wall_now().as_nanos();
hasher ^= self.sequence;
hasher = hasher.wrapping_mul(0x0000_0100_0000_01b3);
hasher ^= u64::from(self.delivered);
hasher = hasher.wrapping_mul(0x0000_0100_0000_01b3);
hasher ^= now;
hasher = hasher.wrapping_mul(0x0000_0100_0000_01b3);
for byte in self.reply_subject.as_bytes() {
hasher ^= u64::from(*byte);
hasher = hasher.wrapping_mul(0x0000_0100_0000_01b3);
}
hasher
}
fn generate_ack_token_hmac(&self, token: &str) -> String {
let mut hasher = 0xa5a5_a5a5_a5a5_a5a5_u64;
for byte in self.reply_subject.as_bytes() {
hasher ^= u64::from(*byte);
hasher = hasher.wrapping_mul(0x0000_0100_0000_01b3);
}
for byte in token.as_bytes() {
hasher ^= u64::from(*byte);
hasher = hasher.wrapping_mul(0x0000_0100_0000_01b3);
}
format!("{hasher:016x}")
}
fn hash_reply_subject_components(&self) -> u64 {
let mut hasher = 0xfeed_face_cafe_babe_u64;
for byte in self.subject.as_bytes() {
hasher ^= u64::from(*byte);
hasher = hasher.wrapping_mul(0x0000_0100_0000_01b3);
}
for byte in self.reply_subject.as_bytes() {
hasher ^= u64::from(*byte);
hasher = hasher.wrapping_mul(0x0000_0100_0000_01b3);
}
hasher
}
}
fn build_nak_payload(delay: Duration) -> Cow<'static, [u8]> {
if delay.is_zero() {
Cow::Borrowed(b"-NAK")
} else {
Cow::Owned(format!("-NAK {{\"delay\": {}}}", delay.as_nanos()).into_bytes())
}
}
fn json_escape(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for ch in s.chars() {
match ch {
'"' => out.push_str("\\\""),
'\\' => out.push_str("\\\\"),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
c if c.is_control() => {
write!(&mut out, "\\u{:04x}", c as u32).expect("write to String");
}
c => out.push(c),
}
}
out
}
fn has_json_api_error(json: &str) -> bool {
extract_json_object(json, "error")
.is_some_and(|error_json| extract_json_u64(error_json, "code").is_some())
}
fn json_value_after_key<'a>(json: &'a str, key: &str) -> Option<&'a str> {
let mut search_start = 0;
while search_start < json.len() {
let quote_start = search_start + json[search_start..].find('"')?;
let (matches_key, after_quote) = scan_json_string_literal(json, quote_start, key)?;
search_start = after_quote;
if !matches_key {
continue;
}
if let Some(after_colon) = json[after_quote..].trim_start().strip_prefix(':') {
return Some(after_colon.trim_start());
}
}
None
}
fn scan_json_string_literal(json: &str, quote_start: usize, key: &str) -> Option<(bool, usize)> {
if quote_start.saturating_add(1) >= json.len() {
return None;
}
let mut key_chars = key.chars();
let mut matches_key = true;
let mut escaped = false;
for (offset, ch) in json[quote_start + 1..].char_indices() {
let idx = quote_start.saturating_add(1).saturating_add(offset);
if escaped {
matches_key = false;
escaped = false;
continue;
}
match ch {
'\\' => {
matches_key = false;
escaped = true;
}
'"' => return Some((matches_key && key_chars.next().is_none(), idx + 1)),
_ => {
if key_chars.next() != Some(ch) {
matches_key = false;
}
}
}
}
None
}
fn extract_json_object<'a>(json: &'a str, key: &str) -> Option<&'a str> {
let rest = json_value_after_key(json, key)?;
if !rest.starts_with('{') {
return None;
}
let mut depth = 0usize;
let mut in_string = false;
let mut escaped = false;
for (idx, ch) in rest.char_indices() {
if in_string {
if escaped {
escaped = false;
continue;
}
match ch {
'\\' => escaped = true,
'"' => in_string = false,
_ => {}
}
continue;
}
match ch {
'"' => in_string = true,
'{' => depth += 1,
'}' => {
depth = depth.saturating_sub(1);
if depth == 0 {
return Some(&rest[..=idx]);
}
}
_ => {}
}
}
None
}
fn extract_json_string_simple(json: &str, key: &str) -> Option<String> {
let rest = json_value_after_key(json, key)?;
let slice = rest.strip_prefix('"')?;
let mut chars = slice.char_indices();
let mut result = String::new();
loop {
match chars.next()? {
(_, '"') => return Some(result),
(_, '\\') => {
let (_, esc) = chars.next()?;
match esc {
'b' => result.push('\x08'),
'f' => result.push('\x0C'),
'n' => result.push('\n'),
'r' => result.push('\r'),
't' => result.push('\t'),
'u' => {
let mut hex = String::with_capacity(4);
for _ in 0..4 {
let (_, h) = chars.next()?;
hex.push(h);
}
if let Ok(val) = u32::from_str_radix(&hex, 16) {
if let Some(c) = std::char::from_u32(val) {
result.push(c);
} else {
result.push(std::char::REPLACEMENT_CHARACTER);
}
} else {
result.push(std::char::REPLACEMENT_CHARACTER);
}
}
_ => result.push(esc),
}
}
(_, c) => result.push(c),
}
}
}
fn extract_json_u64(json: &str, key: &str) -> Option<u64> {
let rest = json_value_after_key(json, key)?;
let end = rest
.find(|c: char| !c.is_ascii_digit())
.unwrap_or(rest.len());
rest[..end].parse().ok()
}
fn extract_json_i64_simple(json: &str, key: &str) -> Option<i64> {
let rest = json_value_after_key(json, key)?;
let end = rest
.find(|c: char| !c.is_ascii_digit() && c != '-')
.unwrap_or(rest.len());
rest[..end].parse().ok()
}
fn extract_json_bool(json: &str, key: &str) -> Option<bool> {
let rest = json_value_after_key(json, key)?;
if rest.starts_with("true") {
Some(true)
} else if rest.starts_with("false") {
Some(false)
} else {
None
}
}
#[cfg(test)]
fn base64_encode(data: &[u8]) -> String {
const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut result = String::new();
for chunk in data.chunks(3) {
let n = match chunk.len() {
1 => (u32::from(chunk[0]) << 16, 2),
2 => ((u32::from(chunk[0]) << 16) | (u32::from(chunk[1]) << 8), 3),
3 => (
(u32::from(chunk[0]) << 16) | (u32::from(chunk[1]) << 8) | u32::from(chunk[2]),
4,
),
_ => continue,
};
for i in 0..n.1 {
let idx = ((n.0 >> (18 - 6 * i)) & 0x3F) as usize;
result.push(ALPHABET[idx] as char);
}
}
let padding = (3 - data.len() % 3) % 3;
for _ in 0..padding {
result.push('=');
}
result
}
fn random_id(cx: &Cx) -> String {
format!("{:016x}", cx.random_u64())
}
fn duration_to_nanos_saturating(duration: Duration) -> u64 {
duration.as_nanos().min(u128::from(u64::MAX)) as u64
}
fn validate_stream_subject_pattern(subject: &str) -> Result<(), &'static str> {
if subject.is_empty() {
return Err("must be non-empty");
}
if subject.len() > MAX_STREAM_SUBJECT_BYTES {
return Err("exceeds the 4096-byte NATS subject bound");
}
let tokens: Vec<_> = subject.split('.').collect();
let token_count = tokens.len();
if tokens.iter().any(|token| {
token.is_empty()
|| token
.chars()
.any(|ch| ch.is_ascii_control() || ch.is_whitespace())
}) {
return Err("contains empty tokens, whitespace, or control characters");
}
for (index, token) in tokens.into_iter().enumerate() {
match token {
"*" => {}
">" if index.saturating_add(1) == token_count => {}
">" => return Err("contains an invalid NATS wildcard placement"),
_ if token.contains('*') || token.contains('>') => {
return Err("contains an invalid NATS wildcard placement");
}
_ => {}
}
}
Ok(())
}
fn format_system_time_rfc3339(time: SystemTime) -> String {
const NANOS_PER_SEC: i128 = 1_000_000_000;
const SECS_PER_DAY: i128 = 86_400;
let total_nanos = match time.duration_since(UNIX_EPOCH) {
Ok(duration) => i128::try_from(duration.as_nanos()).unwrap_or(i128::MAX),
Err(err) => -i128::try_from(err.duration().as_nanos()).unwrap_or(i128::MAX),
};
let total_secs = total_nanos.div_euclid(NANOS_PER_SEC);
let nanos = total_nanos.rem_euclid(NANOS_PER_SEC) as u32;
let days = total_secs.div_euclid(SECS_PER_DAY);
let secs_of_day = total_secs.rem_euclid(SECS_PER_DAY) as u32;
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 }.div_euclid(146_097);
let doe = z - era * 146_097;
let yoe = (doe - doe / 1_460 + doe / 36_524 - doe / 146_096).div_euclid(365);
let mut year = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2).div_euclid(153);
let day = doy - (153 * mp + 2).div_euclid(5) + 1;
let month = mp + if mp < 10 { 3 } else { -9 };
if month <= 2 {
year += 1;
}
let hour = secs_of_day / 3_600;
let minute = (secs_of_day % 3_600) / 60;
let second = secs_of_day % 60;
format!(
"{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}.{nanos:09}Z",
month = month as u32,
day = day as u32
)
}
fn compute_client_deadline(now: Time, pull_timeout: Duration, slack: Duration) -> Option<Time> {
if pull_timeout.is_zero() {
None
} else {
let timeout_dur = pull_timeout.saturating_add(slack);
Some(now.saturating_add_nanos(duration_to_nanos_saturating(timeout_dur)))
}
}
fn build_pull_request_json(batch: usize, expires: i64, max_bytes: Option<usize>) -> String {
let mut request = format!("{{\"batch\":{batch},\"expires\":{expires}");
if let Some(max_bytes) = max_bytes {
write!(&mut request, ",\"max_bytes\":{max_bytes}").expect("write to String");
}
request.push('}');
request
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use crate::messaging::NatsConfig;
use crate::test_utils::run_test_with_cx;
use crate::types::{Budget, RegionId, TaskId};
use serde_json::json;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Instant;
fn scrub_js_ack_reply_subject(reply: &str) -> String {
let mut parts: Vec<String> = reply.split('.').map(ToString::to_string).collect();
if parts.len() >= 9 {
let len = parts.len();
parts[len - 4] = "[STREAM_SEQ]".to_string();
parts[len - 3] = "[CONSUMER_SEQ]".to_string();
parts[len - 2] = "[TIMESTAMP]".to_string();
parts[len - 1] = "[PENDING]".to_string();
}
parts.join(".")
}
fn jetstream_ack_snapshot(
subject: &str,
payload: &[u8],
reply_subject: &str,
ack_payload: &str,
) -> serde_json::Value {
let msg = Message {
subject: subject.to_string(),
sid: 7,
headers: None,
payload: payload.to_vec(),
reply_to: Some(reply_subject.to_string()),
};
let js_msg = Consumer::parse_js_message(msg, None).expect("valid JetStream reply subject");
json!({
"subject": js_msg.subject,
"payload_utf8": String::from_utf8_lossy(&js_msg.payload),
"delivered": js_msg.delivered,
"sequence": "[STREAM_SEQ]",
"reply_subject": scrub_js_ack_reply_subject(&js_msg.reply_subject),
"ack": {
"payload": ack_payload,
"terminal": matches!(ack_payload, "+ACK" | "-NAK" | "+TERM"),
}
})
}
#[test]
fn test_stream_config_to_json() {
let config = StreamConfig::new("TEST")
.subjects(&["test.>"])
.max_messages(1000)
.replicas(1);
let json = config.to_json();
assert!(json.contains("\"name\":\"TEST\""));
assert!(json.contains("\"subjects\":[\"test.>\"]"));
assert!(json.contains("\"max_msgs\":1000"));
}
#[test]
fn test_consumer_config_to_json() {
let config = ConsumerConfig::new("my-consumer")
.ack_policy(AckPolicy::Explicit)
.filter_subject("orders.>");
let json = config.to_json();
assert!(json.contains("\"name\":\"my-consumer\""));
assert!(json.contains("\"ack_policy\":\"explicit\""));
assert!(json.contains("\"filter_subject\":\"orders.>\""));
}
#[test]
fn consumer_config_to_json_includes_push_rate_limit_tick146() {
let config = ConsumerConfig::new("push-consumer")
.deliver_subject("deliver.orders")
.rate_limit_bps(8192)
.ack_policy(AckPolicy::Explicit);
let json = config.to_json();
assert!(json.contains("\"deliver_subject\":\"deliver.orders\""));
assert!(json.contains("\"rate_limit_bps\":8192"));
}
#[test]
fn consumer_config_to_json_includes_start_time_for_deliver_by_start_time_tick137() {
let config = ConsumerConfig::new("time-consumer")
.deliver_policy(DeliverPolicy::ByStartTime(
UNIX_EPOCH + Duration::new(42, 123_456_789),
))
.ack_policy(AckPolicy::Explicit);
let json = config.to_json();
assert!(json.contains("\"deliver_policy\":\"by_start_time\""));
assert!(json.contains("\"opt_start_time\":\"1970-01-01T00:00:42.123456789Z\""));
}
#[test]
fn test_ephemeral_consumer_config_to_json() {
let config = ConsumerConfig::ephemeral();
let json = config.to_json();
assert!(json.starts_with("{\"deliver_policy\""));
assert!(!json.contains("{,"));
assert!(json.contains("\"deliver_policy\":\"all\""));
assert!(json.contains("\"ack_policy\":\"explicit\""));
}
#[test]
fn consumer_config_normalizes_deprecated_durable_alias() {
let mut cfg = ConsumerConfig::ephemeral();
cfg.durable_name = Some("worker_1".into());
cfg.normalize_identity().unwrap();
assert_eq!(cfg.name.as_deref(), Some("worker_1"));
assert!(cfg.durable_name.is_none());
assert!(cfg.to_json().contains("\"name\":\"worker_1\""));
assert!(!cfg.to_json().contains("durable_name"));
}
#[test]
fn consumer_config_rejects_mismatched_durable_alias() {
let mut cfg = ConsumerConfig::new("worker_1");
cfg.durable_name = Some("worker_2".into());
let err = cfg.normalize_identity().unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(err.to_string().contains("consumer name mismatch"));
}
#[test]
fn consumer_config_rejects_subject_injecting_names() {
let raw_name = "worker.bad";
let mut cfg = ConsumerConfig::new(raw_name);
let err = cfg.normalize_identity().unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(
err.to_string()
.contains("must contain only ASCII letters, digits, '-' or '_'")
);
assert!(err.to_string().contains("fingerprint"));
assert!(!err.to_string().contains(raw_name));
}
#[test]
fn consumer_config_validate_rejects_invalid_filter_subject_tick140() {
let mut cfg = ConsumerConfig::new("worker");
cfg.filter_subject = Some("orders.>.archived".into());
let err = cfg.validate().unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(err.to_string().contains("filter_subject"));
assert!(err.to_string().contains("invalid NATS wildcard placement"));
}
#[test]
fn consumer_config_validate_rejects_pull_rate_limit_without_deliver_subject_tick146() {
let mut cfg = ConsumerConfig::new("push-worker");
cfg.rate_limit_bps = Some(4096);
let err = cfg.validate().unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(
err.to_string()
.contains("rate_limit_bps requires deliver_subject")
);
}
#[test]
fn consumer_config_validate_rejects_wildcard_deliver_subject_tick146() {
let mut cfg = ConsumerConfig::new("push-worker");
cfg.deliver_subject = Some("deliver.>".into());
let err = cfg.validate().unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(err.to_string().contains("deliver_subject"));
assert!(err.to_string().contains("fully specified NATS subject"));
}
#[test]
fn stream_config_rejects_unicode_confusables() {
let raw_confusable = "orders.prod";
let err = ConsumerConfig::validate_stream_name(raw_confusable).unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(err.to_string().contains("ASCII letters"));
assert!(err.to_string().contains("fingerprint"));
assert!(!err.to_string().contains(raw_confusable));
let raw_slash = "orders/prod";
let err = ConsumerConfig::validate_stream_name(raw_slash).unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(err.to_string().contains("ASCII letters"));
assert!(err.to_string().contains("fingerprint"));
assert!(!err.to_string().contains(raw_slash));
assert!(ConsumerConfig::validate_stream_name("orders_prod-1").is_ok());
}
#[test]
fn stream_name_validation_enforces_byte_boundary_and_keeps_valid_configs() {
let at_cap = "A".repeat(MAX_NAME_BYTES);
let over_cap = "A".repeat(MAX_NAME_BYTES + 1);
let empty = ConsumerConfig::validate_stream_name("").unwrap_err();
assert!(matches!(empty, JsError::InvalidConfig(_)));
assert!(empty.to_string().contains("must be non-empty"));
assert!(ConsumerConfig::validate_stream_name(&at_cap).is_ok());
let cfg = StreamConfig::new(at_cap.clone()).subjects(&["orders.>"]);
assert!(cfg.validate().is_ok());
assert!(cfg.to_json().contains(&format!("\"name\":\"{at_cap}\"")));
let err = ConsumerConfig::validate_stream_name(&over_cap).unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(err.to_string().contains("256-byte cap"));
assert!(!err.to_string().contains(&over_cap));
}
#[test]
fn consumer_name_validation_enforces_char_and_byte_boundaries() {
let empty = ConsumerConfig::validate_consumer_name("name", Some("")).unwrap_err();
assert!(matches!(empty, JsError::InvalidConfig(_)));
assert!(empty.to_string().contains("must be non-empty"));
let at_char_cap = "a".repeat(MAX_CONSUMER_NAME_CHARS);
let over_char_cap = "a".repeat(MAX_CONSUMER_NAME_CHARS + 1);
let over_byte_cap = "🙂".repeat(70);
let mut cfg = ConsumerConfig::new(at_char_cap.clone());
assert!(cfg.validate().is_ok());
assert!(
cfg.to_json()
.contains(&format!("\"name\":\"{at_char_cap}\""))
);
let char_err = ConsumerConfig::new(over_char_cap.clone())
.validate()
.unwrap_err();
assert!(matches!(char_err, JsError::InvalidConfig(_)));
assert!(char_err.to_string().contains("128 characters"));
assert!(!char_err.to_string().contains(&over_char_cap));
let byte_err =
ConsumerConfig::validate_consumer_name("name", Some(&over_byte_cap)).unwrap_err();
assert!(matches!(byte_err, JsError::InvalidConfig(_)));
assert!(byte_err.to_string().contains("256-byte cap"));
assert!(!byte_err.to_string().contains(&over_byte_cap));
}
#[test]
fn pull_batch_validation_enforces_cap_and_keeps_request_shape() {
let zero = validate_pull_batch_size(0).unwrap_err();
assert!(matches!(zero, JsError::InvalidConfig(_)));
assert!(zero.to_string().contains("must be > 0"));
assert!(validate_pull_batch_size(MAX_PULL_BATCH).is_ok());
let over = validate_pull_batch_size(MAX_PULL_BATCH + 1).unwrap_err();
assert!(matches!(over, JsError::InvalidConfig(_)));
assert!(over.to_string().contains("1024-message cap"));
let request = build_pull_request_json(MAX_PULL_BATCH, 0, Some(4096));
assert_eq!(request, r#"{"batch":1024,"expires":0,"max_bytes":4096}"#);
}
#[test]
fn jetstream_length_cap_boundary_matrix_logs_structured_evidence() {
const EXACT_RCH_COMMAND: &str = "rch exec -- env CARGO_TARGET_DIR=${TMPDIR:-/tmp}/rch_target_asupersync_s4p7iq_jetstream cargo test -p asupersync --lib jetstream_length_cap_boundary_matrix_logs_structured_evidence -- --nocapture";
fn log_scenario(
id: &str,
field_under_test: &str,
input_length: usize,
length_unit: &str,
configured_cap: usize,
result: &Result<(), JsError>,
sanitized_name_fingerprint: Option<String>,
) {
let (accepted_rejected_verdict, error_kind) = match result {
Ok(()) => ("accepted", "none"),
Err(JsError::InvalidConfig(_)) => ("rejected", "invalid_config"),
Err(JsError::Nats(_)) => ("rejected", "nats"),
Err(JsError::Api { .. }) => ("rejected", "api"),
Err(JsError::StreamNotFound(_)) => ("rejected", "stream_not_found"),
Err(JsError::ConsumerNotFound { .. }) => ("rejected", "consumer_not_found"),
Err(JsError::NotAcked) => ("rejected", "not_acked"),
Err(JsError::AlreadyAcknowledged) => ("rejected", "already_acknowledged"),
Err(JsError::ParseError(_)) => ("rejected", "parse_error"),
};
eprintln!(
"{}",
json!({
"id": id,
"field_under_test": field_under_test,
"input_length": input_length,
"length_unit": length_unit,
"configured_cap": configured_cap,
"accepted_rejected_verdict": accepted_rejected_verdict,
"error_kind": error_kind,
"sanitized_name_fingerprint": sanitized_name_fingerprint,
"rch_command": EXACT_RCH_COMMAND,
"artifact_paths": [],
"final_length_cap_verdict": "PASS",
})
);
}
let stream_at_cap = "A".repeat(MAX_NAME_BYTES);
let stream_over_cap = "A".repeat(MAX_NAME_BYTES + 1);
let consumer_at_char_cap = "a".repeat(MAX_CONSUMER_NAME_CHARS);
let consumer_over_char_cap = "a".repeat(MAX_CONSUMER_NAME_CHARS + 1);
let consumer_over_byte_cap = "🙂".repeat(70);
let invalid_stream = "orders.bad";
let invalid_consumer = "worker.bad";
let scenarios = [
(
"JETSTREAM-LEN-1",
"stream_name_bytes",
MAX_NAME_BYTES,
"bytes",
MAX_NAME_BYTES,
true,
ConsumerConfig::validate_stream_name(&stream_at_cap),
Some(stream_at_cap.as_str()),
Some(redacted_name_fingerprint(&stream_at_cap)),
),
(
"JETSTREAM-LEN-2",
"stream_name_bytes",
MAX_NAME_BYTES + 1,
"bytes",
MAX_NAME_BYTES,
false,
ConsumerConfig::validate_stream_name(&stream_over_cap),
Some(stream_over_cap.as_str()),
Some(redacted_name_fingerprint(&stream_over_cap)),
),
(
"JETSTREAM-LEN-3",
"stream_name_charset",
invalid_stream.len(),
"bytes",
MAX_NAME_BYTES,
false,
ConsumerConfig::validate_stream_name(invalid_stream),
Some(invalid_stream),
Some(redacted_name_fingerprint(invalid_stream)),
),
(
"JETSTREAM-LEN-4",
"consumer_name_chars",
MAX_CONSUMER_NAME_CHARS,
"chars",
MAX_CONSUMER_NAME_CHARS,
true,
{
let mut cfg = ConsumerConfig::new(consumer_at_char_cap.clone());
cfg.validate()
},
Some(consumer_at_char_cap.as_str()),
Some(redacted_name_fingerprint(&consumer_at_char_cap)),
),
(
"JETSTREAM-LEN-5",
"consumer_name_chars",
MAX_CONSUMER_NAME_CHARS + 1,
"chars",
MAX_CONSUMER_NAME_CHARS,
false,
{
let mut cfg = ConsumerConfig::new(consumer_over_char_cap.clone());
cfg.validate()
},
Some(consumer_over_char_cap.as_str()),
Some(redacted_name_fingerprint(&consumer_over_char_cap)),
),
(
"JETSTREAM-LEN-6",
"consumer_name_bytes",
consumer_over_byte_cap.len(),
"bytes",
MAX_NAME_BYTES,
false,
ConsumerConfig::validate_consumer_name("name", Some(&consumer_over_byte_cap))
.map(|_| ()),
Some(consumer_over_byte_cap.as_str()),
Some(redacted_name_fingerprint(&consumer_over_byte_cap)),
),
(
"JETSTREAM-LEN-7",
"consumer_name_charset",
invalid_consumer.len(),
"bytes",
MAX_CONSUMER_NAME_CHARS,
false,
ConsumerConfig::validate_consumer_name("name", Some(invalid_consumer)).map(|_| ()),
Some(invalid_consumer),
Some(redacted_name_fingerprint(invalid_consumer)),
),
(
"JETSTREAM-LEN-8",
"pull_batch",
0,
"messages",
MAX_PULL_BATCH,
false,
validate_pull_batch_size(0),
None,
None,
),
(
"JETSTREAM-LEN-9",
"pull_batch",
MAX_PULL_BATCH,
"messages",
MAX_PULL_BATCH,
true,
validate_pull_batch_size(MAX_PULL_BATCH),
None,
None,
),
(
"JETSTREAM-LEN-10",
"pull_batch",
MAX_PULL_BATCH + 1,
"messages",
MAX_PULL_BATCH,
false,
validate_pull_batch_size(MAX_PULL_BATCH + 1),
None,
None,
),
];
for (id, field, input_length, unit, cap, expect_ok, result, raw_input, fingerprint) in
scenarios
{
assert_eq!(
result.is_ok(),
expect_ok,
"{id} drifted for {field}: expected ok={expect_ok}, got {result:?}"
);
if let (Err(JsError::InvalidConfig(msg)), Some(raw_input)) = (&result, raw_input) {
assert!(
!msg.contains(raw_input),
"{id} leaked raw input in validation error: {msg}"
);
}
log_scenario(id, field, input_length, unit, cap, &result, fingerprint);
}
eprintln!(
"{}",
json!({
"id": "JETSTREAM-LEN-FINAL",
"rch_command": EXACT_RCH_COMMAND,
"artifact_paths": [],
"final_length_cap_verdict": "PASS",
})
);
}
#[test]
fn test_retention_policy_str() {
assert_eq!(RetentionPolicy::Limits.as_str(), "limits");
assert_eq!(RetentionPolicy::Interest.as_str(), "interest");
assert_eq!(RetentionPolicy::WorkQueue.as_str(), "workqueue");
}
#[test]
fn test_storage_type_str() {
assert_eq!(StorageType::File.as_str(), "file");
assert_eq!(StorageType::Memory.as_str(), "memory");
}
#[test]
fn test_ack_policy_str() {
assert_eq!(AckPolicy::Explicit.as_str(), "explicit");
assert_eq!(AckPolicy::None.as_str(), "none");
assert_eq!(AckPolicy::All.as_str(), "all");
}
#[test]
fn test_deliver_policy_str() {
assert_eq!(DeliverPolicy::All.as_str(), "all");
assert_eq!(DeliverPolicy::New.as_str(), "new");
assert_eq!(
DeliverPolicy::ByStartSequence(7).as_str(),
"by_start_sequence"
);
assert_eq!(
DeliverPolicy::ByStartTime(UNIX_EPOCH).as_str(),
"by_start_time"
);
assert_eq!(DeliverPolicy::Last.as_str(), "last");
assert_eq!(DeliverPolicy::LastPerSubject.as_str(), "last_per_subject");
}
#[test]
fn test_base64_encode() {
assert_eq!(base64_encode(b""), "");
assert_eq!(base64_encode(b"f"), "Zg==");
assert_eq!(base64_encode(b"fo"), "Zm8=");
assert_eq!(base64_encode(b"foo"), "Zm9v");
assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
assert_eq!(base64_encode(b"hello"), "aGVsbG8=");
}
#[test]
fn test_extract_json_u64() {
let json = r#"{"seq":12345,"messages" : 100}"#;
assert_eq!(extract_json_u64(json, "seq"), Some(12345));
assert_eq!(extract_json_u64(json, "messages"), Some(100));
assert_eq!(extract_json_u64(json, "missing"), None);
}
#[test]
fn test_js_error_display() {
assert_eq!(
format!("{}", JsError::StreamNotFound("TEST".to_string())),
"JetStream stream not found: TEST"
);
assert_eq!(
format!(
"{}",
JsError::Api {
code: 10059,
description: "not found".to_string()
}
),
"JetStream API error 10059: not found"
);
assert_eq!(
format!("{}", JsError::NotAcked),
"JetStream message not acknowledged"
);
}
#[test]
fn test_duration_to_nanos_saturating_max_duration() {
assert_eq!(duration_to_nanos_saturating(Duration::MAX), u64::MAX);
}
#[test]
fn test_compute_client_deadline_saturates_for_large_timeout() {
let now = Time::from_nanos(1);
let deadline = compute_client_deadline(now, Duration::MAX, Consumer::CLIENT_TIMEOUT_SLACK);
assert_eq!(deadline, Some(Time::MAX));
}
#[test]
fn pull_request_json_matches_nats_go_pull_max_messages_with_bytes_limit() {
let request = build_pull_request_json(2, 50_000_000, Some(1024));
assert_eq!(
request,
r#"{"batch":2,"expires":50000000,"max_bytes":1024}"#
);
}
#[test]
fn pull_subscriber_state_completes_at_batch_and_ignores_late_terminal_tick126() {
let mut state = PullSubscriberState::new(2);
state.observe_parsed_message();
assert_eq!(state.termination(), PullSubscriberTermination::Active);
state.observe_parsed_message();
assert_eq!(state.termination(), PullSubscriberTermination::Completed);
assert_eq!(state.received(), 2);
state.observe_timeout();
state.observe_closed();
state.observe_error(JsError::InvalidConfig("late".to_string()));
assert_eq!(state.termination(), PullSubscriberTermination::Completed);
assert!(state.result().is_ok());
}
#[test]
fn pull_subscriber_state_error_is_sticky_tick126() {
let mut state = PullSubscriberState::new(3);
state.observe_parsed_message();
state.observe_error(JsError::InvalidConfig("boom".to_string()));
state.observe_parsed_message();
state.observe_closed();
assert_eq!(state.termination(), PullSubscriberTermination::Error);
assert_eq!(state.received(), 1);
assert!(matches!(state.result(), Err(JsError::InvalidConfig(msg)) if msg == "boom"));
}
#[test]
fn pull_timeout_without_messages_finishes_as_empty_batch() {
let mut state = PullSubscriberState::new(1);
state.observe_timeout();
let messages = finish_pull(Vec::new(), state)
.expect("an empty pull timeout is not proof of a JetStream API error");
assert!(messages.is_empty());
}
#[test]
fn ordered_consumer_gap_triggers_reset_pending_tick143() {
let mut state = FuzzOrderedConsumerState {
phase: FuzzOrderedConsumerPhase::Tracking,
last_sequence: None,
accepted_messages: 0,
reset_count: 0,
pending_gap_from: None,
};
fuzz_apply_ordered_consumer_step(
&mut state,
FuzzOrderedConsumerStep::Observe {
sequence: 10,
delivered: 1,
},
);
fuzz_apply_ordered_consumer_step(
&mut state,
FuzzOrderedConsumerStep::Observe {
sequence: 12,
delivered: 1,
},
);
assert_eq!(state.phase, FuzzOrderedConsumerPhase::ResetPending);
assert_eq!(state.last_sequence, Some(10));
assert_eq!(state.accepted_messages, 1);
assert_eq!(state.reset_count, 1);
assert_eq!(state.pending_gap_from, Some(11));
}
#[test]
fn ordered_consumer_reset_completion_clears_gap_and_restarts_tick143() {
let mut state = FuzzOrderedConsumerState {
phase: FuzzOrderedConsumerPhase::ResetPending,
last_sequence: Some(42),
accepted_messages: 3,
reset_count: 1,
pending_gap_from: Some(43),
};
fuzz_apply_ordered_consumer_step(&mut state, FuzzOrderedConsumerStep::CompleteReset);
assert_eq!(state.phase, FuzzOrderedConsumerPhase::Tracking);
assert_eq!(state.last_sequence, None);
assert_eq!(state.pending_gap_from, None);
assert_eq!(state.accepted_messages, 3);
fuzz_apply_ordered_consumer_step(
&mut state,
FuzzOrderedConsumerStep::Observe {
sequence: 100,
delivered: 1,
},
);
assert_eq!(state.phase, FuzzOrderedConsumerPhase::Tracking);
assert_eq!(state.last_sequence, Some(100));
assert_eq!(state.accepted_messages, 4);
assert_eq!(state.reset_count, 1);
}
#[test]
fn max_deliver_rejects_after_cap_and_advances_to_dlq_tick153() {
let mut state = FuzzMaxDeliverState {
max_deliver: 3,
delivered: 0,
accepted_deliveries: 0,
rejected_deliveries: 0,
dlq_messages: 0,
terminal: FuzzMaxDeliverTerminal::Pending,
};
fuzz_apply_max_deliver_step(&mut state, FuzzMaxDeliverStep::Redeliver);
fuzz_apply_max_deliver_step(&mut state, FuzzMaxDeliverStep::Redeliver);
fuzz_apply_max_deliver_step(&mut state, FuzzMaxDeliverStep::Redeliver);
assert_eq!(state.delivered, 3);
assert_eq!(state.accepted_deliveries, 3);
assert_eq!(state.rejected_deliveries, 0);
assert_eq!(state.dlq_messages, 0);
assert_eq!(state.terminal, FuzzMaxDeliverTerminal::Pending);
fuzz_apply_max_deliver_step(&mut state, FuzzMaxDeliverStep::Redeliver);
assert_eq!(state.delivered, 4);
assert_eq!(state.accepted_deliveries, 3);
assert_eq!(state.rejected_deliveries, 1);
assert_eq!(state.dlq_messages, 1);
assert_eq!(state.terminal, FuzzMaxDeliverTerminal::DeadLettered);
fuzz_apply_max_deliver_step(&mut state, FuzzMaxDeliverStep::Redeliver);
assert_eq!(state.delivered, 4);
assert_eq!(state.accepted_deliveries, 3);
assert_eq!(state.rejected_deliveries, 2);
assert_eq!(state.dlq_messages, 1);
assert_eq!(state.terminal, FuzzMaxDeliverTerminal::DeadLettered);
}
#[test]
fn max_deliver_negative_one_keeps_redelivery_unbounded_tick153() {
let mut state = FuzzMaxDeliverState {
max_deliver: -1,
delivered: 0,
accepted_deliveries: 0,
rejected_deliveries: 0,
dlq_messages: 0,
terminal: FuzzMaxDeliverTerminal::Pending,
};
for _ in 0..8 {
fuzz_apply_max_deliver_step(&mut state, FuzzMaxDeliverStep::Redeliver);
}
assert_eq!(state.delivered, 8);
assert_eq!(state.accepted_deliveries, 8);
assert_eq!(state.rejected_deliveries, 0);
assert_eq!(state.dlq_messages, 0);
assert_eq!(state.terminal, FuzzMaxDeliverTerminal::Pending);
}
#[test]
fn js_error_display_all_variants() {
let nats_err = JsError::Nats(NatsError::Io(std::io::Error::other("e")));
assert!(nats_err.to_string().contains("NATS error"));
let api_err = JsError::Api {
code: 404,
description: "not here".into(),
};
assert!(api_err.to_string().contains("404"));
assert!(api_err.to_string().contains("not here"));
let stream_err = JsError::StreamNotFound("ORDERS".into());
assert!(stream_err.to_string().contains("ORDERS"));
let consumer_err = JsError::ConsumerNotFound {
stream: "S".into(),
consumer: "C".into(),
};
assert!(consumer_err.to_string().contains("S/C"));
let not_acked = JsError::NotAcked;
assert!(not_acked.to_string().contains("not acknowledged"));
let invalid = JsError::InvalidConfig("bad".into());
assert!(invalid.to_string().contains("invalid config"));
let parse = JsError::ParseError("json".into());
assert!(parse.to_string().contains("parse error"));
}
#[test]
fn js_error_debug() {
let err = JsError::NotAcked;
let dbg = format!("{err:?}");
assert!(dbg.contains("NotAcked"));
}
#[test]
fn js_error_source_nats() {
let err = JsError::Nats(NatsError::Io(std::io::Error::other("x")));
assert!(std::error::Error::source(&err).is_some());
}
#[test]
fn js_error_source_none_for_others() {
let err = JsError::NotAcked;
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn js_error_from_nats_error() {
let nats = NatsError::Io(std::io::Error::other("z"));
let err: JsError = JsError::from(nats);
assert!(matches!(err, JsError::Nats(_)));
}
#[test]
fn retention_policy_default_debug_copy_eq() {
assert_eq!(RetentionPolicy::default(), RetentionPolicy::Limits);
let p = RetentionPolicy::Interest;
let dbg = format!("{p:?}");
assert!(dbg.contains("Interest"));
let copy = p;
assert_eq!(p, copy);
assert_ne!(p, RetentionPolicy::WorkQueue);
}
#[test]
fn storage_type_default_debug_copy_eq() {
assert_eq!(StorageType::default(), StorageType::File);
let s = StorageType::Memory;
let dbg = format!("{s:?}");
assert!(dbg.contains("Memory"));
let copy = s;
assert_eq!(s, copy);
assert_ne!(s, StorageType::File);
}
#[test]
fn discard_policy_default_debug_copy_eq() {
assert_eq!(DiscardPolicy::default(), DiscardPolicy::Old);
let d = DiscardPolicy::New;
let dbg = format!("{d:?}");
assert!(dbg.contains("New"));
let copy = d;
assert_eq!(d, copy);
}
#[test]
fn deliver_policy_default_debug_copy_eq() {
assert_eq!(DeliverPolicy::default(), DeliverPolicy::All);
let d = DeliverPolicy::Last;
let dbg = format!("{d:?}");
assert!(dbg.contains("Last"));
let copy = d;
assert_eq!(d, copy);
assert_ne!(d, DeliverPolicy::New);
}
#[test]
fn deliver_policy_by_start_sequence() {
let d = DeliverPolicy::ByStartSequence(42);
assert_eq!(d, DeliverPolicy::ByStartSequence(42));
assert_ne!(d, DeliverPolicy::ByStartSequence(99));
}
#[test]
fn deliver_policy_by_start_time_tick137() {
let d = DeliverPolicy::ByStartTime(UNIX_EPOCH + Duration::new(5, 6));
assert_eq!(
d,
DeliverPolicy::ByStartTime(UNIX_EPOCH + Duration::new(5, 6))
);
assert_ne!(
d,
DeliverPolicy::ByStartTime(UNIX_EPOCH + Duration::new(6, 6))
);
}
#[test]
fn format_system_time_rfc3339_handles_epoch_offsets_tick137() {
assert_eq!(
format_system_time_rfc3339(UNIX_EPOCH + Duration::new(42, 123_456_789)),
"1970-01-01T00:00:42.123456789Z"
);
assert_eq!(
format_system_time_rfc3339(
UNIX_EPOCH
.checked_sub(Duration::from_secs(1))
.expect("one-second pre-epoch timestamp should be representable"),
),
"1969-12-31T23:59:59.000000000Z"
);
}
#[test]
fn deliver_by_start_time_serialization_survives_cross_epoch_skew_tick150() {
let base = UNIX_EPOCH + Duration::new(9, 250_000_000);
let skewed = base
.checked_sub(Duration::new(10, 500_000_000))
.expect("cross-epoch skew should stay representable");
let corrected = skewed
.checked_add(Duration::new(10, 500_000_000))
.expect("inverse skew should restore original timestamp");
assert_eq!(
format_system_time_rfc3339(base),
format_system_time_rfc3339(corrected)
);
let json = ConsumerConfig::ephemeral()
.deliver_policy(DeliverPolicy::ByStartTime(skewed))
.to_json();
assert!(json.contains("\"deliver_policy\":\"by_start_time\""));
assert!(json.contains("\"opt_start_time\":\"1969-12-31T23:59:58.750000000Z\""));
}
#[test]
fn ack_policy_default_debug_copy_eq() {
assert_eq!(AckPolicy::default(), AckPolicy::Explicit);
let a = AckPolicy::None;
let dbg = format!("{a:?}");
assert!(dbg.contains("None"));
let copy = a;
assert_eq!(a, copy);
assert_ne!(a, AckPolicy::All);
}
#[test]
fn stream_config_debug_clone() {
let cfg = StreamConfig::new("TEST");
let dbg = format!("{cfg:?}");
assert!(dbg.contains("StreamConfig"));
assert!(dbg.contains("TEST"));
let cloned = cfg;
assert_eq!(cloned.name, "TEST");
}
#[test]
fn stream_config_new_defaults() {
let cfg = StreamConfig::new("EVENTS");
assert_eq!(cfg.name, "EVENTS");
assert!(cfg.subjects.is_empty());
assert_eq!(cfg.retention, RetentionPolicy::Limits);
assert_eq!(cfg.storage, StorageType::File);
assert_eq!(cfg.discard, DiscardPolicy::Old);
assert_eq!(cfg.replicas, 1);
assert!(cfg.max_msgs.is_none());
assert!(cfg.max_bytes.is_none());
assert!(cfg.max_age.is_none());
assert!(cfg.duplicate_window.is_none());
}
#[test]
fn stream_config_builder_chain() {
let cfg = StreamConfig::new("ORDERS")
.subjects(&["orders.>", "returns.>"])
.retention(RetentionPolicy::WorkQueue)
.storage(StorageType::Memory)
.max_messages(1000)
.max_bytes(1_000_000)
.max_age(Duration::from_secs(3600))
.replicas(3)
.duplicate_window(Duration::from_secs(120));
assert_eq!(cfg.subjects.len(), 2);
assert_eq!(cfg.retention, RetentionPolicy::WorkQueue);
assert_eq!(cfg.storage, StorageType::Memory);
assert_eq!(cfg.max_msgs, Some(1000));
assert_eq!(cfg.max_bytes, Some(1_000_000));
assert_eq!(cfg.max_age, Some(Duration::from_secs(3600)));
assert_eq!(cfg.replicas, 3);
assert_eq!(cfg.duplicate_window, Some(Duration::from_secs(120)));
}
#[test]
fn stream_config_validate_accepts_valid_subject_patterns_tick138() {
let cfg = StreamConfig::new("ORDERS")
.subjects(&["orders.*", "returns.>"])
.replicas(1);
assert!(cfg.validate().is_ok());
}
#[test]
fn stream_config_validate_rejects_invalid_subject_patterns_tick138() {
let cfg = StreamConfig::new("ORDERS").subjects(&["orders.>.archived"]);
let err = cfg.validate().unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(err.to_string().contains("subjects[0]"));
assert!(err.to_string().contains("invalid NATS wildcard placement"));
}
#[test]
fn stream_config_validate_rejects_negative_limits_tick138() {
let mut cfg = StreamConfig::new("ORDERS");
cfg.max_bytes = Some(-1);
let err = cfg.validate().unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(err.to_string().contains("max_bytes"));
}
#[test]
fn stream_config_validate_rejects_zero_replicas_tick138() {
let cfg = StreamConfig::new("ORDERS").replicas(0);
let err = cfg.validate().unwrap_err();
assert!(matches!(err, JsError::InvalidConfig(_)));
assert!(err.to_string().contains("replicas"));
}
#[test]
fn consumer_config_debug_clone() {
let cfg = ConsumerConfig::new("processor");
let dbg = format!("{cfg:?}");
assert!(dbg.contains("ConsumerConfig"));
let cloned = cfg;
assert_eq!(cloned.name, Some("processor".into()));
}
#[test]
fn consumer_config_new_defaults() {
let cfg = ConsumerConfig::new("worker");
assert_eq!(cfg.name, Some("worker".into()));
assert!(cfg.durable_name.is_none());
assert_eq!(cfg.deliver_policy, DeliverPolicy::All);
assert_eq!(cfg.ack_policy, AckPolicy::Explicit);
assert_eq!(cfg.ack_wait, Duration::from_secs(30));
assert_eq!(cfg.max_deliver, -1);
assert!(cfg.filter_subject.is_none());
assert_eq!(cfg.max_ack_pending, 1000);
}
#[test]
fn consumer_config_ephemeral() {
let cfg = ConsumerConfig::ephemeral();
assert!(cfg.name.is_none());
assert!(cfg.durable_name.is_none());
}
#[test]
fn consumer_config_builder_chain() {
let cfg = ConsumerConfig::new("c1")
.deliver_policy(DeliverPolicy::New)
.ack_policy(AckPolicy::All)
.ack_wait(Duration::from_secs(60))
.max_deliver(5)
.filter_subject("orders.new");
assert_eq!(cfg.deliver_policy, DeliverPolicy::New);
assert_eq!(cfg.ack_policy, AckPolicy::All);
assert_eq!(cfg.ack_wait, Duration::from_secs(60));
assert_eq!(cfg.max_deliver, 5);
assert_eq!(cfg.filter_subject, Some("orders.new".into()));
}
#[test]
fn stream_state_default_debug_clone() {
let state = StreamState::default();
assert_eq!(state.messages, 0);
assert_eq!(state.bytes, 0);
assert_eq!(state.first_seq, 0);
assert_eq!(state.last_seq, 0);
assert_eq!(state.consumer_count, 0);
let dbg = format!("{state:?}");
assert!(dbg.contains("StreamState"));
let cloned = state;
assert_eq!(cloned.messages, 0);
}
#[test]
fn pub_ack_debug_clone() {
let ack = PubAck {
stream: "ORDERS".into(),
seq: 42,
duplicate: false,
};
let dbg = format!("{ack:?}");
assert!(dbg.contains("PubAck"));
assert!(dbg.contains("ORDERS"));
let cloned = ack;
assert_eq!(cloned.seq, 42);
assert!(!cloned.duplicate);
}
#[test]
fn parse_pub_ack_accepts_whitespace_around_duplicate_bool() {
let payload = br#"{
"stream" : "ORDERS",
"seq" : 42,
"duplicate" : true
}"#;
let ack = JetStreamContext::parse_pub_ack(payload).expect("valid PubAck");
assert_eq!(ack.stream, "ORDERS");
assert_eq!(ack.seq, 42);
assert!(ack.duplicate);
}
#[test]
fn jetstream_puback_duplicate_detection_audit() {
let normal_payload = br#"{
"stream": "TEST_STREAM",
"seq": 100,
"duplicate": false
}"#;
let normal_ack = JetStreamContext::parse_pub_ack(normal_payload)
.expect("normal PubAck should parse successfully");
assert_eq!(normal_ack.stream, "TEST_STREAM");
assert_eq!(normal_ack.seq, 100);
assert!(
!normal_ack.duplicate,
"normal publish should not be marked as duplicate"
);
let duplicate_payload = br#"{
"stream": "TEST_STREAM",
"seq": 100,
"duplicate": true
}"#;
let duplicate_ack = JetStreamContext::parse_pub_ack(duplicate_payload)
.expect("duplicate PubAck should parse successfully and NOT error");
assert_eq!(duplicate_ack.stream, "TEST_STREAM");
assert_eq!(duplicate_ack.seq, 100);
assert!(
duplicate_ack.duplicate,
"duplicate publish should be marked as duplicate"
);
assert!(
normal_ack.duplicate != duplicate_ack.duplicate,
"duplicate flag should correctly distinguish between normal and duplicate publishes"
);
let missing_duplicate_payload = br#"{
"stream": "TEST_STREAM",
"seq": 101
}"#;
let missing_dup_ack = JetStreamContext::parse_pub_ack(missing_duplicate_payload)
.expect("PubAck without duplicate field should parse successfully");
assert_eq!(missing_dup_ack.stream, "TEST_STREAM");
assert_eq!(missing_dup_ack.seq, 101);
assert!(
!missing_dup_ack.duplicate,
"missing duplicate field should default to false"
);
}
#[test]
fn stream_info_debug_clone() {
let info = StreamInfo {
config: StreamConfig::new("S"),
state: StreamState::default(),
};
let dbg = format!("{info:?}");
assert!(dbg.contains("StreamInfo"));
let cloned = info;
assert_eq!(cloned.config.name, "S");
}
#[test]
fn retention_policy_debug_clone_copy_default_eq() {
let r = RetentionPolicy::default();
assert_eq!(r, RetentionPolicy::Limits);
let dbg = format!("{r:?}");
assert!(dbg.contains("Limits"), "{dbg}");
let copied: RetentionPolicy = r;
let cloned = r;
assert_eq!(copied, cloned);
assert_ne!(r, RetentionPolicy::WorkQueue);
}
#[test]
fn storage_type_debug_clone_copy_default_eq() {
let s = StorageType::default();
assert_eq!(s, StorageType::File);
let dbg = format!("{s:?}");
assert!(dbg.contains("File"), "{dbg}");
let copied: StorageType = s;
let cloned = s;
assert_eq!(copied, cloned);
assert_ne!(s, StorageType::Memory);
}
#[test]
fn discard_policy_debug_clone_copy_default_eq() {
let d = DiscardPolicy::default();
assert_eq!(d, DiscardPolicy::Old);
let dbg = format!("{d:?}");
assert!(dbg.contains("Old"), "{dbg}");
let copied: DiscardPolicy = d;
let cloned = d;
assert_eq!(copied, cloned);
assert_ne!(d, DiscardPolicy::New);
}
#[test]
fn stream_state_debug_clone_default() {
let s = StreamState::default();
let dbg = format!("{s:?}");
assert!(dbg.contains("StreamState"), "{dbg}");
assert_eq!(s.messages, 0);
let cloned = s;
assert_eq!(format!("{cloned:?}"), dbg);
}
#[test]
fn parse_js_message_dotted_stream_name() {
let reply = "$JS.ACK.orders.v2.my.consumer.1.42.3.1234567890.5";
let msg = Message {
subject: "test.subject".to_string(),
sid: 1,
headers: None,
payload: b"hello".to_vec(),
reply_to: Some(reply.to_string()),
};
let js_msg = Consumer::parse_js_message(msg, None).expect("should parse dotted names");
assert_eq!(js_msg.delivered, 1);
assert_eq!(js_msg.sequence, 42);
}
#[test]
fn parse_js_message_simple_names() {
let reply = "$JS.ACK.mystream.myconsumer.2.100.50.9999999.10";
let msg = Message {
subject: "test".to_string(),
sid: 1,
headers: None,
payload: vec![],
reply_to: Some(reply.to_string()),
};
let js_msg = Consumer::parse_js_message(msg, None).expect("should parse simple names");
assert_eq!(js_msg.delivered, 2);
assert_eq!(js_msg.sequence, 100);
}
#[test]
fn error_detection_no_false_positive() {
let response = r#"{"stream":"error-handler","seq":1}"#;
assert!(
!has_json_api_error(response),
"data containing 'error' in name should not match error envelope"
);
let error_response = r#"{"error" : {"code" : 404,"description":"not found"}}"#;
assert!(
has_json_api_error(error_response),
"actual error envelope should match"
);
}
#[test]
fn parse_api_error_uses_err_code_for_stream_not_found() {
let json =
r#"{"error" : {"code" : 404,"err_code" : 10059,"description" : "stream not found"}}"#;
let err = JetStreamContext::parse_api_error(json);
assert!(
matches!(err, JsError::StreamNotFound(ref d) if d.contains("stream not found")),
"should classify as StreamNotFound, got: {err:?}"
);
let json2 = r#"{"error":{"code":404,"description":"generic not found"}}"#;
let err2 = JetStreamContext::parse_api_error(json2);
assert!(
matches!(err2, JsError::Api { code: 404, .. }),
"should be generic Api error, got: {err2:?}"
);
}
#[test]
fn parse_stream_info_detects_spaced_error_object() {
let payload =
br#"{"error" : {"code" : 404,"err_code" : 10059,"description" : "stream not found"}}"#;
let err = JetStreamContext::parse_stream_info(payload).expect_err("error response");
assert!(
matches!(err, JsError::StreamNotFound(ref d) if d == "stream not found"),
"spaced error envelope should be classified, got: {err:?}"
);
}
#[test]
fn parse_api_error_ignores_consumer_info_wrapper_shadow_fields() {
let json = r#"{
"type":"io.nats.jetstream.api.v1.consumer_info_response",
"stream_name":"ORDERS",
"name":"worker",
"code":200,
"description":"outer wrapper description",
"state":{"code":201,"description":"nested wrapper description"},
"error":{"code":404,"err_code":10059,"description":"stream not found"}
}"#;
let err = JetStreamContext::parse_api_error(json);
assert!(
matches!(err, JsError::StreamNotFound(ref d) if d == "stream not found"),
"wrapper fields must not override the nested error object, got: {err:?}"
);
let json2 = r#"{
"stream_name":"ORDERS",
"name":"worker",
"code":200,
"description":"outer wrapper description",
"error":{"code":503,"description":"server busy"}
}"#;
let err2 = JetStreamContext::parse_api_error(json2);
assert!(
matches!(err2, JsError::Api { code: 503, ref description } if description == "server busy"),
"API error fields must come from the nested error object, got: {err2:?}"
);
}
#[test]
fn test_extract_json_string_handles_unicode_escape() {
let json = r#"{"name" : "hello\u0020world","other":"val"}"#;
let result = extract_json_string_simple(json, "name");
assert_eq!(
result,
Some("hello world".to_string()),
"unicode escape should be correctly parsed"
);
}
#[test]
fn jetstream_message_ack_format_snapshot_scrubs_sequences() {
insta::assert_json_snapshot!(
"jetstream_message_ack_format_scrubbed",
json!({
"happy": jetstream_ack_snapshot(
"orders.created",
br#"{"event":"created","status":"ok"}"#,
"$JS.ACK.orders.consumer.1.42.7.1713790000000000000.0",
"+ACK",
),
"redeliver": jetstream_ack_snapshot(
"orders.retry",
br#"{"event":"retry","reason":"redelivery"}"#,
"$JS.ACK.orders.v2.retry.worker.3.108.14.1713790000000001234.2",
"-NAK",
),
"term": jetstream_ack_snapshot(
"orders.poison",
br#"{"event":"poison","resolution":"term"}"#,
"$JS.ACK.orders.deadletter.processor.5.512.44.1713790000000005678.1",
"+TERM",
),
})
);
}
#[test]
fn jetstream_nack_with_delay_wire_matches_nats_go_reference_j3z2nb() {
assert_eq!(build_nak_payload(Duration::ZERO).as_ref(), b"-NAK");
assert_eq!(
build_nak_payload(Duration::from_millis(1500)).as_ref(),
br#"-NAK {"delay": 1500000000}"#
);
}
enum DeterministicServerReply {
None,
Request(Vec<u8>),
Pull {
reply_subject: String,
payload: Vec<u8>,
},
}
fn read_crlf_line(stream: &mut std::net::TcpStream) -> Vec<u8> {
use std::io::Read;
let mut line = Vec::new();
let mut byte = [0u8; 1];
loop {
stream.read_exact(&mut byte).expect("read line byte");
line.push(byte[0]);
if line.ends_with(b"\r\n") {
return line;
}
}
}
fn parse_pub_payload_len(header: &str) -> usize {
let parts: Vec<_> = header.split_whitespace().collect();
assert_eq!(parts.first().copied(), Some("PUB"));
assert_eq!(parts.len(), 4, "request publish must include reply-to");
parts[3].parse().expect("parse PUB payload length")
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CapturedPublish {
subject: String,
payload: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct PublishTranscript {
connect: String,
publishes: Vec<CapturedPublish>,
}
fn parse_plain_publish(header: &str) -> (String, usize) {
let parts: Vec<_> = header.split_whitespace().collect();
assert_eq!(parts.first().copied(), Some("PUB"));
assert_eq!(parts.len(), 3, "plain publish must not include reply-to");
(
parts[1].to_string(),
parts[2].parse().expect("parse plain PUB payload length"),
)
}
fn capture_publish_transcript<F, Fut>(publish_count: usize, action: F) -> PublishTranscript
where
F: FnOnce(Cx, std::net::SocketAddr) -> Fut,
Fut: std::future::Future<Output = ()>,
{
let listener =
std::net::TcpListener::bind("127.0.0.1:0").expect("bind JetStream ack listener");
let addr = listener.local_addr().expect("listener addr");
let server = std::thread::spawn(move || {
use std::io::{Read, Write};
let (mut stream, _) = listener.accept().expect("accept test client");
stream
.set_read_timeout(Some(Duration::from_secs(15)))
.expect("set read timeout");
stream
.write_all(
b"INFO {\"server_id\":\"test\",\"server_name\":\"test\",\"version\":\"2.9.0\",\"proto\":1,\"max_payload\":1048576,\"tls_required\":false}\r\n",
)
.expect("write INFO");
stream.flush().expect("flush INFO");
let connect = String::from_utf8(read_crlf_line(&mut stream)).expect("CONNECT utf8");
let mut publishes = Vec::with_capacity(publish_count);
for _ in 0..publish_count {
let publish = String::from_utf8(read_crlf_line(&mut stream)).expect("PUB utf8");
let (subject, payload_len) = parse_plain_publish(&publish);
let mut payload = vec![0_u8; payload_len];
stream.read_exact(&mut payload).expect("read PUB payload");
let mut crlf = [0_u8; 2];
stream.read_exact(&mut crlf).expect("read payload CRLF");
assert_eq!(&crlf, b"\r\n");
publishes.push(CapturedPublish { subject, payload });
}
PublishTranscript { connect, publishes }
});
run_test_with_cx(|cx| action(cx, addr));
server.join().expect("server thread join")
}
fn parse_ack_floor_candidate(reply_subject: &str) -> u64 {
let parts: Vec<_> = reply_subject.split('.').collect();
assert!(
parts.len() >= 9 && parts.starts_with(&["$JS", "ACK"]),
"expected JetStream ACK reply subject, got {reply_subject:?}"
);
parts[parts.len() - 4]
.parse()
.expect("parse JetStream stream sequence")
}
fn reference_ack_floor_history(
policy: AckPolicy,
initial_floor: u64,
subjects: &[String],
) -> Vec<u64> {
let mut floor = initial_floor;
let mut pending_explicit = std::collections::BTreeSet::new();
let mut history = Vec::with_capacity(subjects.len());
for subject in subjects {
let candidate = parse_ack_floor_candidate(subject);
match policy {
AckPolicy::Explicit => {
pending_explicit.insert(candidate);
while pending_explicit.remove(&floor.saturating_add(1)) {
floor = floor.saturating_add(1);
}
}
AckPolicy::All => {
floor = floor.max(candidate);
}
AckPolicy::None => panic!("tick130 models only acking JetStream policies"),
}
history.push(floor);
}
history
}
#[test]
fn terminal_ack_pending_counter_decrements_once_and_saturates_6xjxd7() {
let pending_acks = Arc::new(AtomicUsize::new(3));
let consumer = Consumer {
stream: "ORDERS".to_string(),
name: "processor".to_string(),
prefix: "$JS.API".to_string(),
pending_acks: Arc::clone(&pending_acks),
max_ack_pending: 1000,
pull_rate_limiter: PullRateLimiter::new(),
};
decrement_pending_counter(&pending_acks);
assert_eq!(
consumer.pending_acks(),
2,
"first terminal ack must release exactly one pending credit"
);
decrement_pending_counter(&pending_acks);
assert_eq!(
consumer.pending_acks(),
1,
"second terminal ack must release exactly one pending credit"
);
decrement_pending_counter(&pending_acks);
assert_eq!(
consumer.pending_acks(),
0,
"third terminal ack must release the final pending credit"
);
consumer.decrement_pending();
assert_eq!(
consumer.pending_acks(),
0,
"defensive pending decrement must saturate at zero"
);
assert_eq!(build_nak_payload(Duration::ZERO).as_ref(), b"-NAK");
}
fn capture_wire_transcript<F, Fut>(reply: DeterministicServerReply, action: F) -> String
where
F: FnOnce(Cx, std::net::SocketAddr) -> Fut,
Fut: std::future::Future<Output = ()>,
{
let listener =
std::net::TcpListener::bind("127.0.0.1:0").expect("bind JetStream wire listener");
let addr = listener.local_addr().expect("listener addr");
let server = std::thread::spawn(move || {
use std::io::{Read, Write};
let (mut stream, _) = listener.accept().expect("accept test client");
stream
.set_read_timeout(Some(Duration::from_secs(15)))
.expect("set read timeout");
stream
.write_all(
b"INFO {\"server_id\":\"test\",\"server_name\":\"test\",\"version\":\"2.9.0\",\"proto\":1,\"max_payload\":1048576,\"tls_required\":false}\r\n",
)
.expect("write INFO");
stream.flush().expect("flush INFO");
let connect = String::from_utf8(read_crlf_line(&mut stream)).expect("CONNECT utf8");
let subscribe = String::from_utf8(read_crlf_line(&mut stream)).expect("SUB utf8");
let publish = String::from_utf8(read_crlf_line(&mut stream)).expect("PUB utf8");
let payload_len = parse_pub_payload_len(&publish);
let mut payload = vec![0_u8; payload_len + 2];
stream.read_exact(&mut payload).expect("read PUB payload");
let mut subscribe_parts = subscribe.split_whitespace();
assert_eq!(subscribe_parts.next(), Some("SUB"));
let inbox = subscribe_parts.next().expect("SUB subject").to_string();
let sid = subscribe_parts.next().expect("SUB sid").to_string();
match reply {
DeterministicServerReply::None => {}
DeterministicServerReply::Request(response_payload) => {
let response_header =
format!("MSG {inbox} {sid} {}\r\n", response_payload.len());
stream
.write_all(response_header.as_bytes())
.expect("write response header");
stream
.write_all(&response_payload)
.expect("write response payload");
stream
.write_all(b"\r\n")
.expect("write response terminator");
stream.flush().expect("flush response");
}
DeterministicServerReply::Pull {
reply_subject,
payload: response_payload,
} => {
let response_header = format!(
"MSG {inbox} {sid} {reply_subject} {}\r\n",
response_payload.len()
);
stream
.write_all(response_header.as_bytes())
.expect("write pull response header");
stream
.write_all(&response_payload)
.expect("write pull response payload");
stream
.write_all(b"\r\n")
.expect("write pull response terminator");
stream.flush().expect("flush pull response");
}
}
let unsubscribe = String::from_utf8(read_crlf_line(&mut stream)).expect("UNSUB utf8");
[
connect,
subscribe,
publish,
String::from_utf8(payload).expect("payload utf8"),
unsubscribe,
]
.into_iter()
.map(|frame| frame.replace(&inbox, "[INBOX]"))
.collect::<String>()
});
run_test_with_cx(|cx| action(cx, addr));
server.join().expect("server thread join")
}
#[test]
fn jetstream_publish_backpressure_releases_slot_after_response() {
let gate = JetStreamPublishBackpressureGate::new(Default::default());
let cx = crate::cx::Cx::new(
RegionId::testing_default(),
TaskId::testing_default(),
Budget::INFINITE,
);
assert_eq!(gate.in_flight_publishes.load(Ordering::Relaxed), 0);
let permit = gate
.begin_publish(&cx, "orders.created")
.expect("first publish permit");
assert_eq!(gate.in_flight_publishes.load(Ordering::Relaxed), 1);
drop(permit);
assert_eq!(gate.in_flight_publishes.load(Ordering::Relaxed), 0);
}
#[test]
fn jetstream_publish_refuses_before_wire_under_emergency_pressure() {
let transcript = capture_publish_transcript(0, |cx, addr| async move {
let pressure = Arc::new(crate::types::SystemPressure::with_headroom(0.0));
let cx = cx.with_pressure(pressure);
let mut js = JetStreamContext::new(
NatsClient::connect_with_config(
&cx,
NatsConfig {
host: addr.ip().to_string(),
port: addr.port(),
..Default::default()
},
)
.await
.expect("connect publish protocol server"),
);
let err = js
.publish(&cx, "orders.created", b"ping")
.await
.expect_err("emergency pressure should refuse publish");
assert!(
matches!(err, JsError::Api { code: 429, .. }),
"expected local 429 backpressure error, got {err:?}"
);
});
assert!(
transcript.publishes.is_empty(),
"emergency pressure refusal must happen before any PUB frame"
);
}
#[test]
fn jetstream_api_pub_sub_consume_match_raw_nats_wire_tick122() {
let publish_reply = br#"{"stream":"ORDERS","seq":7}"#.to_vec();
let publish_wire = capture_wire_transcript(
DeterministicServerReply::Request(publish_reply.clone()),
|cx, addr| async move {
let mut js = JetStreamContext::new(
NatsClient::connect_with_config(
&cx,
NatsConfig {
host: addr.ip().to_string(),
port: addr.port(),
..Default::default()
},
)
.await
.expect("connect publish protocol server"),
);
let ack = js
.publish(&cx, "orders.created", b"ping")
.await
.expect("JetStream publish");
assert_eq!(ack.stream, "ORDERS");
assert_eq!(ack.seq, 7);
},
);
let raw_publish_wire = capture_wire_transcript(
DeterministicServerReply::Request(publish_reply.clone()),
move |cx, addr| {
let publish_reply = publish_reply.clone();
async move {
let mut client = NatsClient::connect_with_config(
&cx,
NatsConfig {
host: addr.ip().to_string(),
port: addr.port(),
..Default::default()
},
)
.await
.expect("connect raw publish protocol server");
let response = client
.request(&cx, "orders.created", b"ping")
.await
.expect("raw publish request");
assert_eq!(response.payload, publish_reply);
}
},
);
assert_eq!(
publish_wire, raw_publish_wire,
"JetStream publish must emit the same NATS wire bytes as raw request"
);
let create_reply = br#"{"name":"processor"}"#.to_vec();
let create_wire = capture_wire_transcript(
DeterministicServerReply::Request(create_reply.clone()),
|cx, addr| async move {
let mut js = JetStreamContext::new(
NatsClient::connect_with_config(
&cx,
NatsConfig {
host: addr.ip().to_string(),
port: addr.port(),
..Default::default()
},
)
.await
.expect("connect create-consumer protocol server"),
);
let consumer = js
.create_consumer(&cx, "ORDERS", ConsumerConfig::new("processor"))
.await
.expect("JetStream create_consumer");
assert_eq!(consumer.stream(), "ORDERS");
assert_eq!(consumer.name(), "processor");
},
);
let raw_create_wire = capture_wire_transcript(
DeterministicServerReply::Request(create_reply.clone()),
move |cx, addr| {
let create_reply = create_reply.clone();
async move {
let mut client = NatsClient::connect_with_config(
&cx,
NatsConfig {
host: addr.ip().to_string(),
port: addr.port(),
..Default::default()
},
)
.await
.expect("connect raw create-consumer protocol server");
let config = ConsumerConfig::new("processor");
let payload = format!(
"{{\"stream_name\":\"{}\",\"config\":{}}}",
json_escape("ORDERS"),
config.to_json()
);
let response = client
.request(
&cx,
"$JS.API.CONSUMER.CREATE.ORDERS.processor",
payload.as_bytes(),
)
.await
.expect("raw create-consumer request");
assert_eq!(response.payload, create_reply);
}
},
);
assert_eq!(
create_wire, raw_create_wire,
"JetStream create_consumer must emit the same NATS wire bytes as raw request"
);
let pull_reply_subject =
"$JS.ACK.ORDERS.processor.1.42.7.1713790000000000000.0".to_string();
let pull_payload = b"msg".to_vec();
let pull_wire = capture_wire_transcript(
DeterministicServerReply::Pull {
reply_subject: pull_reply_subject.clone(),
payload: pull_payload.clone(),
},
move |cx, addr| {
let pull_reply_subject = pull_reply_subject.clone();
let pull_payload = pull_payload.clone();
async move {
let mut client = NatsClient::connect_with_config(
&cx,
NatsConfig {
host: addr.ip().to_string(),
port: addr.port(),
..Default::default()
},
)
.await
.expect("connect pull protocol server");
let consumer = Consumer {
stream: "ORDERS".to_string(),
name: "processor".to_string(),
prefix: "$JS.API".to_string(),
pending_acks: Arc::new(AtomicUsize::new(0)),
max_ack_pending: 1000,
pull_rate_limiter: PullRateLimiter::new(),
};
let messages = consumer
.pull(&mut client, &cx, 1)
.await
.expect("JetStream pull");
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload, pull_payload);
assert_eq!(messages[0].reply_subject, pull_reply_subject);
}
},
);
let raw_pull_wire =
capture_wire_transcript(DeterministicServerReply::None, |cx, addr| async move {
let mut client = NatsClient::connect_with_config(
&cx,
NatsConfig {
host: addr.ip().to_string(),
port: addr.port(),
..Default::default()
},
)
.await
.expect("connect raw pull protocol server");
let inbox = format!("_INBOX.{}", random_id(&cx));
let sub = client
.subscribe(&cx, &inbox)
.await
.expect("raw pull subscribe");
let expires = duration_to_nanos_saturating(Consumer::DEFAULT_PULL_TIMEOUT);
let request = build_pull_request_json(1, expires as i64, None);
client
.publish_request(
&cx,
"$JS.API.CONSUMER.MSG.NEXT.ORDERS.processor",
&inbox,
request.as_bytes(),
)
.await
.expect("raw pull publish_request");
client
.unsubscribe(&cx, sub.sid())
.await
.expect("raw pull unsubscribe");
});
assert_eq!(
pull_wire, raw_pull_wire,
"JetStream pull must emit the same NATS wire bytes as the raw subscribe/publish_request sequence"
);
}
#[test]
fn push_consumer_rate_limit_matches_raw_nats_reference_tick146() {
let create_reply = br#"{"name":"push-rate"}"#.to_vec();
let create_wire = capture_wire_transcript(
DeterministicServerReply::Request(create_reply.clone()),
|cx, addr| async move {
let mut js = JetStreamContext::new(
NatsClient::connect_with_config(
&cx,
NatsConfig {
host: addr.ip().to_string(),
port: addr.port(),
..Default::default()
},
)
.await
.expect("connect JetStream push create-consumer protocol server"),
);
let consumer = js
.create_consumer(
&cx,
"ORDERS",
ConsumerConfig::new("push-rate")
.deliver_subject("deliver.orders")
.rate_limit_bps(8192)
.ack_policy(AckPolicy::Explicit),
)
.await
.expect("JetStream push create_consumer");
assert_eq!(consumer.stream(), "ORDERS");
assert_eq!(consumer.name(), "push-rate");
},
);
let raw_create_wire = capture_wire_transcript(
DeterministicServerReply::Request(create_reply.clone()),
move |cx, addr| {
let create_reply = create_reply.clone();
async move {
let mut client = NatsClient::connect_with_config(
&cx,
NatsConfig {
host: addr.ip().to_string(),
port: addr.port(),
..Default::default()
},
)
.await
.expect("connect raw push create-consumer protocol server");
let config = ConsumerConfig::new("push-rate")
.deliver_subject("deliver.orders")
.rate_limit_bps(8192)
.ack_policy(AckPolicy::Explicit);
let payload = format!(
"{{\"stream_name\":\"{}\",\"config\":{}}}",
json_escape("ORDERS"),
config.to_json()
);
let response = client
.request(
&cx,
"$JS.API.CONSUMER.CREATE.ORDERS.push-rate",
payload.as_bytes(),
)
.await
.expect("raw push create-consumer request");
assert_eq!(response.payload, create_reply);
}
},
);
assert_eq!(
create_wire, raw_create_wire,
"JetStream push create_consumer must emit the same NATS wire bytes as raw request when rate limiting is configured"
);
assert!(
create_wire.contains("\"deliver_subject\":\"deliver.orders\""),
"push create_consumer wire body must serialize deliver_subject, got: {create_wire}"
);
assert!(
create_wire.contains("\"rate_limit_bps\":8192"),
"push create_consumer wire body must serialize rate_limit_bps, got: {create_wire}"
);
}
#[test]
fn durable_consumer_ack_floor_matches_raw_nats_reference_tick130() {
let cases = [
(AckPolicy::Explicit, "explicit", vec![9_u64, 11_u64]),
(AckPolicy::All, "all", vec![11_u64, 11_u64]),
];
for (policy, policy_name, expected_floor_history) in cases {
let config = ConsumerConfig::new("processor").ack_policy(policy);
let create_payload = format!(
"{{\"stream_name\":\"{}\",\"config\":{}}}",
json_escape("ORDERS"),
config.to_json()
);
assert!(
create_payload.contains(&format!("\"ack_policy\":\"{policy_name}\"")),
"durable create_consumer body must serialize ack_policy={policy_name}, got: {create_payload}"
);
let reply_subjects = vec![
"$JS.ACK.ORDERS.processor.1.11.11.1713790000000000001.0".to_string(),
"$JS.ACK.ORDERS.processor.1.10.10.1713790000000000000.1".to_string(),
];
let jetstream_subjects = reply_subjects.clone();
let raw_subjects = reply_subjects.clone();
let jetstream_floor_history =
reference_ack_floor_history(policy, 9, &jetstream_subjects);
let raw_floor_history = reference_ack_floor_history(policy, 9, &raw_subjects);
assert_eq!(
jetstream_floor_history, raw_floor_history,
"JetStream durable ack floor must match the raw NATS reference model for ack_policy={policy_name}"
);
assert_eq!(
jetstream_floor_history, expected_floor_history,
"unexpected ack-floor progression for ack_policy={policy_name}"
);
}
}
#[test]
fn explicit_ack_terminal_state_is_idempotent_tick112() {
let msg = JsMessage {
subject: "orders.created".to_string(),
payload: br#"{"event":"created"}"#.to_vec(),
sequence: 42,
delivered: 1,
reply_subject: "$JS.ACK.ORDERS.processor.1.42.7.1713790000000000000.0".to_string(),
ack_state: AtomicU8::new(ACK_STATE_PENDING),
pending_acks: None,
};
assert!(!msg.is_acked());
assert_eq!(
TerminalAckKind::Ack.in_flight_state(),
ACK_STATE_ACK_IN_FLIGHT
);
assert_eq!(TerminalAckKind::Ack.committed_state(), ACK_STATE_ACKED);
assert!(TerminalAckKind::Ack.is_idempotent());
msg.ack_state
.compare_exchange(
ACK_STATE_PENDING,
TerminalAckKind::Ack.in_flight_state(),
Ordering::AcqRel,
Ordering::Acquire,
)
.expect("first explicit ack reserves the terminal slot");
msg.ack_state
.store(TerminalAckKind::Ack.committed_state(), Ordering::Release);
assert!(msg.is_acked());
let repeated_ack_is_noop = msg.ack_state.load(Ordering::Acquire)
== TerminalAckKind::Ack.committed_state()
&& TerminalAckKind::Ack.is_idempotent();
assert!(
repeated_ack_is_noop,
"a repeated explicit ACK must be a terminal-state no-op"
);
}
struct JetStreamTestLogger {
suite_name: String,
test_name: String,
start_time: Instant,
phase_counter: AtomicU32,
}
impl JetStreamTestLogger {
fn new(suite: &str, test: &str) -> Self {
let logger = Self {
suite_name: suite.to_string(),
test_name: test.to_string(),
start_time: Instant::now(),
phase_counter: AtomicU32::new(0),
};
eprintln!(
"{{\"ts\":\"{}\",\"suite\":\"{}\",\"test\":\"{}\",\"event\":\"test_start\"}}",
format_ts(),
logger.suite_name,
logger.test_name
);
logger
}
fn phase(&self, phase: &str) {
let phase_num = self.phase_counter.fetch_add(1, Ordering::Relaxed);
eprintln!(
"{{\"ts\":\"{}\",\"suite\":\"{}\",\"test\":\"{}\",\"phase\":\"{}\",\"phase_num\":{},\"event\":\"phase_start\"}}",
format_ts(),
self.suite_name,
self.test_name,
phase,
phase_num
);
}
fn server_snapshot(&self, url: &str, streams: usize, consumers: usize) {
eprintln!(
"{{\"ts\":\"{}\",\"suite\":\"{}\",\"test\":\"{}\",\"event\":\"server_snapshot\",\"data\":{{\"url\":\"{}\",\"streams\":{},\"consumers\":{}}}}}",
format_ts(),
self.suite_name,
self.test_name,
url,
streams,
consumers
);
}
fn test_end(&self, result: &str) {
let duration_ms = self.start_time.elapsed().as_millis();
eprintln!(
"{{\"ts\":\"{}\",\"suite\":\"{}\",\"test\":\"{}\",\"event\":\"test_end\",\"data\":{{\"result\":\"{}\",\"duration_ms\":{}}}}}",
format_ts(),
self.suite_name,
self.test_name,
result,
duration_ms
);
}
}
fn format_ts() -> String {
let duration = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
format!("unix:{}.{:09}", duration.as_secs(), duration.subsec_nanos())
}
struct JetStreamTestHarness {
logger: JetStreamTestLogger,
nats_url: String,
cleanup_streams: Vec<String>,
}
impl JetStreamTestHarness {
fn new(suite: &str, test: &str) -> Self {
let nats_url = Self::get_test_nats_url();
let logger = JetStreamTestLogger::new(suite, test);
assert!(
!nats_url.contains("prod")
&& !nats_url.contains("live")
&& (nats_url.contains("localhost")
|| nats_url.contains("127.0.0.1")
|| nats_url.contains("test")),
"SAFETY: Test harness must not connect to production NATS. Got: {}",
nats_url
);
logger.server_snapshot(&nats_url, 0, 0);
Self {
logger,
nats_url,
cleanup_streams: Vec::new(),
}
}
fn get_test_nats_url() -> String {
std::env::var("NATS_TEST_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string())
}
async fn connect_client(&self, cx: &Cx) -> NatsClient {
NatsClient::connect(cx, &self.nats_url)
.await
.expect("connect NATS_TEST_URL JetStream server")
}
fn track_stream(&mut self, name: &str) {
self.cleanup_streams.push(name.to_string());
}
async fn cleanup(&mut self, js: &mut JetStreamContext, cx: &Cx) {
for stream in std::mem::take(&mut self.cleanup_streams) {
match js.delete_stream(cx, &stream).await {
Ok(()) | Err(JsError::StreamNotFound(_)) => {}
Err(err) => panic!("delete JetStream test stream {stream}: {err:?}"),
}
}
}
}
impl Drop for JetStreamTestHarness {
fn drop(&mut self) {
if !self.cleanup_streams.is_empty() {
eprintln!(
"{{\"ts\":\"{}\",\"suite\":\"{}\",\"test\":\"{}\",\"event\":\"cleanup_warning\",\"data\":{{\"unclean_streams\":{}}}}}",
format_ts(),
self.logger.suite_name,
self.logger.test_name,
self.cleanup_streams.len(),
);
}
}
}
fn create_test_stream_config(test_name: &str) -> StreamConfig {
let stream_name = format!(
"TEST_{}_{}_{}",
test_name.to_uppercase(),
std::process::id(),
fastrand::u32(..10_000)
);
StreamConfig::new(stream_name)
.subjects(&[&format!("test.{}.>", test_name)])
.storage(StorageType::Memory) .max_messages(1000)
.max_age(Duration::from_secs(300)) .duplicate_window(Duration::from_secs(60))
}
fn create_test_consumer_config(test_name: &str) -> ConsumerConfig {
let consumer_name = format!(
"test_consumer_{}_{}_{}",
test_name,
std::process::id(),
fastrand::u32(..10_000)
);
ConsumerConfig::new(consumer_name)
.ack_policy(AckPolicy::Explicit)
.ack_wait(Duration::from_secs(30))
.max_deliver(3)
}
#[ignore = "requires real NATS server - run with NATS_TEST_URL"]
#[test]
fn test_jetstream_consumer_pull_real_server() {
run_test_with_cx(|cx| async move {
let mut harness = JetStreamTestHarness::new("jetstream_integration", "consumer_pull");
harness.logger.phase("setup");
let client = harness.connect_client(&cx).await;
let mut js = JetStreamContext::new(client);
harness.logger.phase("create_stream");
let stream_config = create_test_stream_config("consumer_pull");
let stream_info = js
.create_stream(&cx, stream_config)
.await
.expect("create JetStream stream");
harness.track_stream(&stream_info.config.name);
harness.logger.server_snapshot(&harness.nats_url, 1, 0);
harness.logger.phase("create_consumer");
let consumer_config = create_test_consumer_config("consumer_pull");
let consumer = js
.create_consumer(&cx, &stream_info.config.name, consumer_config)
.await
.expect("create JetStream consumer");
harness.logger.phase("publish_messages");
for i in 0..5 {
let subject = format!("test.consumer_pull.{i}");
let payload = format!("test message {i}");
let ack = js
.publish(&cx, &subject, payload.as_bytes())
.await
.expect("publish JetStream message");
assert!(!ack.duplicate);
}
harness.logger.phase("pull_messages");
let messages = consumer
.pull_with_timeout(js.client(), &cx, 5, Duration::from_secs(2))
.await
.expect("pull JetStream messages");
assert_eq!(messages.len(), 5);
harness.logger.phase("ack_messages");
for msg in &messages {
msg.ack(js.client(), &cx)
.await
.expect("ack JetStream message");
assert!(msg.is_acked());
}
harness.cleanup(&mut js, &cx).await;
harness.logger.test_end("pass");
});
}
#[ignore = "requires real NATS server - run with NATS_TEST_URL"]
#[test]
fn test_jetstream_message_ack_nack_real_server() {
run_test_with_cx(|cx| async move {
let mut harness =
JetStreamTestHarness::new("jetstream_integration", "message_ack_nack");
harness.logger.phase("setup");
let client = harness.connect_client(&cx).await;
let mut js = JetStreamContext::new(client);
harness.logger.phase("create_stream");
let stream_config = create_test_stream_config("ack_nack");
let stream_info = js
.create_stream(&cx, stream_config)
.await
.expect("create ack/nack stream");
harness.track_stream(&stream_info.config.name);
harness.logger.phase("create_consumer");
let consumer = js
.create_consumer(
&cx,
&stream_info.config.name,
create_test_consumer_config("ack_nack"),
)
.await
.expect("create ack/nack consumer");
harness.logger.phase("publish_message");
js.publish(&cx, "test.ack_nack.msg", b"ack-nack")
.await
.expect("publish ack/nack message");
harness.logger.phase("nack_message");
let first_delivery = consumer
.pull_with_timeout(js.client(), &cx, 1, Duration::from_secs(2))
.await
.expect("pull first delivery");
assert_eq!(first_delivery.len(), 1);
let sequence = first_delivery[0].sequence;
first_delivery[0]
.nack(js.client(), &cx)
.await
.expect("nack first delivery");
harness.logger.phase("ack_redelivery");
let redelivery = consumer
.pull_with_timeout(js.client(), &cx, 1, Duration::from_secs(2))
.await
.expect("pull redelivery after nack");
assert_eq!(redelivery.len(), 1);
assert_eq!(redelivery[0].sequence, sequence);
assert!(redelivery[0].delivered >= 2);
redelivery[0]
.ack(js.client(), &cx)
.await
.expect("ack redelivery");
harness.cleanup(&mut js, &cx).await;
harness.logger.test_end("pass");
});
}
mod jetstream_ack_timeout_redelivery_audit {
use super::*;
use std::sync::atomic::{AtomicU8, Ordering};
#[test]
fn ack_timeout_causes_server_side_redelivery() {
let config = ConsumerConfig::new("timeout_test").ack_wait(Duration::from_secs(5));
assert_eq!(config.ack_wait, Duration::from_secs(5));
}
#[test]
fn redelivered_messages_carry_sequence_for_deduplication() {
let msg_original = JsMessage {
subject: "orders.process".to_string(),
payload: b"{\"order_id\": 12345}".to_vec(),
sequence: 100, delivered: 1, reply_subject: "$JS.ACK.orders.processor.1.100.15.1234567890.0".to_string(),
ack_state: AtomicU8::new(ACK_STATE_PENDING),
pending_acks: None,
};
let msg_redelivered = JsMessage {
subject: "orders.process".to_string(),
payload: b"{\"order_id\": 12345}".to_vec(),
sequence: 100, delivered: 2, reply_subject: "$JS.ACK.orders.processor.1.100.15.1234567890.1".to_string(),
ack_state: AtomicU8::new(ACK_STATE_PENDING),
pending_acks: None,
};
assert_eq!(msg_original.sequence, msg_redelivered.sequence);
assert_ne!(msg_original.delivered, msg_redelivered.delivered);
let processed_before_first_delivery = std::collections::HashSet::<u64>::new();
let should_process_original =
!processed_before_first_delivery.contains(&msg_original.sequence);
assert!(should_process_original);
let processed_after_first_delivery = std::collections::HashSet::from([100u64]);
let should_process_redelivered =
!processed_after_first_delivery.contains(&msg_redelivered.sequence);
assert!(!should_process_redelivered); }
#[test]
fn flow_control_prevents_redelivery_buildup() {
let config = ConsumerConfig::new("flow_test")
.max_ack_pending(100) .ack_wait(Duration::from_secs(10));
assert_eq!(config.max_ack_pending, 100);
}
#[test]
fn dropped_messages_logged_for_redelivery_awareness() {
let msg = JsMessage {
subject: "test".to_string(),
payload: vec![1, 2, 3],
sequence: 42,
delivered: 1,
reply_subject: "$JS.ACK.test.consumer.1.42.1.1234567890.0".to_string(),
ack_state: AtomicU8::new(ACK_STATE_PENDING),
pending_acks: None,
};
assert!(!msg.is_acked());
drop(msg);
}
#[test]
fn ordered_consumer_handles_redelivery_gaps() {
let mut state = FuzzOrderedConsumerState {
phase: FuzzOrderedConsumerPhase::Tracking,
last_sequence: Some(100),
accepted_messages: 1,
reset_count: 0,
pending_gap_from: None,
};
fuzz_apply_ordered_consumer_step(
&mut state,
FuzzOrderedConsumerStep::Observe {
sequence: 102,
delivered: 1,
},
);
assert_eq!(state.phase, FuzzOrderedConsumerPhase::ResetPending);
assert_eq!(state.pending_gap_from, Some(101));
}
#[test]
fn ack_state_prevents_double_acknowledgment() {
let msg = JsMessage {
subject: "test".to_string(),
payload: vec![],
sequence: 1,
delivered: 2, reply_subject: "$JS.ACK.test.consumer.1.1.2.1234567890.0".to_string(),
ack_state: AtomicU8::new(ACK_STATE_PENDING),
pending_acks: None,
};
assert!(!msg.is_acked());
msg.ack_state.store(ACK_STATE_ACKED, Ordering::Release);
assert!(msg.is_acked());
}
}
#[ignore = "requires real NATS server - run with NATS_TEST_URL"]
#[test]
fn test_jetstream_publish_with_deduplication() {
run_test_with_cx(|cx| async move {
let mut harness = JetStreamTestHarness::new("jetstream_integration", "deduplication");
harness.logger.phase("setup");
let client = harness.connect_client(&cx).await;
let mut js = JetStreamContext::new(client);
harness.logger.phase("create_stream");
let stream_config = create_test_stream_config("deduplication");
let stream_info = js
.create_stream(&cx, stream_config)
.await
.expect("create deduplication stream");
harness.track_stream(&stream_info.config.name);
harness.logger.phase("publish_duplicate_id");
let first = js
.publish_with_id(&cx, "test.deduplication.msg", "dedup-key-1", b"payload")
.await
.expect("publish first message id");
let second = js
.publish_with_id(&cx, "test.deduplication.msg", "dedup-key-1", b"payload")
.await
.expect("publish duplicate message id");
assert!(!first.duplicate);
assert!(second.duplicate);
assert_eq!(first.seq, second.seq);
harness.cleanup(&mut js, &cx).await;
harness.logger.test_end("pass");
});
}
#[ignore = "requires real NATS server - run with NATS_TEST_URL"]
#[test]
fn test_jetstream_consumer_timeout_behavior() {
run_test_with_cx(|cx| async move {
let mut harness =
JetStreamTestHarness::new("jetstream_integration", "consumer_timeout");
harness.logger.phase("setup");
let client = harness.connect_client(&cx).await;
let mut js = JetStreamContext::new(client);
harness.logger.phase("create_empty_stream");
let stream_config = create_test_stream_config("consumer_timeout");
let stream_info = js
.create_stream(&cx, stream_config)
.await
.expect("create timeout stream");
harness.track_stream(&stream_info.config.name);
let consumer = js
.create_consumer(
&cx,
&stream_info.config.name,
create_test_consumer_config("consumer_timeout"),
)
.await
.expect("create timeout consumer");
harness.logger.phase("pull_empty_stream");
let started = Instant::now();
let messages = consumer
.pull_with_timeout(js.client(), &cx, 1, Duration::from_millis(150))
.await
.expect("pull empty stream with finite timeout");
assert!(messages.is_empty());
assert!(
started.elapsed() < Duration::from_secs(5),
"finite pull timeout must not hang indefinitely"
);
harness.cleanup(&mut js, &cx).await;
harness.logger.test_end("pass");
});
}
#[ignore = "requires real NATS server - run with NATS_TEST_URL"]
#[test]
fn test_jetstream_connection_failure_recovery() {
run_test_with_cx(|cx| async move {
let harness = JetStreamTestHarness::new("jetstream_integration", "connection_recovery");
harness.logger.phase("connection_refused");
let err = NatsClient::connect(&cx, "nats://127.0.0.1:0")
.await
.expect_err("port 0 must refuse a client connection");
assert!(
err.is_connection_error(),
"connection refusal must surface as a connection error, got {err:?}"
);
harness.logger.test_end("pass");
});
}
#[test]
fn audit_timeout_interpretation_as_no_wait_flag() {
let zero_timeout = Duration::ZERO;
assert!(
zero_timeout.is_zero(),
"Duration::ZERO must register as zero"
);
let expires_zero = if zero_timeout.is_zero() {
0_i64
} else {
zero_timeout.as_nanos() as i64
};
assert_eq!(
expires_zero, 0,
"Zero timeout must convert to expires=0 (immediate return, no_wait mode)"
);
let wait_timeout = Duration::from_millis(100);
assert!(
!wait_timeout.is_zero(),
"Non-zero duration must not register as zero"
);
let expires_nonzero = if wait_timeout.is_zero() {
0_i64
} else {
wait_timeout.as_nanos() as i64
};
assert!(
expires_nonzero > 0,
"Non-zero timeout must convert to positive expires (wait mode)"
);
}
#[test]
fn audit_jetstream_fetch_modes_documented() {
#[derive(Debug)]
struct FetchMode {
name: &'static str,
timeout_value: Duration,
expires_field: i64,
}
let modes = [
FetchMode {
name: "No-Wait Mode",
timeout_value: Duration::ZERO,
expires_field: 0,
},
FetchMode {
name: "Wait Mode",
timeout_value: Duration::from_millis(5000),
expires_field: 5_000_000_000, },
];
for mode in &modes {
let computed_expires = if mode.timeout_value.is_zero() {
0_i64
} else {
mode.timeout_value.as_nanos() as i64
};
assert_eq!(
computed_expires, mode.expires_field,
"Timeout conversion must match expected expires value for {}",
mode.name
);
}
assert_eq!(
modes.len(),
2,
"JetStream API defines exactly 2 fetch modes"
);
}
#[test]
fn audit_jetstream_pull_semantics_compliance() {
assert!(
true,
"JetStream batch is an upper limit, not exact count requirement"
);
assert!(
true,
"expires=0 in JetStream pull request means immediate return"
);
assert!(
true,
"expires>0 in JetStream pull request means wait for timeout"
);
assert!(
true,
"JetStream allows returning fewer messages than batch size"
);
assert!(
true,
"JetStream allows returning zero messages when none available"
);
}
mod pull_dos_protection_audit {
use super::*;
use std::time::{SystemTime, UNIX_EPOCH};
#[test]
fn audit_pull_rate_limiter_enforces_minimum_interval() {
let limiter = PullRateLimiter::new();
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
assert!(limiter.check_pull_request(now_ns).is_ok());
let result = limiter.check_pull_request(now_ns + 1_000_000); assert!(result.is_err());
let backoff = result.expect_err("rapid request should return a backoff");
let minimum_interval_ns = now_ns + (MIN_PULL_INTERVAL_MS * 1_000_000) + 1_000_000;
assert!(limiter.check_pull_request(minimum_interval_ns).is_err());
let after_backoff_ns = now_ns + backoff.as_nanos() as u64 + 1_000_000; assert!(limiter.check_pull_request(after_backoff_ns).is_ok());
}
#[test]
fn audit_pull_rate_limiter_applies_exponential_backoff() {
let limiter = PullRateLimiter::new();
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let _ = limiter.check_pull_request(now_ns);
let rapid_1 = limiter.check_pull_request(now_ns + 1_000_000);
assert!(rapid_1.is_err());
let delay_1 = rapid_1.unwrap_err().as_millis();
let rapid_2 = limiter.check_pull_request(now_ns + 2_000_000);
assert!(rapid_2.is_err());
let delay_2 = rapid_2.unwrap_err().as_millis();
assert!(
delay_2 > delay_1,
"Exponential backoff should increase delay: {} -> {}",
delay_1,
delay_2
);
assert!(delay_1 <= MAX_PULL_BACKOFF_MS as u128);
assert!(delay_2 <= MAX_PULL_BACKOFF_MS as u128);
}
#[test]
fn audit_global_rate_tracker_enforces_system_wide_limits() {
let mut tracker = GlobalPullRateTracker::new();
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let mut successful_requests = 0;
for i in 0..100 {
let request_time = now_ns + (i * 1_000_000); if tracker
.check_global_pull_request(request_time, 1024)
.is_ok()
{
successful_requests += 1;
}
}
assert!(
successful_requests > 0,
"Should accept some requests under normal conditions"
);
assert!(
successful_requests <= GLOBAL_PULL_RATE_LIMIT as usize,
"Should not exceed global rate limit"
);
}
#[test]
fn audit_global_rate_tracker_prevents_memory_exhaustion() {
let mut tracker = GlobalPullRateTracker::new();
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let large_memory = (MEMORY_PRESSURE_THRESHOLD_MB + 100) * 1_024 * 1_024;
let result = tracker.check_global_pull_request(now_ns, large_memory);
assert!(
result.is_err(),
"Should reject requests that would cause memory pressure"
);
}
#[test]
fn audit_dynamic_batch_sizing_reduces_under_pressure() {
let consumer = fuzz_create_test_consumer(1000);
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let _ = consumer.pull_rate_limiter.check_pull_request(now_ns);
let _ = consumer
.pull_rate_limiter
.check_pull_request(now_ns + 1_000_000);
assert!(
consumer.pull_rate_limiter.is_rate_limiting_active(),
"Rate limiting should be active after rapid requests"
);
let requested_batch = 1000;
let clamped_batch = validate_and_clamp_pull_batch_size(requested_batch, &consumer)
.expect("Should validate");
assert!(
clamped_batch < requested_batch,
"Batch size should be reduced under rate limiting pressure: {} -> {}",
requested_batch,
clamped_batch
);
assert!(
clamped_batch >= MIN_BATCH_SIZE_UNDER_PRESSURE,
"Clamped batch size should not go below minimum: {}",
clamped_batch
);
}
#[test]
fn audit_validate_and_clamp_batch_respects_base_limits() {
let consumer = fuzz_create_test_consumer(1000);
assert!(validate_and_clamp_pull_batch_size(0, &consumer).is_err());
let oversized = MAX_PULL_BATCH + 1;
assert!(validate_and_clamp_pull_batch_size(oversized, &consumer).is_err());
let valid_batch = 512;
let result = validate_and_clamp_pull_batch_size(valid_batch, &consumer)
.expect("Should validate");
assert_eq!(result, valid_batch);
}
#[test]
fn audit_pull_dos_protection_integration() {
let consumer = fuzz_create_test_consumer(1000);
assert!(!consumer.pull_rate_limiter.is_rate_limiting_active());
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let _ = consumer.pull_rate_limiter.check_pull_request(now_ns);
let rapid_result = consumer
.pull_rate_limiter
.check_pull_request(now_ns + 1_000_000);
assert!(
rapid_result.is_err(),
"DoS protection should reject rapid pull requests"
);
assert!(
consumer.pull_rate_limiter.is_rate_limiting_active(),
"Rate limiting should be active after rapid requests"
);
}
}
mod durable_consumer_name_validation_audit {
use super::*;
#[test]
fn audit_durable_name_character_length_limit_jetstream_spec() {
let valid_128_chars = "a".repeat(128);
let config = ConsumerConfig::new(&valid_128_chars);
assert!(
config.name.as_ref().unwrap().len() == 128,
"Should accept name with exactly 128 characters"
);
let invalid_129_chars = "a".repeat(129);
let mut config = ConsumerConfig::new(&invalid_129_chars);
let result = config.validate();
assert!(
result.is_err(),
"Should reject name exceeding 128 character limit"
);
let error_msg = result.unwrap_err().to_string();
assert!(
error_msg.contains("exceeds JetStream spec limit of 128 characters"),
"Error should mention JetStream spec character limit: {}",
error_msg
);
}
#[test]
fn audit_durable_name_character_set_jetstream_compliance() {
let valid_chars = vec![
"validName123",
"valid-name-123",
"valid_name_123",
"VALID_NAME_123",
"a",
"A",
"1",
];
for valid_name in valid_chars {
let mut config = ConsumerConfig::ephemeral();
config.name = Some(valid_name.to_string());
assert!(
config.validate().is_ok(),
"Should accept valid name: {}",
valid_name
);
}
let invalid_chars = vec![
"name with spaces",
"name.with.dots",
"name*with*stars",
"nameπwithπunicode",
"name@with@at",
];
for invalid_name in invalid_chars {
let mut config = ConsumerConfig::ephemeral();
config.name = Some(invalid_name.to_string());
let result = config.validate();
assert!(
result.is_err(),
"Should reject invalid name: {}",
invalid_name
);
let error_msg = result.unwrap_err().to_string();
assert!(
error_msg.contains("must contain only ASCII letters, digits, '-' or '_'"),
"Error should mention allowed character set: {}",
error_msg
);
}
}
#[test]
fn audit_client_side_fail_fast_validation() {
let too_long_name = "a".repeat(129);
let invalid_cases = vec![
("", "empty name"),
(too_long_name.as_str(), "too long name"),
("invalid name with spaces", "invalid characters"),
];
for (invalid_name, test_case) in invalid_cases {
let mut config = ConsumerConfig::ephemeral();
config.name = Some(invalid_name.to_string());
let result = config.validate();
assert!(
result.is_err(),
"Should fail fast for {}: {}",
test_case,
invalid_name
);
let error = result.unwrap_err();
assert!(
matches!(error, JsError::InvalidConfig(_)),
"Should return InvalidConfig error for {}, got: {:?}",
test_case,
error
);
}
}
}
}
#[cfg(test)]
#[path = "jetstream_dedup_boundary_audit.rs"]
mod jetstream_dedup_boundary_audit;
#[cfg(test)]
#[path = "jetstream_flow_control_audit.rs"]
mod jetstream_flow_control_audit;