jetstream_extra/
batch_publish.rs

1// Copyright 2025 Synadia Communications Inc.
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Batch publishing support for NATS JetStream.
15//!
16//! This module provides functionality for publishing multiple messages as an atomic batch
17//! to a JetStream stream. Batch publishing ensures that either all messages in a batch
18//! are stored or none are.
19//!
20//! # Overview
21//!
22//! Batch publishing works by:
23//! 1. Assigning internally a unique batch ID to all messages in a batch
24//! 2. Publishing messages with special headers indicating batch membership
25//! 3. Committing the batch with a final message containing a commit marker
26//!
27//! # Thread Safety
28//!
29//! The batch publisher types in this module are designed to be used from a single task/thread.
30//! They do not implement `Send` or `Sync` as they maintain internal mutable state during
31//! the batch publishing process.
32//!
33//! If you need to share a batch publisher across threads, you should:
34//! - Use separate `BatchPublish` instances per thread
35//! - Clone the underlying JetStream context (which is `Send + Sync + Clone`)
36//! - Coordinate batch IDs externally if needed
37//!
38//! The underlying NATS client connection is thread-safe and can be shared across threads.
39//!
40//! # Usage Patterns
41//!
42//! ## Standard API
43//!
44//! Use [BatchPublish] when you need control over individual message publishing:
45//!
46//! ```no_run
47//! # use jetstream_extra::batch_publish::BatchPublishExt;
48//! # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
49//! let mut batch = client.batch_publish().build();
50//! batch.add("events.1", "data1".into()).await?;
51//! batch.add("events.2", "data2".into()).await?;
52//! let ack = batch.commit("events.3", "final".into()).await?;
53//! # Ok(())
54//! # }
55//! ```
56//!
57//! ## Convenience API - Bulk Publishing
58//!
59//! Use [BatchPublishAllBuilder] when you have all messages ready:
60//!
61//! ```no_run
62//! # use jetstream_extra::batch_publish::BatchPublishExt;
63//! # use futures_util::stream;
64//! # use async_nats::jetstream::message::OutboundMessage;
65//! # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
66//! let messages = vec![/* ... */];
67//! let ack = client.batch_publish_all()
68//!     .publish(stream::iter(messages))
69//!     .await?;
70//! # Ok(())
71//! # }
72//! ```
73//!
74//! # Flow Control
75//!
76//! Both APIs support flow control through acknowledgments:
77//!
78//! - `ack_first()` - Wait for server acknowledgment after the first message
79//! - `ack_every(n)` - Wait for acknowledgment every N messages
80//! - `timeout(duration)` - Set timeout for acknowledgment requests
81//!
82//! # Error Handling
83//!
84//! All operations return [BatchPublishError] with specific error kinds:
85//!
86//! - `BatchPublishNotEnabled` - Stream doesn't have `allow_atomic_publish` enabled
87//! - `BatchPublishIncomplete` - Too many outstanding batches (server limit: 50)
88//! - `BatchPublishUnsupportedHeader` - Message contains `Nats-Msg-Id` or `Nats-Expected-Last-Msg-Id`
89//! - `MaxMessagesExceeded` - Batch exceeds 1000 message limit
90//! - `EmptyBatch` - Attempting to commit an empty batch
91//!
92//! Server errors are automatically mapped to the appropriate error kind based on the error code.
93//! Errors during `add` with flow control may indicate transient issues or configuration problems.
94
95use futures_util::{Stream, StreamExt};
96use std::{
97    fmt::{Debug, Display},
98    time::Duration,
99};
100
101use async_nats::{
102    Request, client,
103    jetstream::{self, message::OutboundMessage, response::Response},
104    subject::ToSubject,
105};
106use serde::{Deserialize, de::DeserializeOwned};
107
108/// Maximum number of messages allowed in a single batch (server limit)
109const MAX_BATCH_SIZE: u64 = 1000;
110
111pub trait BatchPublishExt:
112    client::traits::Requester
113    + client::traits::Publisher
114    + jetstream::context::traits::TimeoutProvider
115    + Clone
116{
117    fn batch_publish(&self) -> BatchPublishBuilder<Self>;
118    fn batch_publish_all(&self) -> BatchPublishAllBuilder<Self>;
119}
120
121impl<C> BatchPublishExt for C
122where
123    C: client::traits::Requester
124        + client::traits::Publisher
125        + jetstream::context::traits::TimeoutProvider
126        + Clone,
127{
128    fn batch_publish(&self) -> BatchPublishBuilder<Self> {
129        BatchPublishBuilder::new(self.clone())
130    }
131
132    fn batch_publish_all(&self) -> BatchPublishAllBuilder<Self> {
133        BatchPublishAllBuilder::new(self.clone())
134    }
135}
136
137pub struct BatchPublishBuilder<C> {
138    client: C,
139    timeout: Duration,
140    ack_first: bool,
141    ack_every: Option<u64>,
142}
143
144impl<C> BatchPublishBuilder<C>
145where
146    C: client::traits::Requester
147        + client::traits::Publisher
148        + jetstream::context::traits::TimeoutProvider
149        + Clone,
150{
151    pub fn new(context: C) -> Self {
152        Self {
153            client: context.clone(),
154            ack_first: true,
155            timeout: context.timeout(),
156            ack_every: None,
157        }
158    }
159
160    /// Configures acknowledgment for every N messages.
161    /// That acknowledgement is used for flow-control purposes.
162    /// It does not provide any actual acknowledgment data back.
163    /// Those are only provided on the final commit message ack.
164    pub fn ack_every(mut self, count: u64) -> Self {
165        self.ack_every = Some(count);
166        self
167    }
168
169    /// Enables acknowledgment for the first message in the batch.
170    /// That acknowledgement is used for flow-control purposes.
171    /// It does not provide any actual acknowledgment data back.
172    /// Those are only provided on the final commit message ack.
173    pub fn ack_first(mut self, ack_first: bool) -> Self {
174        self.ack_first = ack_first;
175        self
176    }
177
178    /// Sets the timeout for acknowledgment requests.
179    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
180        self.timeout = duration;
181        self
182    }
183
184    pub fn build(self) -> BatchPublish<C> {
185        BatchPublish {
186            context: self.client,
187            sequence: 0,
188            batch_id: nuid::next().to_string(),
189            ack_every: self.ack_every,
190            ack_first: self.ack_first,
191            timeout: self.timeout,
192        }
193    }
194}
195
196pub struct BatchPublish<C> {
197    pub context: C,
198    pub sequence: u64,
199    pub batch_id: String,
200    ack_every: Option<u64>,
201    ack_first: bool,
202    timeout: Duration,
203}
204
205impl<C> BatchPublish<C>
206where
207    C: client::traits::Requester
208        + client::traits::Publisher
209        + jetstream::context::traits::TimeoutProvider
210        + Clone,
211{
212    pub fn new(context: C, sequence: u64, batch_id: String) -> Self {
213        Self {
214            sequence,
215            batch_id,
216            timeout: context.timeout(),
217            context,
218            ack_first: true,
219            ack_every: None,
220        }
221    }
222
223    /// Get the current number of messages in the batch.
224    ///
225    /// This includes messages that have been added but excludes the final commit message
226    /// until it is sent.
227    pub fn size(&self) -> u64 {
228        self.sequence
229    }
230
231    /// Add a message to the batch with the specified subject and payload.
232    ///
233    /// The message is sent immediately with batch headers. If flow control is configured
234    /// (via `ack_first` or `ack_every`), this method may wait for acknowledgment from
235    /// the server.
236    ///
237    /// # Errors
238    ///
239    /// Returns `MaxMessagesExceeded` if adding this message would exceed the server's
240    /// batch size limit of 1000 messages.
241    ///
242    /// # Examples
243    ///
244    /// ```no_run
245    /// # use jetstream_extra::batch_publish::BatchPublishExt;
246    /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
247    /// let mut batch = client.batch_publish().build();
248    /// batch.add("events.user.created", r#"{"id":123}"#.into()).await?;
249    /// batch.add("events.user.updated", r#"{"id":123,"name":"Alice"}"#.into()).await?;
250    /// # Ok(())
251    /// # }
252    /// ```
253    pub async fn add<S: ToSubject>(
254        &mut self,
255        subject: S,
256        payload: bytes::Bytes,
257    ) -> Result<(), BatchPublishError> {
258        self.add_message(OutboundMessage {
259            subject: subject.to_subject(),
260            payload,
261            headers: None,
262        })
263        .await
264    }
265
266    /// Add a pre-constructed message to the batch.
267    ///
268    /// This is useful when you need to include headers or have already constructed
269    /// the [OutboundMessage].
270    ///
271    /// # Errors
272    ///
273    /// Returns `MaxMessagesExceeded` if adding this message would exceed the server's
274    /// batch size limit of 1000 messages.
275    ///
276    /// Returns `BatchPublishUnsupportedHeader` if the message contains unsupported
277    /// headers like `Nats-Msg-Id` or `Nats-Expected-Last-Msg-Id`.
278    ///
279    /// # Examples
280    ///
281    /// ```no_run
282    /// # use jetstream_extra::batch_publish::BatchPublishExt;
283    /// # use async_nats::jetstream::message::OutboundMessage;
284    /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
285    /// let mut batch = client.batch_publish().build();
286    ///
287    /// let message = OutboundMessage {
288    ///     subject: "events.important".into(),
289    ///     payload: "critical data".into(),
290    ///     headers: None,
291    /// };
292    ///
293    /// batch.add_message(message).await?;
294    /// # Ok(())
295    /// # }
296    /// ```
297    pub async fn add_message(
298        &mut self,
299        mut message: jetstream::message::OutboundMessage,
300    ) -> Result<(), BatchPublishError> {
301        // Check for unsupported headers
302        if let Some(headers) = &message.headers
303            && (headers.get("Nats-Msg-Id").is_some()
304                || headers.get("Nats-Expected-Last-Msg-Id").is_some())
305        {
306            return Err(BatchPublishError::new(
307                BatchPublishErrorKind::BatchPublishUnsupportedHeader,
308            ));
309        }
310
311        self.sequence += 1;
312
313        if self.sequence > MAX_BATCH_SIZE {
314            return Err(BatchPublishError::new(
315                BatchPublishErrorKind::MaxMessagesExceeded,
316            ));
317        }
318        self.add_header(&mut message);
319
320        if let Some(ack_every) = self.ack_every
321            && self.sequence.is_multiple_of(ack_every)
322        {
323            self.add_request(message).await?;
324        } else if self.ack_first && self.sequence == 1 {
325            self.add_request(message).await?;
326        } else {
327            self.context
328                .publish_message(message.into())
329                .await
330                .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Publish, e))?;
331        }
332        Ok(())
333    }
334
335    /// Commit the batch with a final message.
336    ///
337    /// This sends the final message with batch headers and a commit marker,
338    /// completing the batch operation. The batch cannot be used after committing.
339    ///
340    /// # Examples
341    ///
342    /// ```no_run
343    /// # use jetstream_extra::batch_publish::BatchPublishExt;
344    /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
345    /// let mut batch = client.batch_publish().build();
346    ///
347    /// batch.add("events.1", "data1".into()).await?;
348    /// batch.add("events.2", "data2".into()).await?;
349    ///
350    /// // Commit with final message
351    /// let ack = batch.commit("events.3", "data3".into()).await?;
352    ///
353    /// println!("Batch {} committed with {} messages", ack.batch_id, ack.batch_size);
354    /// # Ok(())
355    /// # }
356    /// ```
357    pub async fn commit<S: ToSubject>(
358        self,
359        subject: S,
360        payload: bytes::Bytes,
361    ) -> Result<BatchPubAck, BatchPublishError> {
362        self.commit_message(OutboundMessage {
363            subject: subject.to_subject(),
364            payload,
365            headers: None,
366        })
367        .await
368    }
369
370    /// Commit the batch with a pre-constructed final message.
371    ///
372    /// Like [commit](Self::commit), but accepts a pre-constructed [OutboundMessage].
373    ///
374    /// # Examples
375    ///
376    /// ```no_run
377    /// # use jetstream_extra::batch_publish::BatchPublishExt;
378    /// # use async_nats::jetstream::message::OutboundMessage;
379    /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
380    /// let mut batch = client.batch_publish().build();
381    ///
382    /// let final_message = OutboundMessage {
383    ///     subject: "events.complete".into(),
384    ///     payload: "batch done".into(),
385    ///     headers: None,
386    /// };
387    ///
388    /// let ack = batch.commit_message(final_message).await?;
389    /// # Ok(())
390    /// # }
391    /// ```
392    pub async fn commit_message(
393        mut self,
394        mut message: jetstream::message::OutboundMessage,
395    ) -> Result<BatchPubAck, BatchPublishError> {
396        // Check for unsupported headers
397        if let Some(headers) = &message.headers
398            && (headers.get("Nats-Msg-Id").is_some()
399                || headers.get("Nats-Expected-Last-Msg-Id").is_some())
400        {
401            return Err(BatchPublishError::new(
402                BatchPublishErrorKind::BatchPublishUnsupportedHeader,
403            ));
404        }
405
406        self.sequence += 1;
407
408        if self.sequence > MAX_BATCH_SIZE {
409            return Err(BatchPublishError::new(
410                BatchPublishErrorKind::MaxMessagesExceeded,
411            ));
412        }
413
414        self.add_header(&mut message);
415        // Headers are guaranteed to exist after add_header
416        let headers = message
417            .headers
418            .get_or_insert_with(async_nats::HeaderMap::new);
419        headers.insert("Nats-Batch-Commit", "1");
420
421        self.commit_request(message).await
422    }
423
424    /// Discard the batch without committing.
425    ///
426    /// This consumes the batch without sending a commit message. The server will
427    /// eventually abandon the batch after a timeout.
428    ///
429    /// # Examples
430    ///
431    /// ```no_run
432    /// # use jetstream_extra::batch_publish::BatchPublishExt;
433    /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
434    /// let mut batch = client.batch_publish().build();
435    ///
436    /// batch.add("events.1", "data".into()).await?;
437    ///
438    /// // Decide to abandon the batch
439    /// batch.discard();
440    /// # Ok(())
441    /// # }
442    /// ```
443    pub fn discard(self) {
444        // Dropping the batch without committing
445    }
446
447    fn add_header(&self, message: &mut jetstream::message::OutboundMessage) {
448        let headers = message
449            .headers
450            .get_or_insert_with(async_nats::HeaderMap::new);
451        headers.insert("Nats-Batch-Id", self.batch_id.clone());
452        headers.insert("Nats-Batch-Sequence", self.sequence.to_string());
453    }
454
455    async fn add_request(&self, message: OutboundMessage) -> Result<(), BatchPublishError> {
456        let request = Request {
457            payload: Some(message.payload),
458            headers: message.headers,
459            timeout: Some(Some(self.timeout)),
460            inbox: None,
461        };
462        let response = self
463            .context
464            .send_request(message.subject, request)
465            .await
466            .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Request, e))?;
467
468        if response.payload.is_empty() {
469            return Ok(());
470        }
471
472        let resp: Response<()> = serde_json::from_slice(response.payload.as_ref())
473            .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Serialization, e))?;
474
475        match resp {
476            Response::Err { error } => {
477                let kind = BatchPublishErrorKind::from_api_error(&error);
478                Err(BatchPublishError::with_source(kind, error))
479            }
480            Response::Ok(()) => Ok(()),
481        }
482    }
483
484    async fn commit_request<T: DeserializeOwned + Debug>(
485        &self,
486        message: OutboundMessage,
487    ) -> Result<T, BatchPublishError> {
488        let request = Request {
489            payload: Some(message.payload),
490            headers: message.headers,
491            timeout: Some(Some(self.timeout)),
492            inbox: None,
493        };
494        let response = self
495            .context
496            .send_request(message.subject, request)
497            .await
498            .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Request, e))?;
499
500        let resp: Response<T> = serde_json::from_slice(response.payload.as_ref())
501            .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Serialization, e))?;
502
503        match resp {
504            Response::Err { error } => {
505                let kind = BatchPublishErrorKind::from_api_error(&error);
506                Err(BatchPublishError::with_source(kind, error))
507            }
508            Response::Ok(ack) => Ok(ack),
509        }
510    }
511}
512
513/// Acknowledgment returned after successfully committing a batch.
514///
515/// Contains information about the committed batch including the stream it was
516/// published to, the sequence number, and batch metadata.
517#[derive(Debug, Deserialize)]
518pub struct BatchPubAck {
519    /// The stream the batch was published to.
520    pub stream: String,
521    /// The stream sequence number of the last message in the batch.
522    #[serde(rename = "seq")]
523    pub sequence: u64,
524    /// The domain the stream belongs to, if any.
525    #[serde(default)]
526    pub domain: Option<String>,
527    /// The unique identifier for the batch.
528    #[serde(rename = "batch")]
529    pub batch_id: String,
530    /// The number of messages in the committed batch.
531    #[serde(rename = "count")]
532    pub batch_size: u64,
533}
534
535/// Builder for bulk publishing multiple messages at once
536pub struct BatchPublishAllBuilder<C> {
537    client: C,
538    timeout: Duration,
539    ack_first: bool,
540    ack_every: Option<u64>,
541}
542
543impl<C> BatchPublishAllBuilder<C>
544where
545    C: client::traits::Requester
546        + client::traits::Publisher
547        + jetstream::context::traits::TimeoutProvider
548        + Clone,
549{
550    pub fn new(client: C) -> Self {
551        Self {
552            client: client.clone(),
553            ack_first: true,
554            timeout: client.timeout(),
555            ack_every: None,
556        }
557    }
558
559    /// Configure acknowledgment for every N messages.
560    ///
561    /// See [BatchPublishBuilder::ack_every] for details.
562    pub fn ack_every(mut self, count: u64) -> Self {
563        self.ack_every = Some(count);
564        self
565    }
566
567    /// Enable acknowledgment for the first message in the batch.
568    ///
569    /// See [BatchPublishBuilder::ack_first] for details.
570    pub fn ack_first(mut self, ack_first: bool) -> Self {
571        self.ack_first = ack_first;
572        self
573    }
574
575    /// Set the timeout for acknowledgment requests.
576    ///
577    /// See [BatchPublishBuilder::timeout] for details.
578    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
579        self.timeout = duration;
580        self
581    }
582
583    /// Publish all messages from a stream and commit with the last message.
584    ///
585    /// This is a convenience method that internally uses `BatchPublish::add_message`
586    /// for all messages except the last, and `BatchPublish::commit_message` for
587    /// the last message.
588    ///
589    /// # Examples
590    ///
591    /// ## From a Vec
592    /// ```no_run
593    /// # use futures_util::stream;
594    /// # use async_nats::jetstream::message::OutboundMessage;
595    /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
596    /// let messages = vec![
597    ///     OutboundMessage {
598    ///         subject: "test.1".into(),
599    ///         payload: "msg1".into(),
600    ///         headers: None,
601    ///     },
602    ///     OutboundMessage {
603    ///         subject: "test.2".into(),
604    ///         payload: "msg2".into(),
605    ///         headers: None,
606    ///     },
607    /// ];
608    /// let ack = client.batch_publish_all()
609    ///     .ack_first(true)
610    ///     .publish(stream::iter(messages))
611    ///     .await?;
612    /// # Ok(())
613    /// # }
614    /// ```
615    ///
616    /// ## From an async channel
617    /// ```no_run
618    /// # use tokio_stream::wrappers::ReceiverStream;
619    /// # use async_nats::jetstream::message::OutboundMessage;
620    /// # use jetstream_extra::batch_publish::BatchPublishExt;
621    /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
622    /// let (tx, rx) = tokio::sync::mpsc::channel(100);
623    ///
624    /// tokio::spawn(async move {
625    ///     for i in 0..10 {
626    ///         let msg = OutboundMessage {
627    ///             subject: format!("test.{}", i).into(),
628    ///             payload: format!("Message {}", i).into(),
629    ///             headers: None,
630    ///         };
631    ///         tx.send(msg).await.unwrap();
632    ///     }
633    /// });
634    ///
635    /// let ack = client.batch_publish_all()
636    ///     .publish(ReceiverStream::new(rx))
637    ///     .await?;
638    /// # Ok(())
639    /// # }
640    /// ```
641    ///
642    /// ## With stream transformations
643    /// ```no_run
644    /// # use futures_util::{stream, StreamExt};
645    /// # use async_nats::jetstream::message::OutboundMessage;
646    /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
647    /// let data = vec!["apple", "banana", "cherry"];
648    /// let message_stream = stream::iter(data)
649    ///     .enumerate()
650    ///     .map(|(i, fruit)| OutboundMessage {
651    ///         subject: format!("fruits.{}", i).into(),
652    ///         payload: fruit.into(),
653    ///         headers: None,
654    ///     });
655    ///
656    /// let ack = client.batch_publish_all()
657    ///     .publish(message_stream)
658    ///     .await?;
659    /// # Ok(())
660    /// # }
661    /// ```
662    ///
663    /// ## From async file reading
664    /// ```no_run
665    /// # use tokio::io::{AsyncBufReadExt, BufReader};
666    /// # use futures_util::{stream, StreamExt};
667    /// # use async_nats::jetstream::message::OutboundMessage;
668    /// # use jetstream_extra::batch_publish::BatchPublishExt;
669    /// # use std::time::Duration;
670    /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
671    /// // Read file lines into a vector first
672    /// let file = tokio::fs::File::open("data.txt").await?;
673    /// let reader = BufReader::new(file);
674    /// let mut lines = reader.lines();
675    /// let mut messages = Vec::new();
676    ///
677    /// while let Some(line) = lines.next_line().await? {
678    ///     messages.push(OutboundMessage {
679    ///         subject: "file.line".into(),
680    ///         payload: line.into(),
681    ///         headers: None,
682    ///     });
683    /// }
684    ///
685    /// let ack = client.batch_publish_all()
686    ///     .timeout(Duration::from_secs(30))
687    ///     .publish(stream::iter(messages))
688    ///     .await?;
689    /// # Ok(())
690    /// # }
691    /// ```
692    ///
693    /// ## Rate-limited publishing
694    /// ```no_run
695    /// # use futures_util::stream;
696    /// # use async_nats::jetstream::message::OutboundMessage;
697    /// # use jetstream_extra::batch_publish::BatchPublishExt;
698    /// # use std::time::Duration;
699    /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
700    /// let messages = vec![
701    ///     OutboundMessage {
702    ///         subject: "test.1".into(),
703    ///         payload: "msg1".into(),
704    ///         headers: None,
705    ///     },
706    ///     // ... more messages
707    /// ];
708    ///
709    /// // Note: For actual rate limiting, you would use tokio_stream::StreamExt::throttle
710    /// // For now, showing the pattern with regular stream
711    /// let message_stream = stream::iter(messages);
712    ///
713    /// let ack = client.batch_publish_all()
714    ///     .timeout(Duration::from_secs(10))
715    ///     .publish(message_stream)
716    ///     .await?;
717    /// # Ok(())
718    /// # }
719    /// ```
720    ///
721    /// ## From multiple sources merged
722    /// ```no_run
723    /// # use futures_util::{stream, StreamExt};
724    /// # use async_nats::jetstream::message::OutboundMessage;
725    /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
726    /// let source1 = stream::iter(vec![
727    ///     OutboundMessage {
728    ///         subject: "test.1".into(),
729    ///         payload: "msg1".into(),
730    ///         headers: None,
731    ///     },
732    /// ]);
733    /// let source2 = stream::iter(vec![
734    ///     OutboundMessage {
735    ///         subject: "test.2".into(),
736    ///         payload: "msg2".into(),
737    ///         headers: None,
738    ///     },
739    /// ]);
740    ///
741    /// let merged = source1.chain(source2);
742    ///
743    /// let ack = client.batch_publish_all()
744    ///     .publish(merged)
745    ///     .await?;
746    /// # Ok(())
747    /// # }
748    /// ```
749    ///
750    /// ## With error handling in stream
751    /// ```no_run
752    /// # use futures_util::{stream, StreamExt};
753    /// # use async_nats::jetstream::message::OutboundMessage;
754    /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
755    /// // First filter errors, then create stream from good messages
756    /// let results: Vec<Result<OutboundMessage, &str>> = vec![
757    ///     Ok(OutboundMessage {
758    ///         subject: "test.1".into(),
759    ///         payload: "msg1".into(),
760    ///         headers: None,
761    ///     }),
762    ///     Err("simulated error"),
763    ///     Ok(OutboundMessage {
764    ///         subject: "test.2".into(),
765    ///         payload: "msg2".into(),
766    ///         headers: None,
767    ///     }),
768    /// ];
769    ///
770    /// // Filter out errors synchronously
771    /// let good_messages: Vec<_> = results
772    ///     .into_iter()
773    ///     .filter_map(|result| match result {
774    ///         Ok(msg) => Some(msg),
775    ///         Err(e) => {
776    ///             eprintln!("Skipping message: {}", e);
777    ///             None
778    ///         }
779    ///     })
780    ///     .collect();
781    ///
782    /// let ack = client.batch_publish_all()
783    ///     .publish(stream::iter(good_messages))
784    ///     .await?;
785    /// # Ok(())
786    /// # }
787    /// ```
788    ///
789    /// ## From arrays
790    /// ```no_run
791    /// # use futures_util::stream;
792    /// # use async_nats::jetstream::message::OutboundMessage;
793    /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
794    /// let ack = client.batch_publish_all()
795    ///     .publish(stream::iter([
796    ///         OutboundMessage {
797    ///             subject: "test.1".into(),
798    ///             payload: "First".into(),
799    ///             headers: None,
800    ///         },
801    ///         OutboundMessage {
802    ///             subject: "test.2".into(),
803    ///             payload: "Second".into(),
804    ///             headers: None,
805    ///         },
806    ///     ]))
807    ///     .await?;
808    /// # Ok(())
809    /// # }
810    /// ```
811    pub async fn publish<S>(self, messages: S) -> Result<BatchPubAck, BatchPublishError>
812    where
813        S: Stream<Item = OutboundMessage> + Unpin,
814    {
815        // Build regular batch with same configuration
816        let mut batch = BatchPublish {
817            context: self.client,
818            sequence: 0,
819            batch_id: nuid::next().to_string(),
820            ack_every: self.ack_every,
821            ack_first: self.ack_first,
822            timeout: self.timeout,
823        };
824
825        // Buffer one message to identify the last
826        let mut last_msg = None;
827        futures_util::pin_mut!(messages);
828
829        while let Some(msg) = messages.next().await {
830            if let Some(prev) = last_msg.replace(msg) {
831                batch.add_message(prev).await?;
832            }
833        }
834
835        // Commit with the last message
836        match last_msg {
837            Some(msg) => batch.commit_message(msg).await,
838            None => Err(BatchPublishError::new(BatchPublishErrorKind::EmptyBatch)),
839        }
840    }
841}
842
843/// Error type for batch publish operations
844pub type BatchPublishError = async_nats::error::Error<BatchPublishErrorKind>;
845
846/// Kinds of errors that can occur during batch publish operations
847#[derive(Debug, Clone, Copy, PartialEq)]
848pub enum BatchPublishErrorKind {
849    /// Failed to send request to the server
850    Request,
851    /// Failed to publish message
852    Publish,
853    /// Failed to serialize or deserialize data
854    Serialization,
855    /// Batch is in an invalid state for the operation
856    BatchFull,
857    /// Exceeded maximum allowed messages in batch (server limit: 1000)
858    MaxMessagesExceeded,
859    /// Empty batch cannot be committed
860    EmptyBatch,
861    /// Batch publishing is not enabled on the stream (allow_atomic_publish must be true)
862    BatchPublishNotEnabled,
863    /// Batch publish is incomplete and was abandoned
864    BatchPublishIncomplete,
865    /// Batch uses unsupported headers (Nats-Msg-Id or Nats-Expected-Last-Msg-Id)
866    BatchPublishUnsupportedHeader,
867    /// Other unspecified error
868    Other,
869}
870
871impl BatchPublishErrorKind {
872    /// Map a JetStream API error to the appropriate batch publish error kind
873    fn from_api_error(error: &async_nats::jetstream::Error) -> Self {
874        use async_nats::jetstream::ErrorCode;
875        match error.error_code() {
876            ErrorCode::ATOMIC_PUBLISH_DISABLED => Self::BatchPublishNotEnabled,
877            ErrorCode::ATOMIC_PUBLISH_INCOMPLETE_BATCH => Self::BatchPublishIncomplete,
878            ErrorCode::ATOMIC_PUBLISH_UNSUPPORTED_HEADER => Self::BatchPublishUnsupportedHeader,
879            ErrorCode::ATOMIC_PUBLISH_TOO_LARGE_BATCH => Self::MaxMessagesExceeded,
880            _ => Self::Other,
881        }
882    }
883}
884
885impl Display for BatchPublishErrorKind {
886    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887        match self {
888            Self::Request => write!(f, "request failed"),
889            Self::Publish => write!(f, "publish failed"),
890            Self::Serialization => write!(f, "serialization/deserialization error"),
891            Self::BatchFull => write!(f, "batch is full"),
892            Self::MaxMessagesExceeded => write!(f, "batch exceeds server limit (1000 messages)"),
893            Self::EmptyBatch => write!(f, "empty batch cannot be committed"),
894            Self::BatchPublishNotEnabled => write!(f, "batch publishing not enabled on stream"),
895            Self::BatchPublishIncomplete => {
896                write!(f, "batch publish is incomplete and was abandoned")
897            }
898            Self::BatchPublishUnsupportedHeader => write!(
899                f,
900                "batch uses unsupported headers (Nats-Msg-Id or Nats-Expected-Last-Msg-Id)"
901            ),
902            Self::Other => write!(f, "other error"),
903        }
904    }
905}