Skip to main content

jetstream_extra/
batch_publish_fast.rs

1// Copyright 2026 Synadia Communications Inc.
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Fast-ingest batch publishing for NATS JetStream (ADR-50 fast ingest).
15//!
16//! Fast ingest is a high-throughput, non-atomic batch publisher. Unlike atomic
17//! [`batch_publish`](crate::batch_publish), which stages an entire batch and
18//! commits it or drops it, fast ingest persists each message as it arrives and
19//! uses a persistent inbox subscription plus server-driven flow control to
20//! coordinate throughput across many concurrent publishers.
21//!
22//! Requires nats-server 2.14 or later and `allow_batched: true` on the stream.
23//!
24//! # Architecture
25//!
26//! A [`FastPublisher`] owns its own [`async_nats::Subscriber`] and drives it
27//! inline from `add` / `commit` / `close`. There is no background task, no
28//! shared state, and no locks. This mirrors the single-task pattern used by
29//! `nats-extra/src/request_many.rs`.
30//!
31//! # Not safe for concurrent use
32//!
33//! A `FastPublisher` holds per-batch state (sequence counters, cached reply
34//! subject prefix, effective flow) and is driven via `&mut self`. Use one
35//! publisher per task. Clone the underlying JetStream context if you need
36//! independent publishers.
37
38use std::{
39    fmt::{self, Debug, Display},
40    task::{Context as TaskContext, Poll},
41    time::Duration,
42};
43
44use async_nats::Subject;
45use async_nats::jetstream::message::OutboundMessage;
46use async_nats::subject::ToSubject;
47use bytes::Bytes;
48use futures::StreamExt;
49use futures::task::noop_waker_ref;
50use serde::Deserialize;
51
52use crate::batch_publish::BatchPubAck;
53
54/// How the server should handle gaps in the batch sequence.
55///
56/// A gap means one or more messages in the batch were lost in transit between
57/// the client and the stream leader (e.g. due to buffer drops under load).
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
59#[non_exhaustive]
60pub enum GapMode {
61    /// Allow gaps — the batch continues and the server informs the client via
62    /// a gap event. Use this when some message loss is acceptable (metrics,
63    /// telemetry).
64    Ok,
65    /// Fail the batch on the first gap. Use this when in-order, gap-free
66    /// delivery is required (object store chunks, ordered events). Default.
67    #[default]
68    Fail,
69}
70
71impl GapMode {
72    pub(crate) fn as_str(self) -> &'static str {
73        match self {
74            Self::Ok => "ok",
75            Self::Fail => "fail",
76        }
77    }
78}
79
80/// Fast-ingest operation codes (match the `$FI` reply-subject tail).
81///
82/// Encoded as the second-to-last segment of the reply subject before `$FI`.
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84#[repr(u8)]
85pub(crate) enum Operation {
86    Start = 0,
87    Append = 1,
88    Commit = 2,
89    CommitEob = 3,
90    Ping = 4,
91}
92
93/// Error type for fast-ingest batch publish operations.
94pub type FastPublishError = async_nats::error::Error<FastPublishErrorKind>;
95
96/// Kinds of errors that can occur during fast-ingest batch publishing.
97///
98/// API error codes are verified against `nats-server` 2.14 `errors.json`.
99///
100/// Marked `#[non_exhaustive]` — adding a new variant in a future release will
101/// not be a breaking change. Match on `_` for forward compatibility.
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
103#[non_exhaustive]
104pub enum FastPublishErrorKind {
105    /// Stream does not have `allow_batched: true`. (`BATCH_PUBLISH_DISABLED`, 10205)
106    NotEnabled,
107    /// Reply subject pattern rejected by the server. (`BATCH_PUBLISH_INVALID_PATTERN`, 10206)
108    InvalidPattern,
109    /// Batch id exceeds 64 characters or is otherwise invalid.
110    /// (`BATCH_PUBLISH_INVALID_BATCH_ID`, 10207)
111    InvalidBatchId,
112    /// Server has forgotten this batch (timed out, leader change in
113    /// `GapMode::Fail`, etc.). (`BATCH_PUBLISH_UNKNOWN_BATCH_ID`, 10208)
114    UnknownBatchId,
115    /// Too many in-flight fast batches on the server.
116    /// (`BATCH_PUBLISH_TOO_MANY_INFLIGHT`, 10211)
117    TooManyInflight,
118    /// A gap was detected while running in [`GapMode::Fail`]. The final ack
119    /// will indicate which messages were persisted.
120    GapDetected,
121    /// Any other server-side error reported via a `BatchFlowErr` message.
122    FlowError,
123    /// `close()` was called on a publisher that has not received any `add`s.
124    EmptyBatch,
125    /// `build()` rejected the inbox because it does not have exactly two
126    /// tokens. The reply-subject parser requires `<prefix>.<id>` shape.
127    InvalidInboxShape,
128    /// `build()` rejected a configuration value (e.g. `max_outstanding_acks`
129    /// outside `1..=3`).
130    InvalidConfig,
131    /// Called a method on a publisher that has already committed, closed, or
132    /// failed fatally.
133    Closed,
134    /// Timed out waiting for a flow ack or final pub ack.
135    Timeout,
136    /// Failed to subscribe to the batch inbox.
137    Subscribe,
138    /// Failed to publish a batch message.
139    Publish,
140    /// Failed to parse a server response.
141    Serialization,
142    /// An internal operation that depends on a runtime invariant was called
143    /// before that invariant held — currently only emitted when the publisher
144    /// would need to send a ping but no message has been published yet, so
145    /// there is no first-subject to address the ping to.
146    InvalidState,
147    /// Catch-all.
148    Other,
149}
150
151impl FastPublishErrorKind {
152    /// Map a JetStream API error code to the matching fast-ingest error kind.
153    pub(crate) fn from_api_error(error: &async_nats::jetstream::Error) -> Self {
154        use async_nats::jetstream::ErrorCode;
155        match error.error_code() {
156            ErrorCode::BATCH_PUBLISH_DISABLED => Self::NotEnabled,
157            ErrorCode::BATCH_PUBLISH_INVALID_PATTERN => Self::InvalidPattern,
158            ErrorCode::BATCH_PUBLISH_INVALID_BATCH_ID => Self::InvalidBatchId,
159            ErrorCode::BATCH_PUBLISH_UNKNOWN_BATCH_ID => Self::UnknownBatchId,
160            ErrorCode::BATCH_PUBLISH_TOO_MANY_INFLIGHT => Self::TooManyInflight,
161            _ => Self::FlowError,
162        }
163    }
164}
165
166impl Display for FastPublishErrorKind {
167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168        match self {
169            Self::NotEnabled => write!(f, "fast batch publish not enabled on stream"),
170            Self::InvalidPattern => write!(f, "fast batch publish invalid reply subject pattern"),
171            Self::InvalidBatchId => write!(
172                f,
173                "fast batch publish id is invalid (exceeds 64 characters)"
174            ),
175            Self::UnknownBatchId => write!(f, "fast batch publish id is unknown to the server"),
176            Self::TooManyInflight => write!(f, "too many in-flight fast batches on the server"),
177            Self::GapDetected => write!(f, "gap detected in fast batch (gap_mode=fail)"),
178            Self::FlowError => write!(f, "fast batch flow error"),
179            Self::EmptyBatch => write!(f, "cannot close an empty batch"),
180            Self::InvalidInboxShape => {
181                write!(f, "inbox must have exactly two tokens (e.g. _INBOX.<id>)")
182            }
183            Self::InvalidConfig => write!(f, "invalid fast publisher configuration"),
184            Self::Closed => write!(f, "fast publisher is closed"),
185            Self::Timeout => write!(f, "timeout waiting for fast batch ack"),
186            Self::Subscribe => write!(f, "failed to subscribe to fast batch inbox"),
187            Self::Publish => write!(f, "failed to publish fast batch message"),
188            Self::Serialization => write!(f, "failed to (de)serialize fast batch message"),
189            Self::InvalidState => {
190                write!(f, "operation not allowed in current fast publisher state")
191            }
192            Self::Other => write!(f, "other fast batch publish error"),
193        }
194    }
195}
196
197/// Flow-control message sent by the server when a batch of messages has been
198/// persisted. Wire tag: `"type":"ack"`.
199#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
200pub(crate) struct BatchFlowAck {
201    /// Highest batch sequence covered by this ack.
202    ///
203    /// In `GapMode::Fail` this means all messages up to and including this
204    /// sequence were persisted. In `GapMode::Ok` some may have been lost.
205    #[serde(rename = "seq")]
206    pub sequence: u64,
207    /// How often the server will send subsequent flow acks (every N messages).
208    #[serde(rename = "msgs")]
209    pub messages: u16,
210}
211
212/// Gap notification sent when the server detects missing messages in a batch.
213/// Wire tag: `"type":"gap"`.
214#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
215pub(crate) struct BatchFlowGap {
216    /// The last sequence the server expected to receive before the gap.
217    ///
218    /// Messages with sequences `[expected_last_sequence+1, current_sequence)`
219    /// were lost.
220    #[serde(rename = "last_seq")]
221    pub expected_last_sequence: u64,
222    /// The sequence of the message that triggered gap detection.
223    #[serde(rename = "seq")]
224    pub current_sequence: u64,
225}
226
227/// Per-message error sent when a batch message fails a server-side check
228/// (e.g. expected-last-seq mismatch). Wire tag: `"type":"err"`.
229#[derive(Debug, Clone, Deserialize)]
230pub(crate) struct BatchFlowErr {
231    /// The batch sequence of the message that triggered the error.
232    #[serde(rename = "seq")]
233    pub sequence: u64,
234    /// The full API error as returned by the server.
235    pub error: async_nats::jetstream::Error,
236}
237
238/// Result of classifying a message received on the batch inbox.
239///
240/// The classifier dispatches on the `type` field (a serde-tagged enum) and
241/// falls back to a full `Response<BatchPubAck>` parse for messages without a
242/// `type` discriminator (the terminal pub-ack or an init-time error).
243#[derive(Debug)]
244pub(crate) enum Classified {
245    FlowAck(BatchFlowAck),
246    FlowGap(BatchFlowGap),
247    FlowErr(BatchFlowErr),
248    /// Terminal publish acknowledgment — delivered on `commit`/`close` or on
249    /// the single-message immediate-commit fast path.
250    PubAck(BatchPubAck),
251    /// Init-time API error — returned in response to the `0` (Start) or
252    /// `2`/`3` (Commit/CommitEob) operation when the server rejects the batch
253    /// before it even begins.
254    InitError(async_nats::jetstream::Error),
255}
256
257/// Tagged enum for `type`-discriminated messages.
258#[derive(Debug, Deserialize)]
259#[serde(tag = "type", rename_all = "lowercase")]
260enum TaggedFlow {
261    Ack(BatchFlowAck),
262    Gap(BatchFlowGap),
263    Err(BatchFlowErr),
264}
265
266/// Classify a payload received on the batch inbox into one of the five
267/// possible shapes.
268///
269/// Implementation strategy: try the tagged-enum deserializer first; if that
270/// fails (no `type` field), fall back to parsing the payload as a terminal
271/// `Response<BatchPubAck>`. This matches the Go byte-search optimization in
272/// spirit while remaining fully serde-driven.
273pub(crate) fn classify(payload: &[u8]) -> Result<Classified, FastPublishError> {
274    if let Ok(tagged) = serde_json::from_slice::<TaggedFlow>(payload) {
275        return Ok(match tagged {
276            TaggedFlow::Ack(a) => Classified::FlowAck(a),
277            TaggedFlow::Gap(g) => Classified::FlowGap(g),
278            TaggedFlow::Err(e) => Classified::FlowErr(e),
279        });
280    }
281
282    let resp: async_nats::jetstream::response::Response<BatchPubAck> =
283        serde_json::from_slice(payload)
284            .map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Serialization, e))?;
285
286    Ok(match resp {
287        async_nats::jetstream::response::Response::Ok(pa) => Classified::PubAck(pa),
288        async_nats::jetstream::response::Response::Err { error } => Classified::InitError(error),
289    })
290}
291
292/// Build the stable prefix of the reply subject:
293/// `<inbox>.<flow>.<gap>.`
294///
295/// The caller appends `<seq>.<op>.$FI` per message via [`build_reply`]. Cached
296/// on the publisher and rebuilt only when the server dictates a new flow via
297/// a [`BatchFlowAck`].
298pub(crate) fn build_reply_prefix(inbox: &str, flow: u16, gap: GapMode) -> String {
299    format!("{inbox}.{flow}.{}.", gap.as_str())
300}
301
302/// Build a full per-message reply subject: `<prefix><seq>.<op>.$FI`.
303///
304/// `$FI` marks the subject as a fast-ingest reply to the server's parser
305/// (`server/stream.go:getFastBatch`). Returns a [`Subject`] so the caller can
306/// pass it to `publish_with_reply` without an extra `String → Bytes` copy.
307pub(crate) fn build_reply(prefix: &str, seq: u64, op: Operation) -> Subject {
308    use std::fmt::Write as _;
309    // Hot path: pre-size for prefix + up to 20 digits of u64 + ".N.$FI" (6).
310    let mut s = String::with_capacity(prefix.len() + 26);
311    s.push_str(prefix);
312    write!(s, "{seq}.{}.$FI", op as u8).expect("String write is infallible");
313    // `From<String> for Subject` is zero-copy — moves the buffer into Bytes.
314    Subject::from(s)
315}
316
317/// Validate that an inbox has the shape the fast-ingest reply subject parser
318/// requires: exactly two tokens separated by a single dot (e.g. `_INBOX.<id>`).
319///
320/// The server (`server/stream.go:5174`) parses the reply subject from the
321/// right, expecting `<prefix>.<uuid>.<flow>.<gap>.<seq>.<op>.$FI`. Our scheme
322/// uses the inbox as the `<prefix>.<uuid>` portion — which only aligns if the
323/// inbox itself is exactly two tokens. A custom multi-token inbox prefix
324/// (e.g. `_INBOX.myapp.xyz`) would misalign the parser and cause cryptic
325/// `InvalidPattern` errors.
326pub(crate) fn validate_inbox_shape(inbox: &str) -> Result<(), FastPublishError> {
327    let mut parts = inbox.splitn(3, '.');
328    let first = parts.next().unwrap_or("");
329    let second = parts.next().unwrap_or("");
330    let no_third = parts.next().is_none();
331    if first.is_empty() || second.is_empty() || !no_third {
332        return Err(FastPublishError::new(
333            FastPublishErrorKind::InvalidInboxShape,
334        ));
335    }
336    Ok(())
337}
338
339/// Decide whether the publisher should stall before sending the next message.
340///
341/// Matches the canonical ADR-50 / orbit.go form: wait iff
342///
343/// ```text
344/// last_ack_sequence + effective_flow * max_outstanding_acks <= next_sequence
345/// ```
346///
347/// Equivalently, no wait iff `window > next_sequence`. This uses inclusive
348/// comparison (`<=`) so that at the exact boundary
349/// `(ack + flow * max) == next_seq` the publisher stalls. That matches the
350/// Go reference's `< next_seq` formulation because Go uses `wait_for_ack =
351/// ack + flow*max < next_seq` which is `true` at `next_seq > ack + flow*max`.
352///
353/// Note on formulations:
354/// - ADR-50 pseudocode: `waitForAck := lastAck.Sequence + lastAck.Messages*maxOutstandingAcks <= f.batchSeq`
355/// - This Rust impl uses the ADR-50 inclusive `<=` form verbatim.
356///
357/// `effective_flow` and `max_outstanding_acks` are widened to `u64` before
358/// multiplication to avoid overflow on pathological inputs.
359#[inline]
360pub(crate) fn should_stall(
361    last_ack_sequence: u64,
362    effective_flow: u16,
363    max_outstanding_acks: u16,
364    next_sequence: u64,
365) -> bool {
366    let window = last_ack_sequence
367        .saturating_add((effective_flow as u64).saturating_mul(max_outstanding_acks as u64));
368    window <= next_sequence
369}
370
371/// Default initial flow (ack-every-N) requested from the server.
372pub(crate) const DEFAULT_FLOW: u16 = 100;
373
374/// Default max outstanding-acks window size (per ADR-50: "generally optimal").
375pub(crate) const DEFAULT_MAX_OUTSTANDING_ACKS: u16 = 2;
376
377/// Minimum allowed value for `max_outstanding_acks`.
378pub(crate) const MIN_MAX_OUTSTANDING_ACKS: u16 = 1;
379
380/// Maximum allowed value for `max_outstanding_acks` (ADR-50 recommends 1..=3).
381pub(crate) const MAX_MAX_OUTSTANDING_ACKS: u16 = 3;
382
383/// Extension trait adding [`fast_publish`](FastPublishExt::fast_publish) to any
384/// JetStream-context-like type.
385///
386/// Implemented automatically for [`async_nats::jetstream::Context`] and any
387/// other type that can provide an [`async_nats::Client`] and a default timeout.
388///
389/// # Example
390///
391/// ```no_run
392/// # async fn example(js: async_nats::jetstream::Context) -> Result<(), Box<dyn std::error::Error>> {
393/// use jetstream_extra::batch_publish_fast::FastPublishExt;
394///
395/// let mut batch = js.fast_publish().build()?;
396/// # let _ = batch;
397/// # Ok(())
398/// # }
399/// ```
400pub trait FastPublishExt:
401    async_nats::jetstream::context::traits::ClientProvider
402    + async_nats::jetstream::context::traits::TimeoutProvider
403{
404    /// Start building a [`FastPublisher`] bound to the underlying connection
405    /// and default timeout of this context.
406    fn fast_publish(&self) -> FastPublisherBuilder {
407        FastPublisherBuilder::new(self.client(), self.timeout())
408    }
409}
410
411impl<T> FastPublishExt for T where
412    T: async_nats::jetstream::context::traits::ClientProvider
413        + async_nats::jetstream::context::traits::TimeoutProvider
414{
415}
416
417/// Callback type for asynchronous fast-publish errors (gaps, flow errors,
418/// per-message server errors). Invoked synchronously on the publisher's task
419/// whenever such an event is drained from the inbox. Keep the callback fast
420/// and non-blocking.
421pub type FastPublishErrorHandler = Box<dyn FnMut(FastPublishError) + Send + 'static>;
422
423/// Builder for a [`FastPublisher`].
424///
425/// Obtained via [`FastPublishExt::fast_publish`]. Call [`build`](Self::build)
426/// to validate configuration and produce a ready-to-use publisher.
427///
428/// All setters are optional; defaults match ADR-50 recommendations:
429/// - `flow = 100` (ack every 100 messages ceiling)
430/// - `max_outstanding_acks = 2`
431/// - `gap_mode = Fail`
432/// - `ack_timeout = <JetStream context default>`
433pub struct FastPublisherBuilder {
434    client: async_nats::Client,
435    flow: u16,
436    max_outstanding_acks: u16,
437    ack_timeout: Duration,
438    gap_mode: GapMode,
439    on_error: Option<FastPublishErrorHandler>,
440}
441
442impl FastPublisherBuilder {
443    /// Create a new builder with default settings.
444    ///
445    /// Prefer [`FastPublishExt::fast_publish`] for public use.
446    pub(crate) fn new(client: async_nats::Client, ack_timeout: Duration) -> Self {
447        Self {
448            client,
449            flow: DEFAULT_FLOW,
450            max_outstanding_acks: DEFAULT_MAX_OUTSTANDING_ACKS,
451            ack_timeout,
452            gap_mode: GapMode::default(),
453            on_error: None,
454        }
455    }
456
457    /// Set the client-requested maximum flow — the upper bound on how often
458    /// the server will send flow acks. The server may choose a lower effective
459    /// flow.
460    ///
461    /// Must be at least 1. Values of 0 are clamped to 1.
462    pub fn flow(mut self, flow: u16) -> Self {
463        self.flow = flow.max(1);
464        self
465    }
466
467    /// Set the number of flow-ack-batches that can be in flight before the
468    /// publisher stalls and waits for an ack. Valid range is `1..=3`.
469    ///
470    /// - `1` behaves like synchronous async publish throttled to flow N
471    /// - `2` is the ADR-50 recommended default (optimal for most cases)
472    /// - `3` is useful on higher-RTT links
473    ///
474    /// Values outside the range are returned as an error from [`build`](Self::build).
475    pub fn max_outstanding_acks(mut self, n: u16) -> Self {
476        self.max_outstanding_acks = n;
477        self
478    }
479
480    /// Set the timeout for waiting on flow acks and the final commit ack.
481    ///
482    /// When the publisher is stalled waiting for an ack, it will auto-send
483    /// pings every `ack_timeout / 3` to recover from lost acks, giving up
484    /// after the full `ack_timeout` elapses.
485    pub fn ack_timeout(mut self, timeout: Duration) -> Self {
486        self.ack_timeout = timeout;
487        self
488    }
489
490    /// Set the gap handling mode. Default: [`GapMode::Fail`].
491    pub fn gap_mode(mut self, mode: GapMode) -> Self {
492        self.gap_mode = mode;
493        self
494    }
495
496    /// Register a callback invoked for asynchronous events: gap detections,
497    /// per-message flow errors, and server-side fast-batch errors.
498    ///
499    /// The callback is called on the publisher's task synchronously from the
500    /// middle of `add` / `commit` / `close`, so it must be fast and
501    /// non-blocking.
502    pub fn on_error<F>(mut self, handler: F) -> Self
503    where
504        F: FnMut(FastPublishError) + Send + 'static,
505    {
506        self.on_error = Some(Box::new(handler));
507        self
508    }
509
510    /// Validate configuration and produce a [`FastPublisher`].
511    ///
512    /// The subscription to the batch inbox is NOT created here — it is lazily
513    /// opened on the first `add` / `commit` / `close`. This matches the Go
514    /// reference implementation and avoids wasted subscriptions when a
515    /// publisher is built and then dropped unused.
516    ///
517    /// # Errors
518    ///
519    /// - [`FastPublishErrorKind::InvalidConfig`] if `max_outstanding_acks` is
520    ///   outside `1..=3`.
521    /// - [`FastPublishErrorKind::InvalidInboxShape`] if the client's
522    ///   `new_inbox()` does not produce a two-token inbox (required by the
523    ///   server's reply-subject parser).
524    pub fn build(self) -> Result<FastPublisher, FastPublishError> {
525        if !(MIN_MAX_OUTSTANDING_ACKS..=MAX_MAX_OUTSTANDING_ACKS)
526            .contains(&self.max_outstanding_acks)
527        {
528            return Err(FastPublishError::new(FastPublishErrorKind::InvalidConfig));
529        }
530
531        let inbox = self.client.new_inbox();
532        validate_inbox_shape(&inbox)?;
533
534        let reply_prefix = build_reply_prefix(&inbox, self.flow, self.gap_mode);
535
536        Ok(FastPublisher {
537            client: self.client,
538            inbox,
539            flow: self.flow,
540            gap_mode: self.gap_mode,
541            max_outstanding_acks: self.max_outstanding_acks,
542            ack_timeout: self.ack_timeout,
543            reply_prefix,
544            subscriber: None,
545            sequence: 0,
546            effective_flow: self.flow,
547            last_ack_sequence: 0,
548            initial_ack_received: false,
549            pending_pub_ack: None,
550            first_subject: None,
551            closed: false,
552            fatal: None,
553            on_error: self.on_error,
554        })
555    }
556}
557
558/// A non-atomic, high-throughput JetStream batch publisher using the
559/// fast-ingest protocol (ADR-50, nats-server 2.14+).
560///
561/// Obtain via [`FastPublishExt::fast_publish`] → [`FastPublisherBuilder::build`].
562///
563/// A `FastPublisher` is `Send` but NOT `Sync`: methods require `&mut self` and
564/// the publisher must be driven from a single task. Dropping the publisher
565/// mid-batch is safe — the underlying [`async_nats::Subscriber`] drops with
566/// it, the server-side interest is torn down, and the server will time out
567/// the abandoned batch after 10 seconds.
568pub struct FastPublisher {
569    // --- configuration (immutable after build) ---
570    client: async_nats::Client,
571    inbox: String,
572    flow: u16, // client-requested ceiling
573    gap_mode: GapMode,
574    max_outstanding_acks: u16,
575    ack_timeout: Duration,
576
577    // --- cached reply subject prefix ---
578    // "<inbox>.<effective_flow>.<gap>." — rebuilt when effective_flow changes.
579    reply_prefix: String,
580
581    // --- lazily-created inbox subscription ---
582    subscriber: Option<async_nats::Subscriber>,
583
584    // --- per-batch state ---
585    sequence: u64,
586    effective_flow: u16, // dictated by server; equals `flow` until first ack
587    last_ack_sequence: u64,
588    /// Set to true as soon as the server sends the initial `BatchFlowAck` in
589    /// response to the first `Start` message. This is tracked separately
590    /// from `last_ack_sequence` because the initial ack has `seq:0` (no
591    /// messages persisted yet — just confirming the batch is alive).
592    initial_ack_received: bool,
593    /// Terminal `PubAck` stashed if seen out of band (e.g. single-msg
594    /// immediate-commit fast path during `await_first_reply`).
595    pending_pub_ack: Option<BatchPubAck>,
596    /// Subject of the first published message, used by [`close`](Self::close)
597    /// to construct the EOB commit message and by `ping` as the publish
598    /// target.
599    first_subject: Option<async_nats::Subject>,
600    closed: bool,
601    fatal: Option<FastPublishErrorKind>,
602
603    // --- async error callback ---
604    on_error: Option<FastPublishErrorHandler>,
605}
606
607impl FastPublisher {
608    /// Returns the number of messages added to (and published in) this batch
609    /// so far, excluding any pending commit message.
610    pub fn size(&self) -> u64 {
611        self.sequence
612    }
613
614    /// Returns `true` if the batch has been committed, closed, or failed
615    /// fatally.
616    pub fn is_closed(&self) -> bool {
617        self.closed
618    }
619
620    /// Returns the batch's inbox, which is also the batch identifier as seen
621    /// by the server.
622    pub fn batch_id(&self) -> &str {
623        &self.inbox
624    }
625
626    /// Returns the currently-configured gap mode.
627    pub fn gap_mode(&self) -> GapMode {
628        self.gap_mode
629    }
630
631    /// Returns the highest batch sequence acknowledged by the server so far.
632    /// `0` before the first flow ack arrives.
633    pub fn last_ack_sequence(&self) -> u64 {
634        self.last_ack_sequence
635    }
636}
637
638impl Debug for FastPublisher {
639    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640        f.debug_struct("FastPublisher")
641            .field("inbox", &self.inbox)
642            .field("flow", &self.flow)
643            .field("effective_flow", &self.effective_flow)
644            .field("gap_mode", &self.gap_mode)
645            .field("max_outstanding_acks", &self.max_outstanding_acks)
646            .field("sequence", &self.sequence)
647            .field("last_ack_sequence", &self.last_ack_sequence)
648            .field("closed", &self.closed)
649            .field("fatal", &self.fatal)
650            .finish()
651    }
652}
653
654// Compile-time check: `FastPublisher` must be `Send` so users can spawn it
655// onto tokio tasks. It is intentionally NOT `Sync`: per-batch state is driven
656// through `&mut self` and the publisher is single-consumer.
657const _: fn() = || {
658    fn assert_send<T: Send>() {}
659    assert_send::<FastPublisher>();
660};
661
662// ---------------------------------------------------------------------------
663// Public return type
664// ---------------------------------------------------------------------------
665
666/// Result of a successful [`FastPublisher::add`] / [`FastPublisher::add_message`]
667/// call.
668///
669/// The `ack_sequence` is the highest batch sequence acknowledged by the
670/// server so far. In [`GapMode::Fail`] this means all messages up to and
671/// including `ack_sequence` were persisted. In [`GapMode::Ok`] there may
672/// have been gaps; `ack_sequence` is only a hint about how far the server
673/// has progressed.
674#[derive(Debug, Clone, Copy, PartialEq, Eq)]
675pub struct FastPubAck {
676    /// Batch sequence of the message that was just added.
677    pub batch_sequence: u64,
678    /// Highest batch sequence acknowledged by the server so far.
679    pub ack_sequence: u64,
680}
681
682// ---------------------------------------------------------------------------
683// State machine: add / commit / close
684// ---------------------------------------------------------------------------
685
686/// Convenience discriminator for the `commit_message_inner` call site.
687#[derive(Debug, Clone, Copy, PartialEq, Eq)]
688enum CommitKind {
689    /// Commit with a final stored message (operation code 2).
690    Final,
691    /// Commit via end-of-batch — do not store the final message (operation
692    /// code 3). Used by [`FastPublisher::close`].
693    Eob,
694}
695
696impl FastPublisher {
697    /// Add a message to the batch with the given subject and payload.
698    ///
699    /// Convenience wrapper around [`add_message`](Self::add_message) for
700    /// callers that don't need custom headers.
701    pub async fn add<S: ToSubject>(
702        &mut self,
703        subject: S,
704        payload: Bytes,
705    ) -> Result<FastPubAck, FastPublishError> {
706        self.add_message(OutboundMessage {
707            subject: subject.to_subject(),
708            payload,
709            headers: None,
710        })
711        .await
712    }
713
714    /// Add a pre-constructed message to the batch.
715    ///
716    /// The message's `subject`, `payload`, and `headers` fields are forwarded
717    /// to the server; the reply subject is always set by the publisher.
718    ///
719    /// On the first call, a subscription to the batch inbox is created and
720    /// the publisher waits for the initial flow ack from the server to
721    /// confirm the batch has been accepted.
722    ///
723    /// # Errors
724    ///
725    /// - [`FastPublishErrorKind::Closed`] if the publisher has already been
726    ///   committed, closed, or failed fatally.
727    /// - [`FastPublishErrorKind::Subscribe`] if the initial subscription
728    ///   fails.
729    /// - [`FastPublishErrorKind::Publish`] if publishing the message fails.
730    /// - [`FastPublishErrorKind::Timeout`] if the initial ack does not arrive
731    ///   within `ack_timeout`.
732    /// - Any mapped API error from the server's init response
733    ///   (`NotEnabled`, `InvalidPattern`, `InvalidBatchId`, `UnknownBatchId`,
734    ///   `TooManyInflight`).
735    pub async fn add_message(
736        &mut self,
737        msg: OutboundMessage,
738    ) -> Result<FastPubAck, FastPublishError> {
739        if self.closed {
740            return Err(FastPublishError::new(FastPublishErrorKind::Closed));
741        }
742        if let Some(kind) = self.fatal {
743            return Err(FastPublishError::new(kind));
744        }
745
746        // Lazy bootstrap on the very first publish.
747        self.ensure_subscribed().await?;
748
749        // Drain any events that arrived while the caller was computing the
750        // next payload. Cost in the common case: one `Pending` poll.
751        self.drain_nonblocking()?;
752        if let Some(kind) = self.fatal {
753            return Err(FastPublishError::new(kind));
754        }
755
756        // Stall gate: if the outstanding-ack window is saturated, wait for a
757        // flow ack before emitting the next message. Check BEFORE incrementing
758        // sequence so the next-message sequence is what we gate on.
759        let next_sequence = self.sequence + 1;
760        if should_stall(
761            self.last_ack_sequence,
762            self.effective_flow,
763            self.max_outstanding_acks,
764            next_sequence,
765        ) {
766            self.wait_for_flow_event_with_pings().await?;
767            if let Some(kind) = self.fatal {
768                return Err(FastPublishError::new(kind));
769            }
770        }
771
772        self.sequence += 1;
773        let op = if self.sequence == 1 {
774            Operation::Start
775        } else {
776            Operation::Append
777        };
778
779        if self.first_subject.is_none() {
780            self.first_subject = Some(msg.subject.clone());
781        }
782
783        let reply = build_reply(&self.reply_prefix, self.sequence, op);
784        self.publish_raw(msg, reply).await?;
785
786        if self.sequence == 1 {
787            self.await_first_reply().await?;
788        }
789
790        // Drain any acks that landed while we were awaiting the first reply
791        // or just now publishing.
792        self.drain_nonblocking()?;
793        if let Some(kind) = self.fatal {
794            return Err(FastPublishError::new(kind));
795        }
796
797        Ok(FastPubAck {
798            batch_sequence: self.sequence,
799            ack_sequence: self.last_ack_sequence,
800        })
801    }
802
803    /// Commit the batch by publishing a final stored message.
804    ///
805    /// After this returns, the publisher is closed and no further messages
806    /// can be added. The returned [`BatchPubAck`] includes the batch id (the
807    /// publisher's inbox) and the total number of messages in the batch.
808    pub async fn commit<S: ToSubject>(
809        self,
810        subject: S,
811        payload: Bytes,
812    ) -> Result<BatchPubAck, FastPublishError> {
813        self.commit_message(OutboundMessage {
814            subject: subject.to_subject(),
815            payload,
816            headers: None,
817        })
818        .await
819    }
820
821    /// Commit the batch with a pre-constructed final message.
822    pub async fn commit_message(
823        mut self,
824        msg: OutboundMessage,
825    ) -> Result<BatchPubAck, FastPublishError> {
826        self.commit_message_inner(msg, CommitKind::Final).await
827    }
828
829    /// End the batch without storing a final message (end-of-batch commit).
830    ///
831    /// Uses the first message's subject as the publish target; the server
832    /// does not persist the commit message itself. Returns the same
833    /// [`BatchPubAck`] shape as [`commit`](Self::commit).
834    ///
835    /// Returns [`FastPublishErrorKind::EmptyBatch`] if no messages have been
836    /// added yet.
837    pub async fn close(mut self) -> Result<BatchPubAck, FastPublishError> {
838        if self.closed {
839            return Err(FastPublishError::new(FastPublishErrorKind::Closed));
840        }
841        if self.sequence == 0 {
842            return Err(FastPublishError::new(FastPublishErrorKind::EmptyBatch));
843        }
844        let subject = self
845            .first_subject
846            .clone()
847            .expect("first_subject set once sequence > 0");
848        let msg = OutboundMessage {
849            subject,
850            payload: Bytes::new(),
851            headers: None,
852        };
853        self.commit_message_inner(msg, CommitKind::Eob).await
854    }
855
856    // ---- internal helpers --------------------------------------------------
857
858    async fn commit_message_inner(
859        &mut self,
860        msg: OutboundMessage,
861        kind: CommitKind,
862    ) -> Result<BatchPubAck, FastPublishError> {
863        if self.closed {
864            return Err(FastPublishError::new(FastPublishErrorKind::Closed));
865        }
866
867        // Lazy bootstrap for first-op-is-commit (single-message immediate
868        // commit) — the commit path uses the same subscribe-on-first-publish
869        // as add_message.
870        self.ensure_subscribed().await?;
871
872        // Drain any pending events first.
873        self.drain_nonblocking()?;
874
875        // If a fatal flow error was observed (e.g. FlowErr or GapDetected in
876        // Fail mode), the batch is already terminal server-side. Don't publish
877        // the commit, but DO drain the inbox to pick up the terminal PubAck
878        // the server will send — it tells the user which messages were
879        // persisted. If the PubAck never arrives, we time out.
880        if let Some(fatal_kind) = self.fatal {
881            self.closed = true;
882            // Try to get the PubAck for diagnostics; ignore drain errors.
883            let _pub_ack = self.drain_until_pub_ack().await.ok();
884            return Err(FastPublishError::new(fatal_kind));
885        }
886
887        // Stall gate: also apply to commits. A commit is just another
888        // message from the server's perspective and counts against the
889        // outstanding-ack window.
890        let next_sequence = self.sequence + 1;
891        if should_stall(
892            self.last_ack_sequence,
893            self.effective_flow,
894            self.max_outstanding_acks,
895            next_sequence,
896        ) {
897            self.wait_for_flow_event_with_pings().await?;
898            if let Some(fatal_kind) = self.fatal {
899                self.closed = true;
900                let _pub_ack = self.drain_until_pub_ack().await.ok();
901                return Err(FastPublishError::new(fatal_kind));
902            }
903        }
904
905        self.sequence += 1;
906        let op = match kind {
907            CommitKind::Final => Operation::Commit,
908            CommitKind::Eob => Operation::CommitEob,
909        };
910
911        if self.first_subject.is_none() {
912            self.first_subject = Some(msg.subject.clone());
913        }
914
915        let reply = build_reply(&self.reply_prefix, self.sequence, op);
916        self.publish_raw(msg, reply).await?;
917        self.closed = true;
918
919        self.drain_until_pub_ack().await
920    }
921
922    /// Create the inbox subscription if it does not exist yet.
923    async fn ensure_subscribed(&mut self) -> Result<(), FastPublishError> {
924        if self.subscriber.is_some() {
925            return Ok(());
926        }
927        let wildcard = format!("{}.>", self.inbox);
928        let sub = self
929            .client
930            .subscribe(wildcard)
931            .await
932            .map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Subscribe, e))?;
933        self.subscriber = Some(sub);
934        Ok(())
935    }
936
937    /// Publish a message with a reply subject, fire-and-forget.
938    ///
939    /// Dispatches to `publish_with_reply_and_headers` when headers are
940    /// present, otherwise the simpler `publish_with_reply`.
941    ///
942    /// Takes `&mut self` to avoid `&FastPublisher` crossing `.await`, which
943    /// would require `FastPublisher: Sync` (not satisfied due to the boxed
944    /// `FnMut` error handler field).
945    async fn publish_raw(
946        &mut self,
947        msg: OutboundMessage,
948        reply: Subject,
949    ) -> Result<(), FastPublishError> {
950        let OutboundMessage {
951            subject,
952            payload,
953            headers,
954        } = msg;
955        let res = match headers {
956            Some(h) => {
957                self.client
958                    .publish_with_reply_and_headers(subject, reply, h, payload)
959                    .await
960            }
961            None => {
962                self.client
963                    .publish_with_reply(subject, reply, payload)
964                    .await
965            }
966        };
967        res.map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Publish, e))
968    }
969
970    /// Non-blocking drain: consume all messages currently buffered on the
971    /// subscription and apply them to the publisher state.
972    ///
973    /// The common case (no pending events) costs one `poll_next` returning
974    /// `Pending`, which is a few nanoseconds.
975    fn drain_nonblocking(&mut self) -> Result<(), FastPublishError> {
976        let Some(sub) = self.subscriber.as_mut() else {
977            return Ok(());
978        };
979        let waker = noop_waker_ref();
980        let mut cx = TaskContext::from_waker(waker);
981        loop {
982            match sub.poll_next_unpin(&mut cx) {
983                Poll::Ready(Some(msg)) => {
984                    let shared = SharedHandlerState {
985                        gap_mode: self.gap_mode,
986                        effective_flow: &mut self.effective_flow,
987                        last_ack_sequence: &mut self.last_ack_sequence,
988                        initial_ack_received: &mut self.initial_ack_received,
989                        pending_pub_ack: &mut self.pending_pub_ack,
990                        fatal: &mut self.fatal,
991                        on_error: self.on_error.as_mut(),
992                    };
993                    handle_inbox_message(shared, msg)?;
994                }
995                Poll::Ready(None) => {
996                    // Subscription ended unexpectedly.
997                    self.closed = true;
998                    return Err(FastPublishError::new(FastPublishErrorKind::Closed));
999                }
1000                Poll::Pending => return Ok(()),
1001            }
1002        }
1003    }
1004
1005    /// After the first `Start` message is sent, wait for the server to
1006    /// either (a) send a `BatchFlowAck` confirming the batch, (b) return an
1007    /// init-time API error, or (c) send a terminal `PubAck` directly on the
1008    /// single-message immediate-commit fast path (stashed for later pickup).
1009    async fn await_first_reply(&mut self) -> Result<(), FastPublishError> {
1010        let deadline = tokio::time::Instant::now() + self.ack_timeout;
1011        loop {
1012            let now = tokio::time::Instant::now();
1013            if now >= deadline {
1014                return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
1015            }
1016            let remaining = deadline - now;
1017            let sub = self
1018                .subscriber
1019                .as_mut()
1020                .expect("subscriber installed by ensure_subscribed");
1021            let msg = match tokio::time::timeout(remaining, sub.next()).await {
1022                Ok(Some(m)) => m,
1023                Ok(None) => {
1024                    self.closed = true;
1025                    return Err(FastPublishError::new(FastPublishErrorKind::Closed));
1026                }
1027                Err(_) => return Err(FastPublishError::new(FastPublishErrorKind::Timeout)),
1028            };
1029            let shared = SharedHandlerState {
1030                gap_mode: self.gap_mode,
1031                effective_flow: &mut self.effective_flow,
1032                last_ack_sequence: &mut self.last_ack_sequence,
1033                initial_ack_received: &mut self.initial_ack_received,
1034                pending_pub_ack: &mut self.pending_pub_ack,
1035                fatal: &mut self.fatal,
1036                on_error: self.on_error.as_mut(),
1037            };
1038            handle_inbox_message(shared, msg)?;
1039
1040            if let Some(kind) = self.fatal {
1041                return Err(FastPublishError::new(kind));
1042            }
1043            if self.pending_pub_ack.is_some() {
1044                // Single-message immediate-commit fast path — the commit
1045                // path (drain_until_pub_ack) will pick this up.
1046                return Ok(());
1047            }
1048            if self.initial_ack_received {
1049                // First BatchFlowAck arrived (even with seq:0) — batch is
1050                // confirmed and we can continue publishing.
1051                return Ok(());
1052            }
1053            // Otherwise: gap/err/etc. — keep waiting for the terminal signal.
1054        }
1055    }
1056
1057    /// Drain the subscription until a terminal `PubAck` is received (or a
1058    /// fatal error surfaces).
1059    async fn drain_until_pub_ack(&mut self) -> Result<BatchPubAck, FastPublishError> {
1060        // Already stashed by a prior drain or first-reply?
1061        if let Some(pa) = self.pending_pub_ack.take() {
1062            self.unsubscribe_best_effort().await;
1063            return Ok(pa);
1064        }
1065
1066        let deadline = tokio::time::Instant::now() + self.ack_timeout;
1067        loop {
1068            let now = tokio::time::Instant::now();
1069            if now >= deadline {
1070                return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
1071            }
1072            let remaining = deadline - now;
1073            let sub = self
1074                .subscriber
1075                .as_mut()
1076                .expect("subscriber installed by ensure_subscribed");
1077            let msg = match tokio::time::timeout(remaining, sub.next()).await {
1078                Ok(Some(m)) => m,
1079                Ok(None) => {
1080                    self.closed = true;
1081                    return Err(FastPublishError::new(FastPublishErrorKind::Closed));
1082                }
1083                Err(_) => return Err(FastPublishError::new(FastPublishErrorKind::Timeout)),
1084            };
1085
1086            let shared = SharedHandlerState {
1087                gap_mode: self.gap_mode,
1088                effective_flow: &mut self.effective_flow,
1089                last_ack_sequence: &mut self.last_ack_sequence,
1090                initial_ack_received: &mut self.initial_ack_received,
1091                pending_pub_ack: &mut self.pending_pub_ack,
1092                fatal: &mut self.fatal,
1093                on_error: self.on_error.as_mut(),
1094            };
1095            handle_inbox_message(shared, msg)?;
1096
1097            if let Some(pa) = self.pending_pub_ack.take() {
1098                self.unsubscribe_best_effort().await;
1099                return Ok(pa);
1100            }
1101            if let Some(kind) = self.fatal.take() {
1102                self.unsubscribe_best_effort().await;
1103                return Err(FastPublishError::new(kind));
1104            }
1105        }
1106    }
1107
1108    async fn unsubscribe_best_effort(&mut self) {
1109        if let Some(mut sub) = self.subscriber.take() {
1110            let _ = sub.unsubscribe().await;
1111        }
1112    }
1113
1114    /// Block until the outstanding-ack window has room for another publish,
1115    /// sending periodic pings to the server to recover from lost acks.
1116    ///
1117    /// Matches Go orbit.go PR #32 `waitForStall`: split the ack_timeout into
1118    /// three intervals so up to two pings fit before the deadline.
1119    async fn wait_for_flow_event_with_pings(&mut self) -> Result<(), FastPublishError> {
1120        // Split timeout into 3 intervals for up to 2 pings before giving up.
1121        // Clamp floor to 100ms so tiny ack_timeouts don't spin.
1122        let ping_interval = (self.ack_timeout / 3).max(Duration::from_millis(100));
1123        let deadline = tokio::time::Instant::now() + self.ack_timeout;
1124        let mut ping_at = tokio::time::Instant::now() + ping_interval;
1125
1126        loop {
1127            if let Some(kind) = self.fatal {
1128                return Err(FastPublishError::new(kind));
1129            }
1130
1131            let now = tokio::time::Instant::now();
1132            if now >= deadline {
1133                return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
1134            }
1135
1136            // Compute the next wake time: either the ping instant or the
1137            // overall deadline, whichever comes first.
1138            let next_wake = ping_at.min(deadline);
1139            let wait = next_wake.saturating_duration_since(now);
1140
1141            let sub = self
1142                .subscriber
1143                .as_mut()
1144                .expect("subscriber installed before stall");
1145
1146            match tokio::time::timeout(wait, sub.next()).await {
1147                Ok(Some(msg)) => {
1148                    let shared = SharedHandlerState {
1149                        gap_mode: self.gap_mode,
1150                        effective_flow: &mut self.effective_flow,
1151                        last_ack_sequence: &mut self.last_ack_sequence,
1152                        initial_ack_received: &mut self.initial_ack_received,
1153                        pending_pub_ack: &mut self.pending_pub_ack,
1154                        fatal: &mut self.fatal,
1155                        on_error: self.on_error.as_mut(),
1156                    };
1157                    handle_inbox_message(shared, msg)?;
1158
1159                    let next_sequence = self.sequence + 1;
1160                    if !should_stall(
1161                        self.last_ack_sequence,
1162                        self.effective_flow,
1163                        self.max_outstanding_acks,
1164                        next_sequence,
1165                    ) {
1166                        // Window has room; we can publish again.
1167                        return Ok(());
1168                    }
1169                    // Still stalled: keep waiting for the next event.
1170                }
1171                Ok(None) => {
1172                    self.closed = true;
1173                    return Err(FastPublishError::new(FastPublishErrorKind::Closed));
1174                }
1175                Err(_) => {
1176                    // Timed out on this interval. Either send a ping (if the
1177                    // ping interval expired) or give up (if the overall
1178                    // deadline expired).
1179                    let now = tokio::time::Instant::now();
1180                    if now >= deadline {
1181                        return Err(FastPublishError::new(FastPublishErrorKind::Timeout));
1182                    }
1183                    // Must have hit the ping interval — send a ping and
1184                    // schedule the next one.
1185                    self.send_ping().await?;
1186                    ping_at = tokio::time::Instant::now() + ping_interval;
1187                }
1188            }
1189        }
1190    }
1191
1192    /// Send a ping message (op=4) to recover from a possibly-lost ack.
1193    ///
1194    /// The ping does NOT increment the batch sequence. It is published to the
1195    /// first message's subject (required so the server routes it to the same
1196    /// stream), and triggers the server to resend the latest flow ack.
1197    ///
1198    /// Takes `&mut self` — not because the call mutates state, but because
1199    /// holding `&self` across `.await` would require `FastPublisher: Sync`,
1200    /// which the boxed `FnMut` error handler field prevents. Callers already
1201    /// hold `&mut self` so this is cheap.
1202    async fn send_ping(&mut self) -> Result<(), FastPublishError> {
1203        let Some(subject) = self.first_subject.clone() else {
1204            // Cannot ping before the first add — this should never happen
1205            // because the stall gate only fires after at least one publish.
1206            return Err(FastPublishError::new(FastPublishErrorKind::InvalidState));
1207        };
1208        let reply = build_reply(&self.reply_prefix, self.sequence, Operation::Ping);
1209        self.client
1210            .publish_with_reply(subject, reply, Bytes::new())
1211            .await
1212            .map_err(|e| FastPublishError::with_source(FastPublishErrorKind::Publish, e))?;
1213        Ok(())
1214    }
1215}
1216
1217// ---------------------------------------------------------------------------
1218// Handler for inbox messages
1219// ---------------------------------------------------------------------------
1220
1221/// Mutable reference bundle passed to [`handle_inbox_message`] so the
1222/// function can update all the relevant fields without conflicting borrows
1223/// on `&mut FastPublisher` (which would prevent us from also holding a
1224/// mutable borrow on `self.subscriber` while processing drained messages).
1225struct SharedHandlerState<'a> {
1226    gap_mode: GapMode,
1227    effective_flow: &'a mut u16,
1228    last_ack_sequence: &'a mut u64,
1229    initial_ack_received: &'a mut bool,
1230    pending_pub_ack: &'a mut Option<BatchPubAck>,
1231    fatal: &'a mut Option<FastPublishErrorKind>,
1232    on_error: Option<&'a mut FastPublishErrorHandler>,
1233}
1234
1235fn handle_inbox_message(
1236    mut state: SharedHandlerState<'_>,
1237    msg: async_nats::Message,
1238) -> Result<(), FastPublishError> {
1239    match classify(&msg.payload)? {
1240        Classified::FlowAck(ack) => {
1241            // The initial BatchFlowAck for a fresh batch has `seq:0` (no
1242            // messages persisted yet — it just confirms the batch was
1243            // accepted). We must track it separately from `last_ack_sequence`
1244            // because `seq:0` is the same as the default `last_ack_sequence`.
1245            *state.initial_ack_received = true;
1246            // Lost-ack handling: any newer seq implicitly acks all below.
1247            if ack.sequence > *state.last_ack_sequence {
1248                *state.last_ack_sequence = ack.sequence;
1249            }
1250            // Server-dictated flow override. Update the effective flow used by
1251            // the stall gate. The reply prefix always uses the CLIENT's initial
1252            // flow ceiling (not the server-dictated value) because ADR-50 says
1253            // "We add the initial flow [...] for replica followers who might
1254            // have missed the first message due to limits." The server's
1255            // getFastBatch parser reads the flow from the reply subject to set
1256            // maxAckMessages on followers — using the server-dictated (lower)
1257            // value would cap follower ramp-up incorrectly in clustered setups.
1258            let new_flow = ack.messages.max(1);
1259            if new_flow != *state.effective_flow {
1260                *state.effective_flow = new_flow;
1261                // NOTE: reply_prefix is NOT rebuilt here — it always contains
1262                // the initial flow ceiling. Only effective_flow (used by the
1263                // stall gate) is updated.
1264            }
1265        }
1266        Classified::FlowGap(gap) => {
1267            tracing::debug!(
1268                expected_last = gap.expected_last_sequence,
1269                current = gap.current_sequence,
1270                "fast batch gap detected"
1271            );
1272            if let Some(h) = state.on_error.as_deref_mut() {
1273                h(FastPublishError::new(FastPublishErrorKind::GapDetected));
1274            }
1275            // In GapMode::Fail, stop publishing immediately. The server has
1276            // already abandoned the batch; further publishes would be silently
1277            // dropped. The terminal PubAck (containing persisted-message
1278            // count) will arrive on the next drain/commit.
1279            if state.gap_mode == GapMode::Fail {
1280                *state.fatal = Some(FastPublishErrorKind::GapDetected);
1281            }
1282        }
1283        Classified::FlowErr(ferr) => {
1284            tracing::debug!(
1285                batch_sequence = ferr.sequence,
1286                err_code = ferr.error.error_code().0,
1287                "fast batch flow error"
1288            );
1289            let kind = FastPublishErrorKind::from_api_error(&ferr.error);
1290            if let Some(h) = state.on_error.as_deref_mut() {
1291                h(FastPublishError::new(kind));
1292            }
1293            if state.gap_mode == GapMode::Fail {
1294                *state.fatal = Some(kind);
1295            }
1296        }
1297        Classified::PubAck(pa) => {
1298            *state.pending_pub_ack = Some(pa);
1299        }
1300        Classified::InitError(err) => {
1301            *state.fatal = Some(FastPublishErrorKind::from_api_error(&err));
1302        }
1303    }
1304    Ok(())
1305}
1306
1307// ---------------------------------------------------------------------------
1308// Tests
1309// ---------------------------------------------------------------------------
1310
1311#[cfg(test)]
1312mod tests {
1313    use super::*;
1314
1315    // -- reply subject format ------------------------------------------------
1316
1317    #[test]
1318    fn reply_prefix_format_ok_mode() {
1319        let p = build_reply_prefix("_INBOX.abc123", 100, GapMode::Ok);
1320        assert_eq!(p, "_INBOX.abc123.100.ok.");
1321    }
1322
1323    #[test]
1324    fn reply_prefix_format_fail_mode() {
1325        let p = build_reply_prefix("_INBOX.abc123", 50, GapMode::Fail);
1326        assert_eq!(p, "_INBOX.abc123.50.fail.");
1327    }
1328
1329    #[test]
1330    fn reply_full_all_operations() {
1331        let prefix = build_reply_prefix("_INBOX.x", 10, GapMode::Fail);
1332        for (op, code) in [
1333            (Operation::Start, 0_u8),
1334            (Operation::Append, 1),
1335            (Operation::Commit, 2),
1336            (Operation::CommitEob, 3),
1337            (Operation::Ping, 4),
1338        ] {
1339            let r = build_reply(&prefix, 42, op);
1340            assert_eq!(r.as_str(), format!("_INBOX.x.10.fail.42.{code}.$FI"));
1341        }
1342    }
1343
1344    #[test]
1345    fn reply_full_both_gap_modes() {
1346        for (mode, tag) in [(GapMode::Ok, "ok"), (GapMode::Fail, "fail")] {
1347            let prefix = build_reply_prefix("_INBOX.abc", 25, mode);
1348            let r = build_reply(&prefix, 1, Operation::Start);
1349            assert_eq!(r.as_str(), format!("_INBOX.abc.25.{tag}.1.0.$FI"));
1350        }
1351    }
1352
1353    // -- inbox shape validation ----------------------------------------------
1354
1355    #[test]
1356    fn inbox_shape_accepts_two_tokens() {
1357        assert!(validate_inbox_shape("_INBOX.abc123").is_ok());
1358        assert!(validate_inbox_shape("X.Y").is_ok());
1359    }
1360
1361    #[test]
1362    fn inbox_shape_rejects_zero_dots() {
1363        assert!(matches!(
1364            validate_inbox_shape("INBOX").unwrap_err().kind(),
1365            FastPublishErrorKind::InvalidInboxShape
1366        ));
1367    }
1368
1369    #[test]
1370    fn inbox_shape_rejects_three_or_more_tokens() {
1371        assert!(matches!(
1372            validate_inbox_shape("_INBOX.myapp.abc123")
1373                .unwrap_err()
1374                .kind(),
1375            FastPublishErrorKind::InvalidInboxShape
1376        ));
1377        assert!(matches!(
1378            validate_inbox_shape("a.b.c.d").unwrap_err().kind(),
1379            FastPublishErrorKind::InvalidInboxShape
1380        ));
1381    }
1382
1383    #[test]
1384    fn inbox_shape_rejects_empty_tokens() {
1385        assert!(validate_inbox_shape(".abc").is_err());
1386        assert!(validate_inbox_shape("abc.").is_err());
1387        assert!(validate_inbox_shape("").is_err());
1388    }
1389
1390    // -- JSON parsing --------------------------------------------------------
1391
1392    #[test]
1393    fn parse_batch_flow_ack() {
1394        let payload = br#"{"type":"ack","seq":10,"msgs":15}"#;
1395        match classify(payload).unwrap() {
1396            Classified::FlowAck(a) => {
1397                assert_eq!(a.sequence, 10);
1398                assert_eq!(a.messages, 15);
1399            }
1400            other => panic!("expected FlowAck, got {other:?}"),
1401        }
1402    }
1403
1404    #[test]
1405    fn parse_batch_flow_gap() {
1406        let payload = br#"{"type":"gap","last_seq":10,"seq":15}"#;
1407        match classify(payload).unwrap() {
1408            Classified::FlowGap(g) => {
1409                assert_eq!(g.expected_last_sequence, 10);
1410                assert_eq!(g.current_sequence, 15);
1411            }
1412            other => panic!("expected FlowGap, got {other:?}"),
1413        }
1414    }
1415
1416    #[test]
1417    fn parse_batch_flow_err() {
1418        let payload = br#"{"type":"err","seq":7,"error":{"code":400,"err_code":10071,"description":"wrong last sequence: 1"}}"#;
1419        match classify(payload).unwrap() {
1420            Classified::FlowErr(e) => {
1421                assert_eq!(e.sequence, 7);
1422                assert_eq!(e.error.error_code().0, 10071);
1423            }
1424            other => panic!("expected FlowErr, got {other:?}"),
1425        }
1426    }
1427
1428    #[test]
1429    fn parse_terminal_pub_ack() {
1430        let payload = br#"{"stream":"TEST","seq":42,"batch":"inbox-id","count":10}"#;
1431        match classify(payload).unwrap() {
1432            Classified::PubAck(pa) => {
1433                assert_eq!(pa.stream, "TEST");
1434                assert_eq!(pa.sequence, 42);
1435                assert_eq!(pa.batch_id, "inbox-id");
1436                assert_eq!(pa.batch_size, 10);
1437            }
1438            other => panic!("expected PubAck, got {other:?}"),
1439        }
1440    }
1441
1442    #[test]
1443    fn parse_init_error_response() {
1444        // Server sends init errors as plain JSPubAckResponse with no `type`
1445        // field — just `{"error":{...}}`. The classifier falls through the
1446        // tagged-enum parse and hits `Response::Err`.
1447        let payload =
1448            br#"{"error":{"code":400,"err_code":10205,"description":"fast batch publish not enabled"}}"#;
1449        match classify(payload).unwrap() {
1450            Classified::InitError(err) => {
1451                assert_eq!(err.error_code().0, 10205);
1452                assert_eq!(
1453                    FastPublishErrorKind::from_api_error(&err),
1454                    FastPublishErrorKind::NotEnabled
1455                );
1456            }
1457            other => panic!("expected InitError, got {other:?}"),
1458        }
1459    }
1460
1461    #[test]
1462    fn classify_malformed_json_returns_error() {
1463        let payload = b"not json at all";
1464        let err = classify(payload).unwrap_err();
1465        assert!(matches!(err.kind(), FastPublishErrorKind::Serialization));
1466    }
1467
1468    // -- stall formula -------------------------------------------------------
1469
1470    #[test]
1471    fn stall_no_wait_when_window_is_strictly_greater() {
1472        // seq=19, ack=0, flow=10, max=2 → window=20, 20 > 19 → no wait
1473        assert!(!should_stall(0, 10, 2, 19));
1474    }
1475
1476    #[test]
1477    fn stall_waits_at_exact_boundary() {
1478        // seq=20, ack=0, flow=10, max=2 → window=20, 20 <= 20 → wait
1479        // This matches ADR-50's `<=` formulation.
1480        assert!(should_stall(0, 10, 2, 20));
1481    }
1482
1483    #[test]
1484    fn stall_waits_past_boundary() {
1485        // seq=21 → window=20 <= 21 → wait
1486        assert!(should_stall(0, 10, 2, 21));
1487    }
1488
1489    #[test]
1490    fn stall_honors_last_ack() {
1491        // ack=10, flow=10, max=2, seq=30 → window=30, 30 <= 30 → wait
1492        assert!(should_stall(10, 10, 2, 30));
1493        // ack=10, flow=10, max=2, seq=29 → window=30, 30 > 29 → no wait
1494        assert!(!should_stall(10, 10, 2, 29));
1495    }
1496
1497    #[test]
1498    fn stall_with_single_outstanding_ack() {
1499        // max=1 matches "ack every N" throttling
1500        assert!(!should_stall(0, 10, 1, 9));
1501        assert!(should_stall(0, 10, 1, 10));
1502        assert!(should_stall(0, 10, 1, 11));
1503    }
1504
1505    #[test]
1506    fn stall_with_max_outstanding_three() {
1507        // max=3 matches higher-RTT throughput mode
1508        assert!(!should_stall(0, 10, 3, 29));
1509        assert!(should_stall(0, 10, 3, 30));
1510    }
1511
1512    #[test]
1513    fn stall_saturates_on_pathological_inputs() {
1514        // Ensure u64 overflow is saturated — should never panic.
1515        let waited = should_stall(u64::MAX - 5, u16::MAX, u16::MAX, u64::MAX);
1516        // Window saturates to u64::MAX, which is > u64::MAX is false, so should stall.
1517        assert!(waited);
1518    }
1519
1520    // -- error code mapping --------------------------------------------------
1521
1522    fn api_err(code: u64) -> async_nats::jetstream::Error {
1523        // Build a minimal jetstream::Error via JSON round-trip so we don't
1524        // depend on any constructor that may not exist in the public API.
1525        let json = format!(r#"{{"code":400,"err_code":{code},"description":"test"}}"#);
1526        serde_json::from_str(&json).expect("synthetic api error parses")
1527    }
1528
1529    #[test]
1530    fn error_code_mapping_verified_against_server() {
1531        assert_eq!(
1532            FastPublishErrorKind::from_api_error(&api_err(10205)),
1533            FastPublishErrorKind::NotEnabled
1534        );
1535        assert_eq!(
1536            FastPublishErrorKind::from_api_error(&api_err(10206)),
1537            FastPublishErrorKind::InvalidPattern
1538        );
1539        assert_eq!(
1540            FastPublishErrorKind::from_api_error(&api_err(10207)),
1541            FastPublishErrorKind::InvalidBatchId
1542        );
1543        assert_eq!(
1544            FastPublishErrorKind::from_api_error(&api_err(10208)),
1545            FastPublishErrorKind::UnknownBatchId
1546        );
1547        assert_eq!(
1548            FastPublishErrorKind::from_api_error(&api_err(10211)),
1549            FastPublishErrorKind::TooManyInflight
1550        );
1551    }
1552
1553    #[test]
1554    fn error_code_mapping_unknown_is_flow_error() {
1555        assert_eq!(
1556            FastPublishErrorKind::from_api_error(&api_err(10071)),
1557            FastPublishErrorKind::FlowError
1558        );
1559        assert_eq!(
1560            FastPublishErrorKind::from_api_error(&api_err(99999)),
1561            FastPublishErrorKind::FlowError
1562        );
1563    }
1564
1565    // -- display impl --------------------------------------------------------
1566
1567    #[test]
1568    fn error_kind_display_non_empty() {
1569        for kind in [
1570            FastPublishErrorKind::NotEnabled,
1571            FastPublishErrorKind::InvalidPattern,
1572            FastPublishErrorKind::InvalidBatchId,
1573            FastPublishErrorKind::UnknownBatchId,
1574            FastPublishErrorKind::TooManyInflight,
1575            FastPublishErrorKind::GapDetected,
1576            FastPublishErrorKind::FlowError,
1577            FastPublishErrorKind::EmptyBatch,
1578            FastPublishErrorKind::InvalidInboxShape,
1579            FastPublishErrorKind::InvalidConfig,
1580            FastPublishErrorKind::Closed,
1581            FastPublishErrorKind::Timeout,
1582            FastPublishErrorKind::Subscribe,
1583            FastPublishErrorKind::Publish,
1584            FastPublishErrorKind::Serialization,
1585            FastPublishErrorKind::InvalidState,
1586            FastPublishErrorKind::Other,
1587        ] {
1588            let s = format!("{kind}");
1589            assert!(!s.is_empty(), "empty Display for {kind:?}");
1590        }
1591    }
1592
1593    // -- default impls -------------------------------------------------------
1594
1595    #[test]
1596    fn gap_mode_default_is_fail() {
1597        assert_eq!(GapMode::default(), GapMode::Fail);
1598    }
1599
1600    // -- builder validation --------------------------------------------------
1601    //
1602    // These tests construct a bare `FastPublisherBuilder` without a real
1603    // NATS connection. We use a dummy client obtained by connecting to a
1604    // bogus address lazily — no I/O is performed because `async_nats::Client`
1605    // implements the configuration pathways that are hit by `build()`:
1606    // only `new_inbox()` (pure, no network) is called before validation.
1607    //
1608    // Since we cannot synthesize an `async_nats::Client` without a runtime,
1609    // each test that needs one does so inside a tokio runtime.
1610
1611    async fn dummy_builder() -> (nats_server::Server, FastPublisherBuilder) {
1612        let server = nats_server::run_server("tests/configs/jetstream.conf");
1613        let client = async_nats::connect(server.client_url()).await.unwrap();
1614        (
1615            server,
1616            FastPublisherBuilder::new(client, Duration::from_secs(5)),
1617        )
1618    }
1619
1620    #[tokio::test]
1621    async fn builder_rejects_max_outstanding_zero() {
1622        let (_s, b) = dummy_builder().await;
1623        let err = b.max_outstanding_acks(0).build().unwrap_err();
1624        assert!(matches!(err.kind(), FastPublishErrorKind::InvalidConfig));
1625    }
1626
1627    #[tokio::test]
1628    async fn builder_rejects_max_outstanding_four() {
1629        let (_s, b) = dummy_builder().await;
1630        let err = b.max_outstanding_acks(4).build().unwrap_err();
1631        assert!(matches!(err.kind(), FastPublishErrorKind::InvalidConfig));
1632    }
1633
1634    #[tokio::test]
1635    async fn builder_accepts_all_valid_max_outstanding() {
1636        for n in 1..=3 {
1637            let (_s, b) = dummy_builder().await;
1638            let fp = b.max_outstanding_acks(n).build().expect("valid config");
1639            assert_eq!(fp.max_outstanding_acks, n);
1640        }
1641    }
1642
1643    #[tokio::test]
1644    async fn builder_clamps_flow_zero_to_one() {
1645        let (_s, b) = dummy_builder().await;
1646        let fp = b.flow(0).build().expect("flow clamped to 1");
1647        assert_eq!(fp.flow, 1);
1648    }
1649
1650    #[tokio::test]
1651    async fn builder_default_values() {
1652        let (_s, b) = dummy_builder().await;
1653        let fp = b.build().expect("defaults build ok");
1654        assert_eq!(fp.flow, DEFAULT_FLOW);
1655        assert_eq!(fp.effective_flow, DEFAULT_FLOW);
1656        assert_eq!(fp.max_outstanding_acks, DEFAULT_MAX_OUTSTANDING_ACKS);
1657        assert_eq!(fp.gap_mode, GapMode::Fail);
1658        assert_eq!(fp.sequence, 0);
1659        assert_eq!(fp.last_ack_sequence, 0);
1660        assert!(fp.subscriber.is_none());
1661        assert!(!fp.is_closed());
1662        assert_eq!(fp.size(), 0);
1663    }
1664
1665    #[tokio::test]
1666    async fn builder_produces_cached_reply_prefix() {
1667        let (_s, b) = dummy_builder().await;
1668        let fp = b.flow(42).gap_mode(GapMode::Ok).build().unwrap();
1669        assert!(fp.reply_prefix.starts_with(&fp.inbox));
1670        assert!(fp.reply_prefix.ends_with(".42.ok."));
1671    }
1672
1673    #[tokio::test]
1674    async fn builder_batch_id_is_the_inbox() {
1675        let (_s, b) = dummy_builder().await;
1676        let fp = b.build().unwrap();
1677        assert_eq!(fp.batch_id(), fp.inbox);
1678        assert_eq!(fp.inbox.matches('.').count(), 1);
1679    }
1680}