use std::{
fmt::{self, Debug, Display},
task::{Context as TaskContext, Poll},
time::Duration,
};
use async_nats::Subject;
use async_nats::jetstream::message::OutboundMessage;
use async_nats::subject::ToSubject;
use bytes::Bytes;
use futures::StreamExt;
use futures::task::noop_waker_ref;
use serde::Deserialize;
use crate::batch_publish::BatchPubAck;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum GapMode {
Ok,
#[default]
Fail,
}
impl GapMode {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Ok => "ok",
Self::Fail => "fail",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum Operation {
Start = 0,
Append = 1,
Commit = 2,
CommitEob = 3,
Ping = 4,
}
pub type FastPublishError = async_nats::error::Error<FastPublishErrorKind>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum FastPublishErrorKind {
NotEnabled,
InvalidPattern,
InvalidBatchId,
UnknownBatchId,
TooManyInflight,
GapDetected,
FlowError,
EmptyBatch,
InvalidInboxShape,
InvalidConfig,
Closed,
Timeout,
Subscribe,
Publish,
Serialization,
InvalidState,
Other,
}
impl FastPublishErrorKind {
pub(crate) fn from_api_error(error: &async_nats::jetstream::Error) -> Self {
use async_nats::jetstream::ErrorCode;
match error.error_code() {
ErrorCode::BATCH_PUBLISH_DISABLED => Self::NotEnabled,
ErrorCode::BATCH_PUBLISH_INVALID_PATTERN => Self::InvalidPattern,
ErrorCode::BATCH_PUBLISH_INVALID_BATCH_ID => Self::InvalidBatchId,
ErrorCode::BATCH_PUBLISH_UNKNOWN_BATCH_ID => Self::UnknownBatchId,
ErrorCode::BATCH_PUBLISH_TOO_MANY_INFLIGHT => Self::TooManyInflight,
_ => Self::FlowError,
}
}
}
impl Display for FastPublishErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotEnabled => write!(f, "fast batch publish not enabled on stream"),
Self::InvalidPattern => write!(f, "fast batch publish invalid reply subject pattern"),
Self::InvalidBatchId => write!(
f,
"fast batch publish id is invalid (exceeds 64 characters)"
),
Self::UnknownBatchId => write!(f, "fast batch publish id is unknown to the server"),
Self::TooManyInflight => write!(f, "too many in-flight fast batches on the server"),
Self::GapDetected => write!(f, "gap detected in fast batch (gap_mode=fail)"),
Self::FlowError => write!(f, "fast batch flow error"),
Self::EmptyBatch => write!(f, "cannot close an empty batch"),
Self::InvalidInboxShape => {
write!(f, "inbox must have exactly two tokens (e.g. _INBOX.<id>)")
}
Self::InvalidConfig => write!(f, "invalid fast publisher configuration"),
Self::Closed => write!(f, "fast publisher is closed"),
Self::Timeout => write!(f, "timeout waiting for fast batch ack"),
Self::Subscribe => write!(f, "failed to subscribe to fast batch inbox"),
Self::Publish => write!(f, "failed to publish fast batch message"),
Self::Serialization => write!(f, "failed to (de)serialize fast batch message"),
Self::InvalidState => {
write!(f, "operation not allowed in current fast publisher state")
}
Self::Other => write!(f, "other fast batch publish error"),
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
pub(crate) struct BatchFlowAck {
#[serde(rename = "seq")]
pub sequence: u64,
#[serde(rename = "msgs")]
pub messages: u16,
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
pub(crate) struct BatchFlowGap {
#[serde(rename = "last_seq")]
pub expected_last_sequence: u64,
#[serde(rename = "seq")]
pub current_sequence: u64,
}
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct BatchFlowErr {
#[serde(rename = "seq")]
pub sequence: u64,
pub error: async_nats::jetstream::Error,
}
#[derive(Debug)]
pub(crate) enum Classified {
FlowAck(BatchFlowAck),
FlowGap(BatchFlowGap),
FlowErr(BatchFlowErr),
PubAck(BatchPubAck),
InitError(async_nats::jetstream::Error),
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
enum TaggedFlow {
Ack(BatchFlowAck),
Gap(BatchFlowGap),
Err(BatchFlowErr),
}
pub(crate) fn classify(payload: &[u8]) -> Result<Classified, FastPublishError> {
if let Ok(tagged) = serde_json::from_slice::<TaggedFlow>(payload) {
return Ok(match tagged {
TaggedFlow::Ack(a) => Classified::FlowAck(a),
TaggedFlow::Gap(g) => Classified::FlowGap(g),
TaggedFlow::Err(e) => Classified::FlowErr(e),
});
}
let resp: async_nats::jetstream::response::Response<BatchPubAck> =
serde_json::from_slice(payload)
.map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Serialization, e))?;
Ok(match resp {
async_nats::jetstream::response::Response::Ok(pa) => Classified::PubAck(pa),
async_nats::jetstream::response::Response::Err { error } => Classified::InitError(error),
})
}
pub(crate) fn build_reply_prefix(inbox: &str, flow: u16, gap: GapMode) -> String {
format!("{inbox}.{flow}.{}.", gap.as_str())
}
pub(crate) fn build_reply(prefix: &str, seq: u64, op: Operation) -> Subject {
use std::fmt::Write as _;
let mut s = String::with_capacity(prefix.len() + 26);
s.push_str(prefix);
write!(s, "{seq}.{}.$FI", op as u8).expect("String write is infallible");
Subject::from(s)
}
pub(crate) fn validate_inbox_shape(inbox: &str) -> Result<(), FastPublishError> {
let mut parts = inbox.splitn(3, '.');
let first = parts.next().unwrap_or("");
let second = parts.next().unwrap_or("");
let no_third = parts.next().is_none();
if first.is_empty() || second.is_empty() || !no_third {
return Err(FastPublishError::new(
FastPublishErrorKind::InvalidInboxShape,
));
}
Ok(())
}
#[inline]
pub(crate) fn should_stall(
last_ack_sequence: u64,
effective_flow: u16,
max_outstanding_acks: u16,
next_sequence: u64,
) -> bool {
let window = last_ack_sequence
.saturating_add((effective_flow as u64).saturating_mul(max_outstanding_acks as u64));
window <= next_sequence
}
pub(crate) const DEFAULT_FLOW: u16 = 100;
pub(crate) const DEFAULT_MAX_OUTSTANDING_ACKS: u16 = 2;
pub(crate) const MIN_MAX_OUTSTANDING_ACKS: u16 = 1;
pub(crate) const MAX_MAX_OUTSTANDING_ACKS: u16 = 3;
pub trait FastPublishExt:
async_nats::jetstream::context::traits::ClientProvider
+ async_nats::jetstream::context::traits::TimeoutProvider
{
fn fast_publish(&self) -> FastPublisherBuilder {
FastPublisherBuilder::new(self.client(), self.timeout())
}
}
impl<T> FastPublishExt for T where
T: async_nats::jetstream::context::traits::ClientProvider
+ async_nats::jetstream::context::traits::TimeoutProvider
{
}
pub type FastPublishErrorHandler = Box<dyn FnMut(FastPublishError) + Send + 'static>;
pub struct FastPublisherBuilder {
client: async_nats::Client,
flow: u16,
max_outstanding_acks: u16,
ack_timeout: Duration,
gap_mode: GapMode,
on_error: Option<FastPublishErrorHandler>,
}
impl FastPublisherBuilder {
pub(crate) fn new(client: async_nats::Client, ack_timeout: Duration) -> Self {
Self {
client,
flow: DEFAULT_FLOW,
max_outstanding_acks: DEFAULT_MAX_OUTSTANDING_ACKS,
ack_timeout,
gap_mode: GapMode::default(),
on_error: None,
}
}
pub fn flow(mut self, flow: u16) -> Self {
self.flow = flow.max(1);
self
}
pub fn max_outstanding_acks(mut self, n: u16) -> Self {
self.max_outstanding_acks = n;
self
}
pub fn ack_timeout(mut self, timeout: Duration) -> Self {
self.ack_timeout = timeout;
self
}
pub fn gap_mode(mut self, mode: GapMode) -> Self {
self.gap_mode = mode;
self
}
pub fn on_error<F>(mut self, handler: F) -> Self
where
F: FnMut(FastPublishError) + Send + 'static,
{
self.on_error = Some(Box::new(handler));
self
}
pub fn build(self) -> Result<FastPublisher, FastPublishError> {
if !(MIN_MAX_OUTSTANDING_ACKS..=MAX_MAX_OUTSTANDING_ACKS)
.contains(&self.max_outstanding_acks)
{
return Err(FastPublishError::new(FastPublishErrorKind::InvalidConfig));
}
let inbox = self.client.new_inbox();
validate_inbox_shape(&inbox)?;
let reply_prefix = build_reply_prefix(&inbox, self.flow, self.gap_mode);
Ok(FastPublisher {
client: self.client,
inbox,
flow: self.flow,
gap_mode: self.gap_mode,
max_outstanding_acks: self.max_outstanding_acks,
ack_timeout: self.ack_timeout,
reply_prefix,
subscriber: None,
sequence: 0,
effective_flow: self.flow,
last_ack_sequence: 0,
initial_ack_received: false,
pending_pub_ack: None,
first_subject: None,
closed: false,
fatal: None,
on_error: self.on_error,
})
}
}
pub struct FastPublisher {
client: async_nats::Client,
inbox: String,
flow: u16, gap_mode: GapMode,
max_outstanding_acks: u16,
ack_timeout: Duration,
reply_prefix: String,
subscriber: Option<async_nats::Subscriber>,
sequence: u64,
effective_flow: u16, last_ack_sequence: u64,
initial_ack_received: bool,
pending_pub_ack: Option<BatchPubAck>,
first_subject: Option<async_nats::Subject>,
closed: bool,
fatal: Option<FastPublishErrorKind>,
on_error: Option<FastPublishErrorHandler>,
}
impl FastPublisher {
pub fn size(&self) -> u64 {
self.sequence
}
pub fn is_closed(&self) -> bool {
self.closed
}
pub fn batch_id(&self) -> &str {
&self.inbox
}
pub fn gap_mode(&self) -> GapMode {
self.gap_mode
}
pub fn last_ack_sequence(&self) -> u64 {
self.last_ack_sequence
}
}
impl Debug for FastPublisher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FastPublisher")
.field("inbox", &self.inbox)
.field("flow", &self.flow)
.field("effective_flow", &self.effective_flow)
.field("gap_mode", &self.gap_mode)
.field("max_outstanding_acks", &self.max_outstanding_acks)
.field("sequence", &self.sequence)
.field("last_ack_sequence", &self.last_ack_sequence)
.field("closed", &self.closed)
.field("fatal", &self.fatal)
.finish()
}
}
const _: fn() = || {
fn assert_send<T: Send>() {}
assert_send::<FastPublisher>();
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FastPubAck {
pub batch_sequence: u64,
pub ack_sequence: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CommitKind {
Final,
Eob,
}
impl FastPublisher {
pub async fn add<S: ToSubject>(
&mut self,
subject: S,
payload: Bytes,
) -> Result<FastPubAck, FastPublishError> {
self.add_message(OutboundMessage {
subject: subject.to_subject(),
payload,
headers: None,
})
.await
}
pub async fn add_message(
&mut self,
msg: OutboundMessage,
) -> Result<FastPubAck, FastPublishError> {
if self.closed {
return Err(FastPublishError::new(FastPublishErrorKind::Closed));
}
if let Some(kind) = self.fatal {
return Err(FastPublishError::new(kind));
}
self.ensure_subscribed().await?;
self.drain_nonblocking()?;
if let Some(kind) = self.fatal {
return Err(FastPublishError::new(kind));
}
let next_sequence = self.sequence + 1;
if should_stall(
self.last_ack_sequence,
self.effective_flow,
self.max_outstanding_acks,
next_sequence,
) {
self.wait_for_flow_event_with_pings().await?;
if let Some(kind) = self.fatal {
return Err(FastPublishError::new(kind));
}
}
self.sequence += 1;
let op = if self.sequence == 1 {
Operation::Start
} else {
Operation::Append
};
if self.first_subject.is_none() {
self.first_subject = Some(msg.subject.clone());
}
let reply = build_reply(&self.reply_prefix, self.sequence, op);
self.publish_raw(msg, reply).await?;
if self.sequence == 1 {
self.await_first_reply().await?;
}
self.drain_nonblocking()?;
if let Some(kind) = self.fatal {
return Err(FastPublishError::new(kind));
}
Ok(FastPubAck {
batch_sequence: self.sequence,
ack_sequence: self.last_ack_sequence,
})
}
pub async fn commit<S: ToSubject>(
self,
subject: S,
payload: Bytes,
) -> Result<BatchPubAck, FastPublishError> {
self.commit_message(OutboundMessage {
subject: subject.to_subject(),
payload,
headers: None,
})
.await
}
pub async fn commit_message(
mut self,
msg: OutboundMessage,
) -> Result<BatchPubAck, FastPublishError> {
self.commit_message_inner(msg, CommitKind::Final).await
}
pub async fn close(mut self) -> Result<BatchPubAck, FastPublishError> {
if self.closed {
return Err(FastPublishError::new(FastPublishErrorKind::Closed));
}
if self.sequence == 0 {
return Err(FastPublishError::new(FastPublishErrorKind::EmptyBatch));
}
let subject = self
.first_subject
.clone()
.expect("first_subject set once sequence > 0");
let msg = OutboundMessage {
subject,
payload: Bytes::new(),
headers: None,
};
self.commit_message_inner(msg, CommitKind::Eob).await
}
async fn commit_message_inner(
&mut self,
msg: OutboundMessage,
kind: CommitKind,
) -> Result<BatchPubAck, FastPublishError> {
if self.closed {
return Err(FastPublishError::new(FastPublishErrorKind::Closed));
}
self.ensure_subscribed().await?;
self.drain_nonblocking()?;
if let Some(fatal_kind) = self.fatal {
self.closed = true;
let _pub_ack = self.drain_until_pub_ack().await.ok();
return Err(FastPublishError::new(fatal_kind));
}
let next_sequence = self.sequence + 1;
if should_stall(
self.last_ack_sequence,
self.effective_flow,
self.max_outstanding_acks,
next_sequence,
) {
self.wait_for_flow_event_with_pings().await?;
if let Some(fatal_kind) = self.fatal {
self.closed = true;
let _pub_ack = self.drain_until_pub_ack().await.ok();
return Err(FastPublishError::new(fatal_kind));
}
}
self.sequence += 1;
let op = match kind {
CommitKind::Final => Operation::Commit,
CommitKind::Eob => Operation::CommitEob,
};
if self.first_subject.is_none() {
self.first_subject = Some(msg.subject.clone());
}
let reply = build_reply(&self.reply_prefix, self.sequence, op);
self.publish_raw(msg, reply).await?;
self.closed = true;
self.drain_until_pub_ack().await
}
async fn ensure_subscribed(&mut self) -> Result<(), FastPublishError> {
if self.subscriber.is_some() {
return Ok(());
}
let wildcard = format!("{}.>", self.inbox);
let sub = self
.client
.subscribe(wildcard)
.await
.map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Subscribe, e))?;
self.subscriber = Some(sub);
Ok(())
}
async fn publish_raw(
&mut self,
msg: OutboundMessage,
reply: Subject,
) -> Result<(), FastPublishError> {
let OutboundMessage {
subject,
payload,
headers,
} = msg;
let res = match headers {
Some(h) => {
self.client
.publish_with_reply_and_headers(subject, reply, h, payload)
.await
}
None => {
self.client
.publish_with_reply(subject, reply, payload)
.await
}
};
res.map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Publish, e))
}
fn drain_nonblocking(&mut self) -> Result<(), FastPublishError> {
let Some(sub) = self.subscriber.as_mut() else {
return Ok(());
};
let waker = noop_waker_ref();
let mut cx = TaskContext::from_waker(waker);
loop {
match sub.poll_next_unpin(&mut cx) {
Poll::Ready(Some(msg)) => {
let shared = SharedHandlerState {
gap_mode: self.gap_mode,
effective_flow: &mut self.effective_flow,
last_ack_sequence: &mut self.last_ack_sequence,
initial_ack_received: &mut self.initial_ack_received,
pending_pub_ack: &mut self.pending_pub_ack,
fatal: &mut self.fatal,
on_error: self.on_error.as_mut(),
};
handle_inbox_message(shared, msg)?;
}
Poll::Ready(None) => {
self.closed = true;
return Err(FastPublishError::new(FastPublishErrorKind::Closed));
}
Poll::Pending => return Ok(()),
}
}
}
async fn await_first_reply(&mut self) -> Result<(), FastPublishError> {
let deadline = tokio::time::Instant::now() + self.ack_timeout;
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
}
let remaining = deadline - now;
let sub = self
.subscriber
.as_mut()
.expect("subscriber installed by ensure_subscribed");
let msg = match tokio::time::timeout(remaining, sub.next()).await {
Ok(Some(m)) => m,
Ok(None) => {
self.closed = true;
return Err(FastPublishError::new(FastPublishErrorKind::Closed));
}
Err(_) => return Err(FastPublishError::new(FastPublishErrorKind::Timeout)),
};
let shared = SharedHandlerState {
gap_mode: self.gap_mode,
effective_flow: &mut self.effective_flow,
last_ack_sequence: &mut self.last_ack_sequence,
initial_ack_received: &mut self.initial_ack_received,
pending_pub_ack: &mut self.pending_pub_ack,
fatal: &mut self.fatal,
on_error: self.on_error.as_mut(),
};
handle_inbox_message(shared, msg)?;
if let Some(kind) = self.fatal {
return Err(FastPublishError::new(kind));
}
if self.pending_pub_ack.is_some() {
return Ok(());
}
if self.initial_ack_received {
return Ok(());
}
}
}
async fn drain_until_pub_ack(&mut self) -> Result<BatchPubAck, FastPublishError> {
if let Some(pa) = self.pending_pub_ack.take() {
self.unsubscribe_best_effort().await;
return Ok(pa);
}
let deadline = tokio::time::Instant::now() + self.ack_timeout;
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
}
let remaining = deadline - now;
let sub = self
.subscriber
.as_mut()
.expect("subscriber installed by ensure_subscribed");
let msg = match tokio::time::timeout(remaining, sub.next()).await {
Ok(Some(m)) => m,
Ok(None) => {
self.closed = true;
return Err(FastPublishError::new(FastPublishErrorKind::Closed));
}
Err(_) => return Err(FastPublishError::new(FastPublishErrorKind::Timeout)),
};
let shared = SharedHandlerState {
gap_mode: self.gap_mode,
effective_flow: &mut self.effective_flow,
last_ack_sequence: &mut self.last_ack_sequence,
initial_ack_received: &mut self.initial_ack_received,
pending_pub_ack: &mut self.pending_pub_ack,
fatal: &mut self.fatal,
on_error: self.on_error.as_mut(),
};
handle_inbox_message(shared, msg)?;
if let Some(pa) = self.pending_pub_ack.take() {
self.unsubscribe_best_effort().await;
return Ok(pa);
}
if let Some(kind) = self.fatal.take() {
self.unsubscribe_best_effort().await;
return Err(FastPublishError::new(kind));
}
}
}
async fn unsubscribe_best_effort(&mut self) {
if let Some(mut sub) = self.subscriber.take() {
let _ = sub.unsubscribe().await;
}
}
async fn wait_for_flow_event_with_pings(&mut self) -> Result<(), FastPublishError> {
let ping_interval = (self.ack_timeout / 3).max(Duration::from_millis(100));
let deadline = tokio::time::Instant::now() + self.ack_timeout;
let mut ping_at = tokio::time::Instant::now() + ping_interval;
loop {
if let Some(kind) = self.fatal {
return Err(FastPublishError::new(kind));
}
let now = tokio::time::Instant::now();
if now >= deadline {
return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
}
let next_wake = ping_at.min(deadline);
let wait = next_wake.saturating_duration_since(now);
let sub = self
.subscriber
.as_mut()
.expect("subscriber installed before stall");
match tokio::time::timeout(wait, sub.next()).await {
Ok(Some(msg)) => {
let shared = SharedHandlerState {
gap_mode: self.gap_mode,
effective_flow: &mut self.effective_flow,
last_ack_sequence: &mut self.last_ack_sequence,
initial_ack_received: &mut self.initial_ack_received,
pending_pub_ack: &mut self.pending_pub_ack,
fatal: &mut self.fatal,
on_error: self.on_error.as_mut(),
};
handle_inbox_message(shared, msg)?;
let next_sequence = self.sequence + 1;
if !should_stall(
self.last_ack_sequence,
self.effective_flow,
self.max_outstanding_acks,
next_sequence,
) {
return Ok(());
}
}
Ok(None) => {
self.closed = true;
return Err(FastPublishError::new(FastPublishErrorKind::Closed));
}
Err(_) => {
let now = tokio::time::Instant::now();
if now >= deadline {
return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
}
self.send_ping().await?;
ping_at = tokio::time::Instant::now() + ping_interval;
}
}
}
}
async fn send_ping(&mut self) -> Result<(), FastPublishError> {
let Some(subject) = self.first_subject.clone() else {
return Err(FastPublishError::new(FastPublishErrorKind::InvalidState));
};
let reply = build_reply(&self.reply_prefix, self.sequence, Operation::Ping);
self.client
.publish_with_reply(subject, reply, Bytes::new())
.await
.map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Publish, e))?;
Ok(())
}
}
struct SharedHandlerState<'a> {
gap_mode: GapMode,
effective_flow: &'a mut u16,
last_ack_sequence: &'a mut u64,
initial_ack_received: &'a mut bool,
pending_pub_ack: &'a mut Option<BatchPubAck>,
fatal: &'a mut Option<FastPublishErrorKind>,
on_error: Option<&'a mut FastPublishErrorHandler>,
}
fn handle_inbox_message(
mut state: SharedHandlerState<'_>,
msg: async_nats::Message,
) -> Result<(), FastPublishError> {
match classify(&msg.payload)? {
Classified::FlowAck(ack) => {
*state.initial_ack_received = true;
if ack.sequence > *state.last_ack_sequence {
*state.last_ack_sequence = ack.sequence;
}
let new_flow = ack.messages.max(1);
if new_flow != *state.effective_flow {
*state.effective_flow = new_flow;
}
}
Classified::FlowGap(gap) => {
tracing::debug!(
expected_last = gap.expected_last_sequence,
current = gap.current_sequence,
"fast batch gap detected"
);
if let Some(h) = state.on_error.as_deref_mut() {
h(FastPublishError::new(FastPublishErrorKind::GapDetected));
}
if state.gap_mode == GapMode::Fail {
*state.fatal = Some(FastPublishErrorKind::GapDetected);
}
}
Classified::FlowErr(ferr) => {
tracing::debug!(
batch_sequence = ferr.sequence,
err_code = ferr.error.error_code().0,
"fast batch flow error"
);
let kind = FastPublishErrorKind::from_api_error(&ferr.error);
if let Some(h) = state.on_error.as_deref_mut() {
h(FastPublishError::new(kind));
}
if state.gap_mode == GapMode::Fail {
*state.fatal = Some(kind);
}
}
Classified::PubAck(pa) => {
*state.pending_pub_ack = Some(pa);
}
Classified::InitError(err) => {
*state.fatal = Some(FastPublishErrorKind::from_api_error(&err));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reply_prefix_format_ok_mode() {
let p = build_reply_prefix("_INBOX.abc123", 100, GapMode::Ok);
assert_eq!(p, "_INBOX.abc123.100.ok.");
}
#[test]
fn reply_prefix_format_fail_mode() {
let p = build_reply_prefix("_INBOX.abc123", 50, GapMode::Fail);
assert_eq!(p, "_INBOX.abc123.50.fail.");
}
#[test]
fn reply_full_all_operations() {
let prefix = build_reply_prefix("_INBOX.x", 10, GapMode::Fail);
for (op, code) in [
(Operation::Start, 0_u8),
(Operation::Append, 1),
(Operation::Commit, 2),
(Operation::CommitEob, 3),
(Operation::Ping, 4),
] {
let r = build_reply(&prefix, 42, op);
assert_eq!(r.as_str(), format!("_INBOX.x.10.fail.42.{code}.$FI"));
}
}
#[test]
fn reply_full_both_gap_modes() {
for (mode, tag) in [(GapMode::Ok, "ok"), (GapMode::Fail, "fail")] {
let prefix = build_reply_prefix("_INBOX.abc", 25, mode);
let r = build_reply(&prefix, 1, Operation::Start);
assert_eq!(r.as_str(), format!("_INBOX.abc.25.{tag}.1.0.$FI"));
}
}
#[test]
fn inbox_shape_accepts_two_tokens() {
assert!(validate_inbox_shape("_INBOX.abc123").is_ok());
assert!(validate_inbox_shape("X.Y").is_ok());
}
#[test]
fn inbox_shape_rejects_zero_dots() {
assert!(matches!(
validate_inbox_shape("INBOX").unwrap_err().kind(),
FastPublishErrorKind::InvalidInboxShape
));
}
#[test]
fn inbox_shape_rejects_three_or_more_tokens() {
assert!(matches!(
validate_inbox_shape("_INBOX.myapp.abc123")
.unwrap_err()
.kind(),
FastPublishErrorKind::InvalidInboxShape
));
assert!(matches!(
validate_inbox_shape("a.b.c.d").unwrap_err().kind(),
FastPublishErrorKind::InvalidInboxShape
));
}
#[test]
fn inbox_shape_rejects_empty_tokens() {
assert!(validate_inbox_shape(".abc").is_err());
assert!(validate_inbox_shape("abc.").is_err());
assert!(validate_inbox_shape("").is_err());
}
#[test]
fn parse_batch_flow_ack() {
let payload = br#"{"type":"ack","seq":10,"msgs":15}"#;
match classify(payload).unwrap() {
Classified::FlowAck(a) => {
assert_eq!(a.sequence, 10);
assert_eq!(a.messages, 15);
}
other => panic!("expected FlowAck, got {other:?}"),
}
}
#[test]
fn parse_batch_flow_gap() {
let payload = br#"{"type":"gap","last_seq":10,"seq":15}"#;
match classify(payload).unwrap() {
Classified::FlowGap(g) => {
assert_eq!(g.expected_last_sequence, 10);
assert_eq!(g.current_sequence, 15);
}
other => panic!("expected FlowGap, got {other:?}"),
}
}
#[test]
fn parse_batch_flow_err() {
let payload = br#"{"type":"err","seq":7,"error":{"code":400,"err_code":10071,"description":"wrong last sequence: 1"}}"#;
match classify(payload).unwrap() {
Classified::FlowErr(e) => {
assert_eq!(e.sequence, 7);
assert_eq!(e.error.error_code().0, 10071);
}
other => panic!("expected FlowErr, got {other:?}"),
}
}
#[test]
fn parse_terminal_pub_ack() {
let payload = br#"{"stream":"TEST","seq":42,"batch":"inbox-id","count":10}"#;
match classify(payload).unwrap() {
Classified::PubAck(pa) => {
assert_eq!(pa.stream, "TEST");
assert_eq!(pa.sequence, 42);
assert_eq!(pa.batch_id, "inbox-id");
assert_eq!(pa.batch_size, 10);
}
other => panic!("expected PubAck, got {other:?}"),
}
}
#[test]
fn parse_init_error_response() {
let payload =
br#"{"error":{"code":400,"err_code":10205,"description":"fast batch publish not enabled"}}"#;
match classify(payload).unwrap() {
Classified::InitError(err) => {
assert_eq!(err.error_code().0, 10205);
assert_eq!(
FastPublishErrorKind::from_api_error(&err),
FastPublishErrorKind::NotEnabled
);
}
other => panic!("expected InitError, got {other:?}"),
}
}
#[test]
fn classify_malformed_json_returns_error() {
let payload = b"not json at all";
let err = classify(payload).unwrap_err();
assert!(matches!(err.kind(), FastPublishErrorKind::Serialization));
}
#[test]
fn stall_no_wait_when_window_is_strictly_greater() {
assert!(!should_stall(0, 10, 2, 19));
}
#[test]
fn stall_waits_at_exact_boundary() {
assert!(should_stall(0, 10, 2, 20));
}
#[test]
fn stall_waits_past_boundary() {
assert!(should_stall(0, 10, 2, 21));
}
#[test]
fn stall_honors_last_ack() {
assert!(should_stall(10, 10, 2, 30));
assert!(!should_stall(10, 10, 2, 29));
}
#[test]
fn stall_with_single_outstanding_ack() {
assert!(!should_stall(0, 10, 1, 9));
assert!(should_stall(0, 10, 1, 10));
assert!(should_stall(0, 10, 1, 11));
}
#[test]
fn stall_with_max_outstanding_three() {
assert!(!should_stall(0, 10, 3, 29));
assert!(should_stall(0, 10, 3, 30));
}
#[test]
fn stall_saturates_on_pathological_inputs() {
let waited = should_stall(u64::MAX - 5, u16::MAX, u16::MAX, u64::MAX);
assert!(waited);
}
fn api_err(code: u64) -> async_nats::jetstream::Error {
let json = format!(r#"{{"code":400,"err_code":{code},"description":"test"}}"#);
serde_json::from_str(&json).expect("synthetic api error parses")
}
#[test]
fn error_code_mapping_verified_against_server() {
assert_eq!(
FastPublishErrorKind::from_api_error(&api_err(10205)),
FastPublishErrorKind::NotEnabled
);
assert_eq!(
FastPublishErrorKind::from_api_error(&api_err(10206)),
FastPublishErrorKind::InvalidPattern
);
assert_eq!(
FastPublishErrorKind::from_api_error(&api_err(10207)),
FastPublishErrorKind::InvalidBatchId
);
assert_eq!(
FastPublishErrorKind::from_api_error(&api_err(10208)),
FastPublishErrorKind::UnknownBatchId
);
assert_eq!(
FastPublishErrorKind::from_api_error(&api_err(10211)),
FastPublishErrorKind::TooManyInflight
);
}
#[test]
fn error_code_mapping_unknown_is_flow_error() {
assert_eq!(
FastPublishErrorKind::from_api_error(&api_err(10071)),
FastPublishErrorKind::FlowError
);
assert_eq!(
FastPublishErrorKind::from_api_error(&api_err(99999)),
FastPublishErrorKind::FlowError
);
}
#[test]
fn error_kind_display_non_empty() {
for kind in [
FastPublishErrorKind::NotEnabled,
FastPublishErrorKind::InvalidPattern,
FastPublishErrorKind::InvalidBatchId,
FastPublishErrorKind::UnknownBatchId,
FastPublishErrorKind::TooManyInflight,
FastPublishErrorKind::GapDetected,
FastPublishErrorKind::FlowError,
FastPublishErrorKind::EmptyBatch,
FastPublishErrorKind::InvalidInboxShape,
FastPublishErrorKind::InvalidConfig,
FastPublishErrorKind::Closed,
FastPublishErrorKind::Timeout,
FastPublishErrorKind::Subscribe,
FastPublishErrorKind::Publish,
FastPublishErrorKind::Serialization,
FastPublishErrorKind::InvalidState,
FastPublishErrorKind::Other,
] {
let s = format!("{kind}");
assert!(!s.is_empty(), "empty Display for {kind:?}");
}
}
#[test]
fn gap_mode_default_is_fail() {
assert_eq!(GapMode::default(), GapMode::Fail);
}
async fn dummy_builder() -> (nats_server::Server, FastPublisherBuilder) {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
(
server,
FastPublisherBuilder::new(client, Duration::from_secs(5)),
)
}
#[tokio::test]
async fn builder_rejects_max_outstanding_zero() {
let (_s, b) = dummy_builder().await;
let err = b.max_outstanding_acks(0).build().unwrap_err();
assert!(matches!(err.kind(), FastPublishErrorKind::InvalidConfig));
}
#[tokio::test]
async fn builder_rejects_max_outstanding_four() {
let (_s, b) = dummy_builder().await;
let err = b.max_outstanding_acks(4).build().unwrap_err();
assert!(matches!(err.kind(), FastPublishErrorKind::InvalidConfig));
}
#[tokio::test]
async fn builder_accepts_all_valid_max_outstanding() {
for n in 1..=3 {
let (_s, b) = dummy_builder().await;
let fp = b.max_outstanding_acks(n).build().expect("valid config");
assert_eq!(fp.max_outstanding_acks, n);
}
}
#[tokio::test]
async fn builder_clamps_flow_zero_to_one() {
let (_s, b) = dummy_builder().await;
let fp = b.flow(0).build().expect("flow clamped to 1");
assert_eq!(fp.flow, 1);
}
#[tokio::test]
async fn builder_default_values() {
let (_s, b) = dummy_builder().await;
let fp = b.build().expect("defaults build ok");
assert_eq!(fp.flow, DEFAULT_FLOW);
assert_eq!(fp.effective_flow, DEFAULT_FLOW);
assert_eq!(fp.max_outstanding_acks, DEFAULT_MAX_OUTSTANDING_ACKS);
assert_eq!(fp.gap_mode, GapMode::Fail);
assert_eq!(fp.sequence, 0);
assert_eq!(fp.last_ack_sequence, 0);
assert!(fp.subscriber.is_none());
assert!(!fp.is_closed());
assert_eq!(fp.size(), 0);
}
#[tokio::test]
async fn builder_produces_cached_reply_prefix() {
let (_s, b) = dummy_builder().await;
let fp = b.flow(42).gap_mode(GapMode::Ok).build().unwrap();
assert!(fp.reply_prefix.starts_with(&fp.inbox));
assert!(fp.reply_prefix.ends_with(".42.ok."));
}
#[tokio::test]
async fn builder_batch_id_is_the_inbox() {
let (_s, b) = dummy_builder().await;
let fp = b.build().unwrap();
assert_eq!(fp.batch_id(), fp.inbox);
assert_eq!(fp.inbox.matches('.').count(), 1);
}
}