jetstream_extra/batch_publish_fast.rs
1// Copyright 2026 Synadia Communications Inc.
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Fast-ingest batch publishing for NATS JetStream (ADR-50 fast ingest).
15//!
16//! Fast ingest is a high-throughput, non-atomic batch publisher. Unlike atomic
17//! [`batch_publish`](crate::batch_publish), which stages an entire batch and
18//! commits it or drops it, fast ingest persists each message as it arrives and
19//! uses a persistent inbox subscription plus server-driven flow control to
20//! coordinate throughput across many concurrent publishers.
21//!
22//! Requires nats-server 2.14 or later and `allow_batched: true` on the stream.
23//!
24//! # Architecture
25//!
26//! A [`FastPublisher`] owns its own [`async_nats::Subscriber`] and drives it
27//! inline from `add` / `commit` / `close`. There is no background task, no
28//! shared state, and no locks. This mirrors the single-task pattern used by
29//! `nats-extra/src/request_many.rs`.
30//!
31//! # Not safe for concurrent use
32//!
33//! A `FastPublisher` holds per-batch state (sequence counters, cached reply
34//! subject prefix, effective flow) and is driven via `&mut self`. Use one
35//! publisher per task. Clone the underlying JetStream context if you need
36//! independent publishers.
37
38use std::{
39 fmt::{self, Debug, Display},
40 task::{Context as TaskContext, Poll},
41 time::Duration,
42};
43
44use async_nats::Subject;
45use async_nats::jetstream::message::OutboundMessage;
46use async_nats::subject::ToSubject;
47use bytes::Bytes;
48use futures::StreamExt;
49use futures::task::noop_waker_ref;
50use serde::Deserialize;
51
52use crate::batch_publish::BatchPubAck;
53
54/// How the server should handle gaps in the batch sequence.
55///
56/// A gap means one or more messages in the batch were lost in transit between
57/// the client and the stream leader (e.g. due to buffer drops under load).
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
59#[non_exhaustive]
60pub enum GapMode {
61 /// Allow gaps — the batch continues and the server informs the client via
62 /// a gap event. Use this when some message loss is acceptable (metrics,
63 /// telemetry).
64 Ok,
65 /// Fail the batch on the first gap. Use this when in-order, gap-free
66 /// delivery is required (object store chunks, ordered events). Default.
67 #[default]
68 Fail,
69}
70
71impl GapMode {
72 pub(crate) fn as_str(self) -> &'static str {
73 match self {
74 Self::Ok => "ok",
75 Self::Fail => "fail",
76 }
77 }
78}
79
80/// Fast-ingest operation codes (match the `$FI` reply-subject tail).
81///
82/// Encoded as the second-to-last segment of the reply subject before `$FI`.
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84#[repr(u8)]
85pub(crate) enum Operation {
86 Start = 0,
87 Append = 1,
88 Commit = 2,
89 CommitEob = 3,
90 Ping = 4,
91}
92
93/// Error type for fast-ingest batch publish operations.
94pub type FastPublishError = async_nats::error::Error<FastPublishErrorKind>;
95
96/// Kinds of errors that can occur during fast-ingest batch publishing.
97///
98/// API error codes are verified against `nats-server` 2.14 `errors.json`.
99///
100/// Marked `#[non_exhaustive]` — adding a new variant in a future release will
101/// not be a breaking change. Match on `_` for forward compatibility.
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
103#[non_exhaustive]
104pub enum FastPublishErrorKind {
105 /// Stream does not have `allow_batched: true`. (`BATCH_PUBLISH_DISABLED`, 10205)
106 NotEnabled,
107 /// Reply subject pattern rejected by the server. (`BATCH_PUBLISH_INVALID_PATTERN`, 10206)
108 InvalidPattern,
109 /// Batch id exceeds 64 characters or is otherwise invalid.
110 /// (`BATCH_PUBLISH_INVALID_BATCH_ID`, 10207)
111 InvalidBatchId,
112 /// Server has forgotten this batch (timed out, leader change in
113 /// `GapMode::Fail`, etc.). (`BATCH_PUBLISH_UNKNOWN_BATCH_ID`, 10208)
114 UnknownBatchId,
115 /// Too many in-flight fast batches on the server.
116 /// (`BATCH_PUBLISH_TOO_MANY_INFLIGHT`, 10211)
117 TooManyInflight,
118 /// A gap was detected while running in [`GapMode::Fail`]. The final ack
119 /// will indicate which messages were persisted.
120 GapDetected,
121 /// Any other server-side error reported via a `BatchFlowErr` message.
122 FlowError,
123 /// `close()` was called on a publisher that has not received any `add`s.
124 EmptyBatch,
125 /// `build()` rejected the inbox because it does not have exactly two
126 /// tokens. The reply-subject parser requires `<prefix>.<id>` shape.
127 InvalidInboxShape,
128 /// `build()` rejected a configuration value (e.g. `max_outstanding_acks`
129 /// outside `1..=3`).
130 InvalidConfig,
131 /// Called a method on a publisher that has already committed, closed, or
132 /// failed fatally.
133 Closed,
134 /// Timed out waiting for a flow ack or final pub ack.
135 Timeout,
136 /// Failed to subscribe to the batch inbox.
137 Subscribe,
138 /// Failed to publish a batch message.
139 Publish,
140 /// Failed to parse a server response.
141 Serialization,
142 /// An internal operation that depends on a runtime invariant was called
143 /// before that invariant held — currently only emitted when the publisher
144 /// would need to send a ping but no message has been published yet, so
145 /// there is no first-subject to address the ping to.
146 InvalidState,
147 /// Catch-all.
148 Other,
149}
150
151impl FastPublishErrorKind {
152 /// Map a JetStream API error code to the matching fast-ingest error kind.
153 pub(crate) fn from_api_error(error: &async_nats::jetstream::Error) -> Self {
154 use async_nats::jetstream::ErrorCode;
155 match error.error_code() {
156 ErrorCode::BATCH_PUBLISH_DISABLED => Self::NotEnabled,
157 ErrorCode::BATCH_PUBLISH_INVALID_PATTERN => Self::InvalidPattern,
158 ErrorCode::BATCH_PUBLISH_INVALID_BATCH_ID => Self::InvalidBatchId,
159 ErrorCode::BATCH_PUBLISH_UNKNOWN_BATCH_ID => Self::UnknownBatchId,
160 ErrorCode::BATCH_PUBLISH_TOO_MANY_INFLIGHT => Self::TooManyInflight,
161 _ => Self::FlowError,
162 }
163 }
164}
165
166impl Display for FastPublishErrorKind {
167 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168 match self {
169 Self::NotEnabled => write!(f, "fast batch publish not enabled on stream"),
170 Self::InvalidPattern => write!(f, "fast batch publish invalid reply subject pattern"),
171 Self::InvalidBatchId => write!(
172 f,
173 "fast batch publish id is invalid (exceeds 64 characters)"
174 ),
175 Self::UnknownBatchId => write!(f, "fast batch publish id is unknown to the server"),
176 Self::TooManyInflight => write!(f, "too many in-flight fast batches on the server"),
177 Self::GapDetected => write!(f, "gap detected in fast batch (gap_mode=fail)"),
178 Self::FlowError => write!(f, "fast batch flow error"),
179 Self::EmptyBatch => write!(f, "cannot close an empty batch"),
180 Self::InvalidInboxShape => {
181 write!(f, "inbox must have exactly two tokens (e.g. _INBOX.<id>)")
182 }
183 Self::InvalidConfig => write!(f, "invalid fast publisher configuration"),
184 Self::Closed => write!(f, "fast publisher is closed"),
185 Self::Timeout => write!(f, "timeout waiting for fast batch ack"),
186 Self::Subscribe => write!(f, "failed to subscribe to fast batch inbox"),
187 Self::Publish => write!(f, "failed to publish fast batch message"),
188 Self::Serialization => write!(f, "failed to (de)serialize fast batch message"),
189 Self::InvalidState => {
190 write!(f, "operation not allowed in current fast publisher state")
191 }
192 Self::Other => write!(f, "other fast batch publish error"),
193 }
194 }
195}
196
197/// Flow-control message sent by the server when a batch of messages has been
198/// persisted. Wire tag: `"type":"ack"`.
199#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
200pub(crate) struct BatchFlowAck {
201 /// Highest batch sequence covered by this ack.
202 ///
203 /// In `GapMode::Fail` this means all messages up to and including this
204 /// sequence were persisted. In `GapMode::Ok` some may have been lost.
205 #[serde(rename = "seq")]
206 pub sequence: u64,
207 /// How often the server will send subsequent flow acks (every N messages).
208 #[serde(rename = "msgs")]
209 pub messages: u16,
210}
211
212/// Gap notification sent when the server detects missing messages in a batch.
213/// Wire tag: `"type":"gap"`.
214#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
215pub(crate) struct BatchFlowGap {
216 /// The last sequence the server expected to receive before the gap.
217 ///
218 /// Messages with sequences `[expected_last_sequence+1, current_sequence)`
219 /// were lost.
220 #[serde(rename = "last_seq")]
221 pub expected_last_sequence: u64,
222 /// The sequence of the message that triggered gap detection.
223 #[serde(rename = "seq")]
224 pub current_sequence: u64,
225}
226
227/// Per-message error sent when a batch message fails a server-side check
228/// (e.g. expected-last-seq mismatch). Wire tag: `"type":"err"`.
229#[derive(Debug, Clone, Deserialize)]
230pub(crate) struct BatchFlowErr {
231 /// The batch sequence of the message that triggered the error.
232 #[serde(rename = "seq")]
233 pub sequence: u64,
234 /// The full API error as returned by the server.
235 pub error: async_nats::jetstream::Error,
236}
237
238/// Result of classifying a message received on the batch inbox.
239///
240/// The classifier dispatches on the `type` field (a serde-tagged enum) and
241/// falls back to a full `Response<BatchPubAck>` parse for messages without a
242/// `type` discriminator (the terminal pub-ack or an init-time error).
243#[derive(Debug)]
244pub(crate) enum Classified {
245 FlowAck(BatchFlowAck),
246 FlowGap(BatchFlowGap),
247 FlowErr(BatchFlowErr),
248 /// Terminal publish acknowledgment — delivered on `commit`/`close` or on
249 /// the single-message immediate-commit fast path.
250 PubAck(BatchPubAck),
251 /// Init-time API error — returned in response to the `0` (Start) or
252 /// `2`/`3` (Commit/CommitEob) operation when the server rejects the batch
253 /// before it even begins.
254 InitError(async_nats::jetstream::Error),
255}
256
257/// Tagged enum for `type`-discriminated messages.
258#[derive(Debug, Deserialize)]
259#[serde(tag = "type", rename_all = "lowercase")]
260enum TaggedFlow {
261 Ack(BatchFlowAck),
262 Gap(BatchFlowGap),
263 Err(BatchFlowErr),
264}
265
266/// Classify a payload received on the batch inbox into one of the five
267/// possible shapes.
268///
269/// Implementation strategy: try the tagged-enum deserializer first; if that
270/// fails (no `type` field), fall back to parsing the payload as a terminal
271/// `Response<BatchPubAck>`. This matches the Go byte-search optimization in
272/// spirit while remaining fully serde-driven.
273pub(crate) fn classify(payload: &[u8]) -> Result<Classified, FastPublishError> {
274 if let Ok(tagged) = serde_json::from_slice::<TaggedFlow>(payload) {
275 return Ok(match tagged {
276 TaggedFlow::Ack(a) => Classified::FlowAck(a),
277 TaggedFlow::Gap(g) => Classified::FlowGap(g),
278 TaggedFlow::Err(e) => Classified::FlowErr(e),
279 });
280 }
281
282 let resp: async_nats::jetstream::response::Response<BatchPubAck> =
283 serde_json::from_slice(payload)
284 .map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Serialization, e))?;
285
286 Ok(match resp {
287 async_nats::jetstream::response::Response::Ok(pa) => Classified::PubAck(pa),
288 async_nats::jetstream::response::Response::Err { error } => Classified::InitError(error),
289 })
290}
291
292/// Build the stable prefix of the reply subject:
293/// `<inbox>.<flow>.<gap>.`
294///
295/// The caller appends `<seq>.<op>.$FI` per message via [`build_reply`]. Cached
296/// on the publisher and rebuilt only when the server dictates a new flow via
297/// a [`BatchFlowAck`].
298pub(crate) fn build_reply_prefix(inbox: &str, flow: u16, gap: GapMode) -> String {
299 format!("{inbox}.{flow}.{}.", gap.as_str())
300}
301
302/// Build a full per-message reply subject: `<prefix><seq>.<op>.$FI`.
303///
304/// `$FI` marks the subject as a fast-ingest reply to the server's parser
305/// (`server/stream.go:getFastBatch`). Returns a [`Subject`] so the caller can
306/// pass it to `publish_with_reply` without an extra `String → Bytes` copy.
307pub(crate) fn build_reply(prefix: &str, seq: u64, op: Operation) -> Subject {
308 use std::fmt::Write as _;
309 // Hot path: pre-size for prefix + up to 20 digits of u64 + ".N.$FI" (6).
310 let mut s = String::with_capacity(prefix.len() + 26);
311 s.push_str(prefix);
312 write!(s, "{seq}.{}.$FI", op as u8).expect("String write is infallible");
313 // `From<String> for Subject` is zero-copy — moves the buffer into Bytes.
314 Subject::from(s)
315}
316
317/// Validate that an inbox has the shape the fast-ingest reply subject parser
318/// requires: exactly two tokens separated by a single dot (e.g. `_INBOX.<id>`).
319///
320/// The server (`server/stream.go:5174`) parses the reply subject from the
321/// right, expecting `<prefix>.<uuid>.<flow>.<gap>.<seq>.<op>.$FI`. Our scheme
322/// uses the inbox as the `<prefix>.<uuid>` portion — which only aligns if the
323/// inbox itself is exactly two tokens. A custom multi-token inbox prefix
324/// (e.g. `_INBOX.myapp.xyz`) would misalign the parser and cause cryptic
325/// `InvalidPattern` errors.
326pub(crate) fn validate_inbox_shape(inbox: &str) -> Result<(), FastPublishError> {
327 let mut parts = inbox.splitn(3, '.');
328 let first = parts.next().unwrap_or("");
329 let second = parts.next().unwrap_or("");
330 let no_third = parts.next().is_none();
331 if first.is_empty() || second.is_empty() || !no_third {
332 return Err(FastPublishError::new(
333 FastPublishErrorKind::InvalidInboxShape,
334 ));
335 }
336 Ok(())
337}
338
339/// Decide whether the publisher should stall before sending the next message.
340///
341/// Matches the canonical ADR-50 / orbit.go form: wait iff
342///
343/// ```text
344/// last_ack_sequence + effective_flow * max_outstanding_acks <= next_sequence
345/// ```
346///
347/// Equivalently, no wait iff `window > next_sequence`. This uses inclusive
348/// comparison (`<=`) so that at the exact boundary
349/// `(ack + flow * max) == next_seq` the publisher stalls. That matches the
350/// Go reference's `< next_seq` formulation because Go uses `wait_for_ack =
351/// ack + flow*max < next_seq` which is `true` at `next_seq > ack + flow*max`.
352///
353/// Note on formulations:
354/// - ADR-50 pseudocode: `waitForAck := lastAck.Sequence + lastAck.Messages*maxOutstandingAcks <= f.batchSeq`
355/// - This Rust impl uses the ADR-50 inclusive `<=` form verbatim.
356///
357/// `effective_flow` and `max_outstanding_acks` are widened to `u64` before
358/// multiplication to avoid overflow on pathological inputs.
359#[inline]
360pub(crate) fn should_stall(
361 last_ack_sequence: u64,
362 effective_flow: u16,
363 max_outstanding_acks: u16,
364 next_sequence: u64,
365) -> bool {
366 let window = last_ack_sequence
367 .saturating_add((effective_flow as u64).saturating_mul(max_outstanding_acks as u64));
368 window <= next_sequence
369}
370
371/// Default initial flow (ack-every-N) requested from the server.
372pub(crate) const DEFAULT_FLOW: u16 = 100;
373
374/// Default max outstanding-acks window size (per ADR-50: "generally optimal").
375pub(crate) const DEFAULT_MAX_OUTSTANDING_ACKS: u16 = 2;
376
377/// Minimum allowed value for `max_outstanding_acks`.
378pub(crate) const MIN_MAX_OUTSTANDING_ACKS: u16 = 1;
379
380/// Maximum allowed value for `max_outstanding_acks` (ADR-50 recommends 1..=3).
381pub(crate) const MAX_MAX_OUTSTANDING_ACKS: u16 = 3;
382
383/// Extension trait adding [`fast_publish`](FastPublishExt::fast_publish) to any
384/// JetStream-context-like type.
385///
386/// Implemented automatically for [`async_nats::jetstream::Context`] and any
387/// other type that can provide an [`async_nats::Client`] and a default timeout.
388///
389/// # Example
390///
391/// ```no_run
392/// # async fn example(js: async_nats::jetstream::Context) -> Result<(), Box<dyn std::error::Error>> {
393/// use jetstream_extra::batch_publish_fast::FastPublishExt;
394///
395/// let mut batch = js.fast_publish().build()?;
396/// # let _ = batch;
397/// # Ok(())
398/// # }
399/// ```
400pub trait FastPublishExt:
401 async_nats::jetstream::context::traits::ClientProvider
402 + async_nats::jetstream::context::traits::TimeoutProvider
403{
404 /// Start building a [`FastPublisher`] bound to the underlying connection
405 /// and default timeout of this context.
406 fn fast_publish(&self) -> FastPublisherBuilder {
407 FastPublisherBuilder::new(self.client(), self.timeout())
408 }
409}
410
411impl<T> FastPublishExt for T where
412 T: async_nats::jetstream::context::traits::ClientProvider
413 + async_nats::jetstream::context::traits::TimeoutProvider
414{
415}
416
417/// Callback type for asynchronous fast-publish errors (gaps, flow errors,
418/// per-message server errors). Invoked synchronously on the publisher's task
419/// whenever such an event is drained from the inbox. Keep the callback fast
420/// and non-blocking.
421pub type FastPublishErrorHandler = Box<dyn FnMut(FastPublishError) + Send + 'static>;
422
423/// Builder for a [`FastPublisher`].
424///
425/// Obtained via [`FastPublishExt::fast_publish`]. Call [`build`](Self::build)
426/// to validate configuration and produce a ready-to-use publisher.
427///
428/// All setters are optional; defaults match ADR-50 recommendations:
429/// - `flow = 100` (ack every 100 messages ceiling)
430/// - `max_outstanding_acks = 2`
431/// - `gap_mode = Fail`
432/// - `ack_timeout = <JetStream context default>`
433pub struct FastPublisherBuilder {
434 client: async_nats::Client,
435 flow: u16,
436 max_outstanding_acks: u16,
437 ack_timeout: Duration,
438 gap_mode: GapMode,
439 on_error: Option<FastPublishErrorHandler>,
440}
441
442impl FastPublisherBuilder {
443 /// Create a new builder with default settings.
444 ///
445 /// Prefer [`FastPublishExt::fast_publish`] for public use.
446 pub(crate) fn new(client: async_nats::Client, ack_timeout: Duration) -> Self {
447 Self {
448 client,
449 flow: DEFAULT_FLOW,
450 max_outstanding_acks: DEFAULT_MAX_OUTSTANDING_ACKS,
451 ack_timeout,
452 gap_mode: GapMode::default(),
453 on_error: None,
454 }
455 }
456
457 /// Set the client-requested maximum flow — the upper bound on how often
458 /// the server will send flow acks. The server may choose a lower effective
459 /// flow.
460 ///
461 /// Must be at least 1. Values of 0 are clamped to 1.
462 pub fn flow(mut self, flow: u16) -> Self {
463 self.flow = flow.max(1);
464 self
465 }
466
467 /// Set the number of flow-ack-batches that can be in flight before the
468 /// publisher stalls and waits for an ack. Valid range is `1..=3`.
469 ///
470 /// - `1` behaves like synchronous async publish throttled to flow N
471 /// - `2` is the ADR-50 recommended default (optimal for most cases)
472 /// - `3` is useful on higher-RTT links
473 ///
474 /// Values outside the range are returned as an error from [`build`](Self::build).
475 pub fn max_outstanding_acks(mut self, n: u16) -> Self {
476 self.max_outstanding_acks = n;
477 self
478 }
479
480 /// Set the timeout for waiting on flow acks and the final commit ack.
481 ///
482 /// When the publisher is stalled waiting for an ack, it will auto-send
483 /// pings every `ack_timeout / 3` to recover from lost acks, giving up
484 /// after the full `ack_timeout` elapses.
485 pub fn ack_timeout(mut self, timeout: Duration) -> Self {
486 self.ack_timeout = timeout;
487 self
488 }
489
490 /// Set the gap handling mode. Default: [`GapMode::Fail`].
491 pub fn gap_mode(mut self, mode: GapMode) -> Self {
492 self.gap_mode = mode;
493 self
494 }
495
496 /// Register a callback invoked for asynchronous events: gap detections,
497 /// per-message flow errors, and server-side fast-batch errors.
498 ///
499 /// The callback is called on the publisher's task synchronously from the
500 /// middle of `add` / `commit` / `close`, so it must be fast and
501 /// non-blocking.
502 pub fn on_error<F>(mut self, handler: F) -> Self
503 where
504 F: FnMut(FastPublishError) + Send + 'static,
505 {
506 self.on_error = Some(Box::new(handler));
507 self
508 }
509
510 /// Validate configuration and produce a [`FastPublisher`].
511 ///
512 /// The subscription to the batch inbox is NOT created here — it is lazily
513 /// opened on the first `add` / `commit` / `close`. This matches the Go
514 /// reference implementation and avoids wasted subscriptions when a
515 /// publisher is built and then dropped unused.
516 ///
517 /// # Errors
518 ///
519 /// - [`FastPublishErrorKind::InvalidConfig`] if `max_outstanding_acks` is
520 /// outside `1..=3`.
521 /// - [`FastPublishErrorKind::InvalidInboxShape`] if the client's
522 /// `new_inbox()` does not produce a two-token inbox (required by the
523 /// server's reply-subject parser).
524 pub fn build(self) -> Result<FastPublisher, FastPublishError> {
525 if !(MIN_MAX_OUTSTANDING_ACKS..=MAX_MAX_OUTSTANDING_ACKS)
526 .contains(&self.max_outstanding_acks)
527 {
528 return Err(FastPublishError::new(FastPublishErrorKind::InvalidConfig));
529 }
530
531 let inbox = self.client.new_inbox();
532 validate_inbox_shape(&inbox)?;
533
534 let reply_prefix = build_reply_prefix(&inbox, self.flow, self.gap_mode);
535
536 Ok(FastPublisher {
537 client: self.client,
538 inbox,
539 flow: self.flow,
540 gap_mode: self.gap_mode,
541 max_outstanding_acks: self.max_outstanding_acks,
542 ack_timeout: self.ack_timeout,
543 reply_prefix,
544 subscriber: None,
545 sequence: 0,
546 effective_flow: self.flow,
547 last_ack_sequence: 0,
548 initial_ack_received: false,
549 pending_pub_ack: None,
550 first_subject: None,
551 closed: false,
552 fatal: None,
553 on_error: self.on_error,
554 })
555 }
556}
557
558/// A non-atomic, high-throughput JetStream batch publisher using the
559/// fast-ingest protocol (ADR-50, nats-server 2.14+).
560///
561/// Obtain via [`FastPublishExt::fast_publish`] → [`FastPublisherBuilder::build`].
562///
563/// A `FastPublisher` is `Send` but NOT `Sync`: methods require `&mut self` and
564/// the publisher must be driven from a single task. Dropping the publisher
565/// mid-batch is safe — the underlying [`async_nats::Subscriber`] drops with
566/// it, the server-side interest is torn down, and the server will time out
567/// the abandoned batch after 10 seconds.
568pub struct FastPublisher {
569 // --- configuration (immutable after build) ---
570 client: async_nats::Client,
571 inbox: String,
572 flow: u16, // client-requested ceiling
573 gap_mode: GapMode,
574 max_outstanding_acks: u16,
575 ack_timeout: Duration,
576
577 // --- cached reply subject prefix ---
578 // "<inbox>.<effective_flow>.<gap>." — rebuilt when effective_flow changes.
579 reply_prefix: String,
580
581 // --- lazily-created inbox subscription ---
582 subscriber: Option<async_nats::Subscriber>,
583
584 // --- per-batch state ---
585 sequence: u64,
586 effective_flow: u16, // dictated by server; equals `flow` until first ack
587 last_ack_sequence: u64,
588 /// Set to true as soon as the server sends the initial `BatchFlowAck` in
589 /// response to the first `Start` message. This is tracked separately
590 /// from `last_ack_sequence` because the initial ack has `seq:0` (no
591 /// messages persisted yet — just confirming the batch is alive).
592 initial_ack_received: bool,
593 /// Terminal `PubAck` stashed if seen out of band (e.g. single-msg
594 /// immediate-commit fast path during `await_first_reply`).
595 pending_pub_ack: Option<BatchPubAck>,
596 /// Subject of the first published message, used by [`close`](Self::close)
597 /// to construct the EOB commit message and by `ping` as the publish
598 /// target.
599 first_subject: Option<async_nats::Subject>,
600 closed: bool,
601 fatal: Option<FastPublishErrorKind>,
602
603 // --- async error callback ---
604 on_error: Option<FastPublishErrorHandler>,
605}
606
607impl FastPublisher {
608 /// Returns the number of messages added to (and published in) this batch
609 /// so far, excluding any pending commit message.
610 pub fn size(&self) -> u64 {
611 self.sequence
612 }
613
614 /// Returns `true` if the batch has been committed, closed, or failed
615 /// fatally.
616 pub fn is_closed(&self) -> bool {
617 self.closed
618 }
619
620 /// Returns the batch's inbox, which is also the batch identifier as seen
621 /// by the server.
622 pub fn batch_id(&self) -> &str {
623 &self.inbox
624 }
625
626 /// Returns the currently-configured gap mode.
627 pub fn gap_mode(&self) -> GapMode {
628 self.gap_mode
629 }
630
631 /// Returns the highest batch sequence acknowledged by the server so far.
632 /// `0` before the first flow ack arrives.
633 pub fn last_ack_sequence(&self) -> u64 {
634 self.last_ack_sequence
635 }
636}
637
638impl Debug for FastPublisher {
639 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640 f.debug_struct("FastPublisher")
641 .field("inbox", &self.inbox)
642 .field("flow", &self.flow)
643 .field("effective_flow", &self.effective_flow)
644 .field("gap_mode", &self.gap_mode)
645 .field("max_outstanding_acks", &self.max_outstanding_acks)
646 .field("sequence", &self.sequence)
647 .field("last_ack_sequence", &self.last_ack_sequence)
648 .field("closed", &self.closed)
649 .field("fatal", &self.fatal)
650 .finish()
651 }
652}
653
654// Compile-time check: `FastPublisher` must be `Send` so users can spawn it
655// onto tokio tasks. It is intentionally NOT `Sync`: per-batch state is driven
656// through `&mut self` and the publisher is single-consumer.
657const _: fn() = || {
658 fn assert_send<T: Send>() {}
659 assert_send::<FastPublisher>();
660};
661
662// ---------------------------------------------------------------------------
663// Public return type
664// ---------------------------------------------------------------------------
665
666/// Result of a successful [`FastPublisher::add`] / [`FastPublisher::add_message`]
667/// call.
668///
669/// The `ack_sequence` is the highest batch sequence acknowledged by the
670/// server so far. In [`GapMode::Fail`] this means all messages up to and
671/// including `ack_sequence` were persisted. In [`GapMode::Ok`] there may
672/// have been gaps; `ack_sequence` is only a hint about how far the server
673/// has progressed.
674#[derive(Debug, Clone, Copy, PartialEq, Eq)]
675pub struct FastPubAck {
676 /// Batch sequence of the message that was just added.
677 pub batch_sequence: u64,
678 /// Highest batch sequence acknowledged by the server so far.
679 pub ack_sequence: u64,
680}
681
682// ---------------------------------------------------------------------------
683// State machine: add / commit / close
684// ---------------------------------------------------------------------------
685
686/// Convenience discriminator for the `commit_message_inner` call site.
687#[derive(Debug, Clone, Copy, PartialEq, Eq)]
688enum CommitKind {
689 /// Commit with a final stored message (operation code 2).
690 Final,
691 /// Commit via end-of-batch — do not store the final message (operation
692 /// code 3). Used by [`FastPublisher::close`].
693 Eob,
694}
695
696impl FastPublisher {
697 /// Add a message to the batch with the given subject and payload.
698 ///
699 /// Convenience wrapper around [`add_message`](Self::add_message) for
700 /// callers that don't need custom headers.
701 pub async fn add<S: ToSubject>(
702 &mut self,
703 subject: S,
704 payload: Bytes,
705 ) -> Result<FastPubAck, FastPublishError> {
706 self.add_message(OutboundMessage {
707 subject: subject.to_subject(),
708 payload,
709 headers: None,
710 })
711 .await
712 }
713
714 /// Add a pre-constructed message to the batch.
715 ///
716 /// The message's `subject`, `payload`, and `headers` fields are forwarded
717 /// to the server; the reply subject is always set by the publisher.
718 ///
719 /// On the first call, a subscription to the batch inbox is created and
720 /// the publisher waits for the initial flow ack from the server to
721 /// confirm the batch has been accepted.
722 ///
723 /// # Errors
724 ///
725 /// - [`FastPublishErrorKind::Closed`] if the publisher has already been
726 /// committed, closed, or failed fatally.
727 /// - [`FastPublishErrorKind::Subscribe`] if the initial subscription
728 /// fails.
729 /// - [`FastPublishErrorKind::Publish`] if publishing the message fails.
730 /// - [`FastPublishErrorKind::Timeout`] if the initial ack does not arrive
731 /// within `ack_timeout`.
732 /// - Any mapped API error from the server's init response
733 /// (`NotEnabled`, `InvalidPattern`, `InvalidBatchId`, `UnknownBatchId`,
734 /// `TooManyInflight`).
735 pub async fn add_message(
736 &mut self,
737 msg: OutboundMessage,
738 ) -> Result<FastPubAck, FastPublishError> {
739 if self.closed {
740 return Err(FastPublishError::new(FastPublishErrorKind::Closed));
741 }
742 if let Some(kind) = self.fatal {
743 return Err(FastPublishError::new(kind));
744 }
745
746 // Lazy bootstrap on the very first publish.
747 self.ensure_subscribed().await?;
748
749 // Drain any events that arrived while the caller was computing the
750 // next payload. Cost in the common case: one `Pending` poll.
751 self.drain_nonblocking()?;
752 if let Some(kind) = self.fatal {
753 return Err(FastPublishError::new(kind));
754 }
755
756 // Stall gate: if the outstanding-ack window is saturated, wait for a
757 // flow ack before emitting the next message. Check BEFORE incrementing
758 // sequence so the next-message sequence is what we gate on.
759 let next_sequence = self.sequence + 1;
760 if should_stall(
761 self.last_ack_sequence,
762 self.effective_flow,
763 self.max_outstanding_acks,
764 next_sequence,
765 ) {
766 self.wait_for_flow_event_with_pings().await?;
767 if let Some(kind) = self.fatal {
768 return Err(FastPublishError::new(kind));
769 }
770 }
771
772 self.sequence += 1;
773 let op = if self.sequence == 1 {
774 Operation::Start
775 } else {
776 Operation::Append
777 };
778
779 if self.first_subject.is_none() {
780 self.first_subject = Some(msg.subject.clone());
781 }
782
783 let reply = build_reply(&self.reply_prefix, self.sequence, op);
784 self.publish_raw(msg, reply).await?;
785
786 if self.sequence == 1 {
787 self.await_first_reply().await?;
788 }
789
790 // Drain any acks that landed while we were awaiting the first reply
791 // or just now publishing.
792 self.drain_nonblocking()?;
793 if let Some(kind) = self.fatal {
794 return Err(FastPublishError::new(kind));
795 }
796
797 Ok(FastPubAck {
798 batch_sequence: self.sequence,
799 ack_sequence: self.last_ack_sequence,
800 })
801 }
802
803 /// Commit the batch by publishing a final stored message.
804 ///
805 /// After this returns, the publisher is closed and no further messages
806 /// can be added. The returned [`BatchPubAck`] includes the batch id (the
807 /// publisher's inbox) and the total number of messages in the batch.
808 pub async fn commit<S: ToSubject>(
809 self,
810 subject: S,
811 payload: Bytes,
812 ) -> Result<BatchPubAck, FastPublishError> {
813 self.commit_message(OutboundMessage {
814 subject: subject.to_subject(),
815 payload,
816 headers: None,
817 })
818 .await
819 }
820
821 /// Commit the batch with a pre-constructed final message.
822 pub async fn commit_message(
823 mut self,
824 msg: OutboundMessage,
825 ) -> Result<BatchPubAck, FastPublishError> {
826 self.commit_message_inner(msg, CommitKind::Final).await
827 }
828
829 /// End the batch without storing a final message (end-of-batch commit).
830 ///
831 /// Uses the first message's subject as the publish target; the server
832 /// does not persist the commit message itself. Returns the same
833 /// [`BatchPubAck`] shape as [`commit`](Self::commit).
834 ///
835 /// Returns [`FastPublishErrorKind::EmptyBatch`] if no messages have been
836 /// added yet.
837 pub async fn close(mut self) -> Result<BatchPubAck, FastPublishError> {
838 if self.closed {
839 return Err(FastPublishError::new(FastPublishErrorKind::Closed));
840 }
841 if self.sequence == 0 {
842 return Err(FastPublishError::new(FastPublishErrorKind::EmptyBatch));
843 }
844 let subject = self
845 .first_subject
846 .clone()
847 .expect("first_subject set once sequence > 0");
848 let msg = OutboundMessage {
849 subject,
850 payload: Bytes::new(),
851 headers: None,
852 };
853 self.commit_message_inner(msg, CommitKind::Eob).await
854 }
855
856 // ---- internal helpers --------------------------------------------------
857
858 async fn commit_message_inner(
859 &mut self,
860 msg: OutboundMessage,
861 kind: CommitKind,
862 ) -> Result<BatchPubAck, FastPublishError> {
863 if self.closed {
864 return Err(FastPublishError::new(FastPublishErrorKind::Closed));
865 }
866
867 // Lazy bootstrap for first-op-is-commit (single-message immediate
868 // commit) — the commit path uses the same subscribe-on-first-publish
869 // as add_message.
870 self.ensure_subscribed().await?;
871
872 // Drain any pending events first.
873 self.drain_nonblocking()?;
874
875 // If a fatal flow error was observed (e.g. FlowErr or GapDetected in
876 // Fail mode), the batch is already terminal server-side. Don't publish
877 // the commit, but DO drain the inbox to pick up the terminal PubAck
878 // the server will send — it tells the user which messages were
879 // persisted. If the PubAck never arrives, we time out.
880 if let Some(fatal_kind) = self.fatal {
881 self.closed = true;
882 // Try to get the PubAck for diagnostics; ignore drain errors.
883 let _pub_ack = self.drain_until_pub_ack().await.ok();
884 return Err(FastPublishError::new(fatal_kind));
885 }
886
887 // Stall gate: also apply to commits. A commit is just another
888 // message from the server's perspective and counts against the
889 // outstanding-ack window.
890 let next_sequence = self.sequence + 1;
891 if should_stall(
892 self.last_ack_sequence,
893 self.effective_flow,
894 self.max_outstanding_acks,
895 next_sequence,
896 ) {
897 self.wait_for_flow_event_with_pings().await?;
898 if let Some(fatal_kind) = self.fatal {
899 self.closed = true;
900 let _pub_ack = self.drain_until_pub_ack().await.ok();
901 return Err(FastPublishError::new(fatal_kind));
902 }
903 }
904
905 self.sequence += 1;
906 let op = match kind {
907 CommitKind::Final => Operation::Commit,
908 CommitKind::Eob => Operation::CommitEob,
909 };
910
911 if self.first_subject.is_none() {
912 self.first_subject = Some(msg.subject.clone());
913 }
914
915 let reply = build_reply(&self.reply_prefix, self.sequence, op);
916 self.publish_raw(msg, reply).await?;
917 self.closed = true;
918
919 self.drain_until_pub_ack().await
920 }
921
922 /// Create the inbox subscription if it does not exist yet.
923 async fn ensure_subscribed(&mut self) -> Result<(), FastPublishError> {
924 if self.subscriber.is_some() {
925 return Ok(());
926 }
927 let wildcard = format!("{}.>", self.inbox);
928 let sub = self
929 .client
930 .subscribe(wildcard)
931 .await
932 .map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Subscribe, e))?;
933 self.subscriber = Some(sub);
934 Ok(())
935 }
936
937 /// Publish a message with a reply subject, fire-and-forget.
938 ///
939 /// Dispatches to `publish_with_reply_and_headers` when headers are
940 /// present, otherwise the simpler `publish_with_reply`.
941 ///
942 /// Takes `&mut self` to avoid `&FastPublisher` crossing `.await`, which
943 /// would require `FastPublisher: Sync` (not satisfied due to the boxed
944 /// `FnMut` error handler field).
945 async fn publish_raw(
946 &mut self,
947 msg: OutboundMessage,
948 reply: Subject,
949 ) -> Result<(), FastPublishError> {
950 let OutboundMessage {
951 subject,
952 payload,
953 headers,
954 } = msg;
955 let res = match headers {
956 Some(h) => {
957 self.client
958 .publish_with_reply_and_headers(subject, reply, h, payload)
959 .await
960 }
961 None => {
962 self.client
963 .publish_with_reply(subject, reply, payload)
964 .await
965 }
966 };
967 res.map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Publish, e))
968 }
969
970 /// Non-blocking drain: consume all messages currently buffered on the
971 /// subscription and apply them to the publisher state.
972 ///
973 /// The common case (no pending events) costs one `poll_next` returning
974 /// `Pending`, which is a few nanoseconds.
975 fn drain_nonblocking(&mut self) -> Result<(), FastPublishError> {
976 let Some(sub) = self.subscriber.as_mut() else {
977 return Ok(());
978 };
979 let waker = noop_waker_ref();
980 let mut cx = TaskContext::from_waker(waker);
981 loop {
982 match sub.poll_next_unpin(&mut cx) {
983 Poll::Ready(Some(msg)) => {
984 let shared = SharedHandlerState {
985 gap_mode: self.gap_mode,
986 effective_flow: &mut self.effective_flow,
987 last_ack_sequence: &mut self.last_ack_sequence,
988 initial_ack_received: &mut self.initial_ack_received,
989 pending_pub_ack: &mut self.pending_pub_ack,
990 fatal: &mut self.fatal,
991 on_error: self.on_error.as_mut(),
992 };
993 handle_inbox_message(shared, msg)?;
994 }
995 Poll::Ready(None) => {
996 // Subscription ended unexpectedly.
997 self.closed = true;
998 return Err(FastPublishError::new(FastPublishErrorKind::Closed));
999 }
1000 Poll::Pending => return Ok(()),
1001 }
1002 }
1003 }
1004
1005 /// After the first `Start` message is sent, wait for the server to
1006 /// either (a) send a `BatchFlowAck` confirming the batch, (b) return an
1007 /// init-time API error, or (c) send a terminal `PubAck` directly on the
1008 /// single-message immediate-commit fast path (stashed for later pickup).
1009 async fn await_first_reply(&mut self) -> Result<(), FastPublishError> {
1010 let deadline = tokio::time::Instant::now() + self.ack_timeout;
1011 loop {
1012 let now = tokio::time::Instant::now();
1013 if now >= deadline {
1014 return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
1015 }
1016 let remaining = deadline - now;
1017 let sub = self
1018 .subscriber
1019 .as_mut()
1020 .expect("subscriber installed by ensure_subscribed");
1021 let msg = match tokio::time::timeout(remaining, sub.next()).await {
1022 Ok(Some(m)) => m,
1023 Ok(None) => {
1024 self.closed = true;
1025 return Err(FastPublishError::new(FastPublishErrorKind::Closed));
1026 }
1027 Err(_) => return Err(FastPublishError::new(FastPublishErrorKind::Timeout)),
1028 };
1029 let shared = SharedHandlerState {
1030 gap_mode: self.gap_mode,
1031 effective_flow: &mut self.effective_flow,
1032 last_ack_sequence: &mut self.last_ack_sequence,
1033 initial_ack_received: &mut self.initial_ack_received,
1034 pending_pub_ack: &mut self.pending_pub_ack,
1035 fatal: &mut self.fatal,
1036 on_error: self.on_error.as_mut(),
1037 };
1038 handle_inbox_message(shared, msg)?;
1039
1040 if let Some(kind) = self.fatal {
1041 return Err(FastPublishError::new(kind));
1042 }
1043 if self.pending_pub_ack.is_some() {
1044 // Single-message immediate-commit fast path — the commit
1045 // path (drain_until_pub_ack) will pick this up.
1046 return Ok(());
1047 }
1048 if self.initial_ack_received {
1049 // First BatchFlowAck arrived (even with seq:0) — batch is
1050 // confirmed and we can continue publishing.
1051 return Ok(());
1052 }
1053 // Otherwise: gap/err/etc. — keep waiting for the terminal signal.
1054 }
1055 }
1056
1057 /// Drain the subscription until a terminal `PubAck` is received (or a
1058 /// fatal error surfaces).
1059 async fn drain_until_pub_ack(&mut self) -> Result<BatchPubAck, FastPublishError> {
1060 // Already stashed by a prior drain or first-reply?
1061 if let Some(pa) = self.pending_pub_ack.take() {
1062 self.unsubscribe_best_effort().await;
1063 return Ok(pa);
1064 }
1065
1066 let deadline = tokio::time::Instant::now() + self.ack_timeout;
1067 loop {
1068 let now = tokio::time::Instant::now();
1069 if now >= deadline {
1070 return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
1071 }
1072 let remaining = deadline - now;
1073 let sub = self
1074 .subscriber
1075 .as_mut()
1076 .expect("subscriber installed by ensure_subscribed");
1077 let msg = match tokio::time::timeout(remaining, sub.next()).await {
1078 Ok(Some(m)) => m,
1079 Ok(None) => {
1080 self.closed = true;
1081 return Err(FastPublishError::new(FastPublishErrorKind::Closed));
1082 }
1083 Err(_) => return Err(FastPublishError::new(FastPublishErrorKind::Timeout)),
1084 };
1085
1086 let shared = SharedHandlerState {
1087 gap_mode: self.gap_mode,
1088 effective_flow: &mut self.effective_flow,
1089 last_ack_sequence: &mut self.last_ack_sequence,
1090 initial_ack_received: &mut self.initial_ack_received,
1091 pending_pub_ack: &mut self.pending_pub_ack,
1092 fatal: &mut self.fatal,
1093 on_error: self.on_error.as_mut(),
1094 };
1095 handle_inbox_message(shared, msg)?;
1096
1097 if let Some(pa) = self.pending_pub_ack.take() {
1098 self.unsubscribe_best_effort().await;
1099 return Ok(pa);
1100 }
1101 if let Some(kind) = self.fatal.take() {
1102 self.unsubscribe_best_effort().await;
1103 return Err(FastPublishError::new(kind));
1104 }
1105 }
1106 }
1107
1108 async fn unsubscribe_best_effort(&mut self) {
1109 if let Some(mut sub) = self.subscriber.take() {
1110 let _ = sub.unsubscribe().await;
1111 }
1112 }
1113
1114 /// Block until the outstanding-ack window has room for another publish,
1115 /// sending periodic pings to the server to recover from lost acks.
1116 ///
1117 /// Matches Go orbit.go PR #32 `waitForStall`: split the ack_timeout into
1118 /// three intervals so up to two pings fit before the deadline.
1119 async fn wait_for_flow_event_with_pings(&mut self) -> Result<(), FastPublishError> {
1120 // Split timeout into 3 intervals for up to 2 pings before giving up.
1121 // Clamp floor to 100ms so tiny ack_timeouts don't spin.
1122 let ping_interval = (self.ack_timeout / 3).max(Duration::from_millis(100));
1123 let deadline = tokio::time::Instant::now() + self.ack_timeout;
1124 let mut ping_at = tokio::time::Instant::now() + ping_interval;
1125
1126 loop {
1127 if let Some(kind) = self.fatal {
1128 return Err(FastPublishError::new(kind));
1129 }
1130
1131 let now = tokio::time::Instant::now();
1132 if now >= deadline {
1133 return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
1134 }
1135
1136 // Compute the next wake time: either the ping instant or the
1137 // overall deadline, whichever comes first.
1138 let next_wake = ping_at.min(deadline);
1139 let wait = next_wake.saturating_duration_since(now);
1140
1141 let sub = self
1142 .subscriber
1143 .as_mut()
1144 .expect("subscriber installed before stall");
1145
1146 match tokio::time::timeout(wait, sub.next()).await {
1147 Ok(Some(msg)) => {
1148 let shared = SharedHandlerState {
1149 gap_mode: self.gap_mode,
1150 effective_flow: &mut self.effective_flow,
1151 last_ack_sequence: &mut self.last_ack_sequence,
1152 initial_ack_received: &mut self.initial_ack_received,
1153 pending_pub_ack: &mut self.pending_pub_ack,
1154 fatal: &mut self.fatal,
1155 on_error: self.on_error.as_mut(),
1156 };
1157 handle_inbox_message(shared, msg)?;
1158
1159 let next_sequence = self.sequence + 1;
1160 if !should_stall(
1161 self.last_ack_sequence,
1162 self.effective_flow,
1163 self.max_outstanding_acks,
1164 next_sequence,
1165 ) {
1166 // Window has room; we can publish again.
1167 return Ok(());
1168 }
1169 // Still stalled: keep waiting for the next event.
1170 }
1171 Ok(None) => {
1172 self.closed = true;
1173 return Err(FastPublishError::new(FastPublishErrorKind::Closed));
1174 }
1175 Err(_) => {
1176 // Timed out on this interval. Either send a ping (if the
1177 // ping interval expired) or give up (if the overall
1178 // deadline expired).
1179 let now = tokio::time::Instant::now();
1180 if now >= deadline {
1181 return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
1182 }
1183 // Must have hit the ping interval — send a ping and
1184 // schedule the next one.
1185 self.send_ping().await?;
1186 ping_at = tokio::time::Instant::now() + ping_interval;
1187 }
1188 }
1189 }
1190 }
1191
1192 /// Send a ping message (op=4) to recover from a possibly-lost ack.
1193 ///
1194 /// The ping does NOT increment the batch sequence. It is published to the
1195 /// first message's subject (required so the server routes it to the same
1196 /// stream), and triggers the server to resend the latest flow ack.
1197 ///
1198 /// Takes `&mut self` — not because the call mutates state, but because
1199 /// holding `&self` across `.await` would require `FastPublisher: Sync`,
1200 /// which the boxed `FnMut` error handler field prevents. Callers already
1201 /// hold `&mut self` so this is cheap.
1202 async fn send_ping(&mut self) -> Result<(), FastPublishError> {
1203 let Some(subject) = self.first_subject.clone() else {
1204 // Cannot ping before the first add — this should never happen
1205 // because the stall gate only fires after at least one publish.
1206 return Err(FastPublishError::new(FastPublishErrorKind::InvalidState));
1207 };
1208 let reply = build_reply(&self.reply_prefix, self.sequence, Operation::Ping);
1209 self.client
1210 .publish_with_reply(subject, reply, Bytes::new())
1211 .await
1212 .map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Publish, e))?;
1213 Ok(())
1214 }
1215}
1216
1217// ---------------------------------------------------------------------------
1218// Handler for inbox messages
1219// ---------------------------------------------------------------------------
1220
1221/// Mutable reference bundle passed to [`handle_inbox_message`] so the
1222/// function can update all the relevant fields without conflicting borrows
1223/// on `&mut FastPublisher` (which would prevent us from also holding a
1224/// mutable borrow on `self.subscriber` while processing drained messages).
1225struct SharedHandlerState<'a> {
1226 gap_mode: GapMode,
1227 effective_flow: &'a mut u16,
1228 last_ack_sequence: &'a mut u64,
1229 initial_ack_received: &'a mut bool,
1230 pending_pub_ack: &'a mut Option<BatchPubAck>,
1231 fatal: &'a mut Option<FastPublishErrorKind>,
1232 on_error: Option<&'a mut FastPublishErrorHandler>,
1233}
1234
1235fn handle_inbox_message(
1236 mut state: SharedHandlerState<'_>,
1237 msg: async_nats::Message,
1238) -> Result<(), FastPublishError> {
1239 match classify(&msg.payload)? {
1240 Classified::FlowAck(ack) => {
1241 // The initial BatchFlowAck for a fresh batch has `seq:0` (no
1242 // messages persisted yet — it just confirms the batch was
1243 // accepted). We must track it separately from `last_ack_sequence`
1244 // because `seq:0` is the same as the default `last_ack_sequence`.
1245 *state.initial_ack_received = true;
1246 // Lost-ack handling: any newer seq implicitly acks all below.
1247 if ack.sequence > *state.last_ack_sequence {
1248 *state.last_ack_sequence = ack.sequence;
1249 }
1250 // Server-dictated flow override. Update the effective flow used by
1251 // the stall gate. The reply prefix always uses the CLIENT's initial
1252 // flow ceiling (not the server-dictated value) because ADR-50 says
1253 // "We add the initial flow [...] for replica followers who might
1254 // have missed the first message due to limits." The server's
1255 // getFastBatch parser reads the flow from the reply subject to set
1256 // maxAckMessages on followers — using the server-dictated (lower)
1257 // value would cap follower ramp-up incorrectly in clustered setups.
1258 let new_flow = ack.messages.max(1);
1259 if new_flow != *state.effective_flow {
1260 *state.effective_flow = new_flow;
1261 // NOTE: reply_prefix is NOT rebuilt here — it always contains
1262 // the initial flow ceiling. Only effective_flow (used by the
1263 // stall gate) is updated.
1264 }
1265 }
1266 Classified::FlowGap(gap) => {
1267 tracing::debug!(
1268 expected_last = gap.expected_last_sequence,
1269 current = gap.current_sequence,
1270 "fast batch gap detected"
1271 );
1272 if let Some(h) = state.on_error.as_deref_mut() {
1273 h(FastPublishError::new(FastPublishErrorKind::GapDetected));
1274 }
1275 // In GapMode::Fail, stop publishing immediately. The server has
1276 // already abandoned the batch; further publishes would be silently
1277 // dropped. The terminal PubAck (containing persisted-message
1278 // count) will arrive on the next drain/commit.
1279 if state.gap_mode == GapMode::Fail {
1280 *state.fatal = Some(FastPublishErrorKind::GapDetected);
1281 }
1282 }
1283 Classified::FlowErr(ferr) => {
1284 tracing::debug!(
1285 batch_sequence = ferr.sequence,
1286 err_code = ferr.error.error_code().0,
1287 "fast batch flow error"
1288 );
1289 let kind = FastPublishErrorKind::from_api_error(&ferr.error);
1290 if let Some(h) = state.on_error.as_deref_mut() {
1291 h(FastPublishError::new(kind));
1292 }
1293 if state.gap_mode == GapMode::Fail {
1294 *state.fatal = Some(kind);
1295 }
1296 }
1297 Classified::PubAck(pa) => {
1298 *state.pending_pub_ack = Some(pa);
1299 }
1300 Classified::InitError(err) => {
1301 *state.fatal = Some(FastPublishErrorKind::from_api_error(&err));
1302 }
1303 }
1304 Ok(())
1305}
1306
1307// ---------------------------------------------------------------------------
1308// Tests
1309// ---------------------------------------------------------------------------
1310
1311#[cfg(test)]
1312mod tests {
1313 use super::*;
1314
1315 // -- reply subject format ------------------------------------------------
1316
1317 #[test]
1318 fn reply_prefix_format_ok_mode() {
1319 let p = build_reply_prefix("_INBOX.abc123", 100, GapMode::Ok);
1320 assert_eq!(p, "_INBOX.abc123.100.ok.");
1321 }
1322
1323 #[test]
1324 fn reply_prefix_format_fail_mode() {
1325 let p = build_reply_prefix("_INBOX.abc123", 50, GapMode::Fail);
1326 assert_eq!(p, "_INBOX.abc123.50.fail.");
1327 }
1328
1329 #[test]
1330 fn reply_full_all_operations() {
1331 let prefix = build_reply_prefix("_INBOX.x", 10, GapMode::Fail);
1332 for (op, code) in [
1333 (Operation::Start, 0_u8),
1334 (Operation::Append, 1),
1335 (Operation::Commit, 2),
1336 (Operation::CommitEob, 3),
1337 (Operation::Ping, 4),
1338 ] {
1339 let r = build_reply(&prefix, 42, op);
1340 assert_eq!(r.as_str(), format!("_INBOX.x.10.fail.42.{code}.$FI"));
1341 }
1342 }
1343
1344 #[test]
1345 fn reply_full_both_gap_modes() {
1346 for (mode, tag) in [(GapMode::Ok, "ok"), (GapMode::Fail, "fail")] {
1347 let prefix = build_reply_prefix("_INBOX.abc", 25, mode);
1348 let r = build_reply(&prefix, 1, Operation::Start);
1349 assert_eq!(r.as_str(), format!("_INBOX.abc.25.{tag}.1.0.$FI"));
1350 }
1351 }
1352
1353 // -- inbox shape validation ----------------------------------------------
1354
1355 #[test]
1356 fn inbox_shape_accepts_two_tokens() {
1357 assert!(validate_inbox_shape("_INBOX.abc123").is_ok());
1358 assert!(validate_inbox_shape("X.Y").is_ok());
1359 }
1360
1361 #[test]
1362 fn inbox_shape_rejects_zero_dots() {
1363 assert!(matches!(
1364 validate_inbox_shape("INBOX").unwrap_err().kind(),
1365 FastPublishErrorKind::InvalidInboxShape
1366 ));
1367 }
1368
1369 #[test]
1370 fn inbox_shape_rejects_three_or_more_tokens() {
1371 assert!(matches!(
1372 validate_inbox_shape("_INBOX.myapp.abc123")
1373 .unwrap_err()
1374 .kind(),
1375 FastPublishErrorKind::InvalidInboxShape
1376 ));
1377 assert!(matches!(
1378 validate_inbox_shape("a.b.c.d").unwrap_err().kind(),
1379 FastPublishErrorKind::InvalidInboxShape
1380 ));
1381 }
1382
1383 #[test]
1384 fn inbox_shape_rejects_empty_tokens() {
1385 assert!(validate_inbox_shape(".abc").is_err());
1386 assert!(validate_inbox_shape("abc.").is_err());
1387 assert!(validate_inbox_shape("").is_err());
1388 }
1389
1390 // -- JSON parsing --------------------------------------------------------
1391
1392 #[test]
1393 fn parse_batch_flow_ack() {
1394 let payload = br#"{"type":"ack","seq":10,"msgs":15}"#;
1395 match classify(payload).unwrap() {
1396 Classified::FlowAck(a) => {
1397 assert_eq!(a.sequence, 10);
1398 assert_eq!(a.messages, 15);
1399 }
1400 other => panic!("expected FlowAck, got {other:?}"),
1401 }
1402 }
1403
1404 #[test]
1405 fn parse_batch_flow_gap() {
1406 let payload = br#"{"type":"gap","last_seq":10,"seq":15}"#;
1407 match classify(payload).unwrap() {
1408 Classified::FlowGap(g) => {
1409 assert_eq!(g.expected_last_sequence, 10);
1410 assert_eq!(g.current_sequence, 15);
1411 }
1412 other => panic!("expected FlowGap, got {other:?}"),
1413 }
1414 }
1415
1416 #[test]
1417 fn parse_batch_flow_err() {
1418 let payload = br#"{"type":"err","seq":7,"error":{"code":400,"err_code":10071,"description":"wrong last sequence: 1"}}"#;
1419 match classify(payload).unwrap() {
1420 Classified::FlowErr(e) => {
1421 assert_eq!(e.sequence, 7);
1422 assert_eq!(e.error.error_code().0, 10071);
1423 }
1424 other => panic!("expected FlowErr, got {other:?}"),
1425 }
1426 }
1427
1428 #[test]
1429 fn parse_terminal_pub_ack() {
1430 let payload = br#"{"stream":"TEST","seq":42,"batch":"inbox-id","count":10}"#;
1431 match classify(payload).unwrap() {
1432 Classified::PubAck(pa) => {
1433 assert_eq!(pa.stream, "TEST");
1434 assert_eq!(pa.sequence, 42);
1435 assert_eq!(pa.batch_id, "inbox-id");
1436 assert_eq!(pa.batch_size, 10);
1437 }
1438 other => panic!("expected PubAck, got {other:?}"),
1439 }
1440 }
1441
1442 #[test]
1443 fn parse_init_error_response() {
1444 // Server sends init errors as plain JSPubAckResponse with no `type`
1445 // field — just `{"error":{...}}`. The classifier falls through the
1446 // tagged-enum parse and hits `Response::Err`.
1447 let payload =
1448 br#"{"error":{"code":400,"err_code":10205,"description":"fast batch publish not enabled"}}"#;
1449 match classify(payload).unwrap() {
1450 Classified::InitError(err) => {
1451 assert_eq!(err.error_code().0, 10205);
1452 assert_eq!(
1453 FastPublishErrorKind::from_api_error(&err),
1454 FastPublishErrorKind::NotEnabled
1455 );
1456 }
1457 other => panic!("expected InitError, got {other:?}"),
1458 }
1459 }
1460
1461 #[test]
1462 fn classify_malformed_json_returns_error() {
1463 let payload = b"not json at all";
1464 let err = classify(payload).unwrap_err();
1465 assert!(matches!(err.kind(), FastPublishErrorKind::Serialization));
1466 }
1467
1468 // -- stall formula -------------------------------------------------------
1469
1470 #[test]
1471 fn stall_no_wait_when_window_is_strictly_greater() {
1472 // seq=19, ack=0, flow=10, max=2 → window=20, 20 > 19 → no wait
1473 assert!(!should_stall(0, 10, 2, 19));
1474 }
1475
1476 #[test]
1477 fn stall_waits_at_exact_boundary() {
1478 // seq=20, ack=0, flow=10, max=2 → window=20, 20 <= 20 → wait
1479 // This matches ADR-50's `<=` formulation.
1480 assert!(should_stall(0, 10, 2, 20));
1481 }
1482
1483 #[test]
1484 fn stall_waits_past_boundary() {
1485 // seq=21 → window=20 <= 21 → wait
1486 assert!(should_stall(0, 10, 2, 21));
1487 }
1488
1489 #[test]
1490 fn stall_honors_last_ack() {
1491 // ack=10, flow=10, max=2, seq=30 → window=30, 30 <= 30 → wait
1492 assert!(should_stall(10, 10, 2, 30));
1493 // ack=10, flow=10, max=2, seq=29 → window=30, 30 > 29 → no wait
1494 assert!(!should_stall(10, 10, 2, 29));
1495 }
1496
1497 #[test]
1498 fn stall_with_single_outstanding_ack() {
1499 // max=1 matches "ack every N" throttling
1500 assert!(!should_stall(0, 10, 1, 9));
1501 assert!(should_stall(0, 10, 1, 10));
1502 assert!(should_stall(0, 10, 1, 11));
1503 }
1504
1505 #[test]
1506 fn stall_with_max_outstanding_three() {
1507 // max=3 matches higher-RTT throughput mode
1508 assert!(!should_stall(0, 10, 3, 29));
1509 assert!(should_stall(0, 10, 3, 30));
1510 }
1511
1512 #[test]
1513 fn stall_saturates_on_pathological_inputs() {
1514 // Ensure u64 overflow is saturated — should never panic.
1515 let waited = should_stall(u64::MAX - 5, u16::MAX, u16::MAX, u64::MAX);
1516 // Window saturates to u64::MAX, which is > u64::MAX is false, so should stall.
1517 assert!(waited);
1518 }
1519
1520 // -- error code mapping --------------------------------------------------
1521
1522 fn api_err(code: u64) -> async_nats::jetstream::Error {
1523 // Build a minimal jetstream::Error via JSON round-trip so we don't
1524 // depend on any constructor that may not exist in the public API.
1525 let json = format!(r#"{{"code":400,"err_code":{code},"description":"test"}}"#);
1526 serde_json::from_str(&json).expect("synthetic api error parses")
1527 }
1528
1529 #[test]
1530 fn error_code_mapping_verified_against_server() {
1531 assert_eq!(
1532 FastPublishErrorKind::from_api_error(&api_err(10205)),
1533 FastPublishErrorKind::NotEnabled
1534 );
1535 assert_eq!(
1536 FastPublishErrorKind::from_api_error(&api_err(10206)),
1537 FastPublishErrorKind::InvalidPattern
1538 );
1539 assert_eq!(
1540 FastPublishErrorKind::from_api_error(&api_err(10207)),
1541 FastPublishErrorKind::InvalidBatchId
1542 );
1543 assert_eq!(
1544 FastPublishErrorKind::from_api_error(&api_err(10208)),
1545 FastPublishErrorKind::UnknownBatchId
1546 );
1547 assert_eq!(
1548 FastPublishErrorKind::from_api_error(&api_err(10211)),
1549 FastPublishErrorKind::TooManyInflight
1550 );
1551 }
1552
1553 #[test]
1554 fn error_code_mapping_unknown_is_flow_error() {
1555 assert_eq!(
1556 FastPublishErrorKind::from_api_error(&api_err(10071)),
1557 FastPublishErrorKind::FlowError
1558 );
1559 assert_eq!(
1560 FastPublishErrorKind::from_api_error(&api_err(99999)),
1561 FastPublishErrorKind::FlowError
1562 );
1563 }
1564
1565 // -- display impl --------------------------------------------------------
1566
1567 #[test]
1568 fn error_kind_display_non_empty() {
1569 for kind in [
1570 FastPublishErrorKind::NotEnabled,
1571 FastPublishErrorKind::InvalidPattern,
1572 FastPublishErrorKind::InvalidBatchId,
1573 FastPublishErrorKind::UnknownBatchId,
1574 FastPublishErrorKind::TooManyInflight,
1575 FastPublishErrorKind::GapDetected,
1576 FastPublishErrorKind::FlowError,
1577 FastPublishErrorKind::EmptyBatch,
1578 FastPublishErrorKind::InvalidInboxShape,
1579 FastPublishErrorKind::InvalidConfig,
1580 FastPublishErrorKind::Closed,
1581 FastPublishErrorKind::Timeout,
1582 FastPublishErrorKind::Subscribe,
1583 FastPublishErrorKind::Publish,
1584 FastPublishErrorKind::Serialization,
1585 FastPublishErrorKind::InvalidState,
1586 FastPublishErrorKind::Other,
1587 ] {
1588 let s = format!("{kind}");
1589 assert!(!s.is_empty(), "empty Display for {kind:?}");
1590 }
1591 }
1592
1593 // -- default impls -------------------------------------------------------
1594
1595 #[test]
1596 fn gap_mode_default_is_fail() {
1597 assert_eq!(GapMode::default(), GapMode::Fail);
1598 }
1599
1600 // -- builder validation --------------------------------------------------
1601 //
1602 // These tests construct a bare `FastPublisherBuilder` without a real
1603 // NATS connection. We use a dummy client obtained by connecting to a
1604 // bogus address lazily — no I/O is performed because `async_nats::Client`
1605 // implements the configuration pathways that are hit by `build()`:
1606 // only `new_inbox()` (pure, no network) is called before validation.
1607 //
1608 // Since we cannot synthesize an `async_nats::Client` without a runtime,
1609 // each test that needs one does so inside a tokio runtime.
1610
1611 async fn dummy_builder() -> (nats_server::Server, FastPublisherBuilder) {
1612 let server = nats_server::run_server("tests/configs/jetstream.conf");
1613 let client = async_nats::connect(server.client_url()).await.unwrap();
1614 (
1615 server,
1616 FastPublisherBuilder::new(client, Duration::from_secs(5)),
1617 )
1618 }
1619
1620 #[tokio::test]
1621 async fn builder_rejects_max_outstanding_zero() {
1622 let (_s, b) = dummy_builder().await;
1623 let err = b.max_outstanding_acks(0).build().unwrap_err();
1624 assert!(matches!(err.kind(), FastPublishErrorKind::InvalidConfig));
1625 }
1626
1627 #[tokio::test]
1628 async fn builder_rejects_max_outstanding_four() {
1629 let (_s, b) = dummy_builder().await;
1630 let err = b.max_outstanding_acks(4).build().unwrap_err();
1631 assert!(matches!(err.kind(), FastPublishErrorKind::InvalidConfig));
1632 }
1633
1634 #[tokio::test]
1635 async fn builder_accepts_all_valid_max_outstanding() {
1636 for n in 1..=3 {
1637 let (_s, b) = dummy_builder().await;
1638 let fp = b.max_outstanding_acks(n).build().expect("valid config");
1639 assert_eq!(fp.max_outstanding_acks, n);
1640 }
1641 }
1642
1643 #[tokio::test]
1644 async fn builder_clamps_flow_zero_to_one() {
1645 let (_s, b) = dummy_builder().await;
1646 let fp = b.flow(0).build().expect("flow clamped to 1");
1647 assert_eq!(fp.flow, 1);
1648 }
1649
1650 #[tokio::test]
1651 async fn builder_default_values() {
1652 let (_s, b) = dummy_builder().await;
1653 let fp = b.build().expect("defaults build ok");
1654 assert_eq!(fp.flow, DEFAULT_FLOW);
1655 assert_eq!(fp.effective_flow, DEFAULT_FLOW);
1656 assert_eq!(fp.max_outstanding_acks, DEFAULT_MAX_OUTSTANDING_ACKS);
1657 assert_eq!(fp.gap_mode, GapMode::Fail);
1658 assert_eq!(fp.sequence, 0);
1659 assert_eq!(fp.last_ack_sequence, 0);
1660 assert!(fp.subscriber.is_none());
1661 assert!(!fp.is_closed());
1662 assert_eq!(fp.size(), 0);
1663 }
1664
1665 #[tokio::test]
1666 async fn builder_produces_cached_reply_prefix() {
1667 let (_s, b) = dummy_builder().await;
1668 let fp = b.flow(42).gap_mode(GapMode::Ok).build().unwrap();
1669 assert!(fp.reply_prefix.starts_with(&fp.inbox));
1670 assert!(fp.reply_prefix.ends_with(".42.ok."));
1671 }
1672
1673 #[tokio::test]
1674 async fn builder_batch_id_is_the_inbox() {
1675 let (_s, b) = dummy_builder().await;
1676 let fp = b.build().unwrap();
1677 assert_eq!(fp.batch_id(), fp.inbox);
1678 assert_eq!(fp.inbox.matches('.').count(), 1);
1679 }
1680}