jetstream_extra/batch_publish.rs
1// Copyright 2025 Synadia Communications Inc.
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Batch publishing support for NATS JetStream.
15//!
16//! This module provides functionality for publishing multiple messages as an atomic batch
17//! to a JetStream stream. Batch publishing ensures that either all messages in a batch
18//! are stored or none are.
19//!
20//! # Overview
21//!
22//! Batch publishing works by:
23//! 1. Assigning internally a unique batch ID to all messages in a batch
24//! 2. Publishing messages with special headers indicating batch membership
25//! 3. Committing the batch with a final message containing a commit marker
26//!
27//! # Thread Safety
28//!
29//! The batch publisher types in this module are designed to be used from a single task/thread.
30//! They do not implement `Send` or `Sync` as they maintain internal mutable state during
31//! the batch publishing process.
32//!
33//! If you need to share a batch publisher across threads, you should:
34//! - Use separate `BatchPublish` instances per thread
35//! - Clone the underlying JetStream context (which is `Send + Sync + Clone`)
36//! - Coordinate batch IDs externally if needed
37//!
38//! The underlying NATS client connection is thread-safe and can be shared across threads.
39//!
40//! # Usage Patterns
41//!
42//! ## Standard API
43//!
44//! Use [BatchPublish] when you need control over individual message publishing:
45//!
46//! ```no_run
47//! # use jetstream_extra::batch_publish::BatchPublishExt;
48//! # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
49//! let mut batch = client.batch_publish().build();
50//! batch.add("events.1", "data1".into()).await?;
51//! batch.add("events.2", "data2".into()).await?;
52//! let ack = batch.commit("events.3", "final".into()).await?;
53//! # Ok(())
54//! # }
55//! ```
56//!
57//! ## Convenience API - Bulk Publishing
58//!
59//! Use [BatchPublishAllBuilder] when you have all messages ready:
60//!
61//! ```no_run
62//! # use jetstream_extra::batch_publish::BatchPublishExt;
63//! # use futures_util::stream;
64//! # use async_nats::jetstream::message::OutboundMessage;
65//! # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
66//! let messages = vec![/* ... */];
67//! let ack = client.batch_publish_all()
68//! .publish(stream::iter(messages))
69//! .await?;
70//! # Ok(())
71//! # }
72//! ```
73//!
74//! # Flow Control
75//!
76//! Both APIs support flow control through acknowledgments:
77//!
78//! - `ack_first()` - Wait for server acknowledgment after the first message
79//! - `ack_every(n)` - Wait for acknowledgment every N messages
80//! - `timeout(duration)` - Set timeout for acknowledgment requests
81//!
82//! # Error Handling
83//!
84//! All operations return [BatchPublishError] with specific error kinds:
85//!
86//! - `BatchPublishNotEnabled` - Stream doesn't have `allow_atomic_publish` enabled
87//! - `BatchPublishIncomplete` - Too many outstanding batches (server limit: 50)
88//! - `BatchPublishUnsupportedHeader` - Message contains `Nats-Msg-Id` or `Nats-Expected-Last-Msg-Id`
89//! - `MaxMessagesExceeded` - Batch exceeds 1000 message limit
90//! - `EmptyBatch` - Attempting to commit an empty batch
91//!
92//! Server errors are automatically mapped to the appropriate error kind based on the error code.
93//! Errors during `add` with flow control may indicate transient issues or configuration problems.
94
95use futures_util::{Stream, StreamExt};
96use std::{
97 fmt::{Debug, Display},
98 time::Duration,
99};
100
101use async_nats::{
102 Request, client,
103 jetstream::{self, message::OutboundMessage, response::Response},
104 subject::ToSubject,
105};
106use serde::{Deserialize, de::DeserializeOwned};
107
108/// Maximum number of messages allowed in a single batch (server limit)
109const MAX_BATCH_SIZE: u64 = 1000;
110
111pub trait BatchPublishExt:
112 client::traits::Requester
113 + client::traits::Publisher
114 + jetstream::context::traits::TimeoutProvider
115 + Clone
116{
117 fn batch_publish(&self) -> BatchPublishBuilder<Self>;
118 fn batch_publish_all(&self) -> BatchPublishAllBuilder<Self>;
119}
120
121impl<C> BatchPublishExt for C
122where
123 C: client::traits::Requester
124 + client::traits::Publisher
125 + jetstream::context::traits::TimeoutProvider
126 + Clone,
127{
128 fn batch_publish(&self) -> BatchPublishBuilder<Self> {
129 BatchPublishBuilder::new(self.clone())
130 }
131
132 fn batch_publish_all(&self) -> BatchPublishAllBuilder<Self> {
133 BatchPublishAllBuilder::new(self.clone())
134 }
135}
136
137pub struct BatchPublishBuilder<C> {
138 client: C,
139 timeout: Duration,
140 ack_first: bool,
141 ack_every: Option<u64>,
142}
143
144impl<C> BatchPublishBuilder<C>
145where
146 C: client::traits::Requester
147 + client::traits::Publisher
148 + jetstream::context::traits::TimeoutProvider
149 + Clone,
150{
151 pub fn new(context: C) -> Self {
152 Self {
153 client: context.clone(),
154 ack_first: true,
155 timeout: context.timeout(),
156 ack_every: None,
157 }
158 }
159
160 /// Configures acknowledgment for every N messages.
161 /// That acknowledgement is used for flow-control purposes.
162 /// It does not provide any actual acknowledgment data back.
163 /// Those are only provided on the final commit message ack.
164 pub fn ack_every(mut self, count: u64) -> Self {
165 self.ack_every = Some(count);
166 self
167 }
168
169 /// Enables acknowledgment for the first message in the batch.
170 /// That acknowledgement is used for flow-control purposes.
171 /// It does not provide any actual acknowledgment data back.
172 /// Those are only provided on the final commit message ack.
173 pub fn ack_first(mut self, ack_first: bool) -> Self {
174 self.ack_first = ack_first;
175 self
176 }
177
178 /// Sets the timeout for acknowledgment requests.
179 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
180 self.timeout = duration;
181 self
182 }
183
184 pub fn build(self) -> BatchPublish<C> {
185 BatchPublish {
186 context: self.client,
187 sequence: 0,
188 batch_id: nuid::next().to_string(),
189 ack_every: self.ack_every,
190 ack_first: self.ack_first,
191 timeout: self.timeout,
192 }
193 }
194}
195
196pub struct BatchPublish<C> {
197 pub context: C,
198 pub sequence: u64,
199 pub batch_id: String,
200 ack_every: Option<u64>,
201 ack_first: bool,
202 timeout: Duration,
203}
204
205impl<C> BatchPublish<C>
206where
207 C: client::traits::Requester
208 + client::traits::Publisher
209 + jetstream::context::traits::TimeoutProvider
210 + Clone,
211{
212 pub fn new(context: C, sequence: u64, batch_id: String) -> Self {
213 Self {
214 sequence,
215 batch_id,
216 timeout: context.timeout(),
217 context,
218 ack_first: true,
219 ack_every: None,
220 }
221 }
222
223 /// Get the current number of messages in the batch.
224 ///
225 /// This includes messages that have been added but excludes the final commit message
226 /// until it is sent.
227 pub fn size(&self) -> u64 {
228 self.sequence
229 }
230
231 /// Add a message to the batch with the specified subject and payload.
232 ///
233 /// The message is sent immediately with batch headers. If flow control is configured
234 /// (via `ack_first` or `ack_every`), this method may wait for acknowledgment from
235 /// the server.
236 ///
237 /// # Errors
238 ///
239 /// Returns `MaxMessagesExceeded` if adding this message would exceed the server's
240 /// batch size limit of 1000 messages.
241 ///
242 /// # Examples
243 ///
244 /// ```no_run
245 /// # use jetstream_extra::batch_publish::BatchPublishExt;
246 /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
247 /// let mut batch = client.batch_publish().build();
248 /// batch.add("events.user.created", r#"{"id":123}"#.into()).await?;
249 /// batch.add("events.user.updated", r#"{"id":123,"name":"Alice"}"#.into()).await?;
250 /// # Ok(())
251 /// # }
252 /// ```
253 pub async fn add<S: ToSubject>(
254 &mut self,
255 subject: S,
256 payload: bytes::Bytes,
257 ) -> Result<(), BatchPublishError> {
258 self.add_message(OutboundMessage {
259 subject: subject.to_subject(),
260 payload,
261 headers: None,
262 })
263 .await
264 }
265
266 /// Add a pre-constructed message to the batch.
267 ///
268 /// This is useful when you need to include headers or have already constructed
269 /// the [OutboundMessage].
270 ///
271 /// # Errors
272 ///
273 /// Returns `MaxMessagesExceeded` if adding this message would exceed the server's
274 /// batch size limit of 1000 messages.
275 ///
276 /// Returns `BatchPublishUnsupportedHeader` if the message contains unsupported
277 /// headers like `Nats-Msg-Id` or `Nats-Expected-Last-Msg-Id`.
278 ///
279 /// # Examples
280 ///
281 /// ```no_run
282 /// # use jetstream_extra::batch_publish::BatchPublishExt;
283 /// # use async_nats::jetstream::message::OutboundMessage;
284 /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
285 /// let mut batch = client.batch_publish().build();
286 ///
287 /// let message = OutboundMessage {
288 /// subject: "events.important".into(),
289 /// payload: "critical data".into(),
290 /// headers: None,
291 /// };
292 ///
293 /// batch.add_message(message).await?;
294 /// # Ok(())
295 /// # }
296 /// ```
297 pub async fn add_message(
298 &mut self,
299 mut message: jetstream::message::OutboundMessage,
300 ) -> Result<(), BatchPublishError> {
301 // Check for unsupported headers
302 if let Some(headers) = &message.headers
303 && (headers.get("Nats-Msg-Id").is_some()
304 || headers.get("Nats-Expected-Last-Msg-Id").is_some())
305 {
306 return Err(BatchPublishError::new(
307 BatchPublishErrorKind::BatchPublishUnsupportedHeader,
308 ));
309 }
310
311 self.sequence += 1;
312
313 if self.sequence > MAX_BATCH_SIZE {
314 return Err(BatchPublishError::new(
315 BatchPublishErrorKind::MaxMessagesExceeded,
316 ));
317 }
318 self.add_header(&mut message);
319
320 if let Some(ack_every) = self.ack_every
321 && self.sequence.is_multiple_of(ack_every)
322 {
323 self.add_request(message).await?;
324 } else if self.ack_first && self.sequence == 1 {
325 self.add_request(message).await?;
326 } else {
327 self.context
328 .publish_message(message.into())
329 .await
330 .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Publish, e))?;
331 }
332 Ok(())
333 }
334
335 /// Commit the batch with a final message.
336 ///
337 /// This sends the final message with batch headers and a commit marker,
338 /// completing the batch operation. The batch cannot be used after committing.
339 ///
340 /// # Examples
341 ///
342 /// ```no_run
343 /// # use jetstream_extra::batch_publish::BatchPublishExt;
344 /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
345 /// let mut batch = client.batch_publish().build();
346 ///
347 /// batch.add("events.1", "data1".into()).await?;
348 /// batch.add("events.2", "data2".into()).await?;
349 ///
350 /// // Commit with final message
351 /// let ack = batch.commit("events.3", "data3".into()).await?;
352 ///
353 /// println!("Batch {} committed with {} messages", ack.batch_id, ack.batch_size);
354 /// # Ok(())
355 /// # }
356 /// ```
357 pub async fn commit<S: ToSubject>(
358 self,
359 subject: S,
360 payload: bytes::Bytes,
361 ) -> Result<BatchPubAck, BatchPublishError> {
362 self.commit_message(OutboundMessage {
363 subject: subject.to_subject(),
364 payload,
365 headers: None,
366 })
367 .await
368 }
369
370 /// Commit the batch with a pre-constructed final message.
371 ///
372 /// Like [commit](Self::commit), but accepts a pre-constructed [OutboundMessage].
373 ///
374 /// # Examples
375 ///
376 /// ```no_run
377 /// # use jetstream_extra::batch_publish::BatchPublishExt;
378 /// # use async_nats::jetstream::message::OutboundMessage;
379 /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
380 /// let mut batch = client.batch_publish().build();
381 ///
382 /// let final_message = OutboundMessage {
383 /// subject: "events.complete".into(),
384 /// payload: "batch done".into(),
385 /// headers: None,
386 /// };
387 ///
388 /// let ack = batch.commit_message(final_message).await?;
389 /// # Ok(())
390 /// # }
391 /// ```
392 pub async fn commit_message(
393 mut self,
394 mut message: jetstream::message::OutboundMessage,
395 ) -> Result<BatchPubAck, BatchPublishError> {
396 // Check for unsupported headers
397 if let Some(headers) = &message.headers
398 && (headers.get("Nats-Msg-Id").is_some()
399 || headers.get("Nats-Expected-Last-Msg-Id").is_some())
400 {
401 return Err(BatchPublishError::new(
402 BatchPublishErrorKind::BatchPublishUnsupportedHeader,
403 ));
404 }
405
406 self.sequence += 1;
407
408 if self.sequence > MAX_BATCH_SIZE {
409 return Err(BatchPublishError::new(
410 BatchPublishErrorKind::MaxMessagesExceeded,
411 ));
412 }
413
414 self.add_header(&mut message);
415 // Headers are guaranteed to exist after add_header
416 let headers = message
417 .headers
418 .get_or_insert_with(async_nats::HeaderMap::new);
419 headers.insert("Nats-Batch-Commit", "1");
420
421 self.commit_request(message).await
422 }
423
424 /// Discard the batch without committing.
425 ///
426 /// This consumes the batch without sending a commit message. The server will
427 /// eventually abandon the batch after a timeout.
428 ///
429 /// # Examples
430 ///
431 /// ```no_run
432 /// # use jetstream_extra::batch_publish::BatchPublishExt;
433 /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
434 /// let mut batch = client.batch_publish().build();
435 ///
436 /// batch.add("events.1", "data".into()).await?;
437 ///
438 /// // Decide to abandon the batch
439 /// batch.discard();
440 /// # Ok(())
441 /// # }
442 /// ```
443 pub fn discard(self) {
444 // Dropping the batch without committing
445 }
446
447 fn add_header(&self, message: &mut jetstream::message::OutboundMessage) {
448 let headers = message
449 .headers
450 .get_or_insert_with(async_nats::HeaderMap::new);
451 headers.insert("Nats-Batch-Id", self.batch_id.clone());
452 headers.insert("Nats-Batch-Sequence", self.sequence.to_string());
453 }
454
455 async fn add_request(&self, message: OutboundMessage) -> Result<(), BatchPublishError> {
456 let request = Request {
457 payload: Some(message.payload),
458 headers: message.headers,
459 timeout: Some(Some(self.timeout)),
460 inbox: None,
461 };
462 let response = self
463 .context
464 .send_request(message.subject, request)
465 .await
466 .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Request, e))?;
467
468 if response.payload.is_empty() {
469 return Ok(());
470 }
471
472 let resp: Response<()> = serde_json::from_slice(response.payload.as_ref())
473 .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Serialization, e))?;
474
475 match resp {
476 Response::Err { error } => {
477 let kind = BatchPublishErrorKind::from_api_error(&error);
478 Err(BatchPublishError::with_source(kind, error))
479 }
480 Response::Ok(()) => Ok(()),
481 }
482 }
483
484 async fn commit_request<T: DeserializeOwned + Debug>(
485 &self,
486 message: OutboundMessage,
487 ) -> Result<T, BatchPublishError> {
488 let request = Request {
489 payload: Some(message.payload),
490 headers: message.headers,
491 timeout: Some(Some(self.timeout)),
492 inbox: None,
493 };
494 let response = self
495 .context
496 .send_request(message.subject, request)
497 .await
498 .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Request, e))?;
499
500 let resp: Response<T> = serde_json::from_slice(response.payload.as_ref())
501 .map_err(|e| BatchPublishError::with_source(BatchPublishErrorKind::Serialization, e))?;
502
503 match resp {
504 Response::Err { error } => {
505 let kind = BatchPublishErrorKind::from_api_error(&error);
506 Err(BatchPublishError::with_source(kind, error))
507 }
508 Response::Ok(ack) => Ok(ack),
509 }
510 }
511}
512
513/// Acknowledgment returned after successfully committing a batch.
514///
515/// Contains information about the committed batch including the stream it was
516/// published to, the sequence number, and batch metadata.
517#[derive(Debug, Deserialize)]
518pub struct BatchPubAck {
519 /// The stream the batch was published to.
520 pub stream: String,
521 /// The stream sequence number of the last message in the batch.
522 #[serde(rename = "seq")]
523 pub sequence: u64,
524 /// The domain the stream belongs to, if any.
525 #[serde(default)]
526 pub domain: Option<String>,
527 /// The unique identifier for the batch.
528 #[serde(rename = "batch")]
529 pub batch_id: String,
530 /// The number of messages in the committed batch.
531 #[serde(rename = "count")]
532 pub batch_size: u64,
533}
534
535/// Builder for bulk publishing multiple messages at once
536pub struct BatchPublishAllBuilder<C> {
537 client: C,
538 timeout: Duration,
539 ack_first: bool,
540 ack_every: Option<u64>,
541}
542
543impl<C> BatchPublishAllBuilder<C>
544where
545 C: client::traits::Requester
546 + client::traits::Publisher
547 + jetstream::context::traits::TimeoutProvider
548 + Clone,
549{
550 pub fn new(client: C) -> Self {
551 Self {
552 client: client.clone(),
553 ack_first: true,
554 timeout: client.timeout(),
555 ack_every: None,
556 }
557 }
558
559 /// Configure acknowledgment for every N messages.
560 ///
561 /// See [BatchPublishBuilder::ack_every] for details.
562 pub fn ack_every(mut self, count: u64) -> Self {
563 self.ack_every = Some(count);
564 self
565 }
566
567 /// Enable acknowledgment for the first message in the batch.
568 ///
569 /// See [BatchPublishBuilder::ack_first] for details.
570 pub fn ack_first(mut self, ack_first: bool) -> Self {
571 self.ack_first = ack_first;
572 self
573 }
574
575 /// Set the timeout for acknowledgment requests.
576 ///
577 /// See [BatchPublishBuilder::timeout] for details.
578 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
579 self.timeout = duration;
580 self
581 }
582
583 /// Publish all messages from a stream and commit with the last message.
584 ///
585 /// This is a convenience method that internally uses `BatchPublish::add_message`
586 /// for all messages except the last, and `BatchPublish::commit_message` for
587 /// the last message.
588 ///
589 /// # Examples
590 ///
591 /// ## From a Vec
592 /// ```no_run
593 /// # use futures_util::stream;
594 /// # use async_nats::jetstream::message::OutboundMessage;
595 /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
596 /// let messages = vec![
597 /// OutboundMessage {
598 /// subject: "test.1".into(),
599 /// payload: "msg1".into(),
600 /// headers: None,
601 /// },
602 /// OutboundMessage {
603 /// subject: "test.2".into(),
604 /// payload: "msg2".into(),
605 /// headers: None,
606 /// },
607 /// ];
608 /// let ack = client.batch_publish_all()
609 /// .ack_first(true)
610 /// .publish(stream::iter(messages))
611 /// .await?;
612 /// # Ok(())
613 /// # }
614 /// ```
615 ///
616 /// ## From an async channel
617 /// ```no_run
618 /// # use tokio_stream::wrappers::ReceiverStream;
619 /// # use async_nats::jetstream::message::OutboundMessage;
620 /// # use jetstream_extra::batch_publish::BatchPublishExt;
621 /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
622 /// let (tx, rx) = tokio::sync::mpsc::channel(100);
623 ///
624 /// tokio::spawn(async move {
625 /// for i in 0..10 {
626 /// let msg = OutboundMessage {
627 /// subject: format!("test.{}", i).into(),
628 /// payload: format!("Message {}", i).into(),
629 /// headers: None,
630 /// };
631 /// tx.send(msg).await.unwrap();
632 /// }
633 /// });
634 ///
635 /// let ack = client.batch_publish_all()
636 /// .publish(ReceiverStream::new(rx))
637 /// .await?;
638 /// # Ok(())
639 /// # }
640 /// ```
641 ///
642 /// ## With stream transformations
643 /// ```no_run
644 /// # use futures_util::{stream, StreamExt};
645 /// # use async_nats::jetstream::message::OutboundMessage;
646 /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
647 /// let data = vec!["apple", "banana", "cherry"];
648 /// let message_stream = stream::iter(data)
649 /// .enumerate()
650 /// .map(|(i, fruit)| OutboundMessage {
651 /// subject: format!("fruits.{}", i).into(),
652 /// payload: fruit.into(),
653 /// headers: None,
654 /// });
655 ///
656 /// let ack = client.batch_publish_all()
657 /// .publish(message_stream)
658 /// .await?;
659 /// # Ok(())
660 /// # }
661 /// ```
662 ///
663 /// ## From async file reading
664 /// ```no_run
665 /// # use tokio::io::{AsyncBufReadExt, BufReader};
666 /// # use futures_util::{stream, StreamExt};
667 /// # use async_nats::jetstream::message::OutboundMessage;
668 /// # use jetstream_extra::batch_publish::BatchPublishExt;
669 /// # use std::time::Duration;
670 /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
671 /// // Read file lines into a vector first
672 /// let file = tokio::fs::File::open("data.txt").await?;
673 /// let reader = BufReader::new(file);
674 /// let mut lines = reader.lines();
675 /// let mut messages = Vec::new();
676 ///
677 /// while let Some(line) = lines.next_line().await? {
678 /// messages.push(OutboundMessage {
679 /// subject: "file.line".into(),
680 /// payload: line.into(),
681 /// headers: None,
682 /// });
683 /// }
684 ///
685 /// let ack = client.batch_publish_all()
686 /// .timeout(Duration::from_secs(30))
687 /// .publish(stream::iter(messages))
688 /// .await?;
689 /// # Ok(())
690 /// # }
691 /// ```
692 ///
693 /// ## Rate-limited publishing
694 /// ```no_run
695 /// # use futures_util::stream;
696 /// # use async_nats::jetstream::message::OutboundMessage;
697 /// # use jetstream_extra::batch_publish::BatchPublishExt;
698 /// # use std::time::Duration;
699 /// # async fn example(client: impl BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
700 /// let messages = vec![
701 /// OutboundMessage {
702 /// subject: "test.1".into(),
703 /// payload: "msg1".into(),
704 /// headers: None,
705 /// },
706 /// // ... more messages
707 /// ];
708 ///
709 /// // Note: For actual rate limiting, you would use tokio_stream::StreamExt::throttle
710 /// // For now, showing the pattern with regular stream
711 /// let message_stream = stream::iter(messages);
712 ///
713 /// let ack = client.batch_publish_all()
714 /// .timeout(Duration::from_secs(10))
715 /// .publish(message_stream)
716 /// .await?;
717 /// # Ok(())
718 /// # }
719 /// ```
720 ///
721 /// ## From multiple sources merged
722 /// ```no_run
723 /// # use futures_util::{stream, StreamExt};
724 /// # use async_nats::jetstream::message::OutboundMessage;
725 /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
726 /// let source1 = stream::iter(vec![
727 /// OutboundMessage {
728 /// subject: "test.1".into(),
729 /// payload: "msg1".into(),
730 /// headers: None,
731 /// },
732 /// ]);
733 /// let source2 = stream::iter(vec![
734 /// OutboundMessage {
735 /// subject: "test.2".into(),
736 /// payload: "msg2".into(),
737 /// headers: None,
738 /// },
739 /// ]);
740 ///
741 /// let merged = source1.chain(source2);
742 ///
743 /// let ack = client.batch_publish_all()
744 /// .publish(merged)
745 /// .await?;
746 /// # Ok(())
747 /// # }
748 /// ```
749 ///
750 /// ## With error handling in stream
751 /// ```no_run
752 /// # use futures_util::{stream, StreamExt};
753 /// # use async_nats::jetstream::message::OutboundMessage;
754 /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
755 /// // First filter errors, then create stream from good messages
756 /// let results: Vec<Result<OutboundMessage, &str>> = vec![
757 /// Ok(OutboundMessage {
758 /// subject: "test.1".into(),
759 /// payload: "msg1".into(),
760 /// headers: None,
761 /// }),
762 /// Err("simulated error"),
763 /// Ok(OutboundMessage {
764 /// subject: "test.2".into(),
765 /// payload: "msg2".into(),
766 /// headers: None,
767 /// }),
768 /// ];
769 ///
770 /// // Filter out errors synchronously
771 /// let good_messages: Vec<_> = results
772 /// .into_iter()
773 /// .filter_map(|result| match result {
774 /// Ok(msg) => Some(msg),
775 /// Err(e) => {
776 /// eprintln!("Skipping message: {}", e);
777 /// None
778 /// }
779 /// })
780 /// .collect();
781 ///
782 /// let ack = client.batch_publish_all()
783 /// .publish(stream::iter(good_messages))
784 /// .await?;
785 /// # Ok(())
786 /// # }
787 /// ```
788 ///
789 /// ## From arrays
790 /// ```no_run
791 /// # use futures_util::stream;
792 /// # use async_nats::jetstream::message::OutboundMessage;
793 /// # async fn example(client: impl jetstream_extra::batch_publish::BatchPublishExt) -> Result<(), Box<dyn std::error::Error>> {
794 /// let ack = client.batch_publish_all()
795 /// .publish(stream::iter([
796 /// OutboundMessage {
797 /// subject: "test.1".into(),
798 /// payload: "First".into(),
799 /// headers: None,
800 /// },
801 /// OutboundMessage {
802 /// subject: "test.2".into(),
803 /// payload: "Second".into(),
804 /// headers: None,
805 /// },
806 /// ]))
807 /// .await?;
808 /// # Ok(())
809 /// # }
810 /// ```
811 pub async fn publish<S>(self, messages: S) -> Result<BatchPubAck, BatchPublishError>
812 where
813 S: Stream<Item = OutboundMessage> + Unpin,
814 {
815 // Build regular batch with same configuration
816 let mut batch = BatchPublish {
817 context: self.client,
818 sequence: 0,
819 batch_id: nuid::next().to_string(),
820 ack_every: self.ack_every,
821 ack_first: self.ack_first,
822 timeout: self.timeout,
823 };
824
825 // Buffer one message to identify the last
826 let mut last_msg = None;
827 futures_util::pin_mut!(messages);
828
829 while let Some(msg) = messages.next().await {
830 if let Some(prev) = last_msg.replace(msg) {
831 batch.add_message(prev).await?;
832 }
833 }
834
835 // Commit with the last message
836 match last_msg {
837 Some(msg) => batch.commit_message(msg).await,
838 None => Err(BatchPublishError::new(BatchPublishErrorKind::EmptyBatch)),
839 }
840 }
841}
842
843/// Error type for batch publish operations
844pub type BatchPublishError = async_nats::error::Error<BatchPublishErrorKind>;
845
846/// Kinds of errors that can occur during batch publish operations
847#[derive(Debug, Clone, Copy, PartialEq)]
848pub enum BatchPublishErrorKind {
849 /// Failed to send request to the server
850 Request,
851 /// Failed to publish message
852 Publish,
853 /// Failed to serialize or deserialize data
854 Serialization,
855 /// Batch is in an invalid state for the operation
856 BatchFull,
857 /// Exceeded maximum allowed messages in batch (server limit: 1000)
858 MaxMessagesExceeded,
859 /// Empty batch cannot be committed
860 EmptyBatch,
861 /// Batch publishing is not enabled on the stream (allow_atomic_publish must be true)
862 BatchPublishNotEnabled,
863 /// Batch publish is incomplete and was abandoned
864 BatchPublishIncomplete,
865 /// Batch uses unsupported headers (Nats-Msg-Id or Nats-Expected-Last-Msg-Id)
866 BatchPublishUnsupportedHeader,
867 /// Other unspecified error
868 Other,
869}
870
871impl BatchPublishErrorKind {
872 /// Map a JetStream API error to the appropriate batch publish error kind
873 fn from_api_error(error: &async_nats::jetstream::Error) -> Self {
874 use async_nats::jetstream::ErrorCode;
875 match error.error_code() {
876 ErrorCode::ATOMIC_PUBLISH_DISABLED => Self::BatchPublishNotEnabled,
877 ErrorCode::ATOMIC_PUBLISH_INCOMPLETE_BATCH => Self::BatchPublishIncomplete,
878 ErrorCode::ATOMIC_PUBLISH_UNSUPPORTED_HEADER => Self::BatchPublishUnsupportedHeader,
879 ErrorCode::ATOMIC_PUBLISH_TOO_LARGE_BATCH => Self::MaxMessagesExceeded,
880 _ => Self::Other,
881 }
882 }
883}
884
885impl Display for BatchPublishErrorKind {
886 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887 match self {
888 Self::Request => write!(f, "request failed"),
889 Self::Publish => write!(f, "publish failed"),
890 Self::Serialization => write!(f, "serialization/deserialization error"),
891 Self::BatchFull => write!(f, "batch is full"),
892 Self::MaxMessagesExceeded => write!(f, "batch exceeds server limit (1000 messages)"),
893 Self::EmptyBatch => write!(f, "empty batch cannot be committed"),
894 Self::BatchPublishNotEnabled => write!(f, "batch publishing not enabled on stream"),
895 Self::BatchPublishIncomplete => {
896 write!(f, "batch publish is incomplete and was abandoned")
897 }
898 Self::BatchPublishUnsupportedHeader => write!(
899 f,
900 "batch uses unsupported headers (Nats-Msg-Id or Nats-Expected-Last-Msg-Id)"
901 ),
902 Self::Other => write!(f, "other error"),
903 }
904 }
905}