async_nats/jetstream/
stream.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on a [Stream], create/delete/update [Consumer].
15
16use std::{
17    collections::{self, HashMap},
18    fmt::{self, Debug, Display},
19    future::IntoFuture,
20    io::{self},
21    pin::Pin,
22    str::FromStr,
23    task::Poll,
24    time::Duration,
25};
26
27use crate::{
28    error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39    consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40    context::{
41        ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42        StreamsErrorKind,
43    },
44    errors::ErrorCode,
45    is_valid_name,
46    message::{StreamMessage, StreamMessageError},
47    response::Response,
48    Context,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55    NotFound,
56    InvalidSubject,
57    TimedOut,
58    Request,
59    ErrorResponse(StatusCode, String),
60    Other,
61}
62
63impl Display for DirectGetErrorKind {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        match self {
66            Self::InvalidSubject => write!(f, "invalid subject"),
67            Self::NotFound => write!(f, "message not found"),
68            Self::ErrorResponse(status, description) => {
69                write!(f, "unable to get message: {status} {description}")
70            }
71            Self::Other => write!(f, "error getting message"),
72            Self::TimedOut => write!(f, "timed out"),
73            Self::Request => write!(f, "request failed"),
74        }
75    }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81    fn from(err: crate::RequestError) -> Self {
82        match err.kind() {
83            crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84            crate::RequestErrorKind::NoResponders => {
85                DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86                    StatusCode::NO_RESPONDERS,
87                    "no responders".to_string(),
88                ))
89            }
90            crate::RequestErrorKind::Other => {
91                DirectGetError::with_source(DirectGetErrorKind::Other, err)
92            }
93        }
94    }
95}
96
97impl From<serde_json::Error> for DirectGetError {
98    fn from(err: serde_json::Error) -> Self {
99        DirectGetError::with_source(DirectGetErrorKind::Other, err)
100    }
101}
102
103impl From<StreamMessageError> for DirectGetError {
104    fn from(err: StreamMessageError) -> Self {
105        DirectGetError::with_source(DirectGetErrorKind::Other, err)
106    }
107}
108
109#[derive(Clone, Debug, PartialEq)]
110pub enum DeleteMessageErrorKind {
111    Request,
112    TimedOut,
113    JetStream(super::errors::Error),
114}
115
116impl Display for DeleteMessageErrorKind {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        match self {
119            Self::Request => write!(f, "request failed"),
120            Self::TimedOut => write!(f, "timed out"),
121            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
122        }
123    }
124}
125
126pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
127
128/// Handle to operations that can be performed on a `Stream`.
129/// It's generic over the type of `info` field to allow `Stream` with or without
130/// info contents.
131#[derive(Debug, Clone)]
132pub struct Stream<T = Info> {
133    pub(crate) info: T,
134    pub(crate) context: Context,
135    pub(crate) name: String,
136}
137
138impl Stream<Info> {
139    /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
140    /// [Stream] and returns it.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// # #[tokio::main]
146    /// # async fn main() -> Result<(), async_nats::Error> {
147    /// let client = async_nats::connect("localhost:4222").await?;
148    /// let jetstream = async_nats::jetstream::new(client);
149    ///
150    /// let mut stream = jetstream.get_stream("events").await?;
151    ///
152    /// let info = stream.info().await?;
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub async fn info(&mut self) -> Result<&Info, InfoError> {
157        let subject = format!("STREAM.INFO.{}", self.info.config.name);
158
159        match self.context.request(subject, &json!({})).await? {
160            Response::Ok::<Info>(info) => {
161                self.info = info;
162                Ok(&self.info)
163            }
164            Response::Err { error } => Err(error.into()),
165        }
166    }
167
168    /// Returns cached [Info] for the [Stream].
169    /// Cache is either from initial creation/retrieval of the [Stream] or last call to
170    /// [Stream::info].
171    ///
172    /// # Examples
173    ///
174    /// ```no_run
175    /// # #[tokio::main]
176    /// # async fn main() -> Result<(), async_nats::Error> {
177    /// let client = async_nats::connect("localhost:4222").await?;
178    /// let jetstream = async_nats::jetstream::new(client);
179    ///
180    /// let stream = jetstream.get_stream("events").await?;
181    ///
182    /// let info = stream.cached_info();
183    /// # Ok(())
184    /// # }
185    /// ```
186    pub fn cached_info(&self) -> &Info {
187        &self.info
188    }
189}
190
191impl<I> Stream<I> {
192    /// Retrieves `info` about [Stream] from the server. Does not update the cache.
193    /// Can be used on Stream retrieved by [Context::get_stream_no_info]
194    pub async fn get_info(&self) -> Result<Info, InfoError> {
195        let subject = format!("STREAM.INFO.{}", self.name);
196
197        match self.context.request(subject, &json!({})).await? {
198            Response::Ok::<Info>(info) => Ok(info),
199            Response::Err { error } => Err(error.into()),
200        }
201    }
202
203    /// Retrieves [[Info]] from the server and returns a [[futures_util::Stream]] that allows
204    /// iterating over all subjects in the stream fetched via paged API.
205    ///
206    /// # Examples
207    ///
208    /// ```no_run
209    /// # #[tokio::main]
210    /// # async fn main() -> Result<(), async_nats::Error> {
211    /// use futures_util::TryStreamExt;
212    /// let client = async_nats::connect("localhost:4222").await?;
213    /// let jetstream = async_nats::jetstream::new(client);
214    ///
215    /// let mut stream = jetstream.get_stream("events").await?;
216    ///
217    /// let mut info = stream.info_with_subjects("events.>").await?;
218    ///
219    /// while let Some((subject, count)) = info.try_next().await? {
220    ///     println!("Subject: {} count: {}", subject, count);
221    /// }
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub async fn info_with_subjects<F: AsRef<str>>(
226        &self,
227        subjects_filter: F,
228    ) -> Result<InfoWithSubjects, InfoError> {
229        let subjects_filter = subjects_filter.as_ref().to_string();
230        // TODO: validate the subject and decide if this should be a `Subject`
231        let info = stream_info_with_details(
232            self.context.clone(),
233            self.name.clone(),
234            0,
235            false,
236            subjects_filter.clone(),
237        )
238        .await?;
239
240        Ok(InfoWithSubjects::new(
241            self.context.clone(),
242            info,
243            subjects_filter,
244        ))
245    }
246
247    /// Creates a builder that allows to customize `Stream::Info`.
248    ///
249    /// # Examples
250    /// ```no_run
251    /// # #[tokio::main]
252    /// # async fn main() -> Result<(), async_nats::Error> {
253    /// use futures_util::TryStreamExt;
254    /// let client = async_nats::connect("localhost:4222").await?;
255    /// let jetstream = async_nats::jetstream::new(client);
256    ///
257    /// let mut stream = jetstream.get_stream("events").await?;
258    ///
259    /// let mut info = stream
260    ///     .info_builder()
261    ///     .with_deleted(true)
262    ///     .subjects("events.>")
263    ///     .fetch()
264    ///     .await?;
265    ///
266    /// while let Some((subject, count)) = info.try_next().await? {
267    ///     println!("Subject: {} count: {}", subject, count);
268    /// }
269    /// # Ok(())
270    /// # }
271    /// ```
272    pub fn info_builder(&self) -> StreamInfoBuilder {
273        StreamInfoBuilder::new(self.context.clone(), self.name.clone())
274    }
275
276    /// Creates a builder for direct get operations.
277    ///
278    /// Allows for more control over direct get requests.
279    ///
280    /// # Examples
281    ///
282    /// ```no_run
283    /// # #[tokio::main]
284    /// # async fn main() -> Result<(), async_nats::Error> {
285    /// let client = async_nats::connect("demo.nats.io").await?;
286    /// let jetstream = async_nats::jetstream::new(client);
287    ///
288    /// let stream = jetstream.get_stream("events").await?;
289    ///
290    /// // Get message without headers
291    /// let message = stream.direct_get_builder().sequence(100).send().await?;
292    /// # Ok(())
293    /// # }
294    /// ```
295    pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders> {
296        DirectGetBuilder::new(self.context.clone(), self.name.clone())
297    }
298
299    /// Gets next message for a [Stream].
300    ///
301    /// Requires a [Stream] with `allow_direct` set to `true`.
302    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
303    /// from any replica member. This means read after write is possible,
304    /// as that given replica might not yet catch up with the leader.
305    ///
306    /// # Examples
307    ///
308    /// ```no_run
309    /// # #[tokio::main]
310    /// # async fn main() -> Result<(), async_nats::Error> {
311    /// let client = async_nats::connect("demo.nats.io").await?;
312    /// let jetstream = async_nats::jetstream::new(client);
313    ///
314    /// let stream = jetstream
315    ///     .create_stream(async_nats::jetstream::stream::Config {
316    ///         name: "events".to_string(),
317    ///         subjects: vec!["events.>".to_string()],
318    ///         allow_direct: true,
319    ///         ..Default::default()
320    ///     })
321    ///     .await?;
322    ///
323    /// jetstream.publish("events.data", "data".into()).await?;
324    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
325    ///
326    /// let message = stream
327    ///     .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
328    ///     .await?;
329    ///
330    /// # Ok(())
331    /// # }
332    /// ```
333    pub async fn direct_get_next_for_subject<T: Into<String>>(
334        &self,
335        subject: T,
336        sequence: Option<u64>,
337    ) -> Result<StreamMessage, DirectGetError> {
338        let subject_str = subject.into();
339        if !is_valid_subject(&subject_str) {
340            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
341        }
342
343        let mut builder = self.direct_get_builder().next_by_subject(subject_str);
344        if let Some(seq) = sequence {
345            builder = builder.sequence(seq);
346        }
347
348        builder.send().await
349    }
350
351    /// Gets first message from [Stream].
352    ///
353    /// Requires a [Stream] with `allow_direct` set to `true`.
354    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
355    /// from any replica member. This means read after write is possible,
356    /// as that given replica might not yet catch up with the leader.
357    ///
358    /// # Examples
359    ///
360    /// ```no_run
361    /// # #[tokio::main]
362    /// # async fn main() -> Result<(), async_nats::Error> {
363    /// let client = async_nats::connect("demo.nats.io").await?;
364    /// let jetstream = async_nats::jetstream::new(client);
365    ///
366    /// let stream = jetstream
367    ///     .create_stream(async_nats::jetstream::stream::Config {
368    ///         name: "events".to_string(),
369    ///         subjects: vec!["events.>".to_string()],
370    ///         allow_direct: true,
371    ///         ..Default::default()
372    ///     })
373    ///     .await?;
374    ///
375    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
376    ///
377    /// let message = stream.direct_get_first_for_subject("events.data").await?;
378    ///
379    /// # Ok(())
380    /// # }
381    /// ```
382    pub async fn direct_get_first_for_subject<T: Into<String>>(
383        &self,
384        subject: T,
385    ) -> Result<StreamMessage, DirectGetError> {
386        let subject_str = subject.into();
387        if !is_valid_subject(&subject_str) {
388            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
389        }
390
391        self.direct_get_builder()
392            .next_by_subject(subject_str)
393            .send()
394            .await
395    }
396
397    /// Gets message from [Stream] with given `sequence id`.
398    ///
399    /// Requires a [Stream] with `allow_direct` set to `true`.
400    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
401    /// from any replica member. This means read after write is possible,
402    /// as that given replica might not yet catch up with the leader.
403    ///
404    /// # Examples
405    ///
406    /// ```no_run
407    /// # #[tokio::main]
408    /// # async fn main() -> Result<(), async_nats::Error> {
409    /// let client = async_nats::connect("demo.nats.io").await?;
410    /// let jetstream = async_nats::jetstream::new(client);
411    ///
412    /// let stream = jetstream
413    ///     .create_stream(async_nats::jetstream::stream::Config {
414    ///         name: "events".to_string(),
415    ///         subjects: vec!["events.>".to_string()],
416    ///         allow_direct: true,
417    ///         ..Default::default()
418    ///     })
419    ///     .await?;
420    ///
421    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
422    ///
423    /// let message = stream.direct_get(pub_ack.await?.sequence).await?;
424    ///
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
429        self.direct_get_builder().sequence(sequence).send().await
430    }
431
432    /// Gets last message for a given `subject`.
433    ///
434    /// Requires a [Stream] with `allow_direct` set to `true`.
435    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
436    /// from any replica member. This means read after write is possible,
437    /// as that given replica might not yet catch up with the leader.
438    ///
439    /// # Examples
440    ///
441    /// ```no_run
442    /// # #[tokio::main]
443    /// # async fn main() -> Result<(), async_nats::Error> {
444    /// let client = async_nats::connect("demo.nats.io").await?;
445    /// let jetstream = async_nats::jetstream::new(client);
446    ///
447    /// let stream = jetstream
448    ///     .create_stream(async_nats::jetstream::stream::Config {
449    ///         name: "events".to_string(),
450    ///         subjects: vec!["events.>".to_string()],
451    ///         allow_direct: true,
452    ///         ..Default::default()
453    ///     })
454    ///     .await?;
455    ///
456    /// jetstream.publish("events.data", "data".into()).await?;
457    ///
458    /// let message = stream.direct_get_last_for_subject("events.data").await?;
459    ///
460    /// # Ok(())
461    /// # }
462    /// ```
463    pub async fn direct_get_last_for_subject<T: Into<String>>(
464        &self,
465        subject: T,
466    ) -> Result<StreamMessage, DirectGetError> {
467        self.direct_get_builder()
468            .last_by_subject(subject)
469            .send()
470            .await
471    }
472    /// Creates a builder for retrieving messages from the stream with flexible options.
473    /// Raw message methods retrieve messages directly from the stream leader instead
474    /// of a replica. This should be used with care, as it can put additional load on the
475    /// stream leader.
476    ///
477    /// # Examples
478    ///
479    /// ```no_run
480    /// # #[tokio::main]
481    /// # async fn main() -> Result<(), async_nats::Error> {
482    /// let client = async_nats::connect("localhost:4222").await?;
483    /// let context = async_nats::jetstream::new(client);
484    /// let stream = context.get_stream("events").await?;
485    ///
486    /// // Get message without headers
487    /// let value = stream.raw_message_builder().sequence(100).send().await?;
488    /// # Ok(())
489    /// # }
490    /// ```
491    pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders> {
492        RawMessageBuilder::new(self.context.clone(), self.name.clone())
493    }
494
495    /// Get a raw message from the stream for a given stream sequence.
496    /// This low-level API always reaches stream leader.
497    /// This should be discouraged in favor of using [Stream::direct_get].
498    ///
499    /// # Examples
500    ///
501    /// ```no_run
502    /// #[tokio::main]
503    /// # async fn main() -> Result<(), async_nats::Error> {
504    /// use futures_util::StreamExt;
505    /// use futures_util::TryStreamExt;
506    ///
507    /// let client = async_nats::connect("localhost:4222").await?;
508    /// let context = async_nats::jetstream::new(client);
509    ///
510    /// let stream = context
511    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
512    ///         name: "events".to_string(),
513    ///         max_messages: 10_000,
514    ///         ..Default::default()
515    ///     })
516    ///     .await?;
517    ///
518    /// let publish_ack = context.publish("events", "data".into()).await?;
519    /// let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
520    /// println!("Retrieved raw message {:?}", raw_message);
521    /// # Ok(())
522    /// # }
523    /// ```
524    pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
525        self.raw_message_builder().sequence(sequence).send().await
526    }
527
528    /// Get a first message from the stream for a given subject starting from provided sequence.
529    /// This low-level API always reaches stream leader.
530    /// This should be discouraged in favor of using [Stream::direct_get_first_for_subject].
531    ///
532    /// # Examples
533    ///
534    /// ```no_run
535    /// #[tokio::main]
536    /// # async fn main() -> Result<(), async_nats::Error> {
537    /// use futures_util::StreamExt;
538    /// use futures_util::TryStreamExt;
539    ///
540    /// let client = async_nats::connect("localhost:4222").await?;
541    /// let context = async_nats::jetstream::new(client);
542    /// let stream = context.get_stream_no_info("events").await?;
543    ///
544    /// let raw_message = stream
545    ///     .get_first_raw_message_by_subject("events.created", 10)
546    ///     .await?;
547    /// println!("Retrieved raw message {:?}", raw_message);
548    /// # Ok(())
549    /// # }
550    /// ```
551    pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
552        &self,
553        subject: T,
554        sequence: u64,
555    ) -> Result<StreamMessage, RawMessageError> {
556        self.raw_message_builder()
557            .sequence(sequence)
558            .next_by_subject(subject.as_ref().to_string())
559            .send()
560            .await
561    }
562
563    /// Get a next message from the stream for a given subject.
564    /// This low-level API always reaches stream leader.
565    /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
566    ///
567    /// # Examples
568    ///
569    /// ```no_run
570    /// #[tokio::main]
571    /// # async fn main() -> Result<(), async_nats::Error> {
572    /// use futures_util::StreamExt;
573    /// use futures_util::TryStreamExt;
574    ///
575    /// let client = async_nats::connect("localhost:4222").await?;
576    /// let context = async_nats::jetstream::new(client);
577    /// let stream = context.get_stream_no_info("events").await?;
578    ///
579    /// let raw_message = stream
580    ///     .get_next_raw_message_by_subject("events.created")
581    ///     .await?;
582    /// println!("Retrieved raw message {:?}", raw_message);
583    /// # Ok(())
584    /// # }
585    /// ```
586    pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
587        &self,
588        subject: T,
589    ) -> Result<StreamMessage, RawMessageError> {
590        self.raw_message_builder()
591            .next_by_subject(subject.as_ref().to_string())
592            .send()
593            .await
594    }
595
596    /// Get a last message from the stream for a given subject.
597    /// This low-level API always reaches stream leader.
598    /// This should be discouraged in favor of using [Stream::direct_get_last_for_subject].
599    ///
600    /// # Examples
601    ///
602    /// ```no_run
603    /// #[tokio::main]
604    /// # async fn main() -> Result<(), async_nats::Error> {
605    /// use futures_util::StreamExt;
606    /// use futures_util::TryStreamExt;
607    ///
608    /// let client = async_nats::connect("localhost:4222").await?;
609    /// let context = async_nats::jetstream::new(client);
610    /// let stream = context.get_stream_no_info("events").await?;
611    ///
612    /// let raw_message = stream
613    ///     .get_last_raw_message_by_subject("events.created")
614    ///     .await?;
615    /// println!("Retrieved raw message {:?}", raw_message);
616    /// # Ok(())
617    /// # }
618    /// ```
619    pub async fn get_last_raw_message_by_subject(
620        &self,
621        stream_subject: &str,
622    ) -> Result<StreamMessage, LastRawMessageError> {
623        self.raw_message_builder()
624            .last_by_subject(stream_subject.to_string())
625            .send()
626            .await
627    }
628
629    /// Delete a message from the stream.
630    ///
631    /// # Examples
632    ///
633    /// ```no_run
634    /// # #[tokio::main]
635    /// # async fn main() -> Result<(), async_nats::Error> {
636    /// let client = async_nats::connect("localhost:4222").await?;
637    /// let context = async_nats::jetstream::new(client);
638    ///
639    /// let stream = context
640    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
641    ///         name: "events".to_string(),
642    ///         max_messages: 10_000,
643    ///         ..Default::default()
644    ///     })
645    ///     .await?;
646    ///
647    /// let publish_ack = context.publish("events", "data".into()).await?;
648    /// stream.delete_message(publish_ack.await?.sequence).await?;
649    /// # Ok(())
650    /// # }
651    /// ```
652    pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
653        let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
654        let payload = json!({
655            "seq": sequence,
656        });
657
658        let response: Response<DeleteStatus> = self
659            .context
660            .request(subject, &payload)
661            .map_err(|err| match err.kind() {
662                RequestErrorKind::TimedOut => {
663                    DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
664                }
665                _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
666            })
667            .await?;
668
669        match response {
670            Response::Err { error } => Err(DeleteMessageError::new(
671                DeleteMessageErrorKind::JetStream(error),
672            )),
673            Response::Ok(value) => Ok(value.success),
674        }
675    }
676
677    /// Purge `Stream` messages.
678    ///
679    /// # Examples
680    ///
681    /// ```no_run
682    /// # #[tokio::main]
683    /// # async fn main() -> Result<(), async_nats::Error> {
684    /// let client = async_nats::connect("demo.nats.io").await?;
685    /// let jetstream = async_nats::jetstream::new(client);
686    ///
687    /// let stream = jetstream.get_stream("events").await?;
688    /// stream.purge().await?;
689    /// # Ok(())
690    /// # }
691    /// ```
692    pub fn purge(&self) -> Purge<No, No> {
693        Purge::build(self)
694    }
695
696    /// Purge `Stream` messages for a matching subject.
697    ///
698    /// # Examples
699    ///
700    /// ```no_run
701    /// # #[tokio::main]
702    /// # #[allow(deprecated)]
703    /// # async fn main() -> Result<(), async_nats::Error> {
704    /// let client = async_nats::connect("demo.nats.io").await?;
705    /// let jetstream = async_nats::jetstream::new(client);
706    ///
707    /// let stream = jetstream.get_stream("events").await?;
708    /// stream.purge_subject("data").await?;
709    /// # Ok(())
710    /// # }
711    /// ```
712    #[deprecated(
713        since = "0.25.0",
714        note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
715    )]
716    pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
717    where
718        T: Into<String>,
719    {
720        self.purge().filter(subject).await
721    }
722
723    /// Create or update `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
724    /// returns the info from the server about created [Consumer]
725    /// If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
726    ///
727    /// # Examples
728    ///
729    /// ```no_run
730    /// # #[tokio::main]
731    /// # async fn main() -> Result<(), async_nats::Error> {
732    /// use async_nats::jetstream::consumer;
733    /// let client = async_nats::connect("localhost:4222").await?;
734    /// let jetstream = async_nats::jetstream::new(client);
735    ///
736    /// let stream = jetstream.get_stream("events").await?;
737    /// let info = stream
738    ///     .create_consumer(consumer::pull::Config {
739    ///         durable_name: Some("pull".to_string()),
740    ///         ..Default::default()
741    ///     })
742    ///     .await?;
743    /// # Ok(())
744    /// # }
745    /// ```
746    pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
747        &self,
748        config: C,
749    ) -> Result<Consumer<C>, ConsumerError> {
750        self.context
751            .create_consumer_on_stream(config, self.name.clone())
752            .await
753    }
754
755    /// Update an existing consumer.
756    /// This call will fail if the consumer does not exist.
757    /// returns the info from the server about updated [Consumer].
758    ///
759    /// # Examples
760    ///
761    /// ```no_run
762    /// # #[tokio::main]
763    /// # async fn main() -> Result<(), async_nats::Error> {
764    /// use async_nats::jetstream::consumer;
765    /// let client = async_nats::connect("localhost:4222").await?;
766    /// let jetstream = async_nats::jetstream::new(client);
767    ///
768    /// let stream = jetstream.get_stream("events").await?;
769    /// let info = stream
770    ///     .update_consumer(consumer::pull::Config {
771    ///         durable_name: Some("pull".to_string()),
772    ///         ..Default::default()
773    ///     })
774    ///     .await?;
775    /// # Ok(())
776    /// # }
777    /// ```
778    #[cfg(feature = "server_2_10")]
779    pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
780        &self,
781        config: C,
782    ) -> Result<Consumer<C>, ConsumerUpdateError> {
783        self.context
784            .update_consumer_on_stream(config, self.name.clone())
785            .await
786    }
787
788    /// Create consumer, but only if it does not exist or the existing config is exactly
789    /// the same.
790    /// This method will fail if consumer is already present with different config.
791    /// returns the info from the server about created [Consumer].
792    ///
793    /// # Examples
794    ///
795    /// ```no_run
796    /// # #[tokio::main]
797    /// # async fn main() -> Result<(), async_nats::Error> {
798    /// use async_nats::jetstream::consumer;
799    /// let client = async_nats::connect("localhost:4222").await?;
800    /// let jetstream = async_nats::jetstream::new(client);
801    ///
802    /// let stream = jetstream.get_stream("events").await?;
803    /// let info = stream
804    ///     .create_consumer_strict(consumer::pull::Config {
805    ///         durable_name: Some("pull".to_string()),
806    ///         ..Default::default()
807    ///     })
808    ///     .await?;
809    /// # Ok(())
810    /// # }
811    /// ```
812    #[cfg(feature = "server_2_10")]
813    pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
814        &self,
815        config: C,
816    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
817        self.context
818            .create_consumer_strict_on_stream(config, self.name.clone())
819            .await
820    }
821
822    /// Retrieve [Info] about [Consumer] from the server.
823    ///
824    /// # Examples
825    ///
826    /// ```no_run
827    /// # #[tokio::main]
828    /// # async fn main() -> Result<(), async_nats::Error> {
829    /// use async_nats::jetstream::consumer;
830    /// let client = async_nats::connect("localhost:4222").await?;
831    /// let jetstream = async_nats::jetstream::new(client);
832    ///
833    /// let stream = jetstream.get_stream("events").await?;
834    /// let info = stream.consumer_info("pull").await?;
835    /// # Ok(())
836    /// # }
837    /// ```
838    pub async fn consumer_info<T: AsRef<str>>(
839        &self,
840        name: T,
841    ) -> Result<consumer::Info, ConsumerInfoError> {
842        let name = name.as_ref();
843
844        if !is_valid_name(name) {
845            return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
846        }
847
848        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
849
850        match self.context.request(subject, &json!({})).await? {
851            Response::Ok(info) => Ok(info),
852            Response::Err { error } => Err(error.into()),
853        }
854    }
855
856    /// Get [Consumer] from the the server. [Consumer] iterators can be used to retrieve
857    /// [Messages][crate::jetstream::Message] for a given [Consumer].
858    ///
859    /// # Examples
860    ///
861    /// ```no_run
862    /// # #[tokio::main]
863    /// # async fn main() -> Result<(), async_nats::Error> {
864    /// use async_nats::jetstream::consumer;
865    /// use futures_util::StreamExt;
866    /// let client = async_nats::connect("localhost:4222").await?;
867    /// let jetstream = async_nats::jetstream::new(client);
868    ///
869    /// let stream = jetstream.get_stream("events").await?;
870    /// let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
871    /// # Ok(())
872    /// # }
873    /// ```
874    pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
875        &self,
876        name: &str,
877    ) -> Result<Consumer<T>, crate::Error> {
878        let info = self.consumer_info(name).await?;
879
880        Ok(Consumer::new(
881            T::try_from_consumer_config(info.config.clone())?,
882            info,
883            self.context.clone(),
884        ))
885    }
886
887    /// Create a [Consumer] with the given configuration if it is not present on the server. Returns a handle to the [Consumer].
888    ///
889    /// Note: This does not validate if the [Consumer] on the server is compatible with the configuration passed in except Push/Pull compatibility.
890    ///
891    /// # Examples
892    ///
893    /// ```no_run
894    /// # #[tokio::main]
895    /// # async fn main() -> Result<(), async_nats::Error> {
896    /// use async_nats::jetstream::consumer;
897    /// use futures_util::StreamExt;
898    /// let client = async_nats::connect("localhost:4222").await?;
899    /// let jetstream = async_nats::jetstream::new(client);
900    ///
901    /// let stream = jetstream.get_stream("events").await?;
902    /// let consumer = stream
903    ///     .get_or_create_consumer(
904    ///         "pull",
905    ///         consumer::pull::Config {
906    ///             durable_name: Some("pull".to_string()),
907    ///             ..Default::default()
908    ///         },
909    ///     )
910    ///     .await?;
911    /// # Ok(())
912    /// # }
913    /// ```
914    pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
915        &self,
916        name: &str,
917        config: T,
918    ) -> Result<Consumer<T>, ConsumerError> {
919        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
920
921        match self.context.request(subject, &json!({})).await? {
922            Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
923            Response::Err { error } => Err(error.into()),
924            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
925                T::try_from_consumer_config(info.config.clone()).map_err(|err| {
926                    ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
927                })?,
928                info,
929                self.context.clone(),
930            )),
931        }
932    }
933
934    /// Delete a [Consumer] from the server.
935    ///
936    /// # Examples
937    ///
938    /// ```no_run
939    /// # #[tokio::main]
940    /// # async fn main() -> Result<(), async_nats::Error> {
941    /// use async_nats::jetstream::consumer;
942    /// use futures_util::StreamExt;
943    /// let client = async_nats::connect("localhost:4222").await?;
944    /// let jetstream = async_nats::jetstream::new(client);
945    ///
946    /// jetstream
947    ///     .get_stream("events")
948    ///     .await?
949    ///     .delete_consumer("pull")
950    ///     .await?;
951    /// # Ok(())
952    /// # }
953    /// ```
954    pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
955        let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
956
957        match self.context.request(subject, &json!({})).await? {
958            Response::Ok(delete_status) => Ok(delete_status),
959            Response::Err { error } => Err(error.into()),
960        }
961    }
962
963    /// Pause a [Consumer] until the given time.
964    /// It will not deliver any messages to clients during that time.
965    ///
966    /// # Examples
967    ///
968    /// ```no_run
969    /// # #[tokio::main]
970    /// # async fn main() -> Result<(), async_nats::Error> {
971    /// use async_nats::jetstream::consumer;
972    /// use futures_util::StreamExt;
973    /// let client = async_nats::connect("localhost:4222").await?;
974    /// let jetstream = async_nats::jetstream::new(client);
975    /// let pause_until =
976    ///     time::OffsetDateTime::now_utc().saturating_add(time::Duration::seconds_f32(10.0));
977    ///
978    /// jetstream
979    ///     .get_stream("events")
980    ///     .await?
981    ///     .pause_consumer("my_consumer", pause_until)
982    ///     .await?;
983    /// # Ok(())
984    /// # }
985    /// ```
986    #[cfg(feature = "server_2_11")]
987    pub async fn pause_consumer(
988        &self,
989        name: &str,
990        pause_until: OffsetDateTime,
991    ) -> Result<PauseResponse, ConsumerError> {
992        self.request_pause_consumer(name, Some(pause_until)).await
993    }
994
995    /// Resume a paused [Consumer].
996    ///
997    /// # Examples
998    ///
999    /// ```no_run
1000    /// # #[tokio::main]
1001    /// # async fn main() -> Result<(), async_nats::Error> {
1002    /// use async_nats::jetstream::consumer;
1003    /// use futures_util::StreamExt;
1004    /// let client = async_nats::connect("localhost:4222").await?;
1005    /// let jetstream = async_nats::jetstream::new(client);
1006    ///
1007    /// jetstream
1008    ///     .get_stream("events")
1009    ///     .await?
1010    ///     .resume_consumer("my_consumer")
1011    ///     .await?;
1012    /// # Ok(())
1013    /// # }
1014    /// ```
1015    #[cfg(feature = "server_2_11")]
1016    pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1017        self.request_pause_consumer(name, None).await
1018    }
1019
1020    #[cfg(feature = "server_2_11")]
1021    async fn request_pause_consumer(
1022        &self,
1023        name: &str,
1024        pause_until: Option<OffsetDateTime>,
1025    ) -> Result<PauseResponse, ConsumerError> {
1026        let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1027        let payload = &PauseResumeConsumerRequest { pause_until };
1028
1029        match self.context.request(subject, payload).await? {
1030            Response::Ok::<PauseResponse>(resp) => Ok(resp),
1031            Response::Err { error } => Err(error.into()),
1032        }
1033    }
1034
1035    /// Lists names of all consumers for current stream.
1036    ///
1037    /// # Examples
1038    ///
1039    /// ```no_run
1040    /// # #[tokio::main]
1041    /// # async fn main() -> Result<(), async_nats::Error> {
1042    /// use futures_util::TryStreamExt;
1043    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1044    /// let jetstream = async_nats::jetstream::new(client);
1045    /// let stream = jetstream.get_stream("stream").await?;
1046    /// let mut names = stream.consumer_names();
1047    /// while let Some(consumer) = names.try_next().await? {
1048    ///     println!("consumer: {stream:?}");
1049    /// }
1050    /// # Ok(())
1051    /// # }
1052    /// ```
1053    pub fn consumer_names(&self) -> ConsumerNames {
1054        ConsumerNames {
1055            context: self.context.clone(),
1056            stream: self.name.clone(),
1057            offset: 0,
1058            page_request: None,
1059            consumers: Vec::new(),
1060            done: false,
1061        }
1062    }
1063
1064    /// Lists all consumers info for current stream.
1065    ///
1066    /// # Examples
1067    ///
1068    /// ```no_run
1069    /// # #[tokio::main]
1070    /// # async fn main() -> Result<(), async_nats::Error> {
1071    /// use futures_util::TryStreamExt;
1072    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1073    /// let jetstream = async_nats::jetstream::new(client);
1074    /// let stream = jetstream.get_stream("stream").await?;
1075    /// let mut consumers = stream.consumers();
1076    /// while let Some(consumer) = consumers.try_next().await? {
1077    ///     println!("consumer: {consumer:?}");
1078    /// }
1079    /// # Ok(())
1080    /// # }
1081    /// ```
1082    pub fn consumers(&self) -> Consumers {
1083        Consumers {
1084            context: self.context.clone(),
1085            stream: self.name.clone(),
1086            offset: 0,
1087            page_request: None,
1088            consumers: Vec::new(),
1089            done: false,
1090        }
1091    }
1092}
1093
1094pub struct StreamInfoBuilder {
1095    pub(crate) context: Context,
1096    pub(crate) name: String,
1097    pub(crate) deleted: bool,
1098    pub(crate) subject: String,
1099}
1100
1101impl StreamInfoBuilder {
1102    fn new(context: Context, name: String) -> Self {
1103        Self {
1104            context,
1105            name,
1106            deleted: false,
1107            subject: "".to_string(),
1108        }
1109    }
1110
1111    pub fn with_deleted(mut self, deleted: bool) -> Self {
1112        self.deleted = deleted;
1113        self
1114    }
1115
1116    pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1117        self.subject = subject.into();
1118        self
1119    }
1120
1121    pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1122        let info = stream_info_with_details(
1123            self.context.clone(),
1124            self.name.clone(),
1125            0,
1126            self.deleted,
1127            self.subject.clone(),
1128        )
1129        .await?;
1130
1131        Ok(InfoWithSubjects::new(self.context, info, self.subject))
1132    }
1133}
1134
1135/// `StreamConfig` determines the properties for a stream.
1136/// There are sensible defaults for most. If no subjects are
1137/// given the name will be used as the only subject.
1138#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1139pub struct Config {
1140    /// A name for the Stream. Must not have spaces, tabs or period `.` characters
1141    pub name: String,
1142    /// How large the Stream may become in total bytes before the configured discard policy kicks in
1143    #[serde(default)]
1144    pub max_bytes: i64,
1145    /// How large the Stream may become in total messages before the configured discard policy kicks in
1146    #[serde(default, rename = "max_msgs")]
1147    pub max_messages: i64,
1148    /// Maximum amount of messages to keep per subject
1149    #[serde(default, rename = "max_msgs_per_subject")]
1150    pub max_messages_per_subject: i64,
1151    /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in.
1152    /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space
1153    pub discard: DiscardPolicy,
1154    /// Prevents a message from being added to a stream if the max_msgs_per_subject limit for the subject has been reached
1155    #[serde(default, skip_serializing_if = "is_default")]
1156    pub discard_new_per_subject: bool,
1157    /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
1158    /// configured stream `name`.
1159    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1160    pub subjects: Vec<String>,
1161    /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
1162    pub retention: RetentionPolicy,
1163    /// How many Consumers can be defined for a given Stream, -1 for unlimited
1164    #[serde(default)]
1165    pub max_consumers: i32,
1166    /// Maximum age of any message in the stream, expressed in nanoseconds
1167    #[serde(default, with = "serde_nanos")]
1168    pub max_age: Duration,
1169    /// The largest message that will be accepted by the Stream
1170    #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1171    pub max_message_size: i32,
1172    /// The type of storage backend, `File` (default) and `Memory`
1173    pub storage: StorageType,
1174    /// How many replicas to keep for each message in a clustered JetStream, maximum 5
1175    pub num_replicas: usize,
1176    /// Disables acknowledging messages that are received by the Stream
1177    #[serde(default, skip_serializing_if = "is_default")]
1178    pub no_ack: bool,
1179    /// The window within which to track duplicate messages.
1180    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1181    pub duplicate_window: Duration,
1182    /// The owner of the template associated with this stream.
1183    #[serde(default, skip_serializing_if = "is_default")]
1184    pub template_owner: String,
1185    /// Indicates the stream is sealed and cannot be modified in any way
1186    #[serde(default, skip_serializing_if = "is_default")]
1187    pub sealed: bool,
1188    /// A short description of the purpose of this stream.
1189    #[serde(default, skip_serializing_if = "is_default")]
1190    pub description: Option<String>,
1191    #[serde(
1192        default,
1193        rename = "allow_rollup_hdrs",
1194        skip_serializing_if = "is_default"
1195    )]
1196    /// Indicates if rollups will be allowed or not.
1197    pub allow_rollup: bool,
1198    #[serde(default, skip_serializing_if = "is_default")]
1199    /// Indicates deletes will be denied or not.
1200    pub deny_delete: bool,
1201    /// Indicates if purges will be denied or not.
1202    #[serde(default, skip_serializing_if = "is_default")]
1203    pub deny_purge: bool,
1204
1205    /// Optional republish config.
1206    #[serde(default, skip_serializing_if = "is_default")]
1207    pub republish: Option<Republish>,
1208
1209    /// Enables direct get, which would get messages from
1210    /// non-leader.
1211    #[serde(default, skip_serializing_if = "is_default")]
1212    pub allow_direct: bool,
1213
1214    /// Enable direct access also for mirrors.
1215    #[serde(default, skip_serializing_if = "is_default")]
1216    pub mirror_direct: bool,
1217
1218    /// Stream mirror configuration.
1219    #[serde(default, skip_serializing_if = "Option::is_none")]
1220    pub mirror: Option<Source>,
1221
1222    /// Sources configuration.
1223    #[serde(default, skip_serializing_if = "Option::is_none")]
1224    pub sources: Option<Vec<Source>>,
1225
1226    #[cfg(feature = "server_2_10")]
1227    /// Additional stream metadata.
1228    #[serde(default, skip_serializing_if = "is_default")]
1229    pub metadata: HashMap<String, String>,
1230
1231    #[cfg(feature = "server_2_10")]
1232    /// Allow applying a subject transform to incoming messages
1233    #[serde(default, skip_serializing_if = "Option::is_none")]
1234    pub subject_transform: Option<SubjectTransform>,
1235
1236    #[cfg(feature = "server_2_10")]
1237    /// Override compression config for this stream.
1238    /// Wrapping enum that has `None` type with [Option] is there
1239    /// because [Stream] can override global compression set to [Compression::S2]
1240    /// to [Compression::None], which is different from not overriding global config with anything.
1241    #[serde(default, skip_serializing_if = "Option::is_none")]
1242    pub compression: Option<Compression>,
1243    #[cfg(feature = "server_2_10")]
1244    /// Set limits on consumers that are created on this stream.
1245    #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1246    pub consumer_limits: Option<ConsumerLimits>,
1247
1248    #[cfg(feature = "server_2_10")]
1249    /// Sets the first sequence for the stream.
1250    #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1251    pub first_sequence: Option<u64>,
1252
1253    /// Placement configuration for clusters and tags.
1254    #[serde(default, skip_serializing_if = "Option::is_none")]
1255    pub placement: Option<Placement>,
1256
1257    /// Persistence mode for the stream.
1258    #[serde(default, skip_serializing_if = "Option::is_none")]
1259    pub persist_mode: Option<PersistenceMode>,
1260
1261    /// For suspending the consumer until the deadline.
1262    #[cfg(feature = "server_2_11")]
1263    #[serde(
1264        default,
1265        with = "rfc3339::option",
1266        skip_serializing_if = "Option::is_none"
1267    )]
1268    pub pause_until: Option<OffsetDateTime>,
1269
1270    /// Allows setting a TTL for a message in the stream.
1271    #[cfg(feature = "server_2_11")]
1272    #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1273    pub allow_message_ttl: bool,
1274
1275    /// Enables delete markers for messages deleted from the stream and sets the TTL
1276    /// for how long the marker should be kept.
1277    #[cfg(feature = "server_2_11")]
1278    #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1279    pub subject_delete_marker_ttl: Option<Duration>,
1280
1281    /// Allows atomic publish operations.
1282    #[cfg(feature = "server_2_12")]
1283    #[serde(default, skip_serializing_if = "is_default", rename = "allow_atomic")]
1284    pub allow_atomic_publish: bool,
1285
1286    /// Enables the scheduling of messages
1287    #[cfg(feature = "server_2_12")]
1288    #[serde(
1289        default,
1290        skip_serializing_if = "is_default",
1291        rename = "allow_msg_schedules"
1292    )]
1293    pub allow_message_schedules: bool,
1294
1295    /// Enables counters for the stream
1296    #[cfg(feature = "server_2_12")]
1297    #[serde(
1298        default,
1299        skip_serializing_if = "is_default",
1300        rename = "allow_msg_counter"
1301    )]
1302    pub allow_message_counter: bool,
1303}
1304
1305impl From<&Config> for Config {
1306    fn from(sc: &Config) -> Config {
1307        sc.clone()
1308    }
1309}
1310
1311impl From<&str> for Config {
1312    fn from(s: &str) -> Config {
1313        Config {
1314            name: s.to_string(),
1315            ..Default::default()
1316        }
1317    }
1318}
1319
1320#[cfg(feature = "server_2_10")]
1321fn default_consumer_limits_as_none<'de, D>(
1322    deserializer: D,
1323) -> Result<Option<ConsumerLimits>, D::Error>
1324where
1325    D: Deserializer<'de>,
1326{
1327    let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1328    if let Some(cl) = consumer_limits {
1329        if cl == ConsumerLimits::default() {
1330            Ok(None)
1331        } else {
1332            Ok(Some(cl))
1333        }
1334    } else {
1335        Ok(None)
1336    }
1337}
1338#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1339pub struct ConsumerLimits {
1340    /// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
1341    #[serde(default, with = "serde_nanos")]
1342    pub inactive_threshold: std::time::Duration,
1343    /// Sets the maximum [crate::jetstream::consumer::Config::max_ack_pending] that can be set on the consumer.
1344    #[serde(default)]
1345    pub max_ack_pending: i64,
1346}
1347
1348#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1349pub enum Compression {
1350    #[serde(rename = "s2")]
1351    S2,
1352    #[serde(rename = "none")]
1353    None,
1354}
1355
1356// SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
1357#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1358pub struct SubjectTransform {
1359    #[serde(rename = "src")]
1360    pub source: String,
1361
1362    #[serde(rename = "dest")]
1363    pub destination: String,
1364}
1365
1366// Republish is for republishing messages once committed to a stream.
1367#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1368pub struct Republish {
1369    /// Subject that should be republished.
1370    #[serde(rename = "src")]
1371    pub source: String,
1372    /// Subject where messages will be republished.
1373    #[serde(rename = "dest")]
1374    pub destination: String,
1375    /// If true, only headers should be republished.
1376    #[serde(default)]
1377    pub headers_only: bool,
1378}
1379
1380/// Placement describes on which cluster or tags the stream should be placed.
1381#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1382pub struct Placement {
1383    // Cluster where the stream should be placed.
1384    #[serde(default, skip_serializing_if = "is_default")]
1385    pub cluster: Option<String>,
1386    // Matching tags for stream placement.
1387    #[serde(default, skip_serializing_if = "is_default")]
1388    pub tags: Vec<String>,
1389}
1390
1391/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
1392/// remove older messages. `New` will fail to store the new message.
1393#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1394#[repr(u8)]
1395pub enum DiscardPolicy {
1396    /// will remove older messages when limits are hit.
1397    #[default]
1398    #[serde(rename = "old")]
1399    Old = 0,
1400    /// will error on a StoreMsg call when limits are hit
1401    #[serde(rename = "new")]
1402    New = 1,
1403}
1404
1405/// `RetentionPolicy` determines how messages in a set are retained.
1406#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1407#[repr(u8)]
1408pub enum RetentionPolicy {
1409    /// `Limits` (default) means that messages are retained until any given limit is reached.
1410    /// This could be one of messages, bytes, or age.
1411    #[default]
1412    #[serde(rename = "limits")]
1413    Limits = 0,
1414    /// `Interest` specifies that when all known observables have acknowledged a message it can be removed.
1415    #[serde(rename = "interest")]
1416    Interest = 1,
1417    /// `WorkQueue` specifies that when the first worker or subscriber acknowledges the message it can be removed.
1418    #[serde(rename = "workqueue")]
1419    WorkQueue = 2,
1420}
1421
1422/// determines how messages are stored for retention.
1423#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1424#[repr(u8)]
1425pub enum StorageType {
1426    /// Stream data is kept in files. This is the default.
1427    #[default]
1428    #[serde(rename = "file")]
1429    File = 0,
1430    /// Stream data is kept only in memory.
1431    #[serde(rename = "memory")]
1432    Memory = 1,
1433}
1434
1435/// Determines the persistence mode for stream data.
1436#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1437#[repr(u8)]
1438pub enum PersistenceMode {
1439    /// Writes are immediately flushed, acknowledgement sent after message is stored.
1440    #[default]
1441    #[serde(rename = "default")]
1442    Default = 0,
1443    /// Writes are flushed asynchronously, acknowledgement may be sent before message is stored.
1444    #[serde(rename = "async")]
1445    Async = 1,
1446}
1447
1448async fn stream_info_with_details(
1449    context: Context,
1450    stream: String,
1451    offset: usize,
1452    deleted_details: bool,
1453    subjects_filter: String,
1454) -> Result<Info, InfoError> {
1455    let subject = format!("STREAM.INFO.{stream}");
1456
1457    let payload = StreamInfoRequest {
1458        offset,
1459        deleted_details,
1460        subjects_filter,
1461    };
1462
1463    let response: Response<Info> = context.request(subject, &payload).await?;
1464
1465    match response {
1466        Response::Ok(info) => Ok(info),
1467        Response::Err { error } => Err(error.into()),
1468    }
1469}
1470
1471type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1472
1473#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1474pub struct StreamInfoRequest {
1475    offset: usize,
1476    deleted_details: bool,
1477    subjects_filter: String,
1478}
1479
1480pub struct InfoWithSubjects {
1481    stream: String,
1482    context: Context,
1483    pub info: Info,
1484    offset: usize,
1485    subjects: collections::hash_map::IntoIter<String, usize>,
1486    info_request: Option<InfoRequest>,
1487    subjects_filter: String,
1488    pages_done: bool,
1489}
1490
1491impl InfoWithSubjects {
1492    pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1493        let subjects = info.state.subjects.take().unwrap_or_default();
1494        let name = info.config.name.clone();
1495        InfoWithSubjects {
1496            context,
1497            info,
1498            pages_done: subjects.is_empty(),
1499            offset: subjects.len(),
1500            subjects: subjects.into_iter(),
1501            subjects_filter: subject,
1502            stream: name,
1503            info_request: None,
1504        }
1505    }
1506}
1507
1508impl futures_util::Stream for InfoWithSubjects {
1509    type Item = Result<(String, usize), InfoError>;
1510
1511    fn poll_next(
1512        mut self: Pin<&mut Self>,
1513        cx: &mut std::task::Context<'_>,
1514    ) -> Poll<Option<Self::Item>> {
1515        match self.subjects.next() {
1516            Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1517            None => {
1518                // If we have already requested all pages, stop the iterator.
1519                if self.pages_done {
1520                    return Poll::Ready(None);
1521                }
1522                let stream = self.stream.clone();
1523                let context = self.context.clone();
1524                let subjects_filter = self.subjects_filter.clone();
1525                let offset = self.offset;
1526                match self
1527                    .info_request
1528                    .get_or_insert_with(|| {
1529                        Box::pin(stream_info_with_details(
1530                            context,
1531                            stream,
1532                            offset,
1533                            false,
1534                            subjects_filter,
1535                        ))
1536                    })
1537                    .poll_unpin(cx)
1538                {
1539                    Poll::Ready(resp) => match resp {
1540                        Ok(info) => {
1541                            let subjects = info.state.subjects.clone();
1542                            self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1543                            self.info_request = None;
1544                            let subjects = subjects.unwrap_or_default();
1545                            self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1546                            let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1547                            if total <= self.offset || subjects.is_empty() {
1548                                self.pages_done = true;
1549                            }
1550                            match self.subjects.next() {
1551                                Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1552                                None => Poll::Ready(None),
1553                            }
1554                        }
1555                        Err(err) => Poll::Ready(Some(Err(err))),
1556                    },
1557                    Poll::Pending => Poll::Pending,
1558                }
1559            }
1560        }
1561    }
1562}
1563
1564/// Shows config and current state for this stream.
1565#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1566pub struct Info {
1567    /// The configuration associated with this stream.
1568    pub config: Config,
1569    /// The time that this stream was created.
1570    #[serde(with = "rfc3339")]
1571    pub created: time::OffsetDateTime,
1572    /// Various metrics associated with this stream.
1573    pub state: State,
1574    /// Information about leader and replicas.
1575    pub cluster: Option<ClusterInfo>,
1576    /// Information about mirror config if present.
1577    #[serde(default)]
1578    pub mirror: Option<SourceInfo>,
1579    /// Information about sources configs if present.
1580    #[serde(default)]
1581    pub sources: Vec<SourceInfo>,
1582    #[serde(flatten)]
1583    paged_info: Option<PagedInfo>,
1584}
1585
1586#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1587pub struct PagedInfo {
1588    offset: usize,
1589    total: usize,
1590    limit: usize,
1591}
1592
1593#[derive(Deserialize)]
1594pub struct DeleteStatus {
1595    pub success: bool,
1596}
1597
1598#[cfg(feature = "server_2_11")]
1599#[derive(Deserialize)]
1600pub struct PauseResponse {
1601    pub paused: bool,
1602    #[serde(with = "rfc3339")]
1603    pub pause_until: OffsetDateTime,
1604    #[serde(default, with = "serde_nanos")]
1605    pub pause_remaining: Option<Duration>,
1606}
1607
1608#[cfg(feature = "server_2_11")]
1609#[derive(Serialize, Debug)]
1610struct PauseResumeConsumerRequest {
1611    #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1612    pause_until: Option<OffsetDateTime>,
1613}
1614
1615/// information about the given stream.
1616#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1617pub struct State {
1618    /// The number of messages contained in this stream
1619    pub messages: u64,
1620    /// The number of bytes of all messages contained in this stream
1621    pub bytes: u64,
1622    /// The lowest sequence number still present in this stream
1623    #[serde(rename = "first_seq")]
1624    pub first_sequence: u64,
1625    /// The time associated with the oldest message still present in this stream
1626    #[serde(with = "rfc3339", rename = "first_ts")]
1627    pub first_timestamp: time::OffsetDateTime,
1628    /// The last sequence number assigned to a message in this stream
1629    #[serde(rename = "last_seq")]
1630    pub last_sequence: u64,
1631    /// The time that the last message was received by this stream
1632    #[serde(with = "rfc3339", rename = "last_ts")]
1633    pub last_timestamp: time::OffsetDateTime,
1634    /// The number of consumers configured to consume this stream
1635    pub consumer_count: usize,
1636    /// The number of subjects in the stream
1637    #[serde(default, rename = "num_subjects")]
1638    pub subjects_count: u64,
1639    /// The number of deleted messages in the stream
1640    #[serde(default, rename = "num_deleted")]
1641    pub deleted_count: Option<u64>,
1642    /// The list of deleted subjects from the Stream.
1643    /// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
1644    #[serde(default)]
1645    pub deleted: Option<Vec<u64>>,
1646
1647    pub(crate) subjects: Option<HashMap<String, usize>>,
1648}
1649
1650/// A raw stream message in the representation it is stored.
1651#[derive(Debug, Serialize, Deserialize, Clone)]
1652pub struct RawMessage {
1653    /// Subject of the message.
1654    #[serde(rename = "subject")]
1655    pub subject: String,
1656
1657    /// Sequence of the message.
1658    #[serde(rename = "seq")]
1659    pub sequence: u64,
1660
1661    /// Raw payload of the message as a base64 encoded string.
1662    #[serde(default, rename = "data")]
1663    pub payload: String,
1664
1665    /// Raw header string, if any.
1666    #[serde(default, rename = "hdrs")]
1667    pub headers: Option<String>,
1668
1669    /// The time the message was published.
1670    #[serde(rename = "time", with = "rfc3339")]
1671    pub time: time::OffsetDateTime,
1672}
1673
1674impl TryFrom<RawMessage> for StreamMessage {
1675    type Error = crate::Error;
1676
1677    fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1678        let decoded_payload = STANDARD
1679            .decode(value.payload)
1680            .map_err(|err| Box::new(std::io::Error::other(err)))?;
1681        let decoded_headers = value
1682            .headers
1683            .map(|header| STANDARD.decode(header))
1684            .map_or(Ok(None), |v| v.map(Some))?;
1685
1686        let (headers, _, _) = decoded_headers
1687            .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1688
1689        Ok(StreamMessage {
1690            subject: value.subject.into(),
1691            payload: decoded_payload.into(),
1692            headers,
1693            sequence: value.sequence,
1694            time: value.time,
1695        })
1696    }
1697}
1698
1699fn is_continuation(c: char) -> bool {
1700    c == ' ' || c == '\t'
1701}
1702const HEADER_LINE: &str = "NATS/1.0";
1703
1704#[allow(clippy::type_complexity)]
1705fn parse_headers(
1706    buf: &[u8],
1707) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1708    let mut headers = HeaderMap::new();
1709    let mut maybe_status: Option<StatusCode> = None;
1710    let mut maybe_description: Option<String> = None;
1711    let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1712        line.lines().peekable()
1713    } else {
1714        return Err(Box::new(std::io::Error::other("invalid header")));
1715    };
1716
1717    if let Some(line) = lines.next() {
1718        let line = line
1719            .strip_prefix(HEADER_LINE)
1720            .ok_or_else(|| {
1721                Box::new(std::io::Error::other(
1722                    "version line does not start with NATS/1.0",
1723                ))
1724            })?
1725            .trim();
1726
1727        match line.split_once(' ') {
1728            Some((status, description)) => {
1729                if !status.is_empty() {
1730                    maybe_status = Some(status.parse()?);
1731                }
1732
1733                if !description.is_empty() {
1734                    maybe_description = Some(description.trim().to_string());
1735                }
1736            }
1737            None => {
1738                if !line.is_empty() {
1739                    maybe_status = Some(line.parse()?);
1740                }
1741            }
1742        }
1743    } else {
1744        return Err(Box::new(std::io::Error::other(
1745            "expected header information not found",
1746        )));
1747    };
1748
1749    while let Some(line) = lines.next() {
1750        if line.is_empty() {
1751            continue;
1752        }
1753
1754        if let Some((k, v)) = line.split_once(':').to_owned() {
1755            let mut s = String::from(v.trim());
1756            while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1757                s.push(' ');
1758                s.push_str(v.trim());
1759            }
1760
1761            headers.insert(
1762                HeaderName::from_str(k)?,
1763                HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1764            );
1765        } else {
1766            return Err(Box::new(std::io::Error::other("malformed header line")));
1767        }
1768    }
1769
1770    if headers.is_empty() {
1771        Ok((HeaderMap::new(), maybe_status, maybe_description))
1772    } else {
1773        Ok((headers, maybe_status, maybe_description))
1774    }
1775}
1776
1777#[derive(Debug, Serialize, Deserialize, Clone)]
1778struct GetRawMessage {
1779    pub(crate) message: RawMessage,
1780}
1781
1782fn is_default<T: Default + Eq>(t: &T) -> bool {
1783    t == &T::default()
1784}
1785/// Information about the stream's, consumer's associated `JetStream` cluster
1786#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1787pub struct ClusterInfo {
1788    /// The cluster name.
1789    #[serde(default)]
1790    pub name: Option<String>,
1791    /// The RAFT group name.
1792    #[serde(default)]
1793    pub raft_group: Option<String>,
1794    /// The server name of the RAFT leader.
1795    #[serde(default)]
1796    pub leader: Option<String>,
1797    /// The time since this server has been the leader.
1798    #[serde(default, with = "rfc3339::option")]
1799    pub leader_since: Option<OffsetDateTime>,
1800    /// Indicates if this account is a system account.
1801    #[cfg(feature = "server_2_12")]
1802    #[serde(default)]
1803    /// Indicates if `traffic_account` is set a system account.
1804    pub system_account: bool,
1805    #[cfg(feature = "server_2_12")]
1806    /// Name of the traffic (replication) account.
1807    #[serde(default)]
1808    pub traffic_account: Option<String>,
1809    /// The members of the RAFT cluster.
1810    #[serde(default)]
1811    pub replicas: Vec<PeerInfo>,
1812}
1813
1814/// The members of the RAFT cluster
1815#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1816pub struct PeerInfo {
1817    /// The server name of the peer.
1818    pub name: String,
1819    /// Indicates if the server is up to date and synchronized.
1820    pub current: bool,
1821    /// Nanoseconds since this peer was last seen.
1822    #[serde(with = "serde_nanos")]
1823    pub active: Duration,
1824    /// Indicates the node is considered offline by the group.
1825    #[serde(default)]
1826    pub offline: bool,
1827    /// How many uncommitted operations this peer is behind the leader.
1828    pub lag: Option<u64>,
1829}
1830
1831#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1832pub struct SourceInfo {
1833    /// Source name.
1834    pub name: String,
1835    /// Number of messages this source is lagging behind.
1836    pub lag: u64,
1837    /// Last time the source was seen active.
1838    #[serde(deserialize_with = "negative_duration_as_none")]
1839    pub active: Option<std::time::Duration>,
1840    /// Filtering for the source.
1841    #[serde(default)]
1842    pub filter_subject: Option<String>,
1843    /// Source destination subject.
1844    #[serde(default)]
1845    pub subject_transform_dest: Option<String>,
1846    /// List of transforms.
1847    #[serde(default)]
1848    pub subject_transforms: Vec<SubjectTransform>,
1849}
1850
1851fn negative_duration_as_none<'de, D>(
1852    deserializer: D,
1853) -> Result<Option<std::time::Duration>, D::Error>
1854where
1855    D: Deserializer<'de>,
1856{
1857    let n = i64::deserialize(deserializer)?;
1858    if n.is_negative() {
1859        Ok(None)
1860    } else {
1861        Ok(Some(std::time::Duration::from_nanos(n as u64)))
1862    }
1863}
1864
1865/// The response generated by trying to purge a stream.
1866#[derive(Debug, Deserialize, Clone, Copy)]
1867pub struct PurgeResponse {
1868    /// Whether the purge request was successful.
1869    pub success: bool,
1870    /// The number of purged messages in a stream.
1871    pub purged: u64,
1872}
1873/// The payload used to generate a purge request.
1874#[derive(Default, Debug, Serialize, Clone)]
1875pub struct PurgeRequest {
1876    /// Purge up to but not including sequence.
1877    #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1878    pub sequence: Option<u64>,
1879
1880    /// Subject to match against messages for the purge command.
1881    #[serde(default, skip_serializing_if = "is_default")]
1882    pub filter: Option<String>,
1883
1884    /// Number of messages to keep.
1885    #[serde(default, skip_serializing_if = "is_default")]
1886    pub keep: Option<u64>,
1887}
1888
1889#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1890pub struct Source {
1891    /// Name of the stream source.
1892    pub name: String,
1893    /// Optional source start sequence.
1894    #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1895    pub start_sequence: Option<u64>,
1896    #[serde(
1897        default,
1898        rename = "opt_start_time",
1899        skip_serializing_if = "is_default",
1900        with = "rfc3339::option"
1901    )]
1902    /// Optional source start time.
1903    pub start_time: Option<OffsetDateTime>,
1904    /// Optional additional filter subject.
1905    #[serde(default, skip_serializing_if = "is_default")]
1906    pub filter_subject: Option<String>,
1907    /// Optional config for sourcing streams from another prefix, used for cross-account.
1908    #[serde(default, skip_serializing_if = "Option::is_none")]
1909    pub external: Option<External>,
1910    /// Optional config to set a domain, if source is residing in different one.
1911    #[serde(default, skip_serializing_if = "is_default")]
1912    pub domain: Option<String>,
1913    /// Subject transforms for Stream.
1914    #[cfg(feature = "server_2_10")]
1915    #[serde(default, skip_serializing_if = "is_default")]
1916    pub subject_transforms: Vec<SubjectTransform>,
1917}
1918
1919#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1920pub struct External {
1921    /// Api prefix of external source.
1922    #[serde(rename = "api")]
1923    pub api_prefix: String,
1924    /// Optional configuration of delivery prefix.
1925    #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1926    pub delivery_prefix: Option<String>,
1927}
1928
1929use std::marker::PhantomData;
1930
1931#[derive(Debug, Default)]
1932pub struct Yes;
1933#[derive(Debug, Default)]
1934pub struct No;
1935
1936pub trait ToAssign: Debug {}
1937
1938impl ToAssign for Yes {}
1939impl ToAssign for No {}
1940
1941#[derive(Debug)]
1942pub struct Purge<SEQUENCE, KEEP>
1943where
1944    SEQUENCE: ToAssign,
1945    KEEP: ToAssign,
1946{
1947    inner: PurgeRequest,
1948    sequence_set: PhantomData<SEQUENCE>,
1949    keep_set: PhantomData<KEEP>,
1950    context: Context,
1951    stream_name: String,
1952}
1953
1954impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
1955where
1956    SEQUENCE: ToAssign,
1957    KEEP: ToAssign,
1958{
1959    /// Adds subject filter to [PurgeRequest]
1960    pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
1961        self.inner.filter = Some(filter.into());
1962        self
1963    }
1964}
1965
1966impl Purge<No, No> {
1967    pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
1968        Purge {
1969            context: stream.context.clone(),
1970            stream_name: stream.name.clone(),
1971            inner: Default::default(),
1972            sequence_set: PhantomData {},
1973            keep_set: PhantomData {},
1974        }
1975    }
1976}
1977
1978impl<KEEP> Purge<No, KEEP>
1979where
1980    KEEP: ToAssign,
1981{
1982    /// Creates a new [PurgeRequest].
1983    /// `keep` and `sequence` are exclusive, enforced compile time by generics.
1984    pub fn keep(self, keep: u64) -> Purge<No, Yes> {
1985        Purge {
1986            context: self.context.clone(),
1987            stream_name: self.stream_name.clone(),
1988            sequence_set: PhantomData {},
1989            keep_set: PhantomData {},
1990            inner: PurgeRequest {
1991                keep: Some(keep),
1992                ..self.inner
1993            },
1994        }
1995    }
1996}
1997impl<SEQUENCE> Purge<SEQUENCE, No>
1998where
1999    SEQUENCE: ToAssign,
2000{
2001    /// Creates a new [PurgeRequest].
2002    /// `keep` and `sequence` are exclusive, enforces compile time by generics.
2003    pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2004        Purge {
2005            context: self.context.clone(),
2006            stream_name: self.stream_name.clone(),
2007            sequence_set: PhantomData {},
2008            keep_set: PhantomData {},
2009            inner: PurgeRequest {
2010                sequence: Some(sequence),
2011                ..self.inner
2012            },
2013        }
2014    }
2015}
2016
2017#[derive(Clone, Debug, PartialEq)]
2018pub enum PurgeErrorKind {
2019    Request,
2020    TimedOut,
2021    JetStream(super::errors::Error),
2022}
2023
2024impl Display for PurgeErrorKind {
2025    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2026        match self {
2027            Self::Request => write!(f, "request failed"),
2028            Self::TimedOut => write!(f, "timed out"),
2029            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2030        }
2031    }
2032}
2033
2034pub type PurgeError = Error<PurgeErrorKind>;
2035
2036impl<S, K> IntoFuture for Purge<S, K>
2037where
2038    S: ToAssign + std::marker::Send,
2039    K: ToAssign + std::marker::Send,
2040{
2041    type Output = Result<PurgeResponse, PurgeError>;
2042
2043    type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2044
2045    fn into_future(self) -> Self::IntoFuture {
2046        Box::pin(std::future::IntoFuture::into_future(async move {
2047            let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2048            let response: Response<PurgeResponse> = self
2049                .context
2050                .request(request_subject, &self.inner)
2051                .map_err(|err| match err.kind() {
2052                    RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2053                    _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2054                })
2055                .await?;
2056
2057            match response {
2058                Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2059                Response::Ok(response) => Ok(response),
2060            }
2061        }))
2062    }
2063}
2064
2065#[derive(Deserialize, Debug)]
2066struct ConsumerPage {
2067    total: usize,
2068    consumers: Option<Vec<String>>,
2069}
2070
2071#[derive(Deserialize, Debug)]
2072struct ConsumerInfoPage {
2073    total: usize,
2074    consumers: Option<Vec<super::consumer::Info>>,
2075}
2076
2077type ConsumerNamesErrorKind = StreamsErrorKind;
2078type ConsumerNamesError = StreamsError;
2079type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2080
2081pub struct ConsumerNames {
2082    context: Context,
2083    stream: String,
2084    offset: usize,
2085    page_request: Option<PageRequest>,
2086    consumers: Vec<String>,
2087    done: bool,
2088}
2089
2090impl futures_util::Stream for ConsumerNames {
2091    type Item = Result<String, ConsumerNamesError>;
2092
2093    fn poll_next(
2094        mut self: Pin<&mut Self>,
2095        cx: &mut std::task::Context<'_>,
2096    ) -> std::task::Poll<Option<Self::Item>> {
2097        match self.page_request.as_mut() {
2098            Some(page) => match page.try_poll_unpin(cx) {
2099                std::task::Poll::Ready(page) => {
2100                    self.page_request = None;
2101                    let page = page.map_err(|err| {
2102                        ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2103                    })?;
2104
2105                    if let Some(consumers) = page.consumers {
2106                        self.offset += consumers.len();
2107                        self.consumers = consumers;
2108                        if self.offset >= page.total {
2109                            self.done = true;
2110                        }
2111                        match self.consumers.pop() {
2112                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2113                            None => Poll::Ready(None),
2114                        }
2115                    } else {
2116                        Poll::Ready(None)
2117                    }
2118                }
2119                std::task::Poll::Pending => std::task::Poll::Pending,
2120            },
2121            None => {
2122                if let Some(stream) = self.consumers.pop() {
2123                    Poll::Ready(Some(Ok(stream)))
2124                } else {
2125                    if self.done {
2126                        return Poll::Ready(None);
2127                    }
2128                    let context = self.context.clone();
2129                    let offset = self.offset;
2130                    let stream = self.stream.clone();
2131                    self.page_request = Some(Box::pin(async move {
2132                        match context
2133                            .request(
2134                                format!("CONSUMER.NAMES.{stream}"),
2135                                &json!({
2136                                    "offset": offset,
2137                                }),
2138                            )
2139                            .await?
2140                        {
2141                            Response::Err { error } => Err(RequestError::with_source(
2142                                super::context::RequestErrorKind::Other,
2143                                error,
2144                            )),
2145                            Response::Ok(page) => Ok(page),
2146                        }
2147                    }));
2148                    self.poll_next(cx)
2149                }
2150            }
2151        }
2152    }
2153}
2154
2155pub type ConsumersErrorKind = StreamsErrorKind;
2156pub type ConsumersError = StreamsError;
2157type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2158
2159pub struct Consumers {
2160    context: Context,
2161    stream: String,
2162    offset: usize,
2163    page_request: Option<PageInfoRequest>,
2164    consumers: Vec<super::consumer::Info>,
2165    done: bool,
2166}
2167
2168impl futures_util::Stream for Consumers {
2169    type Item = Result<super::consumer::Info, ConsumersError>;
2170
2171    fn poll_next(
2172        mut self: Pin<&mut Self>,
2173        cx: &mut std::task::Context<'_>,
2174    ) -> std::task::Poll<Option<Self::Item>> {
2175        match self.page_request.as_mut() {
2176            Some(page) => match page.try_poll_unpin(cx) {
2177                std::task::Poll::Ready(page) => {
2178                    self.page_request = None;
2179                    let page = page.map_err(|err| {
2180                        ConsumersError::with_source(ConsumersErrorKind::Other, err)
2181                    })?;
2182                    if let Some(consumers) = page.consumers {
2183                        self.offset += consumers.len();
2184                        self.consumers = consumers;
2185                        if self.offset >= page.total {
2186                            self.done = true;
2187                        }
2188                        match self.consumers.pop() {
2189                            Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2190                            None => Poll::Ready(None),
2191                        }
2192                    } else {
2193                        Poll::Ready(None)
2194                    }
2195                }
2196                std::task::Poll::Pending => std::task::Poll::Pending,
2197            },
2198            None => {
2199                if let Some(stream) = self.consumers.pop() {
2200                    Poll::Ready(Some(Ok(stream)))
2201                } else {
2202                    if self.done {
2203                        return Poll::Ready(None);
2204                    }
2205                    let context = self.context.clone();
2206                    let offset = self.offset;
2207                    let stream = self.stream.clone();
2208                    self.page_request = Some(Box::pin(async move {
2209                        match context
2210                            .request(
2211                                format!("CONSUMER.LIST.{stream}"),
2212                                &json!({
2213                                    "offset": offset,
2214                                }),
2215                            )
2216                            .await?
2217                        {
2218                            Response::Err { error } => Err(RequestError::with_source(
2219                                super::context::RequestErrorKind::Other,
2220                                error,
2221                            )),
2222                            Response::Ok(page) => Ok(page),
2223                        }
2224                    }));
2225                    self.poll_next(cx)
2226                }
2227            }
2228        }
2229    }
2230}
2231
2232#[derive(Clone, Debug, PartialEq)]
2233pub enum LastRawMessageErrorKind {
2234    NoMessageFound,
2235    InvalidSubject,
2236    JetStream(super::errors::Error),
2237    Other,
2238}
2239
2240impl Display for LastRawMessageErrorKind {
2241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2242        match self {
2243            Self::NoMessageFound => write!(f, "no message found"),
2244            Self::InvalidSubject => write!(f, "invalid subject"),
2245            Self::Other => write!(f, "failed to get last raw message"),
2246            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2247        }
2248    }
2249}
2250
2251pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2252pub type RawMessageErrorKind = LastRawMessageErrorKind;
2253pub type RawMessageError = LastRawMessageError;
2254
2255#[derive(Clone, Debug, PartialEq)]
2256pub enum ConsumerErrorKind {
2257    //TODO: get last should have timeout, which should be mapped here.
2258    TimedOut,
2259    Request,
2260    InvalidConsumerType,
2261    InvalidName,
2262    JetStream(super::errors::Error),
2263    Other,
2264}
2265
2266impl Display for ConsumerErrorKind {
2267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2268        match self {
2269            Self::TimedOut => write!(f, "timed out"),
2270            Self::Request => write!(f, "request failed"),
2271            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2272            Self::Other => write!(f, "consumer error"),
2273            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2274            Self::InvalidName => write!(f, "invalid consumer name"),
2275        }
2276    }
2277}
2278
2279pub type ConsumerError = Error<ConsumerErrorKind>;
2280
2281#[derive(Clone, Debug, PartialEq)]
2282pub enum ConsumerCreateStrictErrorKind {
2283    //TODO: get last should have timeout, which should be mapped here.
2284    TimedOut,
2285    Request,
2286    InvalidConsumerType,
2287    InvalidName,
2288    AlreadyExists,
2289    JetStream(super::errors::Error),
2290    Other,
2291}
2292
2293impl Display for ConsumerCreateStrictErrorKind {
2294    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2295        match self {
2296            Self::TimedOut => write!(f, "timed out"),
2297            Self::Request => write!(f, "request failed"),
2298            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2299            Self::Other => write!(f, "consumer error"),
2300            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2301            Self::InvalidName => write!(f, "invalid consumer name"),
2302            Self::AlreadyExists => write!(f, "consumer already exists"),
2303        }
2304    }
2305}
2306
2307pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2308
2309#[derive(Clone, Debug, PartialEq)]
2310pub enum ConsumerUpdateErrorKind {
2311    //TODO: get last should have timeout, which should be mapped here.
2312    TimedOut,
2313    Request,
2314    InvalidConsumerType,
2315    InvalidName,
2316    DoesNotExist,
2317    JetStream(super::errors::Error),
2318    Other,
2319}
2320
2321impl Display for ConsumerUpdateErrorKind {
2322    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2323        match self {
2324            Self::TimedOut => write!(f, "timed out"),
2325            Self::Request => write!(f, "request failed"),
2326            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2327            Self::Other => write!(f, "consumer error"),
2328            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2329            Self::InvalidName => write!(f, "invalid consumer name"),
2330            Self::DoesNotExist => write!(f, "consumer does not exist"),
2331        }
2332    }
2333}
2334
2335pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2336
2337impl From<super::errors::Error> for ConsumerError {
2338    fn from(err: super::errors::Error) -> Self {
2339        ConsumerError::new(ConsumerErrorKind::JetStream(err))
2340    }
2341}
2342impl From<super::errors::Error> for ConsumerCreateStrictError {
2343    fn from(err: super::errors::Error) -> Self {
2344        if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2345            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2346        } else {
2347            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2348        }
2349    }
2350}
2351impl From<super::errors::Error> for ConsumerUpdateError {
2352    fn from(err: super::errors::Error) -> Self {
2353        if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2354            ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2355        } else {
2356            ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2357        }
2358    }
2359}
2360impl From<ConsumerError> for ConsumerUpdateError {
2361    fn from(err: ConsumerError) -> Self {
2362        match err.kind() {
2363            ConsumerErrorKind::JetStream(err) => {
2364                if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2365                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2366                } else {
2367                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2368                }
2369            }
2370            ConsumerErrorKind::Request => {
2371                ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2372            }
2373            ConsumerErrorKind::TimedOut => {
2374                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2375            }
2376            ConsumerErrorKind::InvalidConsumerType => {
2377                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2378            }
2379            ConsumerErrorKind::InvalidName => {
2380                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2381            }
2382            ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2383        }
2384    }
2385}
2386
2387impl From<ConsumerError> for ConsumerCreateStrictError {
2388    fn from(err: ConsumerError) -> Self {
2389        match err.kind() {
2390            ConsumerErrorKind::JetStream(err) => {
2391                if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2392                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2393                } else {
2394                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2395                }
2396            }
2397            ConsumerErrorKind::Request => {
2398                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2399            }
2400            ConsumerErrorKind::TimedOut => {
2401                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2402            }
2403            ConsumerErrorKind::InvalidConsumerType => {
2404                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2405            }
2406            ConsumerErrorKind::InvalidName => {
2407                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2408            }
2409            ConsumerErrorKind::Other => {
2410                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2411            }
2412        }
2413    }
2414}
2415
2416impl From<super::context::RequestError> for ConsumerError {
2417    fn from(err: super::context::RequestError) -> Self {
2418        match err.kind() {
2419            RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2420            _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2421        }
2422    }
2423}
2424impl From<super::context::RequestError> for ConsumerUpdateError {
2425    fn from(err: super::context::RequestError) -> Self {
2426        match err.kind() {
2427            RequestErrorKind::TimedOut => {
2428                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2429            }
2430            _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2431        }
2432    }
2433}
2434impl From<super::context::RequestError> for ConsumerCreateStrictError {
2435    fn from(err: super::context::RequestError) -> Self {
2436        match err.kind() {
2437            RequestErrorKind::TimedOut => {
2438                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2439            }
2440            _ => {
2441                ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2442            }
2443        }
2444    }
2445}
2446
2447#[derive(Debug, Serialize, Default)]
2448pub struct DirectGetRequest {
2449    #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2450    sequence: Option<u64>,
2451    #[serde(rename = "last_by_subj", skip_serializing)]
2452    last_by_subject: Option<String>,
2453    #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2454    next_by_subject: Option<String>,
2455}
2456
2457/// Marker type indicating that headers should be included in the response.
2458pub struct WithHeaders;
2459
2460/// Marker type indicating that headers should be excluded from the response.
2461pub struct WithoutHeaders;
2462
2463/// Trait for converting a Message response into the appropriate return type.
2464trait DirectGetResponse: Sized {
2465    fn from_message(message: crate::Message) -> Result<Self, DirectGetError>;
2466}
2467
2468impl DirectGetResponse for StreamMessage {
2469    fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2470        StreamMessage::try_from(message).map_err(Into::into)
2471    }
2472}
2473
2474impl DirectGetResponse for StreamValue {
2475    fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2476        Ok(StreamValue {
2477            data: message.payload,
2478        })
2479    }
2480}
2481
2482pub struct DirectGetBuilder<T = WithHeaders> {
2483    context: Context,
2484    stream_name: String,
2485    request: DirectGetRequest,
2486    _phantom: std::marker::PhantomData<T>,
2487}
2488
2489impl DirectGetBuilder<WithHeaders> {
2490    fn new(context: Context, stream_name: String) -> DirectGetBuilder<WithHeaders> {
2491        DirectGetBuilder {
2492            context,
2493            stream_name,
2494            request: DirectGetRequest::default(),
2495            _phantom: std::marker::PhantomData,
2496        }
2497    }
2498}
2499
2500impl<T> DirectGetBuilder<T> {
2501    /// Internal method to send the direct get request and convert to the appropriate type.
2502    async fn send_internal<R: DirectGetResponse>(&self) -> Result<R, DirectGetError> {
2503        // When last_by_subject is used, the subject is embedded in the URL and we should send an empty payload
2504        // to match the NATS protocol expectation (similar to nats.go implementation)
2505        let payload = if self.request.last_by_subject.is_some() {
2506            Bytes::new()
2507        } else {
2508            serde_json::to_vec(&self.request).map(Bytes::from)?
2509        };
2510
2511        let request_subject = if let Some(ref subject) = self.request.last_by_subject {
2512            format!(
2513                "{}.DIRECT.GET.{}.{}",
2514                &self.context.prefix, &self.stream_name, subject
2515            )
2516        } else {
2517            format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.stream_name)
2518        };
2519
2520        let response = self
2521            .context
2522            .client
2523            .request(request_subject, payload)
2524            .await?;
2525
2526        // Check for error status
2527        if let Some(status) = response.status {
2528            if let Some(ref description) = response.description {
2529                match status {
2530                    StatusCode::NOT_FOUND => {
2531                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
2532                    }
2533                    // 408 is used in Direct Message for bad/empty payload.
2534                    StatusCode::TIMEOUT => {
2535                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
2536                    }
2537                    _ => {
2538                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
2539                            status,
2540                            description.to_string(),
2541                        )));
2542                    }
2543                }
2544            }
2545        }
2546
2547        R::from_message(response)
2548    }
2549
2550    /// Sets the sequence for the direct get request.
2551    pub fn sequence(mut self, seq: u64) -> Self {
2552        self.request.sequence = Some(seq);
2553        self
2554    }
2555
2556    /// Sets the last_by_subject for the direct get request.
2557    pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2558        self.request.last_by_subject = Some(subject.into());
2559        self
2560    }
2561
2562    /// Sets the next_by_subject for the direct get request.
2563    pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2564        self.request.next_by_subject = Some(subject.into());
2565        self
2566    }
2567}
2568
2569impl DirectGetBuilder<WithHeaders> {
2570    /// Sends the get request and returns a StreamMessage with headers.
2571    pub async fn send(self) -> Result<StreamMessage, DirectGetError> {
2572        self.send_internal::<StreamMessage>().await
2573    }
2574}
2575
2576impl DirectGetBuilder<WithoutHeaders> {
2577    /// Sends the get request and returns only the payload bytes without headers.
2578    pub async fn send(self) -> Result<StreamValue, DirectGetError> {
2579        self.send_internal::<StreamValue>().await
2580    }
2581}
2582
2583pub struct StreamValue {
2584    pub data: Bytes,
2585}
2586
2587#[derive(Debug, Serialize, Default)]
2588pub struct RawMessageRequest {
2589    #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2590    sequence: Option<u64>,
2591    #[serde(rename = "last_by_subj", skip_serializing_if = "Option::is_none")]
2592    last_by_subject: Option<String>,
2593    #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2594    next_by_subject: Option<String>,
2595}
2596
2597/// Trait for converting a RawMessage response into the appropriate return type.
2598trait RawMessageResponse: Sized {
2599    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError>;
2600}
2601
2602impl RawMessageResponse for StreamMessage {
2603    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2604        StreamMessage::try_from(message)
2605            .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2606    }
2607}
2608
2609impl RawMessageResponse for StreamValue {
2610    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2611        use base64::engine::general_purpose::STANDARD;
2612        use base64::Engine;
2613
2614        let decoded_payload = STANDARD.decode(message.payload).map_err(|err| {
2615            RawMessageError::with_source(
2616                RawMessageErrorKind::Other,
2617                Box::new(std::io::Error::other(err)),
2618            )
2619        })?;
2620
2621        Ok(StreamValue {
2622            data: decoded_payload.into(),
2623        })
2624    }
2625}
2626
2627pub struct RawMessageBuilder<T = WithHeaders> {
2628    context: Context,
2629    stream_name: String,
2630    request: RawMessageRequest,
2631    _phantom: std::marker::PhantomData<T>,
2632}
2633
2634impl RawMessageBuilder<WithHeaders> {
2635    fn new(context: Context, stream_name: String) -> Self {
2636        RawMessageBuilder {
2637            context,
2638            stream_name,
2639            request: RawMessageRequest::default(),
2640            _phantom: std::marker::PhantomData,
2641        }
2642    }
2643}
2644
2645impl<T> RawMessageBuilder<T> {
2646    /// Internal method to send the raw message request and convert to the appropriate type.
2647    async fn send_internal<R: RawMessageResponse>(&self) -> Result<R, RawMessageError> {
2648        // Validate subjects
2649        for subject in [&self.request.last_by_subject, &self.request.next_by_subject]
2650            .into_iter()
2651            .flatten()
2652        {
2653            if !is_valid_subject(subject) {
2654                return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
2655            }
2656        }
2657
2658        let subject = format!("STREAM.MSG.GET.{}", &self.stream_name);
2659
2660        let response: Response<GetRawMessage> = self
2661            .context
2662            .request(subject, &self.request)
2663            .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2664            .await?;
2665
2666        match response {
2667            Response::Err { error } => {
2668                if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
2669                    Err(RawMessageError::new(RawMessageErrorKind::NoMessageFound))
2670                } else {
2671                    Err(RawMessageError::new(RawMessageErrorKind::JetStream(error)))
2672                }
2673            }
2674            Response::Ok(value) => R::from_raw_message(value.message),
2675        }
2676    }
2677
2678    /// Sets the sequence for the raw message request.
2679    pub fn sequence(mut self, seq: u64) -> Self {
2680        self.request.sequence = Some(seq);
2681        self
2682    }
2683
2684    /// Sets the last_by_subject for the raw message request.
2685    pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2686        self.request.last_by_subject = Some(subject.into());
2687        self
2688    }
2689
2690    /// Sets the next_by_subject for the raw message request.
2691    pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2692        self.request.next_by_subject = Some(subject.into());
2693        self
2694    }
2695}
2696
2697impl RawMessageBuilder<WithHeaders> {
2698    /// Sends the raw message request and returns a StreamMessage with headers.
2699    pub async fn send(self) -> Result<StreamMessage, RawMessageError> {
2700        self.send_internal::<StreamMessage>().await
2701    }
2702}
2703
2704impl RawMessageBuilder<WithoutHeaders> {
2705    /// Sends the raw message request and returns only the payload as StreamValue.
2706    pub async fn send(self) -> Result<StreamValue, RawMessageError> {
2707        self.send_internal::<StreamValue>().await
2708    }
2709}
2710
2711#[cfg(test)]
2712mod tests {
2713    use super::*;
2714
2715    #[test]
2716    fn consumer_limits_de() {
2717        let config = Config {
2718            ..Default::default()
2719        };
2720
2721        let roundtrip: Config = {
2722            let ser = serde_json::to_string(&config).unwrap();
2723            serde_json::from_str(&ser).unwrap()
2724        };
2725        assert_eq!(config, roundtrip);
2726    }
2727}