Skip to main content

async_nats/jetstream/
stream.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on a [Stream], create/delete/update [Consumer].
15
16use std::{
17    collections::{self, HashMap},
18    fmt::{self, Debug, Display},
19    future::IntoFuture,
20    io::{self},
21    pin::Pin,
22    str::FromStr,
23    task::Poll,
24    time::Duration,
25};
26
27use crate::{
28    error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39    consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40    context::{
41        ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42        StreamsErrorKind,
43    },
44    errors::ErrorCode,
45    is_valid_name,
46    message::{StreamMessage, StreamMessageError},
47    response::Response,
48    Context,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55    NotFound,
56    InvalidSubject,
57    TimedOut,
58    Request,
59    ErrorResponse(StatusCode, String),
60    Other,
61}
62
63impl Display for DirectGetErrorKind {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        match self {
66            Self::InvalidSubject => write!(f, "invalid subject"),
67            Self::NotFound => write!(f, "message not found"),
68            Self::ErrorResponse(status, description) => {
69                write!(f, "unable to get message: {status} {description}")
70            }
71            Self::Other => write!(f, "error getting message"),
72            Self::TimedOut => write!(f, "timed out"),
73            Self::Request => write!(f, "request failed"),
74        }
75    }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81    fn from(err: crate::RequestError) -> Self {
82        match err.kind() {
83            crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84            crate::RequestErrorKind::NoResponders => {
85                DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86                    StatusCode::NO_RESPONDERS,
87                    "no responders".to_string(),
88                ))
89            }
90            crate::RequestErrorKind::InvalidSubject
91            | crate::RequestErrorKind::MaxPayloadExceeded
92            | crate::RequestErrorKind::Other => {
93                DirectGetError::with_source(DirectGetErrorKind::Other, err)
94            }
95        }
96    }
97}
98
99impl From<serde_json::Error> for DirectGetError {
100    fn from(err: serde_json::Error) -> Self {
101        DirectGetError::with_source(DirectGetErrorKind::Other, err)
102    }
103}
104
105impl From<StreamMessageError> for DirectGetError {
106    fn from(err: StreamMessageError) -> Self {
107        DirectGetError::with_source(DirectGetErrorKind::Other, err)
108    }
109}
110
111#[derive(Clone, Debug, PartialEq)]
112pub enum DeleteMessageErrorKind {
113    Request,
114    TimedOut,
115    JetStream(super::errors::Error),
116}
117
118impl Display for DeleteMessageErrorKind {
119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120        match self {
121            Self::Request => write!(f, "request failed"),
122            Self::TimedOut => write!(f, "timed out"),
123            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
124        }
125    }
126}
127
128pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
129
130/// Handle to operations that can be performed on a `Stream`.
131/// It's generic over the type of `info` field to allow `Stream` with or without
132/// info contents.
133#[derive(Debug, Clone)]
134pub struct Stream<T = Info> {
135    pub(crate) info: T,
136    pub(crate) context: Context,
137    pub(crate) name: String,
138}
139
140impl Stream<Info> {
141    /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
142    /// [Stream] and returns it.
143    ///
144    /// # Examples
145    ///
146    /// ```no_run
147    /// # #[tokio::main]
148    /// # async fn main() -> Result<(), async_nats::Error> {
149    /// let client = async_nats::connect("localhost:4222").await?;
150    /// let jetstream = async_nats::jetstream::new(client);
151    ///
152    /// let mut stream = jetstream.get_stream("events").await?;
153    ///
154    /// let info = stream.info().await?;
155    /// # Ok(())
156    /// # }
157    /// ```
158    pub async fn info(&mut self) -> Result<&Info, InfoError> {
159        let subject = format!("STREAM.INFO.{}", self.info.config.name);
160
161        match self.context.request(subject, &json!({})).await? {
162            Response::Ok::<Info>(info) => {
163                self.info = info;
164                Ok(&self.info)
165            }
166            Response::Err { error } => Err(error.into()),
167        }
168    }
169
170    /// Returns cached [Info] for the [Stream].
171    /// Cache is either from initial creation/retrieval of the [Stream] or last call to
172    /// [Stream::info].
173    ///
174    /// # Examples
175    ///
176    /// ```no_run
177    /// # #[tokio::main]
178    /// # async fn main() -> Result<(), async_nats::Error> {
179    /// let client = async_nats::connect("localhost:4222").await?;
180    /// let jetstream = async_nats::jetstream::new(client);
181    ///
182    /// let stream = jetstream.get_stream("events").await?;
183    ///
184    /// let info = stream.cached_info();
185    /// # Ok(())
186    /// # }
187    /// ```
188    pub fn cached_info(&self) -> &Info {
189        &self.info
190    }
191}
192
193impl<I> Stream<I> {
194    /// Retrieves `info` about [Stream] from the server. Does not update the cache.
195    /// Can be used on Stream retrieved by [Context::get_stream_no_info]
196    pub async fn get_info(&self) -> Result<Info, InfoError> {
197        let subject = format!("STREAM.INFO.{}", self.name);
198
199        match self.context.request(subject, &json!({})).await? {
200            Response::Ok::<Info>(info) => Ok(info),
201            Response::Err { error } => Err(error.into()),
202        }
203    }
204
205    /// Retrieves [[Info]] from the server and returns a [[futures_util::Stream]] that allows
206    /// iterating over all subjects in the stream fetched via paged API.
207    ///
208    /// # Examples
209    ///
210    /// ```no_run
211    /// # #[tokio::main]
212    /// # async fn main() -> Result<(), async_nats::Error> {
213    /// use futures_util::TryStreamExt;
214    /// let client = async_nats::connect("localhost:4222").await?;
215    /// let jetstream = async_nats::jetstream::new(client);
216    ///
217    /// let mut stream = jetstream.get_stream("events").await?;
218    ///
219    /// let mut info = stream.info_with_subjects("events.>").await?;
220    ///
221    /// while let Some((subject, count)) = info.try_next().await? {
222    ///     println!("Subject: {} count: {}", subject, count);
223    /// }
224    /// # Ok(())
225    /// # }
226    /// ```
227    pub async fn info_with_subjects<F: AsRef<str>>(
228        &self,
229        subjects_filter: F,
230    ) -> Result<InfoWithSubjects, InfoError> {
231        let subjects_filter = subjects_filter.as_ref().to_string();
232        // TODO: validate the subject and decide if this should be a `Subject`
233        let info = stream_info_with_details(
234            self.context.clone(),
235            self.name.clone(),
236            0,
237            false,
238            subjects_filter.clone(),
239        )
240        .await?;
241
242        Ok(InfoWithSubjects::new(
243            self.context.clone(),
244            info,
245            subjects_filter,
246        ))
247    }
248
249    /// Creates a builder that allows to customize `Stream::Info`.
250    ///
251    /// # Examples
252    /// ```no_run
253    /// # #[tokio::main]
254    /// # async fn main() -> Result<(), async_nats::Error> {
255    /// use futures_util::TryStreamExt;
256    /// let client = async_nats::connect("localhost:4222").await?;
257    /// let jetstream = async_nats::jetstream::new(client);
258    ///
259    /// let mut stream = jetstream.get_stream("events").await?;
260    ///
261    /// let mut info = stream
262    ///     .info_builder()
263    ///     .with_deleted(true)
264    ///     .subjects("events.>")
265    ///     .fetch()
266    ///     .await?;
267    ///
268    /// while let Some((subject, count)) = info.try_next().await? {
269    ///     println!("Subject: {} count: {}", subject, count);
270    /// }
271    /// # Ok(())
272    /// # }
273    /// ```
274    pub fn info_builder(&self) -> StreamInfoBuilder {
275        StreamInfoBuilder::new(self.context.clone(), self.name.clone())
276    }
277
278    /// Creates a builder for direct get operations.
279    ///
280    /// Allows for more control over direct get requests.
281    ///
282    /// # Examples
283    ///
284    /// ```no_run
285    /// # #[tokio::main]
286    /// # async fn main() -> Result<(), async_nats::Error> {
287    /// let client = async_nats::connect("demo.nats.io").await?;
288    /// let jetstream = async_nats::jetstream::new(client);
289    ///
290    /// let stream = jetstream.get_stream("events").await?;
291    ///
292    /// // Get message without headers
293    /// let message = stream.direct_get_builder().sequence(100).send().await?;
294    /// # Ok(())
295    /// # }
296    /// ```
297    pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders> {
298        DirectGetBuilder::new(self.context.clone(), self.name.clone())
299    }
300
301    /// Gets next message for a [Stream].
302    ///
303    /// Requires a [Stream] with `allow_direct` set to `true`.
304    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
305    /// from any replica member. This means read after write is possible,
306    /// as that given replica might not yet catch up with the leader.
307    ///
308    /// # Examples
309    ///
310    /// ```no_run
311    /// # #[tokio::main]
312    /// # async fn main() -> Result<(), async_nats::Error> {
313    /// let client = async_nats::connect("demo.nats.io").await?;
314    /// let jetstream = async_nats::jetstream::new(client);
315    ///
316    /// let stream = jetstream
317    ///     .create_stream(async_nats::jetstream::stream::Config {
318    ///         name: "events".to_string(),
319    ///         subjects: vec!["events.>".to_string()],
320    ///         allow_direct: true,
321    ///         ..Default::default()
322    ///     })
323    ///     .await?;
324    ///
325    /// jetstream.publish("events.data", "data".into()).await?;
326    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
327    ///
328    /// let message = stream
329    ///     .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
330    ///     .await?;
331    ///
332    /// # Ok(())
333    /// # }
334    /// ```
335    pub async fn direct_get_next_for_subject<T: Into<String>>(
336        &self,
337        subject: T,
338        sequence: Option<u64>,
339    ) -> Result<StreamMessage, DirectGetError> {
340        let subject_str = subject.into();
341        if !is_valid_subject(&subject_str) {
342            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
343        }
344
345        let mut builder = self.direct_get_builder().next_by_subject(subject_str);
346        if let Some(seq) = sequence {
347            builder = builder.sequence(seq);
348        }
349
350        builder.send().await
351    }
352
353    /// Gets first message from [Stream].
354    ///
355    /// Requires a [Stream] with `allow_direct` set to `true`.
356    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
357    /// from any replica member. This means read after write is possible,
358    /// as that given replica might not yet catch up with the leader.
359    ///
360    /// # Examples
361    ///
362    /// ```no_run
363    /// # #[tokio::main]
364    /// # async fn main() -> Result<(), async_nats::Error> {
365    /// let client = async_nats::connect("demo.nats.io").await?;
366    /// let jetstream = async_nats::jetstream::new(client);
367    ///
368    /// let stream = jetstream
369    ///     .create_stream(async_nats::jetstream::stream::Config {
370    ///         name: "events".to_string(),
371    ///         subjects: vec!["events.>".to_string()],
372    ///         allow_direct: true,
373    ///         ..Default::default()
374    ///     })
375    ///     .await?;
376    ///
377    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
378    ///
379    /// let message = stream.direct_get_first_for_subject("events.data").await?;
380    ///
381    /// # Ok(())
382    /// # }
383    /// ```
384    pub async fn direct_get_first_for_subject<T: Into<String>>(
385        &self,
386        subject: T,
387    ) -> Result<StreamMessage, DirectGetError> {
388        let subject_str = subject.into();
389        if !is_valid_subject(&subject_str) {
390            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
391        }
392
393        self.direct_get_builder()
394            .next_by_subject(subject_str)
395            .send()
396            .await
397    }
398
399    /// Gets message from [Stream] with given `sequence id`.
400    ///
401    /// Requires a [Stream] with `allow_direct` set to `true`.
402    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
403    /// from any replica member. This means read after write is possible,
404    /// as that given replica might not yet catch up with the leader.
405    ///
406    /// # Examples
407    ///
408    /// ```no_run
409    /// # #[tokio::main]
410    /// # async fn main() -> Result<(), async_nats::Error> {
411    /// let client = async_nats::connect("demo.nats.io").await?;
412    /// let jetstream = async_nats::jetstream::new(client);
413    ///
414    /// let stream = jetstream
415    ///     .create_stream(async_nats::jetstream::stream::Config {
416    ///         name: "events".to_string(),
417    ///         subjects: vec!["events.>".to_string()],
418    ///         allow_direct: true,
419    ///         ..Default::default()
420    ///     })
421    ///     .await?;
422    ///
423    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
424    ///
425    /// let message = stream.direct_get(pub_ack.await?.sequence).await?;
426    ///
427    /// # Ok(())
428    /// # }
429    /// ```
430    pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
431        self.direct_get_builder().sequence(sequence).send().await
432    }
433
434    /// Gets last message for a given `subject`.
435    ///
436    /// Requires a [Stream] with `allow_direct` set to `true`.
437    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
438    /// from any replica member. This means read after write is possible,
439    /// as that given replica might not yet catch up with the leader.
440    ///
441    /// # Examples
442    ///
443    /// ```no_run
444    /// # #[tokio::main]
445    /// # async fn main() -> Result<(), async_nats::Error> {
446    /// let client = async_nats::connect("demo.nats.io").await?;
447    /// let jetstream = async_nats::jetstream::new(client);
448    ///
449    /// let stream = jetstream
450    ///     .create_stream(async_nats::jetstream::stream::Config {
451    ///         name: "events".to_string(),
452    ///         subjects: vec!["events.>".to_string()],
453    ///         allow_direct: true,
454    ///         ..Default::default()
455    ///     })
456    ///     .await?;
457    ///
458    /// jetstream.publish("events.data", "data".into()).await?;
459    ///
460    /// let message = stream.direct_get_last_for_subject("events.data").await?;
461    ///
462    /// # Ok(())
463    /// # }
464    /// ```
465    pub async fn direct_get_last_for_subject<T: Into<String>>(
466        &self,
467        subject: T,
468    ) -> Result<StreamMessage, DirectGetError> {
469        self.direct_get_builder()
470            .last_by_subject(subject)
471            .send()
472            .await
473    }
474    /// Creates a builder for retrieving messages from the stream with flexible options.
475    /// Raw message methods retrieve messages directly from the stream leader instead
476    /// of a replica. This should be used with care, as it can put additional load on the
477    /// stream leader.
478    ///
479    /// # Examples
480    ///
481    /// ```no_run
482    /// # #[tokio::main]
483    /// # async fn main() -> Result<(), async_nats::Error> {
484    /// let client = async_nats::connect("localhost:4222").await?;
485    /// let context = async_nats::jetstream::new(client);
486    /// let stream = context.get_stream("events").await?;
487    ///
488    /// // Get message without headers
489    /// let value = stream.raw_message_builder().sequence(100).send().await?;
490    /// # Ok(())
491    /// # }
492    /// ```
493    pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders> {
494        RawMessageBuilder::new(self.context.clone(), self.name.clone())
495    }
496
497    /// Get a raw message from the stream for a given stream sequence.
498    /// This low-level API always reaches stream leader.
499    /// This should be discouraged in favor of using [Stream::direct_get].
500    ///
501    /// # Examples
502    ///
503    /// ```no_run
504    /// #[tokio::main]
505    /// # async fn main() -> Result<(), async_nats::Error> {
506    /// use futures_util::StreamExt;
507    /// use futures_util::TryStreamExt;
508    ///
509    /// let client = async_nats::connect("localhost:4222").await?;
510    /// let context = async_nats::jetstream::new(client);
511    ///
512    /// let stream = context
513    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
514    ///         name: "events".to_string(),
515    ///         max_messages: 10_000,
516    ///         ..Default::default()
517    ///     })
518    ///     .await?;
519    ///
520    /// let publish_ack = context.publish("events", "data".into()).await?;
521    /// let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
522    /// println!("Retrieved raw message {:?}", raw_message);
523    /// # Ok(())
524    /// # }
525    /// ```
526    pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
527        self.raw_message_builder().sequence(sequence).send().await
528    }
529
530    /// Get a first message from the stream for a given subject starting from provided sequence.
531    /// This low-level API always reaches stream leader.
532    /// This should be discouraged in favor of using [Stream::direct_get_first_for_subject].
533    ///
534    /// # Examples
535    ///
536    /// ```no_run
537    /// #[tokio::main]
538    /// # async fn main() -> Result<(), async_nats::Error> {
539    /// use futures_util::StreamExt;
540    /// use futures_util::TryStreamExt;
541    ///
542    /// let client = async_nats::connect("localhost:4222").await?;
543    /// let context = async_nats::jetstream::new(client);
544    /// let stream = context.get_stream_no_info("events").await?;
545    ///
546    /// let raw_message = stream
547    ///     .get_first_raw_message_by_subject("events.created", 10)
548    ///     .await?;
549    /// println!("Retrieved raw message {:?}", raw_message);
550    /// # Ok(())
551    /// # }
552    /// ```
553    pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
554        &self,
555        subject: T,
556        sequence: u64,
557    ) -> Result<StreamMessage, RawMessageError> {
558        self.raw_message_builder()
559            .sequence(sequence)
560            .next_by_subject(subject.as_ref().to_string())
561            .send()
562            .await
563    }
564
565    /// Get a next message from the stream for a given subject.
566    /// This low-level API always reaches stream leader.
567    /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
568    ///
569    /// # Examples
570    ///
571    /// ```no_run
572    /// #[tokio::main]
573    /// # async fn main() -> Result<(), async_nats::Error> {
574    /// use futures_util::StreamExt;
575    /// use futures_util::TryStreamExt;
576    ///
577    /// let client = async_nats::connect("localhost:4222").await?;
578    /// let context = async_nats::jetstream::new(client);
579    /// let stream = context.get_stream_no_info("events").await?;
580    ///
581    /// let raw_message = stream
582    ///     .get_next_raw_message_by_subject("events.created")
583    ///     .await?;
584    /// println!("Retrieved raw message {:?}", raw_message);
585    /// # Ok(())
586    /// # }
587    /// ```
588    pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
589        &self,
590        subject: T,
591    ) -> Result<StreamMessage, RawMessageError> {
592        self.raw_message_builder()
593            .next_by_subject(subject.as_ref().to_string())
594            .send()
595            .await
596    }
597
598    /// Get a last message from the stream for a given subject.
599    /// This low-level API always reaches stream leader.
600    /// This should be discouraged in favor of using [Stream::direct_get_last_for_subject].
601    ///
602    /// # Examples
603    ///
604    /// ```no_run
605    /// #[tokio::main]
606    /// # async fn main() -> Result<(), async_nats::Error> {
607    /// use futures_util::StreamExt;
608    /// use futures_util::TryStreamExt;
609    ///
610    /// let client = async_nats::connect("localhost:4222").await?;
611    /// let context = async_nats::jetstream::new(client);
612    /// let stream = context.get_stream_no_info("events").await?;
613    ///
614    /// let raw_message = stream
615    ///     .get_last_raw_message_by_subject("events.created")
616    ///     .await?;
617    /// println!("Retrieved raw message {:?}", raw_message);
618    /// # Ok(())
619    /// # }
620    /// ```
621    pub async fn get_last_raw_message_by_subject(
622        &self,
623        stream_subject: &str,
624    ) -> Result<StreamMessage, LastRawMessageError> {
625        self.raw_message_builder()
626            .last_by_subject(stream_subject.to_string())
627            .send()
628            .await
629    }
630
631    /// Delete a message from the stream.
632    ///
633    /// # Examples
634    ///
635    /// ```no_run
636    /// # #[tokio::main]
637    /// # async fn main() -> Result<(), async_nats::Error> {
638    /// let client = async_nats::connect("localhost:4222").await?;
639    /// let context = async_nats::jetstream::new(client);
640    ///
641    /// let stream = context
642    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
643    ///         name: "events".to_string(),
644    ///         max_messages: 10_000,
645    ///         ..Default::default()
646    ///     })
647    ///     .await?;
648    ///
649    /// let publish_ack = context.publish("events", "data".into()).await?;
650    /// stream.delete_message(publish_ack.await?.sequence).await?;
651    /// # Ok(())
652    /// # }
653    /// ```
654    pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
655        let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
656        let payload = json!({
657            "seq": sequence,
658        });
659
660        let response: Response<DeleteStatus> = self
661            .context
662            .request(subject, &payload)
663            .map_err(|err| match err.kind() {
664                RequestErrorKind::TimedOut => {
665                    DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
666                }
667                _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
668            })
669            .await?;
670
671        match response {
672            Response::Err { error } => Err(DeleteMessageError::new(
673                DeleteMessageErrorKind::JetStream(error),
674            )),
675            Response::Ok(value) => Ok(value.success),
676        }
677    }
678
679    /// Purge `Stream` messages.
680    ///
681    /// # Examples
682    ///
683    /// ```no_run
684    /// # #[tokio::main]
685    /// # async fn main() -> Result<(), async_nats::Error> {
686    /// let client = async_nats::connect("demo.nats.io").await?;
687    /// let jetstream = async_nats::jetstream::new(client);
688    ///
689    /// let stream = jetstream.get_stream("events").await?;
690    /// stream.purge().await?;
691    /// # Ok(())
692    /// # }
693    /// ```
694    pub fn purge(&self) -> Purge<No, No> {
695        Purge::build(self)
696    }
697
698    /// Purge `Stream` messages for a matching subject.
699    ///
700    /// # Examples
701    ///
702    /// ```no_run
703    /// # #[tokio::main]
704    /// # #[allow(deprecated)]
705    /// # async fn main() -> Result<(), async_nats::Error> {
706    /// let client = async_nats::connect("demo.nats.io").await?;
707    /// let jetstream = async_nats::jetstream::new(client);
708    ///
709    /// let stream = jetstream.get_stream("events").await?;
710    /// stream.purge_subject("data").await?;
711    /// # Ok(())
712    /// # }
713    /// ```
714    #[deprecated(
715        since = "0.25.0",
716        note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
717    )]
718    pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
719    where
720        T: Into<String>,
721    {
722        self.purge().filter(subject).await
723    }
724
725    /// Create or update `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
726    /// returns the info from the server about created [Consumer]
727    /// If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
728    ///
729    /// # Examples
730    ///
731    /// ```no_run
732    /// # #[tokio::main]
733    /// # async fn main() -> Result<(), async_nats::Error> {
734    /// use async_nats::jetstream::consumer;
735    /// let client = async_nats::connect("localhost:4222").await?;
736    /// let jetstream = async_nats::jetstream::new(client);
737    ///
738    /// let stream = jetstream.get_stream("events").await?;
739    /// let info = stream
740    ///     .create_consumer(consumer::pull::Config {
741    ///         durable_name: Some("pull".to_string()),
742    ///         ..Default::default()
743    ///     })
744    ///     .await?;
745    /// # Ok(())
746    /// # }
747    /// ```
748    pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
749        &self,
750        config: C,
751    ) -> Result<Consumer<C>, ConsumerError> {
752        self.context
753            .create_consumer_on_stream(config, self.name.clone())
754            .await
755    }
756
757    /// Update an existing consumer.
758    /// This call will fail if the consumer does not exist.
759    /// returns the info from the server about updated [Consumer].
760    ///
761    /// # Examples
762    ///
763    /// ```no_run
764    /// # #[tokio::main]
765    /// # async fn main() -> Result<(), async_nats::Error> {
766    /// use async_nats::jetstream::consumer;
767    /// let client = async_nats::connect("localhost:4222").await?;
768    /// let jetstream = async_nats::jetstream::new(client);
769    ///
770    /// let stream = jetstream.get_stream("events").await?;
771    /// let info = stream
772    ///     .update_consumer(consumer::pull::Config {
773    ///         durable_name: Some("pull".to_string()),
774    ///         ..Default::default()
775    ///     })
776    ///     .await?;
777    /// # Ok(())
778    /// # }
779    /// ```
780    #[cfg(feature = "server_2_10")]
781    pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
782        &self,
783        config: C,
784    ) -> Result<Consumer<C>, ConsumerUpdateError> {
785        self.context
786            .update_consumer_on_stream(config, self.name.clone())
787            .await
788    }
789
790    /// Create consumer, but only if it does not exist or the existing config is exactly
791    /// the same.
792    /// This method will fail if consumer is already present with different config.
793    /// returns the info from the server about created [Consumer].
794    ///
795    /// # Examples
796    ///
797    /// ```no_run
798    /// # #[tokio::main]
799    /// # async fn main() -> Result<(), async_nats::Error> {
800    /// use async_nats::jetstream::consumer;
801    /// let client = async_nats::connect("localhost:4222").await?;
802    /// let jetstream = async_nats::jetstream::new(client);
803    ///
804    /// let stream = jetstream.get_stream("events").await?;
805    /// let info = stream
806    ///     .create_consumer_strict(consumer::pull::Config {
807    ///         durable_name: Some("pull".to_string()),
808    ///         ..Default::default()
809    ///     })
810    ///     .await?;
811    /// # Ok(())
812    /// # }
813    /// ```
814    #[cfg(feature = "server_2_10")]
815    pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
816        &self,
817        config: C,
818    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
819        self.context
820            .create_consumer_strict_on_stream(config, self.name.clone())
821            .await
822    }
823
824    /// Retrieve [Info] about [Consumer] from the server.
825    ///
826    /// # Examples
827    ///
828    /// ```no_run
829    /// # #[tokio::main]
830    /// # async fn main() -> Result<(), async_nats::Error> {
831    /// use async_nats::jetstream::consumer;
832    /// let client = async_nats::connect("localhost:4222").await?;
833    /// let jetstream = async_nats::jetstream::new(client);
834    ///
835    /// let stream = jetstream.get_stream("events").await?;
836    /// let info = stream.consumer_info("pull").await?;
837    /// # Ok(())
838    /// # }
839    /// ```
840    pub async fn consumer_info<T: AsRef<str>>(
841        &self,
842        name: T,
843    ) -> Result<consumer::Info, ConsumerInfoError> {
844        let name = name.as_ref();
845
846        if !is_valid_name(name) {
847            return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
848        }
849
850        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
851
852        match self.context.request(subject, &json!({})).await? {
853            Response::Ok(info) => Ok(info),
854            Response::Err { error } => Err(error.into()),
855        }
856    }
857
858    /// Get [Consumer] from the the server. [Consumer] iterators can be used to retrieve
859    /// [Messages][crate::jetstream::Message] for a given [Consumer].
860    ///
861    /// # Examples
862    ///
863    /// ```no_run
864    /// # #[tokio::main]
865    /// # async fn main() -> Result<(), async_nats::Error> {
866    /// use async_nats::jetstream::consumer;
867    /// use futures_util::StreamExt;
868    /// let client = async_nats::connect("localhost:4222").await?;
869    /// let jetstream = async_nats::jetstream::new(client);
870    ///
871    /// let stream = jetstream.get_stream("events").await?;
872    /// let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
873    /// # Ok(())
874    /// # }
875    /// ```
876    pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
877        &self,
878        name: &str,
879    ) -> Result<Consumer<T>, crate::Error> {
880        let info = self.consumer_info(name).await?;
881
882        Ok(Consumer::new(
883            T::try_from_consumer_config(info.config.clone())?,
884            info,
885            self.context.clone(),
886        ))
887    }
888
889    /// Create a [Consumer] with the given configuration if it is not present on the server. Returns a handle to the [Consumer].
890    ///
891    /// Note: This does not validate if the [Consumer] on the server is compatible with the configuration passed in except Push/Pull compatibility.
892    ///
893    /// # Examples
894    ///
895    /// ```no_run
896    /// # #[tokio::main]
897    /// # async fn main() -> Result<(), async_nats::Error> {
898    /// use async_nats::jetstream::consumer;
899    /// use futures_util::StreamExt;
900    /// let client = async_nats::connect("localhost:4222").await?;
901    /// let jetstream = async_nats::jetstream::new(client);
902    ///
903    /// let stream = jetstream.get_stream("events").await?;
904    /// let consumer = stream
905    ///     .get_or_create_consumer(
906    ///         "pull",
907    ///         consumer::pull::Config {
908    ///             durable_name: Some("pull".to_string()),
909    ///             ..Default::default()
910    ///         },
911    ///     )
912    ///     .await?;
913    /// # Ok(())
914    /// # }
915    /// ```
916    pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
917        &self,
918        name: &str,
919        config: T,
920    ) -> Result<Consumer<T>, ConsumerError> {
921        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
922
923        match self.context.request(subject, &json!({})).await? {
924            Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
925            Response::Err { error } => Err(error.into()),
926            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
927                T::try_from_consumer_config(info.config.clone()).map_err(|err| {
928                    ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
929                })?,
930                info,
931                self.context.clone(),
932            )),
933        }
934    }
935
936    /// Delete a [Consumer] from the server.
937    ///
938    /// # Examples
939    ///
940    /// ```no_run
941    /// # #[tokio::main]
942    /// # async fn main() -> Result<(), async_nats::Error> {
943    /// use async_nats::jetstream::consumer;
944    /// use futures_util::StreamExt;
945    /// let client = async_nats::connect("localhost:4222").await?;
946    /// let jetstream = async_nats::jetstream::new(client);
947    ///
948    /// jetstream
949    ///     .get_stream("events")
950    ///     .await?
951    ///     .delete_consumer("pull")
952    ///     .await?;
953    /// # Ok(())
954    /// # }
955    /// ```
956    pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
957        let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
958
959        match self.context.request(subject, &json!({})).await? {
960            Response::Ok(delete_status) => Ok(delete_status),
961            Response::Err { error } => Err(error.into()),
962        }
963    }
964
965    /// Pause a [Consumer] until the given time.
966    /// It will not deliver any messages to clients during that time.
967    ///
968    /// # Examples
969    ///
970    /// ```no_run
971    /// # #[tokio::main]
972    /// # async fn main() -> Result<(), async_nats::Error> {
973    /// use async_nats::jetstream::consumer;
974    /// use futures_util::StreamExt;
975    /// let client = async_nats::connect("localhost:4222").await?;
976    /// let jetstream = async_nats::jetstream::new(client);
977    /// let pause_until =
978    ///     time::OffsetDateTime::now_utc().saturating_add(time::Duration::seconds_f32(10.0));
979    ///
980    /// jetstream
981    ///     .get_stream("events")
982    ///     .await?
983    ///     .pause_consumer("my_consumer", pause_until)
984    ///     .await?;
985    /// # Ok(())
986    /// # }
987    /// ```
988    #[cfg(feature = "server_2_11")]
989    pub async fn pause_consumer(
990        &self,
991        name: &str,
992        pause_until: OffsetDateTime,
993    ) -> Result<PauseResponse, ConsumerError> {
994        self.request_pause_consumer(name, Some(pause_until)).await
995    }
996
997    /// Resume a paused [Consumer].
998    ///
999    /// # Examples
1000    ///
1001    /// ```no_run
1002    /// # #[tokio::main]
1003    /// # async fn main() -> Result<(), async_nats::Error> {
1004    /// use async_nats::jetstream::consumer;
1005    /// use futures_util::StreamExt;
1006    /// let client = async_nats::connect("localhost:4222").await?;
1007    /// let jetstream = async_nats::jetstream::new(client);
1008    ///
1009    /// jetstream
1010    ///     .get_stream("events")
1011    ///     .await?
1012    ///     .resume_consumer("my_consumer")
1013    ///     .await?;
1014    /// # Ok(())
1015    /// # }
1016    /// ```
1017    #[cfg(feature = "server_2_11")]
1018    pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1019        self.request_pause_consumer(name, None).await
1020    }
1021
1022    #[cfg(feature = "server_2_11")]
1023    async fn request_pause_consumer(
1024        &self,
1025        name: &str,
1026        pause_until: Option<OffsetDateTime>,
1027    ) -> Result<PauseResponse, ConsumerError> {
1028        let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1029        let payload = &PauseResumeConsumerRequest { pause_until };
1030
1031        match self.context.request(subject, payload).await? {
1032            Response::Ok::<PauseResponse>(resp) => Ok(resp),
1033            Response::Err { error } => Err(error.into()),
1034        }
1035    }
1036
1037    /// Reset a [Consumer]'s delivery state (ADR-60).
1038    ///
1039    /// `seq` semantics:
1040    /// - `None` (or `Some(0)`): reset back to the consumer's ack floor.
1041    ///   Pending and redelivered messages are cleared; the ack-floor stream
1042    ///   sequence is left where it was.
1043    /// - `Some(n)` with `n > 0`: ack-floor stream sequence is set to one
1044    ///   below `n`; the next delivered message will have a stream sequence
1045    ///   of at least `n`.
1046    ///
1047    /// Only valid on consumers with
1048    /// [`DeliverPolicy::All`][crate::jetstream::consumer::DeliverPolicy::All],
1049    /// [`DeliverPolicy::ByStartSequence`][crate::jetstream::consumer::DeliverPolicy::ByStartSequence],
1050    /// or
1051    /// [`DeliverPolicy::ByStartTime`][crate::jetstream::consumer::DeliverPolicy::ByStartTime].
1052    /// For policies with a configured starting sequence/time, resets below
1053    /// the configured start are rejected by the server.
1054    ///
1055    /// # Examples
1056    ///
1057    /// ```no_run
1058    /// # #[tokio::main]
1059    /// # async fn main() -> Result<(), async_nats::Error> {
1060    /// let client = async_nats::connect("localhost:4222").await?;
1061    /// let jetstream = async_nats::jetstream::new(client);
1062    ///
1063    /// let stream = jetstream.get_stream("events").await?;
1064    /// stream.reset_consumer("processor", Some(42)).await?;
1065    /// # Ok(())
1066    /// # }
1067    /// ```
1068    #[cfg(feature = "server_2_14")]
1069    #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1070    pub async fn reset_consumer(
1071        &self,
1072        name: &str,
1073        seq: Option<u64>,
1074    ) -> Result<ConsumerResetResponse, ConsumerResetError> {
1075        let subject = format!("CONSUMER.RESET.{}.{}", self.name, name);
1076        let payload = ConsumerResetRequest {
1077            seq: seq.unwrap_or(0),
1078        };
1079
1080        match self.context.request(subject, &payload).await? {
1081            Response::Ok::<ConsumerResetResponse>(resp) => Ok(resp),
1082            Response::Err { error } => Err(error.into()),
1083        }
1084    }
1085
1086    /// Lists names of all consumers for current stream.
1087    ///
1088    /// # Examples
1089    ///
1090    /// ```no_run
1091    /// # #[tokio::main]
1092    /// # async fn main() -> Result<(), async_nats::Error> {
1093    /// use futures_util::TryStreamExt;
1094    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1095    /// let jetstream = async_nats::jetstream::new(client);
1096    /// let stream = jetstream.get_stream("stream").await?;
1097    /// let mut names = stream.consumer_names();
1098    /// while let Some(consumer) = names.try_next().await? {
1099    ///     println!("consumer: {stream:?}");
1100    /// }
1101    /// # Ok(())
1102    /// # }
1103    /// ```
1104    pub fn consumer_names(&self) -> ConsumerNames {
1105        ConsumerNames {
1106            context: self.context.clone(),
1107            stream: self.name.clone(),
1108            offset: 0,
1109            page_request: None,
1110            consumers: Vec::new(),
1111            done: false,
1112        }
1113    }
1114
1115    /// Lists all consumers info for current stream.
1116    ///
1117    /// # Examples
1118    ///
1119    /// ```no_run
1120    /// # #[tokio::main]
1121    /// # async fn main() -> Result<(), async_nats::Error> {
1122    /// use futures_util::TryStreamExt;
1123    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1124    /// let jetstream = async_nats::jetstream::new(client);
1125    /// let stream = jetstream.get_stream("stream").await?;
1126    /// let mut consumers = stream.consumers();
1127    /// while let Some(consumer) = consumers.try_next().await? {
1128    ///     println!("consumer: {consumer:?}");
1129    /// }
1130    /// # Ok(())
1131    /// # }
1132    /// ```
1133    pub fn consumers(&self) -> Consumers {
1134        Consumers {
1135            context: self.context.clone(),
1136            stream: self.name.clone(),
1137            offset: 0,
1138            page_request: None,
1139            consumers: Vec::new(),
1140            done: false,
1141        }
1142    }
1143}
1144
1145pub struct StreamInfoBuilder {
1146    pub(crate) context: Context,
1147    pub(crate) name: String,
1148    pub(crate) deleted: bool,
1149    pub(crate) subject: String,
1150}
1151
1152impl StreamInfoBuilder {
1153    fn new(context: Context, name: String) -> Self {
1154        Self {
1155            context,
1156            name,
1157            deleted: false,
1158            subject: "".to_string(),
1159        }
1160    }
1161
1162    pub fn with_deleted(mut self, deleted: bool) -> Self {
1163        self.deleted = deleted;
1164        self
1165    }
1166
1167    pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1168        self.subject = subject.into();
1169        self
1170    }
1171
1172    pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1173        let info = stream_info_with_details(
1174            self.context.clone(),
1175            self.name.clone(),
1176            0,
1177            self.deleted,
1178            self.subject.clone(),
1179        )
1180        .await?;
1181
1182        Ok(InfoWithSubjects::new(self.context, info, self.subject))
1183    }
1184}
1185
1186/// `StreamConfig` determines the properties for a stream.
1187/// There are sensible defaults for most. If no subjects are
1188/// given the name will be used as the only subject.
1189#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1190pub struct Config {
1191    /// A name for the Stream. Must not have spaces, tabs or period `.` characters
1192    pub name: String,
1193    /// How large the Stream may become in total bytes before the configured discard policy kicks in
1194    #[serde(default)]
1195    pub max_bytes: i64,
1196    /// How large the Stream may become in total messages before the configured discard policy kicks in
1197    #[serde(default, rename = "max_msgs")]
1198    pub max_messages: i64,
1199    /// Maximum amount of messages to keep per subject
1200    #[serde(default, rename = "max_msgs_per_subject")]
1201    pub max_messages_per_subject: i64,
1202    /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in.
1203    /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space
1204    pub discard: DiscardPolicy,
1205    /// Prevents a message from being added to a stream if the max_msgs_per_subject limit for the subject has been reached
1206    #[serde(default, skip_serializing_if = "is_default")]
1207    pub discard_new_per_subject: bool,
1208    /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
1209    /// configured stream `name`.
1210    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1211    pub subjects: Vec<String>,
1212    /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
1213    pub retention: RetentionPolicy,
1214    /// How many Consumers can be defined for a given Stream, -1 for unlimited
1215    #[serde(default)]
1216    pub max_consumers: i32,
1217    /// Maximum age of any message in the stream, expressed in nanoseconds
1218    #[serde(default, with = "serde_nanos")]
1219    pub max_age: Duration,
1220    /// The largest message that will be accepted by the Stream
1221    #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1222    pub max_message_size: i32,
1223    /// The type of storage backend, `File` (default) and `Memory`
1224    pub storage: StorageType,
1225    /// How many replicas to keep for each message in a clustered JetStream, maximum 5
1226    pub num_replicas: usize,
1227    /// Disables acknowledging messages that are received by the Stream
1228    #[serde(default, skip_serializing_if = "is_default")]
1229    pub no_ack: bool,
1230    /// The window within which to track duplicate messages.
1231    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1232    pub duplicate_window: Duration,
1233    /// The owner of the template associated with this stream.
1234    #[serde(default, skip_serializing_if = "is_default")]
1235    pub template_owner: String,
1236    /// Indicates the stream is sealed and cannot be modified in any way
1237    #[serde(default, skip_serializing_if = "is_default")]
1238    pub sealed: bool,
1239    /// A short description of the purpose of this stream.
1240    #[serde(default, skip_serializing_if = "is_default")]
1241    pub description: Option<String>,
1242    #[serde(
1243        default,
1244        rename = "allow_rollup_hdrs",
1245        skip_serializing_if = "is_default"
1246    )]
1247    /// Indicates if rollups will be allowed or not.
1248    pub allow_rollup: bool,
1249    #[serde(default, skip_serializing_if = "is_default")]
1250    /// Indicates deletes will be denied or not.
1251    pub deny_delete: bool,
1252    /// Indicates if purges will be denied or not.
1253    #[serde(default, skip_serializing_if = "is_default")]
1254    pub deny_purge: bool,
1255
1256    /// Optional republish config.
1257    #[serde(default, skip_serializing_if = "is_default")]
1258    pub republish: Option<Republish>,
1259
1260    /// Enables direct get, which would get messages from
1261    /// non-leader.
1262    #[serde(default, skip_serializing_if = "is_default")]
1263    pub allow_direct: bool,
1264
1265    /// Enable direct access also for mirrors.
1266    #[serde(default, skip_serializing_if = "is_default")]
1267    pub mirror_direct: bool,
1268
1269    /// Stream mirror configuration.
1270    #[serde(default, skip_serializing_if = "Option::is_none")]
1271    pub mirror: Option<Source>,
1272
1273    /// Sources configuration.
1274    #[serde(default, skip_serializing_if = "Option::is_none")]
1275    pub sources: Option<Vec<Source>>,
1276
1277    #[cfg(feature = "server_2_10")]
1278    /// Additional stream metadata.
1279    #[serde(default, skip_serializing_if = "is_default")]
1280    pub metadata: HashMap<String, String>,
1281
1282    #[cfg(feature = "server_2_10")]
1283    /// Allow applying a subject transform to incoming messages
1284    #[serde(default, skip_serializing_if = "Option::is_none")]
1285    pub subject_transform: Option<SubjectTransform>,
1286
1287    #[cfg(feature = "server_2_10")]
1288    /// Override compression config for this stream.
1289    /// Wrapping enum that has `None` type with [Option] is there
1290    /// because [Stream] can override global compression set to [Compression::S2]
1291    /// to [Compression::None], which is different from not overriding global config with anything.
1292    #[serde(default, skip_serializing_if = "Option::is_none")]
1293    pub compression: Option<Compression>,
1294    #[cfg(feature = "server_2_10")]
1295    /// Set limits on consumers that are created on this stream.
1296    #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1297    pub consumer_limits: Option<ConsumerLimits>,
1298
1299    #[cfg(feature = "server_2_10")]
1300    /// Sets the first sequence for the stream.
1301    #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1302    pub first_sequence: Option<u64>,
1303
1304    /// Placement configuration for clusters and tags.
1305    #[serde(default, skip_serializing_if = "Option::is_none")]
1306    pub placement: Option<Placement>,
1307
1308    /// Persistence mode for the stream.
1309    #[serde(default, skip_serializing_if = "Option::is_none")]
1310    pub persist_mode: Option<PersistenceMode>,
1311
1312    /// For suspending the consumer until the deadline.
1313    #[cfg(feature = "server_2_11")]
1314    #[serde(
1315        default,
1316        with = "rfc3339::option",
1317        skip_serializing_if = "Option::is_none"
1318    )]
1319    pub pause_until: Option<OffsetDateTime>,
1320
1321    /// Allows setting a TTL for a message in the stream.
1322    #[cfg(feature = "server_2_11")]
1323    #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1324    pub allow_message_ttl: bool,
1325
1326    /// Enables delete markers for messages deleted from the stream and sets the TTL
1327    /// for how long the marker should be kept.
1328    #[cfg(feature = "server_2_11")]
1329    #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1330    pub subject_delete_marker_ttl: Option<Duration>,
1331
1332    /// Allows atomic publish operations.
1333    #[cfg(feature = "server_2_12")]
1334    #[serde(default, skip_serializing_if = "is_default", rename = "allow_atomic")]
1335    pub allow_atomic_publish: bool,
1336
1337    /// Enables the scheduling of messages
1338    #[cfg(feature = "server_2_12")]
1339    #[serde(
1340        default,
1341        skip_serializing_if = "is_default",
1342        rename = "allow_msg_schedules"
1343    )]
1344    pub allow_message_schedules: bool,
1345
1346    /// Enables counters for the stream
1347    #[cfg(feature = "server_2_12")]
1348    #[serde(
1349        default,
1350        skip_serializing_if = "is_default",
1351        rename = "allow_msg_counter"
1352    )]
1353    pub allow_message_counter: bool,
1354
1355    /// Allows fast-ingest batch publishing on the stream (ADR-50).
1356    #[cfg(feature = "server_2_14")]
1357    #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1358    #[serde(default, skip_serializing_if = "is_default", rename = "allow_batched")]
1359    pub allow_batch_publish: bool,
1360}
1361
1362impl From<&Config> for Config {
1363    fn from(sc: &Config) -> Config {
1364        sc.clone()
1365    }
1366}
1367
1368impl From<&str> for Config {
1369    fn from(s: &str) -> Config {
1370        Config {
1371            name: s.to_string(),
1372            ..Default::default()
1373        }
1374    }
1375}
1376
1377#[cfg(feature = "server_2_10")]
1378fn default_consumer_limits_as_none<'de, D>(
1379    deserializer: D,
1380) -> Result<Option<ConsumerLimits>, D::Error>
1381where
1382    D: Deserializer<'de>,
1383{
1384    let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1385    if let Some(cl) = consumer_limits {
1386        if cl == ConsumerLimits::default() {
1387            Ok(None)
1388        } else {
1389            Ok(Some(cl))
1390        }
1391    } else {
1392        Ok(None)
1393    }
1394}
1395#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1396pub struct ConsumerLimits {
1397    /// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
1398    #[serde(default, with = "serde_nanos")]
1399    pub inactive_threshold: std::time::Duration,
1400    /// Sets the maximum [crate::jetstream::consumer::Config::max_ack_pending] that can be set on the consumer.
1401    #[serde(default)]
1402    pub max_ack_pending: i64,
1403}
1404
1405#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1406pub enum Compression {
1407    #[serde(rename = "s2")]
1408    S2,
1409    #[serde(rename = "none")]
1410    None,
1411}
1412
1413// SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
1414#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1415pub struct SubjectTransform {
1416    #[serde(rename = "src")]
1417    pub source: String,
1418
1419    #[serde(rename = "dest")]
1420    pub destination: String,
1421}
1422
1423// Republish is for republishing messages once committed to a stream.
1424#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1425pub struct Republish {
1426    /// Subject that should be republished.
1427    #[serde(rename = "src")]
1428    pub source: String,
1429    /// Subject where messages will be republished.
1430    #[serde(rename = "dest")]
1431    pub destination: String,
1432    /// If true, only headers should be republished.
1433    #[serde(default)]
1434    pub headers_only: bool,
1435}
1436
1437/// Placement describes on which cluster or tags the stream should be placed.
1438#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1439pub struct Placement {
1440    // Cluster where the stream should be placed.
1441    #[serde(default, skip_serializing_if = "is_default")]
1442    pub cluster: Option<String>,
1443    // Matching tags for stream placement.
1444    #[serde(default, skip_serializing_if = "is_default")]
1445    pub tags: Vec<String>,
1446}
1447
1448/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
1449/// remove older messages. `New` will fail to store the new message.
1450#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1451#[repr(u8)]
1452pub enum DiscardPolicy {
1453    /// will remove older messages when limits are hit.
1454    #[default]
1455    #[serde(rename = "old")]
1456    Old = 0,
1457    /// will error on a StoreMsg call when limits are hit
1458    #[serde(rename = "new")]
1459    New = 1,
1460}
1461
1462/// `RetentionPolicy` determines how messages in a set are retained.
1463#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1464#[repr(u8)]
1465pub enum RetentionPolicy {
1466    /// `Limits` (default) means that messages are retained until any given limit is reached.
1467    /// This could be one of messages, bytes, or age.
1468    #[default]
1469    #[serde(rename = "limits")]
1470    Limits = 0,
1471    /// `Interest` specifies that when all known observables have acknowledged a message it can be removed.
1472    #[serde(rename = "interest")]
1473    Interest = 1,
1474    /// `WorkQueue` specifies that when the first worker or subscriber acknowledges the message it can be removed.
1475    #[serde(rename = "workqueue")]
1476    WorkQueue = 2,
1477}
1478
1479/// determines how messages are stored for retention.
1480#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1481#[repr(u8)]
1482pub enum StorageType {
1483    /// Stream data is kept in files. This is the default.
1484    #[default]
1485    #[serde(rename = "file")]
1486    File = 0,
1487    /// Stream data is kept only in memory.
1488    #[serde(rename = "memory")]
1489    Memory = 1,
1490}
1491
1492/// Determines the persistence mode for stream data.
1493#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1494#[repr(u8)]
1495pub enum PersistenceMode {
1496    /// Writes are immediately flushed, acknowledgement sent after message is stored.
1497    #[default]
1498    #[serde(rename = "default")]
1499    Default = 0,
1500    /// Writes are flushed asynchronously, acknowledgement may be sent before message is stored.
1501    #[serde(rename = "async")]
1502    Async = 1,
1503}
1504
1505async fn stream_info_with_details(
1506    context: Context,
1507    stream: String,
1508    offset: usize,
1509    deleted_details: bool,
1510    subjects_filter: String,
1511) -> Result<Info, InfoError> {
1512    let subject = format!("STREAM.INFO.{stream}");
1513
1514    let payload = StreamInfoRequest {
1515        offset,
1516        deleted_details,
1517        subjects_filter,
1518    };
1519
1520    let response: Response<Info> = context.request(subject, &payload).await?;
1521
1522    match response {
1523        Response::Ok(info) => Ok(info),
1524        Response::Err { error } => Err(error.into()),
1525    }
1526}
1527
1528type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1529
1530#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1531pub struct StreamInfoRequest {
1532    offset: usize,
1533    deleted_details: bool,
1534    subjects_filter: String,
1535}
1536
1537pub struct InfoWithSubjects {
1538    stream: String,
1539    context: Context,
1540    pub info: Info,
1541    offset: usize,
1542    subjects: collections::hash_map::IntoIter<String, usize>,
1543    info_request: Option<InfoRequest>,
1544    subjects_filter: String,
1545    pages_done: bool,
1546}
1547
1548impl InfoWithSubjects {
1549    pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1550        let subjects = info.state.subjects.take().unwrap_or_default();
1551        let name = info.config.name.clone();
1552        InfoWithSubjects {
1553            context,
1554            info,
1555            pages_done: subjects.is_empty(),
1556            offset: subjects.len(),
1557            subjects: subjects.into_iter(),
1558            subjects_filter: subject,
1559            stream: name,
1560            info_request: None,
1561        }
1562    }
1563}
1564
1565impl futures_util::Stream for InfoWithSubjects {
1566    type Item = Result<(String, usize), InfoError>;
1567
1568    fn poll_next(
1569        mut self: Pin<&mut Self>,
1570        cx: &mut std::task::Context<'_>,
1571    ) -> Poll<Option<Self::Item>> {
1572        match self.subjects.next() {
1573            Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1574            None => {
1575                // If we have already requested all pages, stop the iterator.
1576                if self.pages_done {
1577                    return Poll::Ready(None);
1578                }
1579                let stream = self.stream.clone();
1580                let context = self.context.clone();
1581                let subjects_filter = self.subjects_filter.clone();
1582                let offset = self.offset;
1583                match self
1584                    .info_request
1585                    .get_or_insert_with(|| {
1586                        Box::pin(stream_info_with_details(
1587                            context,
1588                            stream,
1589                            offset,
1590                            false,
1591                            subjects_filter,
1592                        ))
1593                    })
1594                    .poll_unpin(cx)
1595                {
1596                    Poll::Ready(resp) => match resp {
1597                        Ok(info) => {
1598                            let subjects = info.state.subjects.clone();
1599                            self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1600                            self.info_request = None;
1601                            let subjects = subjects.unwrap_or_default();
1602                            self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1603                            let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1604                            if total <= self.offset || subjects.is_empty() {
1605                                self.pages_done = true;
1606                            }
1607                            match self.subjects.next() {
1608                                Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1609                                None => Poll::Ready(None),
1610                            }
1611                        }
1612                        Err(err) => Poll::Ready(Some(Err(err))),
1613                    },
1614                    Poll::Pending => Poll::Pending,
1615                }
1616            }
1617        }
1618    }
1619}
1620
1621/// Shows config and current state for this stream.
1622#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1623pub struct Info {
1624    /// The configuration associated with this stream.
1625    pub config: Config,
1626    /// The time that this stream was created.
1627    #[serde(with = "rfc3339")]
1628    pub created: time::OffsetDateTime,
1629    /// Various metrics associated with this stream.
1630    pub state: State,
1631    /// Information about leader and replicas.
1632    pub cluster: Option<ClusterInfo>,
1633    /// Information about mirror config if present.
1634    #[serde(default)]
1635    pub mirror: Option<SourceInfo>,
1636    /// Information about sources configs if present.
1637    #[serde(default)]
1638    pub sources: Vec<SourceInfo>,
1639    #[serde(flatten)]
1640    paged_info: Option<PagedInfo>,
1641}
1642
1643#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1644pub struct PagedInfo {
1645    offset: usize,
1646    total: usize,
1647    limit: usize,
1648}
1649
1650#[derive(Deserialize)]
1651pub struct DeleteStatus {
1652    pub success: bool,
1653}
1654
1655#[cfg(feature = "server_2_11")]
1656#[derive(Deserialize)]
1657pub struct PauseResponse {
1658    pub paused: bool,
1659    #[serde(with = "rfc3339")]
1660    pub pause_until: OffsetDateTime,
1661    #[serde(default, with = "serde_nanos")]
1662    pub pause_remaining: Option<Duration>,
1663}
1664
1665#[cfg(feature = "server_2_11")]
1666#[derive(Serialize, Debug)]
1667struct PauseResumeConsumerRequest {
1668    #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1669    pause_until: Option<OffsetDateTime>,
1670}
1671
1672#[cfg(feature = "server_2_14")]
1673#[derive(Serialize, Debug)]
1674pub(crate) struct ConsumerResetRequest {
1675    #[serde(default, skip_serializing_if = "is_default")]
1676    pub(crate) seq: u64,
1677}
1678
1679/// Response from a [`Stream::reset_consumer`] / [`crate::jetstream::consumer::Consumer::reset`] call.
1680#[cfg(feature = "server_2_14")]
1681#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1682#[derive(Debug, Deserialize, Clone)]
1683pub struct ConsumerResetResponse {
1684    /// Refreshed [Info][crate::jetstream::consumer::Info] for the consumer
1685    /// after the reset has been applied.
1686    #[serde(flatten)]
1687    pub info: super::consumer::Info,
1688    /// Stream sequence the consumer's ack floor is now sitting at after the
1689    /// reset. For an empty / `None` request this echoes the previously held
1690    /// ack-floor seq.
1691    pub reset_seq: u64,
1692}
1693
1694/// information about the given stream.
1695#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1696pub struct State {
1697    /// The number of messages contained in this stream
1698    pub messages: u64,
1699    /// The number of bytes of all messages contained in this stream
1700    pub bytes: u64,
1701    /// The lowest sequence number still present in this stream
1702    #[serde(rename = "first_seq")]
1703    pub first_sequence: u64,
1704    /// The time associated with the oldest message still present in this stream
1705    #[serde(with = "rfc3339", rename = "first_ts")]
1706    pub first_timestamp: time::OffsetDateTime,
1707    /// The last sequence number assigned to a message in this stream
1708    #[serde(rename = "last_seq")]
1709    pub last_sequence: u64,
1710    /// The time that the last message was received by this stream
1711    #[serde(with = "rfc3339", rename = "last_ts")]
1712    pub last_timestamp: time::OffsetDateTime,
1713    /// The number of consumers configured to consume this stream
1714    pub consumer_count: usize,
1715    /// The number of subjects in the stream
1716    #[serde(default, rename = "num_subjects")]
1717    pub subjects_count: u64,
1718    /// The number of deleted messages in the stream
1719    #[serde(default, rename = "num_deleted")]
1720    pub deleted_count: Option<u64>,
1721    /// The list of deleted subjects from the Stream.
1722    /// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
1723    #[serde(default)]
1724    pub deleted: Option<Vec<u64>>,
1725
1726    pub(crate) subjects: Option<HashMap<String, usize>>,
1727}
1728
1729/// A raw stream message in the representation it is stored.
1730#[derive(Debug, Serialize, Deserialize, Clone)]
1731pub struct RawMessage {
1732    /// Subject of the message.
1733    #[serde(rename = "subject")]
1734    pub subject: String,
1735
1736    /// Sequence of the message.
1737    #[serde(rename = "seq")]
1738    pub sequence: u64,
1739
1740    /// Raw payload of the message as a base64 encoded string.
1741    #[serde(default, rename = "data")]
1742    pub payload: String,
1743
1744    /// Raw header string, if any.
1745    #[serde(default, rename = "hdrs")]
1746    pub headers: Option<String>,
1747
1748    /// The time the message was published.
1749    #[serde(rename = "time", with = "rfc3339")]
1750    pub time: time::OffsetDateTime,
1751}
1752
1753impl TryFrom<RawMessage> for StreamMessage {
1754    type Error = crate::Error;
1755
1756    fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1757        let decoded_payload = STANDARD
1758            .decode(value.payload)
1759            .map_err(|err| Box::new(std::io::Error::other(err)))?;
1760        let decoded_headers = value
1761            .headers
1762            .map(|header| STANDARD.decode(header))
1763            .map_or(Ok(None), |v| v.map(Some))?;
1764
1765        let (headers, _, _) = decoded_headers
1766            .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1767
1768        Ok(StreamMessage {
1769            subject: value.subject.into(),
1770            payload: decoded_payload.into(),
1771            headers,
1772            sequence: value.sequence,
1773            time: value.time,
1774        })
1775    }
1776}
1777
1778fn is_continuation(c: char) -> bool {
1779    c == ' ' || c == '\t'
1780}
1781const HEADER_LINE: &str = "NATS/1.0";
1782
1783#[allow(clippy::type_complexity)]
1784fn parse_headers(
1785    buf: &[u8],
1786) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1787    let mut headers = HeaderMap::new();
1788    let mut maybe_status: Option<StatusCode> = None;
1789    let mut maybe_description: Option<String> = None;
1790    let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1791        line.lines().peekable()
1792    } else {
1793        return Err(Box::new(std::io::Error::other("invalid header")));
1794    };
1795
1796    if let Some(line) = lines.next() {
1797        let line = line
1798            .strip_prefix(HEADER_LINE)
1799            .ok_or_else(|| {
1800                Box::new(std::io::Error::other(
1801                    "version line does not start with NATS/1.0",
1802                ))
1803            })?
1804            .trim();
1805
1806        match line.split_once(' ') {
1807            Some((status, description)) => {
1808                if !status.is_empty() {
1809                    maybe_status = Some(status.parse()?);
1810                }
1811
1812                if !description.is_empty() {
1813                    maybe_description = Some(description.trim().to_string());
1814                }
1815            }
1816            None => {
1817                if !line.is_empty() {
1818                    maybe_status = Some(line.parse()?);
1819                }
1820            }
1821        }
1822    } else {
1823        return Err(Box::new(std::io::Error::other(
1824            "expected header information not found",
1825        )));
1826    };
1827
1828    while let Some(line) = lines.next() {
1829        if line.is_empty() {
1830            continue;
1831        }
1832
1833        if let Some((k, v)) = line.split_once(':').to_owned() {
1834            let mut s = String::from(v.trim());
1835            while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1836                s.push(' ');
1837                s.push_str(v.trim());
1838            }
1839
1840            headers.insert(
1841                HeaderName::from_str(k)?,
1842                HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1843            );
1844        } else {
1845            return Err(Box::new(std::io::Error::other("malformed header line")));
1846        }
1847    }
1848
1849    if headers.is_empty() {
1850        Ok((HeaderMap::new(), maybe_status, maybe_description))
1851    } else {
1852        Ok((headers, maybe_status, maybe_description))
1853    }
1854}
1855
1856#[derive(Debug, Serialize, Deserialize, Clone)]
1857struct GetRawMessage {
1858    pub(crate) message: RawMessage,
1859}
1860
1861fn is_default<T: Default + Eq>(t: &T) -> bool {
1862    t == &T::default()
1863}
1864/// Information about the stream's, consumer's associated `JetStream` cluster
1865#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1866pub struct ClusterInfo {
1867    /// The cluster name.
1868    #[serde(default)]
1869    pub name: Option<String>,
1870    /// The RAFT group name.
1871    #[serde(default)]
1872    pub raft_group: Option<String>,
1873    /// The server name of the RAFT leader.
1874    #[serde(default)]
1875    pub leader: Option<String>,
1876    /// The time since this server has been the leader.
1877    #[serde(default, with = "rfc3339::option")]
1878    pub leader_since: Option<OffsetDateTime>,
1879    /// Indicates if this account is a system account.
1880    #[cfg(feature = "server_2_12")]
1881    #[serde(default)]
1882    /// Indicates if `traffic_account` is set a system account.
1883    pub system_account: bool,
1884    #[cfg(feature = "server_2_12")]
1885    /// Name of the traffic (replication) account.
1886    #[serde(default)]
1887    pub traffic_account: Option<String>,
1888    /// The members of the RAFT cluster.
1889    #[serde(default)]
1890    pub replicas: Vec<PeerInfo>,
1891}
1892
1893/// The members of the RAFT cluster
1894#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1895pub struct PeerInfo {
1896    /// The server name of the peer.
1897    pub name: String,
1898    /// Indicates if the server is up to date and synchronized.
1899    pub current: bool,
1900    /// Nanoseconds since this peer was last seen.
1901    #[serde(with = "serde_nanos")]
1902    pub active: Duration,
1903    /// Indicates the node is considered offline by the group.
1904    #[serde(default)]
1905    pub offline: bool,
1906    /// How many uncommitted operations this peer is behind the leader.
1907    pub lag: Option<u64>,
1908}
1909
1910#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1911pub struct SourceInfo {
1912    /// Source name.
1913    pub name: String,
1914    /// Number of messages this source is lagging behind.
1915    pub lag: u64,
1916    /// Last time the source was seen active.
1917    #[serde(deserialize_with = "negative_duration_as_none")]
1918    pub active: Option<std::time::Duration>,
1919    /// Filtering for the source.
1920    #[serde(default)]
1921    pub filter_subject: Option<String>,
1922    /// Source destination subject.
1923    #[serde(default)]
1924    pub subject_transform_dest: Option<String>,
1925    /// List of transforms.
1926    #[serde(default)]
1927    pub subject_transforms: Vec<SubjectTransform>,
1928}
1929
1930fn negative_duration_as_none<'de, D>(
1931    deserializer: D,
1932) -> Result<Option<std::time::Duration>, D::Error>
1933where
1934    D: Deserializer<'de>,
1935{
1936    let n = i64::deserialize(deserializer)?;
1937    if n.is_negative() {
1938        Ok(None)
1939    } else {
1940        Ok(Some(std::time::Duration::from_nanos(n as u64)))
1941    }
1942}
1943
1944/// The response generated by trying to purge a stream.
1945#[derive(Debug, Deserialize, Clone, Copy)]
1946pub struct PurgeResponse {
1947    /// Whether the purge request was successful.
1948    pub success: bool,
1949    /// The number of purged messages in a stream.
1950    pub purged: u64,
1951}
1952/// The payload used to generate a purge request.
1953#[derive(Default, Debug, Serialize, Clone)]
1954pub struct PurgeRequest {
1955    /// Purge up to but not including sequence.
1956    #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1957    pub sequence: Option<u64>,
1958
1959    /// Subject to match against messages for the purge command.
1960    #[serde(default, skip_serializing_if = "is_default")]
1961    pub filter: Option<String>,
1962
1963    /// Number of messages to keep.
1964    #[serde(default, skip_serializing_if = "is_default")]
1965    pub keep: Option<u64>,
1966}
1967
1968#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1969pub struct Source {
1970    /// Name of the stream source.
1971    pub name: String,
1972    /// Optional source start sequence.
1973    #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1974    pub start_sequence: Option<u64>,
1975    #[serde(
1976        default,
1977        rename = "opt_start_time",
1978        skip_serializing_if = "is_default",
1979        with = "rfc3339::option"
1980    )]
1981    /// Optional source start time.
1982    pub start_time: Option<OffsetDateTime>,
1983    /// Optional additional filter subject.
1984    #[serde(default, skip_serializing_if = "is_default")]
1985    pub filter_subject: Option<String>,
1986    /// Optional config for sourcing streams from another prefix, used for cross-account.
1987    #[serde(default, skip_serializing_if = "Option::is_none")]
1988    pub external: Option<External>,
1989    /// Optional config to set a domain, if source is residing in different one.
1990    #[serde(default, skip_serializing_if = "is_default")]
1991    pub domain: Option<String>,
1992    /// Subject transforms for Stream.
1993    #[cfg(feature = "server_2_10")]
1994    #[serde(default, skip_serializing_if = "is_default")]
1995    pub subject_transforms: Vec<SubjectTransform>,
1996
1997    /// Pre-created durable consumer to use for sourcing instead of the
1998    /// auto-managed ephemeral one. Required for full control over the
1999    /// consumer's lifecycle (security, advanced delivery/replay options)
2000    /// when sourcing/mirroring from WorkQueue/Interest streams. Both
2001    /// `name` and `deliver_subject` must be non-empty and valid; the
2002    /// server rejects the config otherwise. See ADR-60.
2003    #[cfg(feature = "server_2_14")]
2004    #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2005    #[serde(default, skip_serializing_if = "Option::is_none")]
2006    pub consumer: Option<StreamConsumerSource>,
2007}
2008
2009/// Configures a pre-created durable consumer used for stream
2010/// sourcing/mirroring. See ADR-60.
2011#[cfg(feature = "server_2_14")]
2012#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2013#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
2014pub struct StreamConsumerSource {
2015    /// Name of the durable consumer to use for sourcing.
2016    #[serde(default, skip_serializing_if = "is_default")]
2017    pub name: String,
2018    /// Deliver subject of the (push) consumer used for sourcing.
2019    #[serde(default, skip_serializing_if = "is_default")]
2020    pub deliver_subject: String,
2021}
2022
2023#[cfg(feature = "server_2_14")]
2024impl StreamConsumerSource {
2025    /// Build a [`StreamConsumerSource`] with both required fields populated.
2026    /// The server rejects either field empty (`NewJSSourceDurableConsumerCfgInvalidError`).
2027    pub fn new(name: impl Into<String>, deliver_subject: impl Into<String>) -> Self {
2028        Self {
2029            name: name.into(),
2030            deliver_subject: deliver_subject.into(),
2031        }
2032    }
2033}
2034
2035#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
2036pub struct External {
2037    /// Api prefix of external source.
2038    #[serde(rename = "api")]
2039    pub api_prefix: String,
2040    /// Optional configuration of delivery prefix.
2041    #[serde(rename = "deliver", skip_serializing_if = "is_default")]
2042    pub delivery_prefix: Option<String>,
2043}
2044
2045use std::marker::PhantomData;
2046
2047#[derive(Debug, Default)]
2048pub struct Yes;
2049#[derive(Debug, Default)]
2050pub struct No;
2051
2052pub trait ToAssign: Debug {}
2053
2054impl ToAssign for Yes {}
2055impl ToAssign for No {}
2056
2057#[derive(Debug)]
2058pub struct Purge<SEQUENCE, KEEP>
2059where
2060    SEQUENCE: ToAssign,
2061    KEEP: ToAssign,
2062{
2063    inner: PurgeRequest,
2064    sequence_set: PhantomData<SEQUENCE>,
2065    keep_set: PhantomData<KEEP>,
2066    context: Context,
2067    stream_name: String,
2068}
2069
2070impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
2071where
2072    SEQUENCE: ToAssign,
2073    KEEP: ToAssign,
2074{
2075    /// Adds subject filter to [PurgeRequest]
2076    pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
2077        self.inner.filter = Some(filter.into());
2078        self
2079    }
2080}
2081
2082impl Purge<No, No> {
2083    pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
2084        Purge {
2085            context: stream.context.clone(),
2086            stream_name: stream.name.clone(),
2087            inner: Default::default(),
2088            sequence_set: PhantomData {},
2089            keep_set: PhantomData {},
2090        }
2091    }
2092}
2093
2094impl<KEEP> Purge<No, KEEP>
2095where
2096    KEEP: ToAssign,
2097{
2098    /// Creates a new [PurgeRequest].
2099    /// `keep` and `sequence` are exclusive, enforced compile time by generics.
2100    pub fn keep(self, keep: u64) -> Purge<No, Yes> {
2101        Purge {
2102            context: self.context.clone(),
2103            stream_name: self.stream_name.clone(),
2104            sequence_set: PhantomData {},
2105            keep_set: PhantomData {},
2106            inner: PurgeRequest {
2107                keep: Some(keep),
2108                ..self.inner
2109            },
2110        }
2111    }
2112}
2113impl<SEQUENCE> Purge<SEQUENCE, No>
2114where
2115    SEQUENCE: ToAssign,
2116{
2117    /// Creates a new [PurgeRequest].
2118    /// `keep` and `sequence` are exclusive, enforces compile time by generics.
2119    pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2120        Purge {
2121            context: self.context.clone(),
2122            stream_name: self.stream_name.clone(),
2123            sequence_set: PhantomData {},
2124            keep_set: PhantomData {},
2125            inner: PurgeRequest {
2126                sequence: Some(sequence),
2127                ..self.inner
2128            },
2129        }
2130    }
2131}
2132
2133#[derive(Clone, Debug, PartialEq)]
2134pub enum PurgeErrorKind {
2135    Request,
2136    TimedOut,
2137    JetStream(super::errors::Error),
2138}
2139
2140impl Display for PurgeErrorKind {
2141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2142        match self {
2143            Self::Request => write!(f, "request failed"),
2144            Self::TimedOut => write!(f, "timed out"),
2145            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2146        }
2147    }
2148}
2149
2150pub type PurgeError = Error<PurgeErrorKind>;
2151
2152impl<S, K> IntoFuture for Purge<S, K>
2153where
2154    S: ToAssign + std::marker::Send,
2155    K: ToAssign + std::marker::Send,
2156{
2157    type Output = Result<PurgeResponse, PurgeError>;
2158
2159    type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2160
2161    fn into_future(self) -> Self::IntoFuture {
2162        Box::pin(std::future::IntoFuture::into_future(async move {
2163            let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2164            let response: Response<PurgeResponse> = self
2165                .context
2166                .request(request_subject, &self.inner)
2167                .map_err(|err| match err.kind() {
2168                    RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2169                    _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2170                })
2171                .await?;
2172
2173            match response {
2174                Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2175                Response::Ok(response) => Ok(response),
2176            }
2177        }))
2178    }
2179}
2180
2181#[derive(Deserialize, Debug)]
2182struct ConsumerPage {
2183    total: usize,
2184    consumers: Option<Vec<String>>,
2185}
2186
2187#[derive(Deserialize, Debug)]
2188struct ConsumerInfoPage {
2189    total: usize,
2190    consumers: Option<Vec<super::consumer::Info>>,
2191}
2192
2193type ConsumerNamesErrorKind = StreamsErrorKind;
2194type ConsumerNamesError = StreamsError;
2195type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2196
2197pub struct ConsumerNames {
2198    context: Context,
2199    stream: String,
2200    offset: usize,
2201    page_request: Option<PageRequest>,
2202    consumers: Vec<String>,
2203    done: bool,
2204}
2205
2206impl futures_util::Stream for ConsumerNames {
2207    type Item = Result<String, ConsumerNamesError>;
2208
2209    fn poll_next(
2210        mut self: Pin<&mut Self>,
2211        cx: &mut std::task::Context<'_>,
2212    ) -> std::task::Poll<Option<Self::Item>> {
2213        match self.page_request.as_mut() {
2214            Some(page) => match page.try_poll_unpin(cx) {
2215                std::task::Poll::Ready(page) => {
2216                    self.page_request = None;
2217                    let page = page.map_err(|err| {
2218                        ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2219                    })?;
2220
2221                    if let Some(consumers) = page.consumers {
2222                        self.offset += consumers.len();
2223                        self.consumers = consumers;
2224                        if self.offset >= page.total {
2225                            self.done = true;
2226                        }
2227                        match self.consumers.pop() {
2228                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2229                            None => Poll::Ready(None),
2230                        }
2231                    } else {
2232                        Poll::Ready(None)
2233                    }
2234                }
2235                std::task::Poll::Pending => std::task::Poll::Pending,
2236            },
2237            None => {
2238                if let Some(stream) = self.consumers.pop() {
2239                    Poll::Ready(Some(Ok(stream)))
2240                } else {
2241                    if self.done {
2242                        return Poll::Ready(None);
2243                    }
2244                    let context = self.context.clone();
2245                    let offset = self.offset;
2246                    let stream = self.stream.clone();
2247                    self.page_request = Some(Box::pin(async move {
2248                        match context
2249                            .request(
2250                                format!("CONSUMER.NAMES.{stream}"),
2251                                &json!({
2252                                    "offset": offset,
2253                                }),
2254                            )
2255                            .await?
2256                        {
2257                            Response::Err { error } => Err(RequestError::with_source(
2258                                super::context::RequestErrorKind::Other,
2259                                error,
2260                            )),
2261                            Response::Ok(page) => Ok(page),
2262                        }
2263                    }));
2264                    self.poll_next(cx)
2265                }
2266            }
2267        }
2268    }
2269}
2270
2271pub type ConsumersErrorKind = StreamsErrorKind;
2272pub type ConsumersError = StreamsError;
2273type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2274
2275pub struct Consumers {
2276    context: Context,
2277    stream: String,
2278    offset: usize,
2279    page_request: Option<PageInfoRequest>,
2280    consumers: Vec<super::consumer::Info>,
2281    done: bool,
2282}
2283
2284impl futures_util::Stream for Consumers {
2285    type Item = Result<super::consumer::Info, ConsumersError>;
2286
2287    fn poll_next(
2288        mut self: Pin<&mut Self>,
2289        cx: &mut std::task::Context<'_>,
2290    ) -> std::task::Poll<Option<Self::Item>> {
2291        match self.page_request.as_mut() {
2292            Some(page) => match page.try_poll_unpin(cx) {
2293                std::task::Poll::Ready(page) => {
2294                    self.page_request = None;
2295                    let page = page.map_err(|err| {
2296                        ConsumersError::with_source(ConsumersErrorKind::Other, err)
2297                    })?;
2298                    if let Some(consumers) = page.consumers {
2299                        self.offset += consumers.len();
2300                        self.consumers = consumers;
2301                        if self.offset >= page.total {
2302                            self.done = true;
2303                        }
2304                        match self.consumers.pop() {
2305                            Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2306                            None => Poll::Ready(None),
2307                        }
2308                    } else {
2309                        Poll::Ready(None)
2310                    }
2311                }
2312                std::task::Poll::Pending => std::task::Poll::Pending,
2313            },
2314            None => {
2315                if let Some(stream) = self.consumers.pop() {
2316                    Poll::Ready(Some(Ok(stream)))
2317                } else {
2318                    if self.done {
2319                        return Poll::Ready(None);
2320                    }
2321                    let context = self.context.clone();
2322                    let offset = self.offset;
2323                    let stream = self.stream.clone();
2324                    self.page_request = Some(Box::pin(async move {
2325                        match context
2326                            .request(
2327                                format!("CONSUMER.LIST.{stream}"),
2328                                &json!({
2329                                    "offset": offset,
2330                                }),
2331                            )
2332                            .await?
2333                        {
2334                            Response::Err { error } => Err(RequestError::with_source(
2335                                super::context::RequestErrorKind::Other,
2336                                error,
2337                            )),
2338                            Response::Ok(page) => Ok(page),
2339                        }
2340                    }));
2341                    self.poll_next(cx)
2342                }
2343            }
2344        }
2345    }
2346}
2347
2348#[derive(Clone, Debug, PartialEq)]
2349pub enum LastRawMessageErrorKind {
2350    NoMessageFound,
2351    InvalidSubject,
2352    JetStream(super::errors::Error),
2353    Other,
2354}
2355
2356impl Display for LastRawMessageErrorKind {
2357    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2358        match self {
2359            Self::NoMessageFound => write!(f, "no message found"),
2360            Self::InvalidSubject => write!(f, "invalid subject"),
2361            Self::Other => write!(f, "failed to get last raw message"),
2362            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2363        }
2364    }
2365}
2366
2367pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2368pub type RawMessageErrorKind = LastRawMessageErrorKind;
2369pub type RawMessageError = LastRawMessageError;
2370
2371#[derive(Clone, Debug, PartialEq)]
2372pub enum ConsumerErrorKind {
2373    //TODO: get last should have timeout, which should be mapped here.
2374    TimedOut,
2375    Request,
2376    InvalidConsumerType,
2377    InvalidName,
2378    JetStream(super::errors::Error),
2379    Other,
2380}
2381
2382impl Display for ConsumerErrorKind {
2383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2384        match self {
2385            Self::TimedOut => write!(f, "timed out"),
2386            Self::Request => write!(f, "request failed"),
2387            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2388            Self::Other => write!(f, "consumer error"),
2389            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2390            Self::InvalidName => write!(f, "invalid consumer name"),
2391        }
2392    }
2393}
2394
2395pub type ConsumerError = Error<ConsumerErrorKind>;
2396
2397#[cfg(feature = "server_2_14")]
2398#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2399#[derive(Clone, Debug, PartialEq)]
2400pub enum ConsumerResetErrorKind {
2401    TimedOut,
2402    Request,
2403    /// Stream or consumer does not exist.
2404    ///
2405    /// Surfaced when the server replies with `CONSUMER_NOT_FOUND` /
2406    /// `STREAM_NOT_FOUND`, or via core NATS `NoResponders` if the JS API
2407    /// has no handler for the subject.
2408    NotFound,
2409    /// Reset request violates the consumer's `DeliverPolicy` constraints
2410    /// (e.g. seq below `OptStartSeq`, or non-zero seq with a `DeliverPolicy`
2411    /// other than `All` / `ByStartSequence` / `ByStartTime`).
2412    InvalidReset,
2413    JetStream(super::errors::Error),
2414}
2415
2416#[cfg(feature = "server_2_14")]
2417#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2418impl Display for ConsumerResetErrorKind {
2419    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2420        match self {
2421            Self::TimedOut => write!(f, "timed out"),
2422            Self::Request => write!(f, "request failed"),
2423            Self::NotFound => write!(f, "stream or consumer not found"),
2424            Self::InvalidReset => write!(f, "invalid reset"),
2425            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2426        }
2427    }
2428}
2429
2430#[cfg(feature = "server_2_14")]
2431pub type ConsumerResetError = Error<ConsumerResetErrorKind>;
2432
2433#[cfg(feature = "server_2_14")]
2434impl From<super::errors::Error> for ConsumerResetError {
2435    fn from(err: super::errors::Error) -> Self {
2436        match err.error_code() {
2437            super::errors::ErrorCode::CONSUMER_INVALID_RESET => {
2438                ConsumerResetError::new(ConsumerResetErrorKind::InvalidReset)
2439            }
2440            super::errors::ErrorCode::CONSUMER_NOT_FOUND
2441            | super::errors::ErrorCode::STREAM_NOT_FOUND => {
2442                ConsumerResetError::new(ConsumerResetErrorKind::NotFound)
2443            }
2444            _ => ConsumerResetError::new(ConsumerResetErrorKind::JetStream(err)),
2445        }
2446    }
2447}
2448
2449#[cfg(feature = "server_2_14")]
2450impl From<super::context::RequestError> for ConsumerResetError {
2451    fn from(err: super::context::RequestError) -> Self {
2452        match err.kind() {
2453            RequestErrorKind::TimedOut => ConsumerResetError::new(ConsumerResetErrorKind::TimedOut),
2454            RequestErrorKind::NoResponders => {
2455                ConsumerResetError::new(ConsumerResetErrorKind::NotFound)
2456            }
2457            _ => ConsumerResetError::with_source(ConsumerResetErrorKind::Request, err),
2458        }
2459    }
2460}
2461
2462#[derive(Clone, Debug, PartialEq)]
2463pub enum ConsumerCreateStrictErrorKind {
2464    //TODO: get last should have timeout, which should be mapped here.
2465    TimedOut,
2466    Request,
2467    InvalidConsumerType,
2468    InvalidName,
2469    AlreadyExists,
2470    JetStream(super::errors::Error),
2471    Other,
2472}
2473
2474impl Display for ConsumerCreateStrictErrorKind {
2475    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2476        match self {
2477            Self::TimedOut => write!(f, "timed out"),
2478            Self::Request => write!(f, "request failed"),
2479            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2480            Self::Other => write!(f, "consumer error"),
2481            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2482            Self::InvalidName => write!(f, "invalid consumer name"),
2483            Self::AlreadyExists => write!(f, "consumer already exists"),
2484        }
2485    }
2486}
2487
2488pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2489
2490#[derive(Clone, Debug, PartialEq)]
2491pub enum ConsumerUpdateErrorKind {
2492    //TODO: get last should have timeout, which should be mapped here.
2493    TimedOut,
2494    Request,
2495    InvalidConsumerType,
2496    InvalidName,
2497    DoesNotExist,
2498    JetStream(super::errors::Error),
2499    Other,
2500}
2501
2502impl Display for ConsumerUpdateErrorKind {
2503    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2504        match self {
2505            Self::TimedOut => write!(f, "timed out"),
2506            Self::Request => write!(f, "request failed"),
2507            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2508            Self::Other => write!(f, "consumer error"),
2509            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2510            Self::InvalidName => write!(f, "invalid consumer name"),
2511            Self::DoesNotExist => write!(f, "consumer does not exist"),
2512        }
2513    }
2514}
2515
2516pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2517
2518impl From<super::errors::Error> for ConsumerError {
2519    fn from(err: super::errors::Error) -> Self {
2520        ConsumerError::new(ConsumerErrorKind::JetStream(err))
2521    }
2522}
2523impl From<super::errors::Error> for ConsumerCreateStrictError {
2524    fn from(err: super::errors::Error) -> Self {
2525        if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2526            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2527        } else {
2528            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2529        }
2530    }
2531}
2532impl From<super::errors::Error> for ConsumerUpdateError {
2533    fn from(err: super::errors::Error) -> Self {
2534        if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2535            ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2536        } else {
2537            ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2538        }
2539    }
2540}
2541impl From<ConsumerError> for ConsumerUpdateError {
2542    fn from(err: ConsumerError) -> Self {
2543        match err.kind() {
2544            ConsumerErrorKind::JetStream(err) => {
2545                if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2546                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2547                } else {
2548                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2549                }
2550            }
2551            ConsumerErrorKind::Request => {
2552                ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2553            }
2554            ConsumerErrorKind::TimedOut => {
2555                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2556            }
2557            ConsumerErrorKind::InvalidConsumerType => {
2558                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2559            }
2560            ConsumerErrorKind::InvalidName => {
2561                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2562            }
2563            ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2564        }
2565    }
2566}
2567
2568impl From<ConsumerError> for ConsumerCreateStrictError {
2569    fn from(err: ConsumerError) -> Self {
2570        match err.kind() {
2571            ConsumerErrorKind::JetStream(err) => {
2572                if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2573                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2574                } else {
2575                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2576                }
2577            }
2578            ConsumerErrorKind::Request => {
2579                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2580            }
2581            ConsumerErrorKind::TimedOut => {
2582                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2583            }
2584            ConsumerErrorKind::InvalidConsumerType => {
2585                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2586            }
2587            ConsumerErrorKind::InvalidName => {
2588                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2589            }
2590            ConsumerErrorKind::Other => {
2591                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2592            }
2593        }
2594    }
2595}
2596
2597impl From<super::context::RequestError> for ConsumerError {
2598    fn from(err: super::context::RequestError) -> Self {
2599        match err.kind() {
2600            RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2601            _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2602        }
2603    }
2604}
2605impl From<super::context::RequestError> for ConsumerUpdateError {
2606    fn from(err: super::context::RequestError) -> Self {
2607        match err.kind() {
2608            RequestErrorKind::TimedOut => {
2609                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2610            }
2611            _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2612        }
2613    }
2614}
2615impl From<super::context::RequestError> for ConsumerCreateStrictError {
2616    fn from(err: super::context::RequestError) -> Self {
2617        match err.kind() {
2618            RequestErrorKind::TimedOut => {
2619                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2620            }
2621            _ => {
2622                ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2623            }
2624        }
2625    }
2626}
2627
2628#[derive(Debug, Serialize, Default)]
2629pub struct DirectGetRequest {
2630    #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2631    sequence: Option<u64>,
2632    #[serde(rename = "last_by_subj", skip_serializing)]
2633    last_by_subject: Option<String>,
2634    #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2635    next_by_subject: Option<String>,
2636}
2637
2638/// Marker type indicating that headers should be included in the response.
2639pub struct WithHeaders;
2640
2641/// Marker type indicating that headers should be excluded from the response.
2642pub struct WithoutHeaders;
2643
2644/// Trait for converting a Message response into the appropriate return type.
2645trait DirectGetResponse: Sized {
2646    fn from_message(message: crate::Message) -> Result<Self, DirectGetError>;
2647}
2648
2649impl DirectGetResponse for StreamMessage {
2650    fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2651        StreamMessage::try_from(message).map_err(Into::into)
2652    }
2653}
2654
2655impl DirectGetResponse for StreamValue {
2656    fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2657        Ok(StreamValue {
2658            data: message.payload,
2659        })
2660    }
2661}
2662
2663pub struct DirectGetBuilder<T = WithHeaders> {
2664    context: Context,
2665    stream_name: String,
2666    request: DirectGetRequest,
2667    _phantom: std::marker::PhantomData<T>,
2668}
2669
2670impl DirectGetBuilder<WithHeaders> {
2671    fn new(context: Context, stream_name: String) -> DirectGetBuilder<WithHeaders> {
2672        DirectGetBuilder {
2673            context,
2674            stream_name,
2675            request: DirectGetRequest::default(),
2676            _phantom: std::marker::PhantomData,
2677        }
2678    }
2679}
2680
2681impl<T> DirectGetBuilder<T> {
2682    /// Internal method to send the direct get request and convert to the appropriate type.
2683    async fn send_internal<R: DirectGetResponse>(&self) -> Result<R, DirectGetError> {
2684        // When last_by_subject is used, the subject is embedded in the URL and we should send an empty payload
2685        // to match the NATS protocol expectation (similar to nats.go implementation)
2686        let payload = if self.request.last_by_subject.is_some() {
2687            Bytes::new()
2688        } else {
2689            serde_json::to_vec(&self.request).map(Bytes::from)?
2690        };
2691
2692        let request_subject = if let Some(ref subject) = self.request.last_by_subject {
2693            format!(
2694                "{}.DIRECT.GET.{}.{}",
2695                &self.context.prefix, &self.stream_name, subject
2696            )
2697        } else {
2698            format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.stream_name)
2699        };
2700
2701        let response = self
2702            .context
2703            .client
2704            .request(request_subject, payload)
2705            .await?;
2706
2707        // Check for error status
2708        if let Some(status) = response.status {
2709            if let Some(ref description) = response.description {
2710                match status {
2711                    StatusCode::NOT_FOUND => {
2712                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
2713                    }
2714                    // 408 is used in Direct Message for bad/empty payload.
2715                    StatusCode::TIMEOUT => {
2716                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
2717                    }
2718                    _ => {
2719                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
2720                            status,
2721                            description.to_string(),
2722                        )));
2723                    }
2724                }
2725            }
2726        }
2727
2728        R::from_message(response)
2729    }
2730
2731    /// Sets the sequence for the direct get request.
2732    pub fn sequence(mut self, seq: u64) -> Self {
2733        self.request.sequence = Some(seq);
2734        self
2735    }
2736
2737    /// Sets the last_by_subject for the direct get request.
2738    pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2739        self.request.last_by_subject = Some(subject.into());
2740        self
2741    }
2742
2743    /// Sets the next_by_subject for the direct get request.
2744    pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2745        self.request.next_by_subject = Some(subject.into());
2746        self
2747    }
2748}
2749
2750impl DirectGetBuilder<WithHeaders> {
2751    /// Sends the get request and returns a StreamMessage with headers.
2752    pub async fn send(self) -> Result<StreamMessage, DirectGetError> {
2753        self.send_internal::<StreamMessage>().await
2754    }
2755}
2756
2757impl DirectGetBuilder<WithoutHeaders> {
2758    /// Sends the get request and returns only the payload bytes without headers.
2759    pub async fn send(self) -> Result<StreamValue, DirectGetError> {
2760        self.send_internal::<StreamValue>().await
2761    }
2762}
2763
2764pub struct StreamValue {
2765    pub data: Bytes,
2766}
2767
2768#[derive(Debug, Serialize, Default)]
2769pub struct RawMessageRequest {
2770    #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2771    sequence: Option<u64>,
2772    #[serde(rename = "last_by_subj", skip_serializing_if = "Option::is_none")]
2773    last_by_subject: Option<String>,
2774    #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2775    next_by_subject: Option<String>,
2776}
2777
2778/// Trait for converting a RawMessage response into the appropriate return type.
2779trait RawMessageResponse: Sized {
2780    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError>;
2781}
2782
2783impl RawMessageResponse for StreamMessage {
2784    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2785        StreamMessage::try_from(message)
2786            .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2787    }
2788}
2789
2790impl RawMessageResponse for StreamValue {
2791    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2792        use base64::engine::general_purpose::STANDARD;
2793        use base64::Engine;
2794
2795        let decoded_payload = STANDARD.decode(message.payload).map_err(|err| {
2796            RawMessageError::with_source(
2797                RawMessageErrorKind::Other,
2798                Box::new(std::io::Error::other(err)),
2799            )
2800        })?;
2801
2802        Ok(StreamValue {
2803            data: decoded_payload.into(),
2804        })
2805    }
2806}
2807
2808pub struct RawMessageBuilder<T = WithHeaders> {
2809    context: Context,
2810    stream_name: String,
2811    request: RawMessageRequest,
2812    _phantom: std::marker::PhantomData<T>,
2813}
2814
2815impl RawMessageBuilder<WithHeaders> {
2816    fn new(context: Context, stream_name: String) -> Self {
2817        RawMessageBuilder {
2818            context,
2819            stream_name,
2820            request: RawMessageRequest::default(),
2821            _phantom: std::marker::PhantomData,
2822        }
2823    }
2824}
2825
2826impl<T> RawMessageBuilder<T> {
2827    /// Internal method to send the raw message request and convert to the appropriate type.
2828    async fn send_internal<R: RawMessageResponse>(&self) -> Result<R, RawMessageError> {
2829        // Validate subjects
2830        for subject in [&self.request.last_by_subject, &self.request.next_by_subject]
2831            .into_iter()
2832            .flatten()
2833        {
2834            if !is_valid_subject(subject) {
2835                return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
2836            }
2837        }
2838
2839        let subject = format!("STREAM.MSG.GET.{}", &self.stream_name);
2840
2841        let response: Response<GetRawMessage> = self
2842            .context
2843            .request(subject, &self.request)
2844            .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2845            .await?;
2846
2847        match response {
2848            Response::Err { error } => {
2849                if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
2850                    Err(RawMessageError::new(RawMessageErrorKind::NoMessageFound))
2851                } else {
2852                    Err(RawMessageError::new(RawMessageErrorKind::JetStream(error)))
2853                }
2854            }
2855            Response::Ok(value) => R::from_raw_message(value.message),
2856        }
2857    }
2858
2859    /// Sets the sequence for the raw message request.
2860    pub fn sequence(mut self, seq: u64) -> Self {
2861        self.request.sequence = Some(seq);
2862        self
2863    }
2864
2865    /// Sets the last_by_subject for the raw message request.
2866    pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2867        self.request.last_by_subject = Some(subject.into());
2868        self
2869    }
2870
2871    /// Sets the next_by_subject for the raw message request.
2872    pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2873        self.request.next_by_subject = Some(subject.into());
2874        self
2875    }
2876}
2877
2878impl RawMessageBuilder<WithHeaders> {
2879    /// Sends the raw message request and returns a StreamMessage with headers.
2880    pub async fn send(self) -> Result<StreamMessage, RawMessageError> {
2881        self.send_internal::<StreamMessage>().await
2882    }
2883}
2884
2885impl RawMessageBuilder<WithoutHeaders> {
2886    /// Sends the raw message request and returns only the payload as StreamValue.
2887    pub async fn send(self) -> Result<StreamValue, RawMessageError> {
2888        self.send_internal::<StreamValue>().await
2889    }
2890}
2891
2892#[cfg(test)]
2893mod tests {
2894    use super::*;
2895
2896    #[test]
2897    fn consumer_limits_de() {
2898        let config = Config {
2899            ..Default::default()
2900        };
2901
2902        let roundtrip: Config = {
2903            let ser = serde_json::to_string(&config).unwrap();
2904            serde_json::from_str(&ser).unwrap()
2905        };
2906        assert_eq!(config, roundtrip);
2907    }
2908}