async_nats/jetstream/
stream.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on a [Stream], create/delete/update [Consumer].
15
16use std::{
17    collections::{self, HashMap},
18    fmt::{self, Debug, Display},
19    future::IntoFuture,
20    io::{self},
21    pin::Pin,
22    str::FromStr,
23    task::Poll,
24    time::Duration,
25};
26
27use crate::{
28    error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39    consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40    context::{
41        ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42        StreamsErrorKind,
43    },
44    errors::ErrorCode,
45    is_valid_name,
46    message::{StreamMessage, StreamMessageError},
47    response::Response,
48    Context,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55    NotFound,
56    InvalidSubject,
57    TimedOut,
58    Request,
59    ErrorResponse(StatusCode, String),
60    Other,
61}
62
63impl Display for DirectGetErrorKind {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        match self {
66            Self::InvalidSubject => write!(f, "invalid subject"),
67            Self::NotFound => write!(f, "message not found"),
68            Self::ErrorResponse(status, description) => {
69                write!(f, "unable to get message: {status} {description}")
70            }
71            Self::Other => write!(f, "error getting message"),
72            Self::TimedOut => write!(f, "timed out"),
73            Self::Request => write!(f, "request failed"),
74        }
75    }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81    fn from(err: crate::RequestError) -> Self {
82        match err.kind() {
83            crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84            crate::RequestErrorKind::NoResponders => {
85                DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86                    StatusCode::NO_RESPONDERS,
87                    "no responders".to_string(),
88                ))
89            }
90            crate::RequestErrorKind::Other => {
91                DirectGetError::with_source(DirectGetErrorKind::Other, err)
92            }
93        }
94    }
95}
96
97impl From<serde_json::Error> for DirectGetError {
98    fn from(err: serde_json::Error) -> Self {
99        DirectGetError::with_source(DirectGetErrorKind::Other, err)
100    }
101}
102
103impl From<StreamMessageError> for DirectGetError {
104    fn from(err: StreamMessageError) -> Self {
105        DirectGetError::with_source(DirectGetErrorKind::Other, err)
106    }
107}
108
109#[derive(Clone, Debug, PartialEq)]
110pub enum DeleteMessageErrorKind {
111    Request,
112    TimedOut,
113    JetStream(super::errors::Error),
114}
115
116impl Display for DeleteMessageErrorKind {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        match self {
119            Self::Request => write!(f, "request failed"),
120            Self::TimedOut => write!(f, "timed out"),
121            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
122        }
123    }
124}
125
126pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
127
128/// Handle to operations that can be performed on a `Stream`.
129/// It's generic over the type of `info` field to allow `Stream` with or without
130/// info contents.
131#[derive(Debug, Clone)]
132pub struct Stream<T = Info> {
133    pub(crate) info: T,
134    pub(crate) context: Context,
135    pub(crate) name: String,
136}
137
138impl Stream<Info> {
139    /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
140    /// [Stream] and returns it.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// # #[tokio::main]
146    /// # async fn main() -> Result<(), async_nats::Error> {
147    /// let client = async_nats::connect("localhost:4222").await?;
148    /// let jetstream = async_nats::jetstream::new(client);
149    ///
150    /// let mut stream = jetstream.get_stream("events").await?;
151    ///
152    /// let info = stream.info().await?;
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub async fn info(&mut self) -> Result<&Info, InfoError> {
157        let subject = format!("STREAM.INFO.{}", self.info.config.name);
158
159        match self.context.request(subject, &json!({})).await? {
160            Response::Ok::<Info>(info) => {
161                self.info = info;
162                Ok(&self.info)
163            }
164            Response::Err { error } => Err(error.into()),
165        }
166    }
167
168    /// Returns cached [Info] for the [Stream].
169    /// Cache is either from initial creation/retrieval of the [Stream] or last call to
170    /// [Stream::info].
171    ///
172    /// # Examples
173    ///
174    /// ```no_run
175    /// # #[tokio::main]
176    /// # async fn main() -> Result<(), async_nats::Error> {
177    /// let client = async_nats::connect("localhost:4222").await?;
178    /// let jetstream = async_nats::jetstream::new(client);
179    ///
180    /// let stream = jetstream.get_stream("events").await?;
181    ///
182    /// let info = stream.cached_info();
183    /// # Ok(())
184    /// # }
185    /// ```
186    pub fn cached_info(&self) -> &Info {
187        &self.info
188    }
189}
190
191impl<I> Stream<I> {
192    /// Retrieves `info` about [Stream] from the server. Does not update the cache.
193    /// Can be used on Stream retrieved by [Context::get_stream_no_info]
194    pub async fn get_info(&self) -> Result<Info, InfoError> {
195        let subject = format!("STREAM.INFO.{}", self.name);
196
197        match self.context.request(subject, &json!({})).await? {
198            Response::Ok::<Info>(info) => Ok(info),
199            Response::Err { error } => Err(error.into()),
200        }
201    }
202
203    /// Retrieves [[Info]] from the server and returns a [[futures_util::Stream]] that allows
204    /// iterating over all subjects in the stream fetched via paged API.
205    ///
206    /// # Examples
207    ///
208    /// ```no_run
209    /// # #[tokio::main]
210    /// # async fn main() -> Result<(), async_nats::Error> {
211    /// use futures_util::TryStreamExt;
212    /// let client = async_nats::connect("localhost:4222").await?;
213    /// let jetstream = async_nats::jetstream::new(client);
214    ///
215    /// let mut stream = jetstream.get_stream("events").await?;
216    ///
217    /// let mut info = stream.info_with_subjects("events.>").await?;
218    ///
219    /// while let Some((subject, count)) = info.try_next().await? {
220    ///     println!("Subject: {} count: {}", subject, count);
221    /// }
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub async fn info_with_subjects<F: AsRef<str>>(
226        &self,
227        subjects_filter: F,
228    ) -> Result<InfoWithSubjects, InfoError> {
229        let subjects_filter = subjects_filter.as_ref().to_string();
230        // TODO: validate the subject and decide if this should be a `Subject`
231        let info = stream_info_with_details(
232            self.context.clone(),
233            self.name.clone(),
234            0,
235            false,
236            subjects_filter.clone(),
237        )
238        .await?;
239
240        Ok(InfoWithSubjects::new(
241            self.context.clone(),
242            info,
243            subjects_filter,
244        ))
245    }
246
247    /// Creates a builder that allows to customize `Stream::Info`.
248    ///
249    /// # Examples
250    /// ```no_run
251    /// # #[tokio::main]
252    /// # async fn main() -> Result<(), async_nats::Error> {
253    /// use futures_util::TryStreamExt;
254    /// let client = async_nats::connect("localhost:4222").await?;
255    /// let jetstream = async_nats::jetstream::new(client);
256    ///
257    /// let mut stream = jetstream.get_stream("events").await?;
258    ///
259    /// let mut info = stream
260    ///     .info_builder()
261    ///     .with_deleted(true)
262    ///     .subjects("events.>")
263    ///     .fetch()
264    ///     .await?;
265    ///
266    /// while let Some((subject, count)) = info.try_next().await? {
267    ///     println!("Subject: {} count: {}", subject, count);
268    /// }
269    /// # Ok(())
270    /// # }
271    /// ```
272    pub fn info_builder(&self) -> StreamInfoBuilder {
273        StreamInfoBuilder::new(self.context.clone(), self.name.clone())
274    }
275
276    /// Creates a builder for direct get operations.
277    ///
278    /// Allows for more control over direct get requests.
279    ///
280    /// # Examples
281    ///
282    /// ```no_run
283    /// # #[tokio::main]
284    /// # async fn main() -> Result<(), async_nats::Error> {
285    /// let client = async_nats::connect("demo.nats.io").await?;
286    /// let jetstream = async_nats::jetstream::new(client);
287    ///
288    /// let stream = jetstream.get_stream("events").await?;
289    ///
290    /// // Get message without headers
291    /// let message = stream.direct_get_builder().sequence(100).send().await?;
292    /// # Ok(())
293    /// # }
294    /// ```
295    pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders> {
296        DirectGetBuilder::new(self.context.clone(), self.name.clone())
297    }
298
299    /// Gets next message for a [Stream].
300    ///
301    /// Requires a [Stream] with `allow_direct` set to `true`.
302    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
303    /// from any replica member. This means read after write is possible,
304    /// as that given replica might not yet catch up with the leader.
305    ///
306    /// # Examples
307    ///
308    /// ```no_run
309    /// # #[tokio::main]
310    /// # async fn main() -> Result<(), async_nats::Error> {
311    /// let client = async_nats::connect("demo.nats.io").await?;
312    /// let jetstream = async_nats::jetstream::new(client);
313    ///
314    /// let stream = jetstream
315    ///     .create_stream(async_nats::jetstream::stream::Config {
316    ///         name: "events".to_string(),
317    ///         subjects: vec!["events.>".to_string()],
318    ///         allow_direct: true,
319    ///         ..Default::default()
320    ///     })
321    ///     .await?;
322    ///
323    /// jetstream.publish("events.data", "data".into()).await?;
324    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
325    ///
326    /// let message = stream
327    ///     .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
328    ///     .await?;
329    ///
330    /// # Ok(())
331    /// # }
332    /// ```
333    pub async fn direct_get_next_for_subject<T: Into<String>>(
334        &self,
335        subject: T,
336        sequence: Option<u64>,
337    ) -> Result<StreamMessage, DirectGetError> {
338        let subject_str = subject.into();
339        if !is_valid_subject(&subject_str) {
340            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
341        }
342
343        let mut builder = self.direct_get_builder().next_by_subject(subject_str);
344        if let Some(seq) = sequence {
345            builder = builder.sequence(seq);
346        }
347
348        builder.send().await
349    }
350
351    /// Gets first message from [Stream].
352    ///
353    /// Requires a [Stream] with `allow_direct` set to `true`.
354    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
355    /// from any replica member. This means read after write is possible,
356    /// as that given replica might not yet catch up with the leader.
357    ///
358    /// # Examples
359    ///
360    /// ```no_run
361    /// # #[tokio::main]
362    /// # async fn main() -> Result<(), async_nats::Error> {
363    /// let client = async_nats::connect("demo.nats.io").await?;
364    /// let jetstream = async_nats::jetstream::new(client);
365    ///
366    /// let stream = jetstream
367    ///     .create_stream(async_nats::jetstream::stream::Config {
368    ///         name: "events".to_string(),
369    ///         subjects: vec!["events.>".to_string()],
370    ///         allow_direct: true,
371    ///         ..Default::default()
372    ///     })
373    ///     .await?;
374    ///
375    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
376    ///
377    /// let message = stream.direct_get_first_for_subject("events.data").await?;
378    ///
379    /// # Ok(())
380    /// # }
381    /// ```
382    pub async fn direct_get_first_for_subject<T: Into<String>>(
383        &self,
384        subject: T,
385    ) -> Result<StreamMessage, DirectGetError> {
386        let subject_str = subject.into();
387        if !is_valid_subject(&subject_str) {
388            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
389        }
390
391        self.direct_get_builder()
392            .next_by_subject(subject_str)
393            .send()
394            .await
395    }
396
397    /// Gets message from [Stream] with given `sequence id`.
398    ///
399    /// Requires a [Stream] with `allow_direct` set to `true`.
400    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
401    /// from any replica member. This means read after write is possible,
402    /// as that given replica might not yet catch up with the leader.
403    ///
404    /// # Examples
405    ///
406    /// ```no_run
407    /// # #[tokio::main]
408    /// # async fn main() -> Result<(), async_nats::Error> {
409    /// let client = async_nats::connect("demo.nats.io").await?;
410    /// let jetstream = async_nats::jetstream::new(client);
411    ///
412    /// let stream = jetstream
413    ///     .create_stream(async_nats::jetstream::stream::Config {
414    ///         name: "events".to_string(),
415    ///         subjects: vec!["events.>".to_string()],
416    ///         allow_direct: true,
417    ///         ..Default::default()
418    ///     })
419    ///     .await?;
420    ///
421    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
422    ///
423    /// let message = stream.direct_get(pub_ack.await?.sequence).await?;
424    ///
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
429        self.direct_get_builder().sequence(sequence).send().await
430    }
431
432    /// Gets last message for a given `subject`.
433    ///
434    /// Requires a [Stream] with `allow_direct` set to `true`.
435    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
436    /// from any replica member. This means read after write is possible,
437    /// as that given replica might not yet catch up with the leader.
438    ///
439    /// # Examples
440    ///
441    /// ```no_run
442    /// # #[tokio::main]
443    /// # async fn main() -> Result<(), async_nats::Error> {
444    /// let client = async_nats::connect("demo.nats.io").await?;
445    /// let jetstream = async_nats::jetstream::new(client);
446    ///
447    /// let stream = jetstream
448    ///     .create_stream(async_nats::jetstream::stream::Config {
449    ///         name: "events".to_string(),
450    ///         subjects: vec!["events.>".to_string()],
451    ///         allow_direct: true,
452    ///         ..Default::default()
453    ///     })
454    ///     .await?;
455    ///
456    /// jetstream.publish("events.data", "data".into()).await?;
457    ///
458    /// let message = stream.direct_get_last_for_subject("events.data").await?;
459    ///
460    /// # Ok(())
461    /// # }
462    /// ```
463    pub async fn direct_get_last_for_subject<T: Into<String>>(
464        &self,
465        subject: T,
466    ) -> Result<StreamMessage, DirectGetError> {
467        self.direct_get_builder()
468            .last_by_subject(subject)
469            .send()
470            .await
471    }
472    /// Creates a builder for retrieving messages from the stream with flexible options.
473    /// Raw message methods retrieve messages directly from the stream leader instead
474    /// of a replica. This should be used with care, as it can put additional load on the
475    /// stream leader.
476    ///
477    /// # Examples
478    ///
479    /// ```no_run
480    /// # #[tokio::main]
481    /// # async fn main() -> Result<(), async_nats::Error> {
482    /// let client = async_nats::connect("localhost:4222").await?;
483    /// let context = async_nats::jetstream::new(client);
484    /// let stream = context.get_stream("events").await?;
485    ///
486    /// // Get message without headers
487    /// let value = stream.raw_message_builder().sequence(100).send().await?;
488    /// # Ok(())
489    /// # }
490    /// ```
491    pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders> {
492        RawMessageBuilder::new(self.context.clone(), self.name.clone())
493    }
494
495    /// Get a raw message from the stream for a given stream sequence.
496    /// This low-level API always reaches stream leader.
497    /// This should be discouraged in favor of using [Stream::direct_get].
498    ///
499    /// # Examples
500    ///
501    /// ```no_run
502    /// #[tokio::main]
503    /// # async fn main() -> Result<(), async_nats::Error> {
504    /// use futures_util::StreamExt;
505    /// use futures_util::TryStreamExt;
506    ///
507    /// let client = async_nats::connect("localhost:4222").await?;
508    /// let context = async_nats::jetstream::new(client);
509    ///
510    /// let stream = context
511    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
512    ///         name: "events".to_string(),
513    ///         max_messages: 10_000,
514    ///         ..Default::default()
515    ///     })
516    ///     .await?;
517    ///
518    /// let publish_ack = context.publish("events", "data".into()).await?;
519    /// let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
520    /// println!("Retrieved raw message {:?}", raw_message);
521    /// # Ok(())
522    /// # }
523    /// ```
524    pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
525        self.raw_message_builder().sequence(sequence).send().await
526    }
527
528    /// Get a first message from the stream for a given subject starting from provided sequence.
529    /// This low-level API always reaches stream leader.
530    /// This should be discouraged in favor of using [Stream::direct_get_first_for_subject].
531    ///
532    /// # Examples
533    ///
534    /// ```no_run
535    /// #[tokio::main]
536    /// # async fn main() -> Result<(), async_nats::Error> {
537    /// use futures_util::StreamExt;
538    /// use futures_util::TryStreamExt;
539    ///
540    /// let client = async_nats::connect("localhost:4222").await?;
541    /// let context = async_nats::jetstream::new(client);
542    /// let stream = context.get_stream_no_info("events").await?;
543    ///
544    /// let raw_message = stream
545    ///     .get_first_raw_message_by_subject("events.created", 10)
546    ///     .await?;
547    /// println!("Retrieved raw message {:?}", raw_message);
548    /// # Ok(())
549    /// # }
550    /// ```
551    pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
552        &self,
553        subject: T,
554        sequence: u64,
555    ) -> Result<StreamMessage, RawMessageError> {
556        self.raw_message_builder()
557            .sequence(sequence)
558            .next_by_subject(subject.as_ref().to_string())
559            .send()
560            .await
561    }
562
563    /// Get a next message from the stream for a given subject.
564    /// This low-level API always reaches stream leader.
565    /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
566    ///
567    /// # Examples
568    ///
569    /// ```no_run
570    /// #[tokio::main]
571    /// # async fn main() -> Result<(), async_nats::Error> {
572    /// use futures_util::StreamExt;
573    /// use futures_util::TryStreamExt;
574    ///
575    /// let client = async_nats::connect("localhost:4222").await?;
576    /// let context = async_nats::jetstream::new(client);
577    /// let stream = context.get_stream_no_info("events").await?;
578    ///
579    /// let raw_message = stream
580    ///     .get_next_raw_message_by_subject("events.created")
581    ///     .await?;
582    /// println!("Retrieved raw message {:?}", raw_message);
583    /// # Ok(())
584    /// # }
585    /// ```
586    pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
587        &self,
588        subject: T,
589    ) -> Result<StreamMessage, RawMessageError> {
590        self.raw_message_builder()
591            .next_by_subject(subject.as_ref().to_string())
592            .send()
593            .await
594    }
595
596    /// Get a last message from the stream for a given subject.
597    /// This low-level API always reaches stream leader.
598    /// This should be discouraged in favor of using [Stream::direct_get_last_for_subject].
599    ///
600    /// # Examples
601    ///
602    /// ```no_run
603    /// #[tokio::main]
604    /// # async fn main() -> Result<(), async_nats::Error> {
605    /// use futures_util::StreamExt;
606    /// use futures_util::TryStreamExt;
607    ///
608    /// let client = async_nats::connect("localhost:4222").await?;
609    /// let context = async_nats::jetstream::new(client);
610    /// let stream = context.get_stream_no_info("events").await?;
611    ///
612    /// let raw_message = stream
613    ///     .get_last_raw_message_by_subject("events.created")
614    ///     .await?;
615    /// println!("Retrieved raw message {:?}", raw_message);
616    /// # Ok(())
617    /// # }
618    /// ```
619    pub async fn get_last_raw_message_by_subject(
620        &self,
621        stream_subject: &str,
622    ) -> Result<StreamMessage, LastRawMessageError> {
623        self.raw_message_builder()
624            .last_by_subject(stream_subject.to_string())
625            .send()
626            .await
627    }
628
629    /// Delete a message from the stream.
630    ///
631    /// # Examples
632    ///
633    /// ```no_run
634    /// # #[tokio::main]
635    /// # async fn main() -> Result<(), async_nats::Error> {
636    /// let client = async_nats::connect("localhost:4222").await?;
637    /// let context = async_nats::jetstream::new(client);
638    ///
639    /// let stream = context
640    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
641    ///         name: "events".to_string(),
642    ///         max_messages: 10_000,
643    ///         ..Default::default()
644    ///     })
645    ///     .await?;
646    ///
647    /// let publish_ack = context.publish("events", "data".into()).await?;
648    /// stream.delete_message(publish_ack.await?.sequence).await?;
649    /// # Ok(())
650    /// # }
651    /// ```
652    pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
653        let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
654        let payload = json!({
655            "seq": sequence,
656        });
657
658        let response: Response<DeleteStatus> = self
659            .context
660            .request(subject, &payload)
661            .map_err(|err| match err.kind() {
662                RequestErrorKind::TimedOut => {
663                    DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
664                }
665                _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
666            })
667            .await?;
668
669        match response {
670            Response::Err { error } => Err(DeleteMessageError::new(
671                DeleteMessageErrorKind::JetStream(error),
672            )),
673            Response::Ok(value) => Ok(value.success),
674        }
675    }
676
677    /// Purge `Stream` messages.
678    ///
679    /// # Examples
680    ///
681    /// ```no_run
682    /// # #[tokio::main]
683    /// # async fn main() -> Result<(), async_nats::Error> {
684    /// let client = async_nats::connect("demo.nats.io").await?;
685    /// let jetstream = async_nats::jetstream::new(client);
686    ///
687    /// let stream = jetstream.get_stream("events").await?;
688    /// stream.purge().await?;
689    /// # Ok(())
690    /// # }
691    /// ```
692    pub fn purge(&self) -> Purge<No, No> {
693        Purge::build(self)
694    }
695
696    /// Purge `Stream` messages for a matching subject.
697    ///
698    /// # Examples
699    ///
700    /// ```no_run
701    /// # #[tokio::main]
702    /// # #[allow(deprecated)]
703    /// # async fn main() -> Result<(), async_nats::Error> {
704    /// let client = async_nats::connect("demo.nats.io").await?;
705    /// let jetstream = async_nats::jetstream::new(client);
706    ///
707    /// let stream = jetstream.get_stream("events").await?;
708    /// stream.purge_subject("data").await?;
709    /// # Ok(())
710    /// # }
711    /// ```
712    #[deprecated(
713        since = "0.25.0",
714        note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
715    )]
716    pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
717    where
718        T: Into<String>,
719    {
720        self.purge().filter(subject).await
721    }
722
723    /// Create or update `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
724    /// returns the info from the server about created [Consumer]
725    /// If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
726    ///
727    /// # Examples
728    ///
729    /// ```no_run
730    /// # #[tokio::main]
731    /// # async fn main() -> Result<(), async_nats::Error> {
732    /// use async_nats::jetstream::consumer;
733    /// let client = async_nats::connect("localhost:4222").await?;
734    /// let jetstream = async_nats::jetstream::new(client);
735    ///
736    /// let stream = jetstream.get_stream("events").await?;
737    /// let info = stream
738    ///     .create_consumer(consumer::pull::Config {
739    ///         durable_name: Some("pull".to_string()),
740    ///         ..Default::default()
741    ///     })
742    ///     .await?;
743    /// # Ok(())
744    /// # }
745    /// ```
746    pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
747        &self,
748        config: C,
749    ) -> Result<Consumer<C>, ConsumerError> {
750        self.context
751            .create_consumer_on_stream(config, self.name.clone())
752            .await
753    }
754
755    /// Update an existing consumer.
756    /// This call will fail if the consumer does not exist.
757    /// returns the info from the server about updated [Consumer].
758    ///
759    /// # Examples
760    ///
761    /// ```no_run
762    /// # #[tokio::main]
763    /// # async fn main() -> Result<(), async_nats::Error> {
764    /// use async_nats::jetstream::consumer;
765    /// let client = async_nats::connect("localhost:4222").await?;
766    /// let jetstream = async_nats::jetstream::new(client);
767    ///
768    /// let stream = jetstream.get_stream("events").await?;
769    /// let info = stream
770    ///     .update_consumer(consumer::pull::Config {
771    ///         durable_name: Some("pull".to_string()),
772    ///         ..Default::default()
773    ///     })
774    ///     .await?;
775    /// # Ok(())
776    /// # }
777    /// ```
778    #[cfg(feature = "server_2_10")]
779    pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
780        &self,
781        config: C,
782    ) -> Result<Consumer<C>, ConsumerUpdateError> {
783        self.context
784            .update_consumer_on_stream(config, self.name.clone())
785            .await
786    }
787
788    /// Create consumer, but only if it does not exist or the existing config is exactly
789    /// the same.
790    /// This method will fail if consumer is already present with different config.
791    /// returns the info from the server about created [Consumer].
792    ///
793    /// # Examples
794    ///
795    /// ```no_run
796    /// # #[tokio::main]
797    /// # async fn main() -> Result<(), async_nats::Error> {
798    /// use async_nats::jetstream::consumer;
799    /// let client = async_nats::connect("localhost:4222").await?;
800    /// let jetstream = async_nats::jetstream::new(client);
801    ///
802    /// let stream = jetstream.get_stream("events").await?;
803    /// let info = stream
804    ///     .create_consumer_strict(consumer::pull::Config {
805    ///         durable_name: Some("pull".to_string()),
806    ///         ..Default::default()
807    ///     })
808    ///     .await?;
809    /// # Ok(())
810    /// # }
811    /// ```
812    #[cfg(feature = "server_2_10")]
813    pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
814        &self,
815        config: C,
816    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
817        self.context
818            .create_consumer_strict_on_stream(config, self.name.clone())
819            .await
820    }
821
822    /// Retrieve [Info] about [Consumer] from the server.
823    ///
824    /// # Examples
825    ///
826    /// ```no_run
827    /// # #[tokio::main]
828    /// # async fn main() -> Result<(), async_nats::Error> {
829    /// use async_nats::jetstream::consumer;
830    /// let client = async_nats::connect("localhost:4222").await?;
831    /// let jetstream = async_nats::jetstream::new(client);
832    ///
833    /// let stream = jetstream.get_stream("events").await?;
834    /// let info = stream.consumer_info("pull").await?;
835    /// # Ok(())
836    /// # }
837    /// ```
838    pub async fn consumer_info<T: AsRef<str>>(
839        &self,
840        name: T,
841    ) -> Result<consumer::Info, ConsumerInfoError> {
842        let name = name.as_ref();
843
844        if !is_valid_name(name) {
845            return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
846        }
847
848        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
849
850        match self.context.request(subject, &json!({})).await? {
851            Response::Ok(info) => Ok(info),
852            Response::Err { error } => Err(error.into()),
853        }
854    }
855
856    /// Get [Consumer] from the the server. [Consumer] iterators can be used to retrieve
857    /// [Messages][crate::jetstream::Message] for a given [Consumer].
858    ///
859    /// # Examples
860    ///
861    /// ```no_run
862    /// # #[tokio::main]
863    /// # async fn main() -> Result<(), async_nats::Error> {
864    /// use async_nats::jetstream::consumer;
865    /// use futures_util::StreamExt;
866    /// let client = async_nats::connect("localhost:4222").await?;
867    /// let jetstream = async_nats::jetstream::new(client);
868    ///
869    /// let stream = jetstream.get_stream("events").await?;
870    /// let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
871    /// # Ok(())
872    /// # }
873    /// ```
874    pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
875        &self,
876        name: &str,
877    ) -> Result<Consumer<T>, crate::Error> {
878        let info = self.consumer_info(name).await?;
879
880        Ok(Consumer::new(
881            T::try_from_consumer_config(info.config.clone())?,
882            info,
883            self.context.clone(),
884        ))
885    }
886
887    /// Create a [Consumer] with the given configuration if it is not present on the server. Returns a handle to the [Consumer].
888    ///
889    /// Note: This does not validate if the [Consumer] on the server is compatible with the configuration passed in except Push/Pull compatibility.
890    ///
891    /// # Examples
892    ///
893    /// ```no_run
894    /// # #[tokio::main]
895    /// # async fn main() -> Result<(), async_nats::Error> {
896    /// use async_nats::jetstream::consumer;
897    /// use futures_util::StreamExt;
898    /// let client = async_nats::connect("localhost:4222").await?;
899    /// let jetstream = async_nats::jetstream::new(client);
900    ///
901    /// let stream = jetstream.get_stream("events").await?;
902    /// let consumer = stream
903    ///     .get_or_create_consumer(
904    ///         "pull",
905    ///         consumer::pull::Config {
906    ///             durable_name: Some("pull".to_string()),
907    ///             ..Default::default()
908    ///         },
909    ///     )
910    ///     .await?;
911    /// # Ok(())
912    /// # }
913    /// ```
914    pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
915        &self,
916        name: &str,
917        config: T,
918    ) -> Result<Consumer<T>, ConsumerError> {
919        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
920
921        match self.context.request(subject, &json!({})).await? {
922            Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
923            Response::Err { error } => Err(error.into()),
924            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
925                T::try_from_consumer_config(info.config.clone()).map_err(|err| {
926                    ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
927                })?,
928                info,
929                self.context.clone(),
930            )),
931        }
932    }
933
934    /// Delete a [Consumer] from the server.
935    ///
936    /// # Examples
937    ///
938    /// ```no_run
939    /// # #[tokio::main]
940    /// # async fn main() -> Result<(), async_nats::Error> {
941    /// use async_nats::jetstream::consumer;
942    /// use futures_util::StreamExt;
943    /// let client = async_nats::connect("localhost:4222").await?;
944    /// let jetstream = async_nats::jetstream::new(client);
945    ///
946    /// jetstream
947    ///     .get_stream("events")
948    ///     .await?
949    ///     .delete_consumer("pull")
950    ///     .await?;
951    /// # Ok(())
952    /// # }
953    /// ```
954    pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
955        let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
956
957        match self.context.request(subject, &json!({})).await? {
958            Response::Ok(delete_status) => Ok(delete_status),
959            Response::Err { error } => Err(error.into()),
960        }
961    }
962
963    /// Pause a [Consumer] until the given time.
964    /// It will not deliver any messages to clients during that time.
965    ///
966    /// # Examples
967    ///
968    /// ```no_run
969    /// # #[tokio::main]
970    /// # async fn main() -> Result<(), async_nats::Error> {
971    /// use async_nats::jetstream::consumer;
972    /// use futures_util::StreamExt;
973    /// let client = async_nats::connect("localhost:4222").await?;
974    /// let jetstream = async_nats::jetstream::new(client);
975    /// let pause_until =
976    ///     time::OffsetDateTime::now_utc().saturating_add(time::Duration::seconds_f32(10.0));
977    ///
978    /// jetstream
979    ///     .get_stream("events")
980    ///     .await?
981    ///     .pause_consumer("my_consumer", pause_until)
982    ///     .await?;
983    /// # Ok(())
984    /// # }
985    /// ```
986    #[cfg(feature = "server_2_11")]
987    pub async fn pause_consumer(
988        &self,
989        name: &str,
990        pause_until: OffsetDateTime,
991    ) -> Result<PauseResponse, ConsumerError> {
992        self.request_pause_consumer(name, Some(pause_until)).await
993    }
994
995    /// Resume a paused [Consumer].
996    ///
997    /// # Examples
998    ///
999    /// ```no_run
1000    /// # #[tokio::main]
1001    /// # async fn main() -> Result<(), async_nats::Error> {
1002    /// use async_nats::jetstream::consumer;
1003    /// use futures_util::StreamExt;
1004    /// let client = async_nats::connect("localhost:4222").await?;
1005    /// let jetstream = async_nats::jetstream::new(client);
1006    ///
1007    /// jetstream
1008    ///     .get_stream("events")
1009    ///     .await?
1010    ///     .resume_consumer("my_consumer")
1011    ///     .await?;
1012    /// # Ok(())
1013    /// # }
1014    /// ```
1015    #[cfg(feature = "server_2_11")]
1016    pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1017        self.request_pause_consumer(name, None).await
1018    }
1019
1020    #[cfg(feature = "server_2_11")]
1021    async fn request_pause_consumer(
1022        &self,
1023        name: &str,
1024        pause_until: Option<OffsetDateTime>,
1025    ) -> Result<PauseResponse, ConsumerError> {
1026        let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1027        let payload = &PauseResumeConsumerRequest { pause_until };
1028
1029        match self.context.request(subject, payload).await? {
1030            Response::Ok::<PauseResponse>(resp) => Ok(resp),
1031            Response::Err { error } => Err(error.into()),
1032        }
1033    }
1034
1035    /// Lists names of all consumers for current stream.
1036    ///
1037    /// # Examples
1038    ///
1039    /// ```no_run
1040    /// # #[tokio::main]
1041    /// # async fn main() -> Result<(), async_nats::Error> {
1042    /// use futures_util::TryStreamExt;
1043    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1044    /// let jetstream = async_nats::jetstream::new(client);
1045    /// let stream = jetstream.get_stream("stream").await?;
1046    /// let mut names = stream.consumer_names();
1047    /// while let Some(consumer) = names.try_next().await? {
1048    ///     println!("consumer: {stream:?}");
1049    /// }
1050    /// # Ok(())
1051    /// # }
1052    /// ```
1053    pub fn consumer_names(&self) -> ConsumerNames {
1054        ConsumerNames {
1055            context: self.context.clone(),
1056            stream: self.name.clone(),
1057            offset: 0,
1058            page_request: None,
1059            consumers: Vec::new(),
1060            done: false,
1061        }
1062    }
1063
1064    /// Lists all consumers info for current stream.
1065    ///
1066    /// # Examples
1067    ///
1068    /// ```no_run
1069    /// # #[tokio::main]
1070    /// # async fn main() -> Result<(), async_nats::Error> {
1071    /// use futures_util::TryStreamExt;
1072    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1073    /// let jetstream = async_nats::jetstream::new(client);
1074    /// let stream = jetstream.get_stream("stream").await?;
1075    /// let mut consumers = stream.consumers();
1076    /// while let Some(consumer) = consumers.try_next().await? {
1077    ///     println!("consumer: {consumer:?}");
1078    /// }
1079    /// # Ok(())
1080    /// # }
1081    /// ```
1082    pub fn consumers(&self) -> Consumers {
1083        Consumers {
1084            context: self.context.clone(),
1085            stream: self.name.clone(),
1086            offset: 0,
1087            page_request: None,
1088            consumers: Vec::new(),
1089            done: false,
1090        }
1091    }
1092}
1093
1094pub struct StreamInfoBuilder {
1095    pub(crate) context: Context,
1096    pub(crate) name: String,
1097    pub(crate) deleted: bool,
1098    pub(crate) subject: String,
1099}
1100
1101impl StreamInfoBuilder {
1102    fn new(context: Context, name: String) -> Self {
1103        Self {
1104            context,
1105            name,
1106            deleted: false,
1107            subject: "".to_string(),
1108        }
1109    }
1110
1111    pub fn with_deleted(mut self, deleted: bool) -> Self {
1112        self.deleted = deleted;
1113        self
1114    }
1115
1116    pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1117        self.subject = subject.into();
1118        self
1119    }
1120
1121    pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1122        let info = stream_info_with_details(
1123            self.context.clone(),
1124            self.name.clone(),
1125            0,
1126            self.deleted,
1127            self.subject.clone(),
1128        )
1129        .await?;
1130
1131        Ok(InfoWithSubjects::new(self.context, info, self.subject))
1132    }
1133}
1134
1135/// `StreamConfig` determines the properties for a stream.
1136/// There are sensible defaults for most. If no subjects are
1137/// given the name will be used as the only subject.
1138#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1139pub struct Config {
1140    /// A name for the Stream. Must not have spaces, tabs or period `.` characters
1141    pub name: String,
1142    /// How large the Stream may become in total bytes before the configured discard policy kicks in
1143    pub max_bytes: i64,
1144    /// How large the Stream may become in total messages before the configured discard policy kicks in
1145    #[serde(rename = "max_msgs")]
1146    pub max_messages: i64,
1147    /// Maximum amount of messages to keep per subject
1148    #[serde(rename = "max_msgs_per_subject")]
1149    pub max_messages_per_subject: i64,
1150    /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in.
1151    /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space
1152    pub discard: DiscardPolicy,
1153    /// Prevents a message from being added to a stream if the max_msgs_per_subject limit for the subject has been reached
1154    #[serde(default, skip_serializing_if = "is_default")]
1155    pub discard_new_per_subject: bool,
1156    /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
1157    /// configured stream `name`.
1158    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1159    pub subjects: Vec<String>,
1160    /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
1161    pub retention: RetentionPolicy,
1162    /// How many Consumers can be defined for a given Stream, -1 for unlimited
1163    pub max_consumers: i32,
1164    /// Maximum age of any message in the stream, expressed in nanoseconds
1165    #[serde(with = "serde_nanos")]
1166    pub max_age: Duration,
1167    /// The largest message that will be accepted by the Stream
1168    #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1169    pub max_message_size: i32,
1170    /// The type of storage backend, `File` (default) and `Memory`
1171    pub storage: StorageType,
1172    /// How many replicas to keep for each message in a clustered JetStream, maximum 5
1173    pub num_replicas: usize,
1174    /// Disables acknowledging messages that are received by the Stream
1175    #[serde(default, skip_serializing_if = "is_default")]
1176    pub no_ack: bool,
1177    /// The window within which to track duplicate messages.
1178    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1179    pub duplicate_window: Duration,
1180    /// The owner of the template associated with this stream.
1181    #[serde(default, skip_serializing_if = "is_default")]
1182    pub template_owner: String,
1183    /// Indicates the stream is sealed and cannot be modified in any way
1184    #[serde(default, skip_serializing_if = "is_default")]
1185    pub sealed: bool,
1186    /// A short description of the purpose of this stream.
1187    #[serde(default, skip_serializing_if = "is_default")]
1188    pub description: Option<String>,
1189    #[serde(
1190        default,
1191        rename = "allow_rollup_hdrs",
1192        skip_serializing_if = "is_default"
1193    )]
1194    /// Indicates if rollups will be allowed or not.
1195    pub allow_rollup: bool,
1196    #[serde(default, skip_serializing_if = "is_default")]
1197    /// Indicates deletes will be denied or not.
1198    pub deny_delete: bool,
1199    /// Indicates if purges will be denied or not.
1200    #[serde(default, skip_serializing_if = "is_default")]
1201    pub deny_purge: bool,
1202
1203    /// Optional republish config.
1204    #[serde(default, skip_serializing_if = "is_default")]
1205    pub republish: Option<Republish>,
1206
1207    /// Enables direct get, which would get messages from
1208    /// non-leader.
1209    #[serde(default, skip_serializing_if = "is_default")]
1210    pub allow_direct: bool,
1211
1212    /// Enable direct access also for mirrors.
1213    #[serde(default, skip_serializing_if = "is_default")]
1214    pub mirror_direct: bool,
1215
1216    /// Stream mirror configuration.
1217    #[serde(default, skip_serializing_if = "Option::is_none")]
1218    pub mirror: Option<Source>,
1219
1220    /// Sources configuration.
1221    #[serde(default, skip_serializing_if = "Option::is_none")]
1222    pub sources: Option<Vec<Source>>,
1223
1224    #[cfg(feature = "server_2_10")]
1225    /// Additional stream metadata.
1226    #[serde(default, skip_serializing_if = "is_default")]
1227    pub metadata: HashMap<String, String>,
1228
1229    #[cfg(feature = "server_2_10")]
1230    /// Allow applying a subject transform to incoming messages
1231    #[serde(default, skip_serializing_if = "Option::is_none")]
1232    pub subject_transform: Option<SubjectTransform>,
1233
1234    #[cfg(feature = "server_2_10")]
1235    /// Override compression config for this stream.
1236    /// Wrapping enum that has `None` type with [Option] is there
1237    /// because [Stream] can override global compression set to [Compression::S2]
1238    /// to [Compression::None], which is different from not overriding global config with anything.
1239    #[serde(default, skip_serializing_if = "Option::is_none")]
1240    pub compression: Option<Compression>,
1241    #[cfg(feature = "server_2_10")]
1242    /// Set limits on consumers that are created on this stream.
1243    #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1244    pub consumer_limits: Option<ConsumerLimits>,
1245
1246    #[cfg(feature = "server_2_10")]
1247    /// Sets the first sequence for the stream.
1248    #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1249    pub first_sequence: Option<u64>,
1250
1251    /// Placement configuration for clusters and tags.
1252    #[serde(default, skip_serializing_if = "Option::is_none")]
1253    pub placement: Option<Placement>,
1254
1255    /// Persistence mode for the stream.
1256    #[serde(default, skip_serializing_if = "Option::is_none")]
1257    pub persist_mode: Option<PersistenceMode>,
1258
1259    /// For suspending the consumer until the deadline.
1260    #[cfg(feature = "server_2_11")]
1261    #[serde(
1262        default,
1263        with = "rfc3339::option",
1264        skip_serializing_if = "Option::is_none"
1265    )]
1266    pub pause_until: Option<OffsetDateTime>,
1267
1268    /// Allows setting a TTL for a message in the stream.
1269    #[cfg(feature = "server_2_11")]
1270    #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1271    pub allow_message_ttl: bool,
1272
1273    /// Enables delete markers for messages deleted from the stream and sets the TTL
1274    /// for how long the marker should be kept.
1275    #[cfg(feature = "server_2_11")]
1276    #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1277    pub subject_delete_marker_ttl: Option<Duration>,
1278
1279    /// Allows atomic publish operations.
1280    #[cfg(feature = "server_2_12")]
1281    #[serde(default, skip_serializing_if = "is_default", rename = "allow_atomic")]
1282    pub allow_atomic_publish: bool,
1283
1284    /// Enables the scheduling of messages
1285    #[cfg(feature = "server_2_12")]
1286    #[serde(
1287        default,
1288        skip_serializing_if = "is_default",
1289        rename = "allow_msg_schedules"
1290    )]
1291    pub allow_message_schedules: bool,
1292
1293    /// Enables counters for the stream
1294    #[cfg(feature = "server_2_12")]
1295    #[serde(
1296        default,
1297        skip_serializing_if = "is_default",
1298        rename = "allow_msg_counter"
1299    )]
1300    pub allow_message_counter: bool,
1301}
1302
1303impl From<&Config> for Config {
1304    fn from(sc: &Config) -> Config {
1305        sc.clone()
1306    }
1307}
1308
1309impl From<&str> for Config {
1310    fn from(s: &str) -> Config {
1311        Config {
1312            name: s.to_string(),
1313            ..Default::default()
1314        }
1315    }
1316}
1317
1318#[cfg(feature = "server_2_10")]
1319fn default_consumer_limits_as_none<'de, D>(
1320    deserializer: D,
1321) -> Result<Option<ConsumerLimits>, D::Error>
1322where
1323    D: Deserializer<'de>,
1324{
1325    let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1326    if let Some(cl) = consumer_limits {
1327        if cl == ConsumerLimits::default() {
1328            Ok(None)
1329        } else {
1330            Ok(Some(cl))
1331        }
1332    } else {
1333        Ok(None)
1334    }
1335}
1336#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1337pub struct ConsumerLimits {
1338    /// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
1339    #[serde(default, with = "serde_nanos")]
1340    pub inactive_threshold: std::time::Duration,
1341    /// Sets the maximum [crate::jetstream::consumer::Config::max_ack_pending] that can be set on the consumer.
1342    #[serde(default)]
1343    pub max_ack_pending: i64,
1344}
1345
1346#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1347pub enum Compression {
1348    #[serde(rename = "s2")]
1349    S2,
1350    #[serde(rename = "none")]
1351    None,
1352}
1353
1354// SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
1355#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1356pub struct SubjectTransform {
1357    #[serde(rename = "src")]
1358    pub source: String,
1359
1360    #[serde(rename = "dest")]
1361    pub destination: String,
1362}
1363
1364// Republish is for republishing messages once committed to a stream.
1365#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1366pub struct Republish {
1367    /// Subject that should be republished.
1368    #[serde(rename = "src")]
1369    pub source: String,
1370    /// Subject where messages will be republished.
1371    #[serde(rename = "dest")]
1372    pub destination: String,
1373    /// If true, only headers should be republished.
1374    #[serde(default)]
1375    pub headers_only: bool,
1376}
1377
1378/// Placement describes on which cluster or tags the stream should be placed.
1379#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1380pub struct Placement {
1381    // Cluster where the stream should be placed.
1382    #[serde(default, skip_serializing_if = "is_default")]
1383    pub cluster: Option<String>,
1384    // Matching tags for stream placement.
1385    #[serde(default, skip_serializing_if = "is_default")]
1386    pub tags: Vec<String>,
1387}
1388
1389/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
1390/// remove older messages. `New` will fail to store the new message.
1391#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1392#[repr(u8)]
1393pub enum DiscardPolicy {
1394    /// will remove older messages when limits are hit.
1395    #[default]
1396    #[serde(rename = "old")]
1397    Old = 0,
1398    /// will error on a StoreMsg call when limits are hit
1399    #[serde(rename = "new")]
1400    New = 1,
1401}
1402
1403/// `RetentionPolicy` determines how messages in a set are retained.
1404#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1405#[repr(u8)]
1406pub enum RetentionPolicy {
1407    /// `Limits` (default) means that messages are retained until any given limit is reached.
1408    /// This could be one of messages, bytes, or age.
1409    #[default]
1410    #[serde(rename = "limits")]
1411    Limits = 0,
1412    /// `Interest` specifies that when all known observables have acknowledged a message it can be removed.
1413    #[serde(rename = "interest")]
1414    Interest = 1,
1415    /// `WorkQueue` specifies that when the first worker or subscriber acknowledges the message it can be removed.
1416    #[serde(rename = "workqueue")]
1417    WorkQueue = 2,
1418}
1419
1420/// determines how messages are stored for retention.
1421#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1422#[repr(u8)]
1423pub enum StorageType {
1424    /// Stream data is kept in files. This is the default.
1425    #[default]
1426    #[serde(rename = "file")]
1427    File = 0,
1428    /// Stream data is kept only in memory.
1429    #[serde(rename = "memory")]
1430    Memory = 1,
1431}
1432
1433/// Determines the persistence mode for stream data.
1434#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1435#[repr(u8)]
1436pub enum PersistenceMode {
1437    /// Writes are immediately flushed, acknowledgement sent after message is stored.
1438    #[default]
1439    #[serde(rename = "default")]
1440    Default = 0,
1441    /// Writes are flushed asynchronously, acknowledgement may be sent before message is stored.
1442    #[serde(rename = "async")]
1443    Async = 1,
1444}
1445
1446async fn stream_info_with_details(
1447    context: Context,
1448    stream: String,
1449    offset: usize,
1450    deleted_details: bool,
1451    subjects_filter: String,
1452) -> Result<Info, InfoError> {
1453    let subject = format!("STREAM.INFO.{stream}");
1454
1455    let payload = StreamInfoRequest {
1456        offset,
1457        deleted_details,
1458        subjects_filter,
1459    };
1460
1461    let response: Response<Info> = context.request(subject, &payload).await?;
1462
1463    match response {
1464        Response::Ok(info) => Ok(info),
1465        Response::Err { error } => Err(error.into()),
1466    }
1467}
1468
1469type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1470
1471#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1472pub struct StreamInfoRequest {
1473    offset: usize,
1474    deleted_details: bool,
1475    subjects_filter: String,
1476}
1477
1478pub struct InfoWithSubjects {
1479    stream: String,
1480    context: Context,
1481    pub info: Info,
1482    offset: usize,
1483    subjects: collections::hash_map::IntoIter<String, usize>,
1484    info_request: Option<InfoRequest>,
1485    subjects_filter: String,
1486    pages_done: bool,
1487}
1488
1489impl InfoWithSubjects {
1490    pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1491        let subjects = info.state.subjects.take().unwrap_or_default();
1492        let name = info.config.name.clone();
1493        InfoWithSubjects {
1494            context,
1495            info,
1496            pages_done: subjects.is_empty(),
1497            offset: subjects.len(),
1498            subjects: subjects.into_iter(),
1499            subjects_filter: subject,
1500            stream: name,
1501            info_request: None,
1502        }
1503    }
1504}
1505
1506impl futures_util::Stream for InfoWithSubjects {
1507    type Item = Result<(String, usize), InfoError>;
1508
1509    fn poll_next(
1510        mut self: Pin<&mut Self>,
1511        cx: &mut std::task::Context<'_>,
1512    ) -> Poll<Option<Self::Item>> {
1513        match self.subjects.next() {
1514            Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1515            None => {
1516                // If we have already requested all pages, stop the iterator.
1517                if self.pages_done {
1518                    return Poll::Ready(None);
1519                }
1520                let stream = self.stream.clone();
1521                let context = self.context.clone();
1522                let subjects_filter = self.subjects_filter.clone();
1523                let offset = self.offset;
1524                match self
1525                    .info_request
1526                    .get_or_insert_with(|| {
1527                        Box::pin(stream_info_with_details(
1528                            context,
1529                            stream,
1530                            offset,
1531                            false,
1532                            subjects_filter,
1533                        ))
1534                    })
1535                    .poll_unpin(cx)
1536                {
1537                    Poll::Ready(resp) => match resp {
1538                        Ok(info) => {
1539                            let subjects = info.state.subjects.clone();
1540                            self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1541                            self.info_request = None;
1542                            let subjects = subjects.unwrap_or_default();
1543                            self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1544                            let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1545                            if total <= self.offset || subjects.is_empty() {
1546                                self.pages_done = true;
1547                            }
1548                            match self.subjects.next() {
1549                                Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1550                                None => Poll::Ready(None),
1551                            }
1552                        }
1553                        Err(err) => Poll::Ready(Some(Err(err))),
1554                    },
1555                    Poll::Pending => Poll::Pending,
1556                }
1557            }
1558        }
1559    }
1560}
1561
1562/// Shows config and current state for this stream.
1563#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1564pub struct Info {
1565    /// The configuration associated with this stream.
1566    pub config: Config,
1567    /// The time that this stream was created.
1568    #[serde(with = "rfc3339")]
1569    pub created: time::OffsetDateTime,
1570    /// Various metrics associated with this stream.
1571    pub state: State,
1572    /// Information about leader and replicas.
1573    pub cluster: Option<ClusterInfo>,
1574    /// Information about mirror config if present.
1575    #[serde(default)]
1576    pub mirror: Option<SourceInfo>,
1577    /// Information about sources configs if present.
1578    #[serde(default)]
1579    pub sources: Vec<SourceInfo>,
1580    #[serde(flatten)]
1581    paged_info: Option<PagedInfo>,
1582}
1583
1584#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1585pub struct PagedInfo {
1586    offset: usize,
1587    total: usize,
1588    limit: usize,
1589}
1590
1591#[derive(Deserialize)]
1592pub struct DeleteStatus {
1593    pub success: bool,
1594}
1595
1596#[cfg(feature = "server_2_11")]
1597#[derive(Deserialize)]
1598pub struct PauseResponse {
1599    pub paused: bool,
1600    #[serde(with = "rfc3339")]
1601    pub pause_until: OffsetDateTime,
1602    #[serde(default, with = "serde_nanos")]
1603    pub pause_remaining: Option<Duration>,
1604}
1605
1606#[cfg(feature = "server_2_11")]
1607#[derive(Serialize, Debug)]
1608struct PauseResumeConsumerRequest {
1609    #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1610    pause_until: Option<OffsetDateTime>,
1611}
1612
1613/// information about the given stream.
1614#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1615pub struct State {
1616    /// The number of messages contained in this stream
1617    pub messages: u64,
1618    /// The number of bytes of all messages contained in this stream
1619    pub bytes: u64,
1620    /// The lowest sequence number still present in this stream
1621    #[serde(rename = "first_seq")]
1622    pub first_sequence: u64,
1623    /// The time associated with the oldest message still present in this stream
1624    #[serde(with = "rfc3339", rename = "first_ts")]
1625    pub first_timestamp: time::OffsetDateTime,
1626    /// The last sequence number assigned to a message in this stream
1627    #[serde(rename = "last_seq")]
1628    pub last_sequence: u64,
1629    /// The time that the last message was received by this stream
1630    #[serde(with = "rfc3339", rename = "last_ts")]
1631    pub last_timestamp: time::OffsetDateTime,
1632    /// The number of consumers configured to consume this stream
1633    pub consumer_count: usize,
1634    /// The number of subjects in the stream
1635    #[serde(default, rename = "num_subjects")]
1636    pub subjects_count: u64,
1637    /// The number of deleted messages in the stream
1638    #[serde(default, rename = "num_deleted")]
1639    pub deleted_count: Option<u64>,
1640    /// The list of deleted subjects from the Stream.
1641    /// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
1642    #[serde(default)]
1643    pub deleted: Option<Vec<u64>>,
1644
1645    pub(crate) subjects: Option<HashMap<String, usize>>,
1646}
1647
1648/// A raw stream message in the representation it is stored.
1649#[derive(Debug, Serialize, Deserialize, Clone)]
1650pub struct RawMessage {
1651    /// Subject of the message.
1652    #[serde(rename = "subject")]
1653    pub subject: String,
1654
1655    /// Sequence of the message.
1656    #[serde(rename = "seq")]
1657    pub sequence: u64,
1658
1659    /// Raw payload of the message as a base64 encoded string.
1660    #[serde(default, rename = "data")]
1661    pub payload: String,
1662
1663    /// Raw header string, if any.
1664    #[serde(default, rename = "hdrs")]
1665    pub headers: Option<String>,
1666
1667    /// The time the message was published.
1668    #[serde(rename = "time", with = "rfc3339")]
1669    pub time: time::OffsetDateTime,
1670}
1671
1672impl TryFrom<RawMessage> for StreamMessage {
1673    type Error = crate::Error;
1674
1675    fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1676        let decoded_payload = STANDARD
1677            .decode(value.payload)
1678            .map_err(|err| Box::new(std::io::Error::other(err)))?;
1679        let decoded_headers = value
1680            .headers
1681            .map(|header| STANDARD.decode(header))
1682            .map_or(Ok(None), |v| v.map(Some))?;
1683
1684        let (headers, _, _) = decoded_headers
1685            .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1686
1687        Ok(StreamMessage {
1688            subject: value.subject.into(),
1689            payload: decoded_payload.into(),
1690            headers,
1691            sequence: value.sequence,
1692            time: value.time,
1693        })
1694    }
1695}
1696
1697fn is_continuation(c: char) -> bool {
1698    c == ' ' || c == '\t'
1699}
1700const HEADER_LINE: &str = "NATS/1.0";
1701
1702#[allow(clippy::type_complexity)]
1703fn parse_headers(
1704    buf: &[u8],
1705) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1706    let mut headers = HeaderMap::new();
1707    let mut maybe_status: Option<StatusCode> = None;
1708    let mut maybe_description: Option<String> = None;
1709    let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1710        line.lines().peekable()
1711    } else {
1712        return Err(Box::new(std::io::Error::other("invalid header")));
1713    };
1714
1715    if let Some(line) = lines.next() {
1716        let line = line
1717            .strip_prefix(HEADER_LINE)
1718            .ok_or_else(|| {
1719                Box::new(std::io::Error::other(
1720                    "version line does not start with NATS/1.0",
1721                ))
1722            })?
1723            .trim();
1724
1725        match line.split_once(' ') {
1726            Some((status, description)) => {
1727                if !status.is_empty() {
1728                    maybe_status = Some(status.parse()?);
1729                }
1730
1731                if !description.is_empty() {
1732                    maybe_description = Some(description.trim().to_string());
1733                }
1734            }
1735            None => {
1736                if !line.is_empty() {
1737                    maybe_status = Some(line.parse()?);
1738                }
1739            }
1740        }
1741    } else {
1742        return Err(Box::new(std::io::Error::other(
1743            "expected header information not found",
1744        )));
1745    };
1746
1747    while let Some(line) = lines.next() {
1748        if line.is_empty() {
1749            continue;
1750        }
1751
1752        if let Some((k, v)) = line.split_once(':').to_owned() {
1753            let mut s = String::from(v.trim());
1754            while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1755                s.push(' ');
1756                s.push_str(v.trim());
1757            }
1758
1759            headers.insert(
1760                HeaderName::from_str(k)?,
1761                HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1762            );
1763        } else {
1764            return Err(Box::new(std::io::Error::other("malformed header line")));
1765        }
1766    }
1767
1768    if headers.is_empty() {
1769        Ok((HeaderMap::new(), maybe_status, maybe_description))
1770    } else {
1771        Ok((headers, maybe_status, maybe_description))
1772    }
1773}
1774
1775#[derive(Debug, Serialize, Deserialize, Clone)]
1776struct GetRawMessage {
1777    pub(crate) message: RawMessage,
1778}
1779
1780fn is_default<T: Default + Eq>(t: &T) -> bool {
1781    t == &T::default()
1782}
1783/// Information about the stream's, consumer's associated `JetStream` cluster
1784#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1785pub struct ClusterInfo {
1786    /// The cluster name.
1787    #[serde(default)]
1788    pub name: Option<String>,
1789    /// The RAFT group name.
1790    #[serde(default)]
1791    pub raft_group: Option<String>,
1792    /// The server name of the RAFT leader.
1793    #[serde(default)]
1794    pub leader: Option<String>,
1795    /// The time since this server has been the leader.
1796    #[serde(default, with = "rfc3339::option")]
1797    pub leader_since: Option<OffsetDateTime>,
1798    /// Indicates if this account is a system account.
1799    #[cfg(feature = "server_2_12")]
1800    #[serde(default)]
1801    /// Indicates if `traffic_account` is set a system account.
1802    pub system_account: bool,
1803    #[cfg(feature = "server_2_12")]
1804    /// Name of the traffic (replication) account.
1805    #[serde(default)]
1806    pub traffic_account: Option<String>,
1807    /// The members of the RAFT cluster.
1808    #[serde(default)]
1809    pub replicas: Vec<PeerInfo>,
1810}
1811
1812/// The members of the RAFT cluster
1813#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1814pub struct PeerInfo {
1815    /// The server name of the peer.
1816    pub name: String,
1817    /// Indicates if the server is up to date and synchronized.
1818    pub current: bool,
1819    /// Nanoseconds since this peer was last seen.
1820    #[serde(with = "serde_nanos")]
1821    pub active: Duration,
1822    /// Indicates the node is considered offline by the group.
1823    #[serde(default)]
1824    pub offline: bool,
1825    /// How many uncommitted operations this peer is behind the leader.
1826    pub lag: Option<u64>,
1827}
1828
1829#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1830pub struct SourceInfo {
1831    /// Source name.
1832    pub name: String,
1833    /// Number of messages this source is lagging behind.
1834    pub lag: u64,
1835    /// Last time the source was seen active.
1836    #[serde(deserialize_with = "negative_duration_as_none")]
1837    pub active: Option<std::time::Duration>,
1838    /// Filtering for the source.
1839    #[serde(default)]
1840    pub filter_subject: Option<String>,
1841    /// Source destination subject.
1842    #[serde(default)]
1843    pub subject_transform_dest: Option<String>,
1844    /// List of transforms.
1845    #[serde(default)]
1846    pub subject_transforms: Vec<SubjectTransform>,
1847}
1848
1849fn negative_duration_as_none<'de, D>(
1850    deserializer: D,
1851) -> Result<Option<std::time::Duration>, D::Error>
1852where
1853    D: Deserializer<'de>,
1854{
1855    let n = i64::deserialize(deserializer)?;
1856    if n.is_negative() {
1857        Ok(None)
1858    } else {
1859        Ok(Some(std::time::Duration::from_nanos(n as u64)))
1860    }
1861}
1862
1863/// The response generated by trying to purge a stream.
1864#[derive(Debug, Deserialize, Clone, Copy)]
1865pub struct PurgeResponse {
1866    /// Whether the purge request was successful.
1867    pub success: bool,
1868    /// The number of purged messages in a stream.
1869    pub purged: u64,
1870}
1871/// The payload used to generate a purge request.
1872#[derive(Default, Debug, Serialize, Clone)]
1873pub struct PurgeRequest {
1874    /// Purge up to but not including sequence.
1875    #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1876    pub sequence: Option<u64>,
1877
1878    /// Subject to match against messages for the purge command.
1879    #[serde(default, skip_serializing_if = "is_default")]
1880    pub filter: Option<String>,
1881
1882    /// Number of messages to keep.
1883    #[serde(default, skip_serializing_if = "is_default")]
1884    pub keep: Option<u64>,
1885}
1886
1887#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1888pub struct Source {
1889    /// Name of the stream source.
1890    pub name: String,
1891    /// Optional source start sequence.
1892    #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1893    pub start_sequence: Option<u64>,
1894    #[serde(
1895        default,
1896        rename = "opt_start_time",
1897        skip_serializing_if = "is_default",
1898        with = "rfc3339::option"
1899    )]
1900    /// Optional source start time.
1901    pub start_time: Option<OffsetDateTime>,
1902    /// Optional additional filter subject.
1903    #[serde(default, skip_serializing_if = "is_default")]
1904    pub filter_subject: Option<String>,
1905    /// Optional config for sourcing streams from another prefix, used for cross-account.
1906    #[serde(default, skip_serializing_if = "Option::is_none")]
1907    pub external: Option<External>,
1908    /// Optional config to set a domain, if source is residing in different one.
1909    #[serde(default, skip_serializing_if = "is_default")]
1910    pub domain: Option<String>,
1911    /// Subject transforms for Stream.
1912    #[cfg(feature = "server_2_10")]
1913    #[serde(default, skip_serializing_if = "is_default")]
1914    pub subject_transforms: Vec<SubjectTransform>,
1915}
1916
1917#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1918pub struct External {
1919    /// Api prefix of external source.
1920    #[serde(rename = "api")]
1921    pub api_prefix: String,
1922    /// Optional configuration of delivery prefix.
1923    #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1924    pub delivery_prefix: Option<String>,
1925}
1926
1927use std::marker::PhantomData;
1928
1929#[derive(Debug, Default)]
1930pub struct Yes;
1931#[derive(Debug, Default)]
1932pub struct No;
1933
1934pub trait ToAssign: Debug {}
1935
1936impl ToAssign for Yes {}
1937impl ToAssign for No {}
1938
1939#[derive(Debug)]
1940pub struct Purge<SEQUENCE, KEEP>
1941where
1942    SEQUENCE: ToAssign,
1943    KEEP: ToAssign,
1944{
1945    inner: PurgeRequest,
1946    sequence_set: PhantomData<SEQUENCE>,
1947    keep_set: PhantomData<KEEP>,
1948    context: Context,
1949    stream_name: String,
1950}
1951
1952impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
1953where
1954    SEQUENCE: ToAssign,
1955    KEEP: ToAssign,
1956{
1957    /// Adds subject filter to [PurgeRequest]
1958    pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
1959        self.inner.filter = Some(filter.into());
1960        self
1961    }
1962}
1963
1964impl Purge<No, No> {
1965    pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
1966        Purge {
1967            context: stream.context.clone(),
1968            stream_name: stream.name.clone(),
1969            inner: Default::default(),
1970            sequence_set: PhantomData {},
1971            keep_set: PhantomData {},
1972        }
1973    }
1974}
1975
1976impl<KEEP> Purge<No, KEEP>
1977where
1978    KEEP: ToAssign,
1979{
1980    /// Creates a new [PurgeRequest].
1981    /// `keep` and `sequence` are exclusive, enforced compile time by generics.
1982    pub fn keep(self, keep: u64) -> Purge<No, Yes> {
1983        Purge {
1984            context: self.context.clone(),
1985            stream_name: self.stream_name.clone(),
1986            sequence_set: PhantomData {},
1987            keep_set: PhantomData {},
1988            inner: PurgeRequest {
1989                keep: Some(keep),
1990                ..self.inner
1991            },
1992        }
1993    }
1994}
1995impl<SEQUENCE> Purge<SEQUENCE, No>
1996where
1997    SEQUENCE: ToAssign,
1998{
1999    /// Creates a new [PurgeRequest].
2000    /// `keep` and `sequence` are exclusive, enforces compile time by generics.
2001    pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2002        Purge {
2003            context: self.context.clone(),
2004            stream_name: self.stream_name.clone(),
2005            sequence_set: PhantomData {},
2006            keep_set: PhantomData {},
2007            inner: PurgeRequest {
2008                sequence: Some(sequence),
2009                ..self.inner
2010            },
2011        }
2012    }
2013}
2014
2015#[derive(Clone, Debug, PartialEq)]
2016pub enum PurgeErrorKind {
2017    Request,
2018    TimedOut,
2019    JetStream(super::errors::Error),
2020}
2021
2022impl Display for PurgeErrorKind {
2023    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2024        match self {
2025            Self::Request => write!(f, "request failed"),
2026            Self::TimedOut => write!(f, "timed out"),
2027            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2028        }
2029    }
2030}
2031
2032pub type PurgeError = Error<PurgeErrorKind>;
2033
2034impl<S, K> IntoFuture for Purge<S, K>
2035where
2036    S: ToAssign + std::marker::Send,
2037    K: ToAssign + std::marker::Send,
2038{
2039    type Output = Result<PurgeResponse, PurgeError>;
2040
2041    type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2042
2043    fn into_future(self) -> Self::IntoFuture {
2044        Box::pin(std::future::IntoFuture::into_future(async move {
2045            let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2046            let response: Response<PurgeResponse> = self
2047                .context
2048                .request(request_subject, &self.inner)
2049                .map_err(|err| match err.kind() {
2050                    RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2051                    _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2052                })
2053                .await?;
2054
2055            match response {
2056                Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2057                Response::Ok(response) => Ok(response),
2058            }
2059        }))
2060    }
2061}
2062
2063#[derive(Deserialize, Debug)]
2064struct ConsumerPage {
2065    total: usize,
2066    consumers: Option<Vec<String>>,
2067}
2068
2069#[derive(Deserialize, Debug)]
2070struct ConsumerInfoPage {
2071    total: usize,
2072    consumers: Option<Vec<super::consumer::Info>>,
2073}
2074
2075type ConsumerNamesErrorKind = StreamsErrorKind;
2076type ConsumerNamesError = StreamsError;
2077type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2078
2079pub struct ConsumerNames {
2080    context: Context,
2081    stream: String,
2082    offset: usize,
2083    page_request: Option<PageRequest>,
2084    consumers: Vec<String>,
2085    done: bool,
2086}
2087
2088impl futures_util::Stream for ConsumerNames {
2089    type Item = Result<String, ConsumerNamesError>;
2090
2091    fn poll_next(
2092        mut self: Pin<&mut Self>,
2093        cx: &mut std::task::Context<'_>,
2094    ) -> std::task::Poll<Option<Self::Item>> {
2095        match self.page_request.as_mut() {
2096            Some(page) => match page.try_poll_unpin(cx) {
2097                std::task::Poll::Ready(page) => {
2098                    self.page_request = None;
2099                    let page = page.map_err(|err| {
2100                        ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2101                    })?;
2102
2103                    if let Some(consumers) = page.consumers {
2104                        self.offset += consumers.len();
2105                        self.consumers = consumers;
2106                        if self.offset >= page.total {
2107                            self.done = true;
2108                        }
2109                        match self.consumers.pop() {
2110                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2111                            None => Poll::Ready(None),
2112                        }
2113                    } else {
2114                        Poll::Ready(None)
2115                    }
2116                }
2117                std::task::Poll::Pending => std::task::Poll::Pending,
2118            },
2119            None => {
2120                if let Some(stream) = self.consumers.pop() {
2121                    Poll::Ready(Some(Ok(stream)))
2122                } else {
2123                    if self.done {
2124                        return Poll::Ready(None);
2125                    }
2126                    let context = self.context.clone();
2127                    let offset = self.offset;
2128                    let stream = self.stream.clone();
2129                    self.page_request = Some(Box::pin(async move {
2130                        match context
2131                            .request(
2132                                format!("CONSUMER.NAMES.{stream}"),
2133                                &json!({
2134                                    "offset": offset,
2135                                }),
2136                            )
2137                            .await?
2138                        {
2139                            Response::Err { error } => Err(RequestError::with_source(
2140                                super::context::RequestErrorKind::Other,
2141                                error,
2142                            )),
2143                            Response::Ok(page) => Ok(page),
2144                        }
2145                    }));
2146                    self.poll_next(cx)
2147                }
2148            }
2149        }
2150    }
2151}
2152
2153pub type ConsumersErrorKind = StreamsErrorKind;
2154pub type ConsumersError = StreamsError;
2155type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2156
2157pub struct Consumers {
2158    context: Context,
2159    stream: String,
2160    offset: usize,
2161    page_request: Option<PageInfoRequest>,
2162    consumers: Vec<super::consumer::Info>,
2163    done: bool,
2164}
2165
2166impl futures_util::Stream for Consumers {
2167    type Item = Result<super::consumer::Info, ConsumersError>;
2168
2169    fn poll_next(
2170        mut self: Pin<&mut Self>,
2171        cx: &mut std::task::Context<'_>,
2172    ) -> std::task::Poll<Option<Self::Item>> {
2173        match self.page_request.as_mut() {
2174            Some(page) => match page.try_poll_unpin(cx) {
2175                std::task::Poll::Ready(page) => {
2176                    self.page_request = None;
2177                    let page = page.map_err(|err| {
2178                        ConsumersError::with_source(ConsumersErrorKind::Other, err)
2179                    })?;
2180                    if let Some(consumers) = page.consumers {
2181                        self.offset += consumers.len();
2182                        self.consumers = consumers;
2183                        if self.offset >= page.total {
2184                            self.done = true;
2185                        }
2186                        match self.consumers.pop() {
2187                            Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2188                            None => Poll::Ready(None),
2189                        }
2190                    } else {
2191                        Poll::Ready(None)
2192                    }
2193                }
2194                std::task::Poll::Pending => std::task::Poll::Pending,
2195            },
2196            None => {
2197                if let Some(stream) = self.consumers.pop() {
2198                    Poll::Ready(Some(Ok(stream)))
2199                } else {
2200                    if self.done {
2201                        return Poll::Ready(None);
2202                    }
2203                    let context = self.context.clone();
2204                    let offset = self.offset;
2205                    let stream = self.stream.clone();
2206                    self.page_request = Some(Box::pin(async move {
2207                        match context
2208                            .request(
2209                                format!("CONSUMER.LIST.{stream}"),
2210                                &json!({
2211                                    "offset": offset,
2212                                }),
2213                            )
2214                            .await?
2215                        {
2216                            Response::Err { error } => Err(RequestError::with_source(
2217                                super::context::RequestErrorKind::Other,
2218                                error,
2219                            )),
2220                            Response::Ok(page) => Ok(page),
2221                        }
2222                    }));
2223                    self.poll_next(cx)
2224                }
2225            }
2226        }
2227    }
2228}
2229
2230#[derive(Clone, Debug, PartialEq)]
2231pub enum LastRawMessageErrorKind {
2232    NoMessageFound,
2233    InvalidSubject,
2234    JetStream(super::errors::Error),
2235    Other,
2236}
2237
2238impl Display for LastRawMessageErrorKind {
2239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2240        match self {
2241            Self::NoMessageFound => write!(f, "no message found"),
2242            Self::InvalidSubject => write!(f, "invalid subject"),
2243            Self::Other => write!(f, "failed to get last raw message"),
2244            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2245        }
2246    }
2247}
2248
2249pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2250pub type RawMessageErrorKind = LastRawMessageErrorKind;
2251pub type RawMessageError = LastRawMessageError;
2252
2253#[derive(Clone, Debug, PartialEq)]
2254pub enum ConsumerErrorKind {
2255    //TODO: get last should have timeout, which should be mapped here.
2256    TimedOut,
2257    Request,
2258    InvalidConsumerType,
2259    InvalidName,
2260    JetStream(super::errors::Error),
2261    Other,
2262}
2263
2264impl Display for ConsumerErrorKind {
2265    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2266        match self {
2267            Self::TimedOut => write!(f, "timed out"),
2268            Self::Request => write!(f, "request failed"),
2269            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2270            Self::Other => write!(f, "consumer error"),
2271            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2272            Self::InvalidName => write!(f, "invalid consumer name"),
2273        }
2274    }
2275}
2276
2277pub type ConsumerError = Error<ConsumerErrorKind>;
2278
2279#[derive(Clone, Debug, PartialEq)]
2280pub enum ConsumerCreateStrictErrorKind {
2281    //TODO: get last should have timeout, which should be mapped here.
2282    TimedOut,
2283    Request,
2284    InvalidConsumerType,
2285    InvalidName,
2286    AlreadyExists,
2287    JetStream(super::errors::Error),
2288    Other,
2289}
2290
2291impl Display for ConsumerCreateStrictErrorKind {
2292    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2293        match self {
2294            Self::TimedOut => write!(f, "timed out"),
2295            Self::Request => write!(f, "request failed"),
2296            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2297            Self::Other => write!(f, "consumer error"),
2298            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2299            Self::InvalidName => write!(f, "invalid consumer name"),
2300            Self::AlreadyExists => write!(f, "consumer already exists"),
2301        }
2302    }
2303}
2304
2305pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2306
2307#[derive(Clone, Debug, PartialEq)]
2308pub enum ConsumerUpdateErrorKind {
2309    //TODO: get last should have timeout, which should be mapped here.
2310    TimedOut,
2311    Request,
2312    InvalidConsumerType,
2313    InvalidName,
2314    DoesNotExist,
2315    JetStream(super::errors::Error),
2316    Other,
2317}
2318
2319impl Display for ConsumerUpdateErrorKind {
2320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2321        match self {
2322            Self::TimedOut => write!(f, "timed out"),
2323            Self::Request => write!(f, "request failed"),
2324            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2325            Self::Other => write!(f, "consumer error"),
2326            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2327            Self::InvalidName => write!(f, "invalid consumer name"),
2328            Self::DoesNotExist => write!(f, "consumer does not exist"),
2329        }
2330    }
2331}
2332
2333pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2334
2335impl From<super::errors::Error> for ConsumerError {
2336    fn from(err: super::errors::Error) -> Self {
2337        ConsumerError::new(ConsumerErrorKind::JetStream(err))
2338    }
2339}
2340impl From<super::errors::Error> for ConsumerCreateStrictError {
2341    fn from(err: super::errors::Error) -> Self {
2342        if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2343            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2344        } else {
2345            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2346        }
2347    }
2348}
2349impl From<super::errors::Error> for ConsumerUpdateError {
2350    fn from(err: super::errors::Error) -> Self {
2351        if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2352            ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2353        } else {
2354            ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2355        }
2356    }
2357}
2358impl From<ConsumerError> for ConsumerUpdateError {
2359    fn from(err: ConsumerError) -> Self {
2360        match err.kind() {
2361            ConsumerErrorKind::JetStream(err) => {
2362                if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2363                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2364                } else {
2365                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2366                }
2367            }
2368            ConsumerErrorKind::Request => {
2369                ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2370            }
2371            ConsumerErrorKind::TimedOut => {
2372                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2373            }
2374            ConsumerErrorKind::InvalidConsumerType => {
2375                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2376            }
2377            ConsumerErrorKind::InvalidName => {
2378                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2379            }
2380            ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2381        }
2382    }
2383}
2384
2385impl From<ConsumerError> for ConsumerCreateStrictError {
2386    fn from(err: ConsumerError) -> Self {
2387        match err.kind() {
2388            ConsumerErrorKind::JetStream(err) => {
2389                if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2390                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2391                } else {
2392                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2393                }
2394            }
2395            ConsumerErrorKind::Request => {
2396                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2397            }
2398            ConsumerErrorKind::TimedOut => {
2399                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2400            }
2401            ConsumerErrorKind::InvalidConsumerType => {
2402                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2403            }
2404            ConsumerErrorKind::InvalidName => {
2405                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2406            }
2407            ConsumerErrorKind::Other => {
2408                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2409            }
2410        }
2411    }
2412}
2413
2414impl From<super::context::RequestError> for ConsumerError {
2415    fn from(err: super::context::RequestError) -> Self {
2416        match err.kind() {
2417            RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2418            _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2419        }
2420    }
2421}
2422impl From<super::context::RequestError> for ConsumerUpdateError {
2423    fn from(err: super::context::RequestError) -> Self {
2424        match err.kind() {
2425            RequestErrorKind::TimedOut => {
2426                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2427            }
2428            _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2429        }
2430    }
2431}
2432impl From<super::context::RequestError> for ConsumerCreateStrictError {
2433    fn from(err: super::context::RequestError) -> Self {
2434        match err.kind() {
2435            RequestErrorKind::TimedOut => {
2436                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2437            }
2438            _ => {
2439                ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2440            }
2441        }
2442    }
2443}
2444
2445#[derive(Debug, Serialize, Default)]
2446pub struct DirectGetRequest {
2447    #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2448    sequence: Option<u64>,
2449    #[serde(rename = "last_by_subj", skip_serializing)]
2450    last_by_subject: Option<String>,
2451    #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2452    next_by_subject: Option<String>,
2453}
2454
2455/// Marker type indicating that headers should be included in the response.
2456pub struct WithHeaders;
2457
2458/// Marker type indicating that headers should be excluded from the response.
2459pub struct WithoutHeaders;
2460
2461/// Trait for converting a Message response into the appropriate return type.
2462trait DirectGetResponse: Sized {
2463    fn from_message(message: crate::Message) -> Result<Self, DirectGetError>;
2464}
2465
2466impl DirectGetResponse for StreamMessage {
2467    fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2468        StreamMessage::try_from(message).map_err(Into::into)
2469    }
2470}
2471
2472impl DirectGetResponse for StreamValue {
2473    fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2474        Ok(StreamValue {
2475            data: message.payload,
2476        })
2477    }
2478}
2479
2480pub struct DirectGetBuilder<T = WithHeaders> {
2481    context: Context,
2482    stream_name: String,
2483    request: DirectGetRequest,
2484    _phantom: std::marker::PhantomData<T>,
2485}
2486
2487impl DirectGetBuilder<WithHeaders> {
2488    fn new(context: Context, stream_name: String) -> DirectGetBuilder<WithHeaders> {
2489        DirectGetBuilder {
2490            context,
2491            stream_name,
2492            request: DirectGetRequest::default(),
2493            _phantom: std::marker::PhantomData,
2494        }
2495    }
2496}
2497
2498impl<T> DirectGetBuilder<T> {
2499    /// Internal method to send the direct get request and convert to the appropriate type.
2500    async fn send_internal<R: DirectGetResponse>(&self) -> Result<R, DirectGetError> {
2501        // When last_by_subject is used, the subject is embedded in the URL and we should send an empty payload
2502        // to match the NATS protocol expectation (similar to nats.go implementation)
2503        let payload = if self.request.last_by_subject.is_some() {
2504            Bytes::new()
2505        } else {
2506            serde_json::to_vec(&self.request).map(Bytes::from)?
2507        };
2508
2509        let request_subject = if let Some(ref subject) = self.request.last_by_subject {
2510            format!(
2511                "{}.DIRECT.GET.{}.{}",
2512                &self.context.prefix, &self.stream_name, subject
2513            )
2514        } else {
2515            format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.stream_name)
2516        };
2517
2518        let response = self
2519            .context
2520            .client
2521            .request(request_subject, payload)
2522            .await?;
2523
2524        // Check for error status
2525        if let Some(status) = response.status {
2526            if let Some(ref description) = response.description {
2527                match status {
2528                    StatusCode::NOT_FOUND => {
2529                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
2530                    }
2531                    // 408 is used in Direct Message for bad/empty payload.
2532                    StatusCode::TIMEOUT => {
2533                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
2534                    }
2535                    _ => {
2536                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
2537                            status,
2538                            description.to_string(),
2539                        )));
2540                    }
2541                }
2542            }
2543        }
2544
2545        R::from_message(response)
2546    }
2547
2548    /// Sets the sequence for the direct get request.
2549    pub fn sequence(mut self, seq: u64) -> Self {
2550        self.request.sequence = Some(seq);
2551        self
2552    }
2553
2554    /// Sets the last_by_subject for the direct get request.
2555    pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2556        self.request.last_by_subject = Some(subject.into());
2557        self
2558    }
2559
2560    /// Sets the next_by_subject for the direct get request.
2561    pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2562        self.request.next_by_subject = Some(subject.into());
2563        self
2564    }
2565}
2566
2567impl DirectGetBuilder<WithHeaders> {
2568    /// Sends the get request and returns a StreamMessage with headers.
2569    pub async fn send(self) -> Result<StreamMessage, DirectGetError> {
2570        self.send_internal::<StreamMessage>().await
2571    }
2572}
2573
2574impl DirectGetBuilder<WithoutHeaders> {
2575    /// Sends the get request and returns only the payload bytes without headers.
2576    pub async fn send(self) -> Result<StreamValue, DirectGetError> {
2577        self.send_internal::<StreamValue>().await
2578    }
2579}
2580
2581pub struct StreamValue {
2582    pub data: Bytes,
2583}
2584
2585#[derive(Debug, Serialize, Default)]
2586pub struct RawMessageRequest {
2587    #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2588    sequence: Option<u64>,
2589    #[serde(rename = "last_by_subj", skip_serializing_if = "Option::is_none")]
2590    last_by_subject: Option<String>,
2591    #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2592    next_by_subject: Option<String>,
2593}
2594
2595/// Trait for converting a RawMessage response into the appropriate return type.
2596trait RawMessageResponse: Sized {
2597    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError>;
2598}
2599
2600impl RawMessageResponse for StreamMessage {
2601    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2602        StreamMessage::try_from(message)
2603            .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2604    }
2605}
2606
2607impl RawMessageResponse for StreamValue {
2608    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2609        use base64::engine::general_purpose::STANDARD;
2610        use base64::Engine;
2611
2612        let decoded_payload = STANDARD.decode(message.payload).map_err(|err| {
2613            RawMessageError::with_source(
2614                RawMessageErrorKind::Other,
2615                Box::new(std::io::Error::other(err)),
2616            )
2617        })?;
2618
2619        Ok(StreamValue {
2620            data: decoded_payload.into(),
2621        })
2622    }
2623}
2624
2625pub struct RawMessageBuilder<T = WithHeaders> {
2626    context: Context,
2627    stream_name: String,
2628    request: RawMessageRequest,
2629    _phantom: std::marker::PhantomData<T>,
2630}
2631
2632impl RawMessageBuilder<WithHeaders> {
2633    fn new(context: Context, stream_name: String) -> Self {
2634        RawMessageBuilder {
2635            context,
2636            stream_name,
2637            request: RawMessageRequest::default(),
2638            _phantom: std::marker::PhantomData,
2639        }
2640    }
2641}
2642
2643impl<T> RawMessageBuilder<T> {
2644    /// Internal method to send the raw message request and convert to the appropriate type.
2645    async fn send_internal<R: RawMessageResponse>(&self) -> Result<R, RawMessageError> {
2646        // Validate subjects
2647        for subject in [&self.request.last_by_subject, &self.request.next_by_subject]
2648            .into_iter()
2649            .flatten()
2650        {
2651            if !is_valid_subject(subject) {
2652                return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
2653            }
2654        }
2655
2656        let subject = format!("STREAM.MSG.GET.{}", &self.stream_name);
2657
2658        let response: Response<GetRawMessage> = self
2659            .context
2660            .request(subject, &self.request)
2661            .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2662            .await?;
2663
2664        match response {
2665            Response::Err { error } => {
2666                if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
2667                    Err(RawMessageError::new(RawMessageErrorKind::NoMessageFound))
2668                } else {
2669                    Err(RawMessageError::new(RawMessageErrorKind::JetStream(error)))
2670                }
2671            }
2672            Response::Ok(value) => R::from_raw_message(value.message),
2673        }
2674    }
2675
2676    /// Sets the sequence for the raw message request.
2677    pub fn sequence(mut self, seq: u64) -> Self {
2678        self.request.sequence = Some(seq);
2679        self
2680    }
2681
2682    /// Sets the last_by_subject for the raw message request.
2683    pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2684        self.request.last_by_subject = Some(subject.into());
2685        self
2686    }
2687
2688    /// Sets the next_by_subject for the raw message request.
2689    pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2690        self.request.next_by_subject = Some(subject.into());
2691        self
2692    }
2693}
2694
2695impl RawMessageBuilder<WithHeaders> {
2696    /// Sends the raw message request and returns a StreamMessage with headers.
2697    pub async fn send(self) -> Result<StreamMessage, RawMessageError> {
2698        self.send_internal::<StreamMessage>().await
2699    }
2700}
2701
2702impl RawMessageBuilder<WithoutHeaders> {
2703    /// Sends the raw message request and returns only the payload as StreamValue.
2704    pub async fn send(self) -> Result<StreamValue, RawMessageError> {
2705        self.send_internal::<StreamValue>().await
2706    }
2707}
2708
2709#[cfg(test)]
2710mod tests {
2711    use super::*;
2712
2713    #[test]
2714    fn consumer_limits_de() {
2715        let config = Config {
2716            ..Default::default()
2717        };
2718
2719        let roundtrip: Config = {
2720            let ser = serde_json::to_string(&config).unwrap();
2721            serde_json::from_str(&ser).unwrap()
2722        };
2723        assert_eq!(config, roundtrip);
2724    }
2725}