Skip to main content

async_nats/jetstream/
stream.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on a [Stream], create/delete/update [Consumer].
15
16use std::{
17    collections::{self, HashMap},
18    fmt::{self, Debug, Display},
19    future::IntoFuture,
20    io::{self},
21    pin::Pin,
22    str::FromStr,
23    task::Poll,
24    time::Duration,
25};
26
27use crate::{
28    error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39    consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40    context::{
41        ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42        StreamsErrorKind,
43    },
44    errors::ErrorCode,
45    is_valid_name,
46    message::{StreamMessage, StreamMessageError},
47    response::Response,
48    Context,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55    NotFound,
56    InvalidSubject,
57    TimedOut,
58    Request,
59    ErrorResponse(StatusCode, String),
60    Other,
61}
62
63impl Display for DirectGetErrorKind {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        match self {
66            Self::InvalidSubject => write!(f, "invalid subject"),
67            Self::NotFound => write!(f, "message not found"),
68            Self::ErrorResponse(status, description) => {
69                write!(f, "unable to get message: {status} {description}")
70            }
71            Self::Other => write!(f, "error getting message"),
72            Self::TimedOut => write!(f, "timed out"),
73            Self::Request => write!(f, "request failed"),
74        }
75    }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81    fn from(err: crate::RequestError) -> Self {
82        match err.kind() {
83            crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84            crate::RequestErrorKind::NoResponders => {
85                DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86                    StatusCode::NO_RESPONDERS,
87                    "no responders".to_string(),
88                ))
89            }
90            crate::RequestErrorKind::InvalidSubject | crate::RequestErrorKind::Other => {
91                DirectGetError::with_source(DirectGetErrorKind::Other, err)
92            }
93        }
94    }
95}
96
97impl From<serde_json::Error> for DirectGetError {
98    fn from(err: serde_json::Error) -> Self {
99        DirectGetError::with_source(DirectGetErrorKind::Other, err)
100    }
101}
102
103impl From<StreamMessageError> for DirectGetError {
104    fn from(err: StreamMessageError) -> Self {
105        DirectGetError::with_source(DirectGetErrorKind::Other, err)
106    }
107}
108
109#[derive(Clone, Debug, PartialEq)]
110pub enum DeleteMessageErrorKind {
111    Request,
112    TimedOut,
113    JetStream(super::errors::Error),
114}
115
116impl Display for DeleteMessageErrorKind {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        match self {
119            Self::Request => write!(f, "request failed"),
120            Self::TimedOut => write!(f, "timed out"),
121            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
122        }
123    }
124}
125
126pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
127
128/// Handle to operations that can be performed on a `Stream`.
129/// It's generic over the type of `info` field to allow `Stream` with or without
130/// info contents.
131#[derive(Debug, Clone)]
132pub struct Stream<T = Info> {
133    pub(crate) info: T,
134    pub(crate) context: Context,
135    pub(crate) name: String,
136}
137
138impl Stream<Info> {
139    /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
140    /// [Stream] and returns it.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// # #[tokio::main]
146    /// # async fn main() -> Result<(), async_nats::Error> {
147    /// let client = async_nats::connect("localhost:4222").await?;
148    /// let jetstream = async_nats::jetstream::new(client);
149    ///
150    /// let mut stream = jetstream.get_stream("events").await?;
151    ///
152    /// let info = stream.info().await?;
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub async fn info(&mut self) -> Result<&Info, InfoError> {
157        let subject = format!("STREAM.INFO.{}", self.info.config.name);
158
159        match self.context.request(subject, &json!({})).await? {
160            Response::Ok::<Info>(info) => {
161                self.info = info;
162                Ok(&self.info)
163            }
164            Response::Err { error } => Err(error.into()),
165        }
166    }
167
168    /// Returns cached [Info] for the [Stream].
169    /// Cache is either from initial creation/retrieval of the [Stream] or last call to
170    /// [Stream::info].
171    ///
172    /// # Examples
173    ///
174    /// ```no_run
175    /// # #[tokio::main]
176    /// # async fn main() -> Result<(), async_nats::Error> {
177    /// let client = async_nats::connect("localhost:4222").await?;
178    /// let jetstream = async_nats::jetstream::new(client);
179    ///
180    /// let stream = jetstream.get_stream("events").await?;
181    ///
182    /// let info = stream.cached_info();
183    /// # Ok(())
184    /// # }
185    /// ```
186    pub fn cached_info(&self) -> &Info {
187        &self.info
188    }
189}
190
191impl<I> Stream<I> {
192    /// Retrieves `info` about [Stream] from the server. Does not update the cache.
193    /// Can be used on Stream retrieved by [Context::get_stream_no_info]
194    pub async fn get_info(&self) -> Result<Info, InfoError> {
195        let subject = format!("STREAM.INFO.{}", self.name);
196
197        match self.context.request(subject, &json!({})).await? {
198            Response::Ok::<Info>(info) => Ok(info),
199            Response::Err { error } => Err(error.into()),
200        }
201    }
202
203    /// Retrieves [[Info]] from the server and returns a [[futures_util::Stream]] that allows
204    /// iterating over all subjects in the stream fetched via paged API.
205    ///
206    /// # Examples
207    ///
208    /// ```no_run
209    /// # #[tokio::main]
210    /// # async fn main() -> Result<(), async_nats::Error> {
211    /// use futures_util::TryStreamExt;
212    /// let client = async_nats::connect("localhost:4222").await?;
213    /// let jetstream = async_nats::jetstream::new(client);
214    ///
215    /// let mut stream = jetstream.get_stream("events").await?;
216    ///
217    /// let mut info = stream.info_with_subjects("events.>").await?;
218    ///
219    /// while let Some((subject, count)) = info.try_next().await? {
220    ///     println!("Subject: {} count: {}", subject, count);
221    /// }
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub async fn info_with_subjects<F: AsRef<str>>(
226        &self,
227        subjects_filter: F,
228    ) -> Result<InfoWithSubjects, InfoError> {
229        let subjects_filter = subjects_filter.as_ref().to_string();
230        // TODO: validate the subject and decide if this should be a `Subject`
231        let info = stream_info_with_details(
232            self.context.clone(),
233            self.name.clone(),
234            0,
235            false,
236            subjects_filter.clone(),
237        )
238        .await?;
239
240        Ok(InfoWithSubjects::new(
241            self.context.clone(),
242            info,
243            subjects_filter,
244        ))
245    }
246
247    /// Creates a builder that allows to customize `Stream::Info`.
248    ///
249    /// # Examples
250    /// ```no_run
251    /// # #[tokio::main]
252    /// # async fn main() -> Result<(), async_nats::Error> {
253    /// use futures_util::TryStreamExt;
254    /// let client = async_nats::connect("localhost:4222").await?;
255    /// let jetstream = async_nats::jetstream::new(client);
256    ///
257    /// let mut stream = jetstream.get_stream("events").await?;
258    ///
259    /// let mut info = stream
260    ///     .info_builder()
261    ///     .with_deleted(true)
262    ///     .subjects("events.>")
263    ///     .fetch()
264    ///     .await?;
265    ///
266    /// while let Some((subject, count)) = info.try_next().await? {
267    ///     println!("Subject: {} count: {}", subject, count);
268    /// }
269    /// # Ok(())
270    /// # }
271    /// ```
272    pub fn info_builder(&self) -> StreamInfoBuilder {
273        StreamInfoBuilder::new(self.context.clone(), self.name.clone())
274    }
275
276    /// Creates a builder for direct get operations.
277    ///
278    /// Allows for more control over direct get requests.
279    ///
280    /// # Examples
281    ///
282    /// ```no_run
283    /// # #[tokio::main]
284    /// # async fn main() -> Result<(), async_nats::Error> {
285    /// let client = async_nats::connect("demo.nats.io").await?;
286    /// let jetstream = async_nats::jetstream::new(client);
287    ///
288    /// let stream = jetstream.get_stream("events").await?;
289    ///
290    /// // Get message without headers
291    /// let message = stream.direct_get_builder().sequence(100).send().await?;
292    /// # Ok(())
293    /// # }
294    /// ```
295    pub fn direct_get_builder(&self) -> DirectGetBuilder<WithHeaders> {
296        DirectGetBuilder::new(self.context.clone(), self.name.clone())
297    }
298
299    /// Gets next message for a [Stream].
300    ///
301    /// Requires a [Stream] with `allow_direct` set to `true`.
302    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
303    /// from any replica member. This means read after write is possible,
304    /// as that given replica might not yet catch up with the leader.
305    ///
306    /// # Examples
307    ///
308    /// ```no_run
309    /// # #[tokio::main]
310    /// # async fn main() -> Result<(), async_nats::Error> {
311    /// let client = async_nats::connect("demo.nats.io").await?;
312    /// let jetstream = async_nats::jetstream::new(client);
313    ///
314    /// let stream = jetstream
315    ///     .create_stream(async_nats::jetstream::stream::Config {
316    ///         name: "events".to_string(),
317    ///         subjects: vec!["events.>".to_string()],
318    ///         allow_direct: true,
319    ///         ..Default::default()
320    ///     })
321    ///     .await?;
322    ///
323    /// jetstream.publish("events.data", "data".into()).await?;
324    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
325    ///
326    /// let message = stream
327    ///     .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
328    ///     .await?;
329    ///
330    /// # Ok(())
331    /// # }
332    /// ```
333    pub async fn direct_get_next_for_subject<T: Into<String>>(
334        &self,
335        subject: T,
336        sequence: Option<u64>,
337    ) -> Result<StreamMessage, DirectGetError> {
338        let subject_str = subject.into();
339        if !is_valid_subject(&subject_str) {
340            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
341        }
342
343        let mut builder = self.direct_get_builder().next_by_subject(subject_str);
344        if let Some(seq) = sequence {
345            builder = builder.sequence(seq);
346        }
347
348        builder.send().await
349    }
350
351    /// Gets first message from [Stream].
352    ///
353    /// Requires a [Stream] with `allow_direct` set to `true`.
354    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
355    /// from any replica member. This means read after write is possible,
356    /// as that given replica might not yet catch up with the leader.
357    ///
358    /// # Examples
359    ///
360    /// ```no_run
361    /// # #[tokio::main]
362    /// # async fn main() -> Result<(), async_nats::Error> {
363    /// let client = async_nats::connect("demo.nats.io").await?;
364    /// let jetstream = async_nats::jetstream::new(client);
365    ///
366    /// let stream = jetstream
367    ///     .create_stream(async_nats::jetstream::stream::Config {
368    ///         name: "events".to_string(),
369    ///         subjects: vec!["events.>".to_string()],
370    ///         allow_direct: true,
371    ///         ..Default::default()
372    ///     })
373    ///     .await?;
374    ///
375    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
376    ///
377    /// let message = stream.direct_get_first_for_subject("events.data").await?;
378    ///
379    /// # Ok(())
380    /// # }
381    /// ```
382    pub async fn direct_get_first_for_subject<T: Into<String>>(
383        &self,
384        subject: T,
385    ) -> Result<StreamMessage, DirectGetError> {
386        let subject_str = subject.into();
387        if !is_valid_subject(&subject_str) {
388            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
389        }
390
391        self.direct_get_builder()
392            .next_by_subject(subject_str)
393            .send()
394            .await
395    }
396
397    /// Gets message from [Stream] with given `sequence id`.
398    ///
399    /// Requires a [Stream] with `allow_direct` set to `true`.
400    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
401    /// from any replica member. This means read after write is possible,
402    /// as that given replica might not yet catch up with the leader.
403    ///
404    /// # Examples
405    ///
406    /// ```no_run
407    /// # #[tokio::main]
408    /// # async fn main() -> Result<(), async_nats::Error> {
409    /// let client = async_nats::connect("demo.nats.io").await?;
410    /// let jetstream = async_nats::jetstream::new(client);
411    ///
412    /// let stream = jetstream
413    ///     .create_stream(async_nats::jetstream::stream::Config {
414    ///         name: "events".to_string(),
415    ///         subjects: vec!["events.>".to_string()],
416    ///         allow_direct: true,
417    ///         ..Default::default()
418    ///     })
419    ///     .await?;
420    ///
421    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
422    ///
423    /// let message = stream.direct_get(pub_ack.await?.sequence).await?;
424    ///
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
429        self.direct_get_builder().sequence(sequence).send().await
430    }
431
432    /// Gets last message for a given `subject`.
433    ///
434    /// Requires a [Stream] with `allow_direct` set to `true`.
435    /// This is different from [Stream::get_raw_message], as it can fetch [super::message::StreamMessage]
436    /// from any replica member. This means read after write is possible,
437    /// as that given replica might not yet catch up with the leader.
438    ///
439    /// # Examples
440    ///
441    /// ```no_run
442    /// # #[tokio::main]
443    /// # async fn main() -> Result<(), async_nats::Error> {
444    /// let client = async_nats::connect("demo.nats.io").await?;
445    /// let jetstream = async_nats::jetstream::new(client);
446    ///
447    /// let stream = jetstream
448    ///     .create_stream(async_nats::jetstream::stream::Config {
449    ///         name: "events".to_string(),
450    ///         subjects: vec!["events.>".to_string()],
451    ///         allow_direct: true,
452    ///         ..Default::default()
453    ///     })
454    ///     .await?;
455    ///
456    /// jetstream.publish("events.data", "data".into()).await?;
457    ///
458    /// let message = stream.direct_get_last_for_subject("events.data").await?;
459    ///
460    /// # Ok(())
461    /// # }
462    /// ```
463    pub async fn direct_get_last_for_subject<T: Into<String>>(
464        &self,
465        subject: T,
466    ) -> Result<StreamMessage, DirectGetError> {
467        self.direct_get_builder()
468            .last_by_subject(subject)
469            .send()
470            .await
471    }
472    /// Creates a builder for retrieving messages from the stream with flexible options.
473    /// Raw message methods retrieve messages directly from the stream leader instead
474    /// of a replica. This should be used with care, as it can put additional load on the
475    /// stream leader.
476    ///
477    /// # Examples
478    ///
479    /// ```no_run
480    /// # #[tokio::main]
481    /// # async fn main() -> Result<(), async_nats::Error> {
482    /// let client = async_nats::connect("localhost:4222").await?;
483    /// let context = async_nats::jetstream::new(client);
484    /// let stream = context.get_stream("events").await?;
485    ///
486    /// // Get message without headers
487    /// let value = stream.raw_message_builder().sequence(100).send().await?;
488    /// # Ok(())
489    /// # }
490    /// ```
491    pub fn raw_message_builder(&self) -> RawMessageBuilder<WithHeaders> {
492        RawMessageBuilder::new(self.context.clone(), self.name.clone())
493    }
494
495    /// Get a raw message from the stream for a given stream sequence.
496    /// This low-level API always reaches stream leader.
497    /// This should be discouraged in favor of using [Stream::direct_get].
498    ///
499    /// # Examples
500    ///
501    /// ```no_run
502    /// #[tokio::main]
503    /// # async fn main() -> Result<(), async_nats::Error> {
504    /// use futures_util::StreamExt;
505    /// use futures_util::TryStreamExt;
506    ///
507    /// let client = async_nats::connect("localhost:4222").await?;
508    /// let context = async_nats::jetstream::new(client);
509    ///
510    /// let stream = context
511    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
512    ///         name: "events".to_string(),
513    ///         max_messages: 10_000,
514    ///         ..Default::default()
515    ///     })
516    ///     .await?;
517    ///
518    /// let publish_ack = context.publish("events", "data".into()).await?;
519    /// let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
520    /// println!("Retrieved raw message {:?}", raw_message);
521    /// # Ok(())
522    /// # }
523    /// ```
524    pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
525        self.raw_message_builder().sequence(sequence).send().await
526    }
527
528    /// Get a first message from the stream for a given subject starting from provided sequence.
529    /// This low-level API always reaches stream leader.
530    /// This should be discouraged in favor of using [Stream::direct_get_first_for_subject].
531    ///
532    /// # Examples
533    ///
534    /// ```no_run
535    /// #[tokio::main]
536    /// # async fn main() -> Result<(), async_nats::Error> {
537    /// use futures_util::StreamExt;
538    /// use futures_util::TryStreamExt;
539    ///
540    /// let client = async_nats::connect("localhost:4222").await?;
541    /// let context = async_nats::jetstream::new(client);
542    /// let stream = context.get_stream_no_info("events").await?;
543    ///
544    /// let raw_message = stream
545    ///     .get_first_raw_message_by_subject("events.created", 10)
546    ///     .await?;
547    /// println!("Retrieved raw message {:?}", raw_message);
548    /// # Ok(())
549    /// # }
550    /// ```
551    pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
552        &self,
553        subject: T,
554        sequence: u64,
555    ) -> Result<StreamMessage, RawMessageError> {
556        self.raw_message_builder()
557            .sequence(sequence)
558            .next_by_subject(subject.as_ref().to_string())
559            .send()
560            .await
561    }
562
563    /// Get a next message from the stream for a given subject.
564    /// This low-level API always reaches stream leader.
565    /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
566    ///
567    /// # Examples
568    ///
569    /// ```no_run
570    /// #[tokio::main]
571    /// # async fn main() -> Result<(), async_nats::Error> {
572    /// use futures_util::StreamExt;
573    /// use futures_util::TryStreamExt;
574    ///
575    /// let client = async_nats::connect("localhost:4222").await?;
576    /// let context = async_nats::jetstream::new(client);
577    /// let stream = context.get_stream_no_info("events").await?;
578    ///
579    /// let raw_message = stream
580    ///     .get_next_raw_message_by_subject("events.created")
581    ///     .await?;
582    /// println!("Retrieved raw message {:?}", raw_message);
583    /// # Ok(())
584    /// # }
585    /// ```
586    pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
587        &self,
588        subject: T,
589    ) -> Result<StreamMessage, RawMessageError> {
590        self.raw_message_builder()
591            .next_by_subject(subject.as_ref().to_string())
592            .send()
593            .await
594    }
595
596    /// Get a last message from the stream for a given subject.
597    /// This low-level API always reaches stream leader.
598    /// This should be discouraged in favor of using [Stream::direct_get_last_for_subject].
599    ///
600    /// # Examples
601    ///
602    /// ```no_run
603    /// #[tokio::main]
604    /// # async fn main() -> Result<(), async_nats::Error> {
605    /// use futures_util::StreamExt;
606    /// use futures_util::TryStreamExt;
607    ///
608    /// let client = async_nats::connect("localhost:4222").await?;
609    /// let context = async_nats::jetstream::new(client);
610    /// let stream = context.get_stream_no_info("events").await?;
611    ///
612    /// let raw_message = stream
613    ///     .get_last_raw_message_by_subject("events.created")
614    ///     .await?;
615    /// println!("Retrieved raw message {:?}", raw_message);
616    /// # Ok(())
617    /// # }
618    /// ```
619    pub async fn get_last_raw_message_by_subject(
620        &self,
621        stream_subject: &str,
622    ) -> Result<StreamMessage, LastRawMessageError> {
623        self.raw_message_builder()
624            .last_by_subject(stream_subject.to_string())
625            .send()
626            .await
627    }
628
629    /// Delete a message from the stream.
630    ///
631    /// # Examples
632    ///
633    /// ```no_run
634    /// # #[tokio::main]
635    /// # async fn main() -> Result<(), async_nats::Error> {
636    /// let client = async_nats::connect("localhost:4222").await?;
637    /// let context = async_nats::jetstream::new(client);
638    ///
639    /// let stream = context
640    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
641    ///         name: "events".to_string(),
642    ///         max_messages: 10_000,
643    ///         ..Default::default()
644    ///     })
645    ///     .await?;
646    ///
647    /// let publish_ack = context.publish("events", "data".into()).await?;
648    /// stream.delete_message(publish_ack.await?.sequence).await?;
649    /// # Ok(())
650    /// # }
651    /// ```
652    pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
653        let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
654        let payload = json!({
655            "seq": sequence,
656        });
657
658        let response: Response<DeleteStatus> = self
659            .context
660            .request(subject, &payload)
661            .map_err(|err| match err.kind() {
662                RequestErrorKind::TimedOut => {
663                    DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
664                }
665                _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
666            })
667            .await?;
668
669        match response {
670            Response::Err { error } => Err(DeleteMessageError::new(
671                DeleteMessageErrorKind::JetStream(error),
672            )),
673            Response::Ok(value) => Ok(value.success),
674        }
675    }
676
677    /// Purge `Stream` messages.
678    ///
679    /// # Examples
680    ///
681    /// ```no_run
682    /// # #[tokio::main]
683    /// # async fn main() -> Result<(), async_nats::Error> {
684    /// let client = async_nats::connect("demo.nats.io").await?;
685    /// let jetstream = async_nats::jetstream::new(client);
686    ///
687    /// let stream = jetstream.get_stream("events").await?;
688    /// stream.purge().await?;
689    /// # Ok(())
690    /// # }
691    /// ```
692    pub fn purge(&self) -> Purge<No, No> {
693        Purge::build(self)
694    }
695
696    /// Purge `Stream` messages for a matching subject.
697    ///
698    /// # Examples
699    ///
700    /// ```no_run
701    /// # #[tokio::main]
702    /// # #[allow(deprecated)]
703    /// # async fn main() -> Result<(), async_nats::Error> {
704    /// let client = async_nats::connect("demo.nats.io").await?;
705    /// let jetstream = async_nats::jetstream::new(client);
706    ///
707    /// let stream = jetstream.get_stream("events").await?;
708    /// stream.purge_subject("data").await?;
709    /// # Ok(())
710    /// # }
711    /// ```
712    #[deprecated(
713        since = "0.25.0",
714        note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
715    )]
716    pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
717    where
718        T: Into<String>,
719    {
720        self.purge().filter(subject).await
721    }
722
723    /// Create or update `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
724    /// returns the info from the server about created [Consumer]
725    /// If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
726    ///
727    /// # Examples
728    ///
729    /// ```no_run
730    /// # #[tokio::main]
731    /// # async fn main() -> Result<(), async_nats::Error> {
732    /// use async_nats::jetstream::consumer;
733    /// let client = async_nats::connect("localhost:4222").await?;
734    /// let jetstream = async_nats::jetstream::new(client);
735    ///
736    /// let stream = jetstream.get_stream("events").await?;
737    /// let info = stream
738    ///     .create_consumer(consumer::pull::Config {
739    ///         durable_name: Some("pull".to_string()),
740    ///         ..Default::default()
741    ///     })
742    ///     .await?;
743    /// # Ok(())
744    /// # }
745    /// ```
746    pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
747        &self,
748        config: C,
749    ) -> Result<Consumer<C>, ConsumerError> {
750        self.context
751            .create_consumer_on_stream(config, self.name.clone())
752            .await
753    }
754
755    /// Update an existing consumer.
756    /// This call will fail if the consumer does not exist.
757    /// returns the info from the server about updated [Consumer].
758    ///
759    /// # Examples
760    ///
761    /// ```no_run
762    /// # #[tokio::main]
763    /// # async fn main() -> Result<(), async_nats::Error> {
764    /// use async_nats::jetstream::consumer;
765    /// let client = async_nats::connect("localhost:4222").await?;
766    /// let jetstream = async_nats::jetstream::new(client);
767    ///
768    /// let stream = jetstream.get_stream("events").await?;
769    /// let info = stream
770    ///     .update_consumer(consumer::pull::Config {
771    ///         durable_name: Some("pull".to_string()),
772    ///         ..Default::default()
773    ///     })
774    ///     .await?;
775    /// # Ok(())
776    /// # }
777    /// ```
778    #[cfg(feature = "server_2_10")]
779    pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
780        &self,
781        config: C,
782    ) -> Result<Consumer<C>, ConsumerUpdateError> {
783        self.context
784            .update_consumer_on_stream(config, self.name.clone())
785            .await
786    }
787
788    /// Create consumer, but only if it does not exist or the existing config is exactly
789    /// the same.
790    /// This method will fail if consumer is already present with different config.
791    /// returns the info from the server about created [Consumer].
792    ///
793    /// # Examples
794    ///
795    /// ```no_run
796    /// # #[tokio::main]
797    /// # async fn main() -> Result<(), async_nats::Error> {
798    /// use async_nats::jetstream::consumer;
799    /// let client = async_nats::connect("localhost:4222").await?;
800    /// let jetstream = async_nats::jetstream::new(client);
801    ///
802    /// let stream = jetstream.get_stream("events").await?;
803    /// let info = stream
804    ///     .create_consumer_strict(consumer::pull::Config {
805    ///         durable_name: Some("pull".to_string()),
806    ///         ..Default::default()
807    ///     })
808    ///     .await?;
809    /// # Ok(())
810    /// # }
811    /// ```
812    #[cfg(feature = "server_2_10")]
813    pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
814        &self,
815        config: C,
816    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
817        self.context
818            .create_consumer_strict_on_stream(config, self.name.clone())
819            .await
820    }
821
822    /// Retrieve [Info] about [Consumer] from the server.
823    ///
824    /// # Examples
825    ///
826    /// ```no_run
827    /// # #[tokio::main]
828    /// # async fn main() -> Result<(), async_nats::Error> {
829    /// use async_nats::jetstream::consumer;
830    /// let client = async_nats::connect("localhost:4222").await?;
831    /// let jetstream = async_nats::jetstream::new(client);
832    ///
833    /// let stream = jetstream.get_stream("events").await?;
834    /// let info = stream.consumer_info("pull").await?;
835    /// # Ok(())
836    /// # }
837    /// ```
838    pub async fn consumer_info<T: AsRef<str>>(
839        &self,
840        name: T,
841    ) -> Result<consumer::Info, ConsumerInfoError> {
842        let name = name.as_ref();
843
844        if !is_valid_name(name) {
845            return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
846        }
847
848        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
849
850        match self.context.request(subject, &json!({})).await? {
851            Response::Ok(info) => Ok(info),
852            Response::Err { error } => Err(error.into()),
853        }
854    }
855
856    /// Get [Consumer] from the the server. [Consumer] iterators can be used to retrieve
857    /// [Messages][crate::jetstream::Message] for a given [Consumer].
858    ///
859    /// # Examples
860    ///
861    /// ```no_run
862    /// # #[tokio::main]
863    /// # async fn main() -> Result<(), async_nats::Error> {
864    /// use async_nats::jetstream::consumer;
865    /// use futures_util::StreamExt;
866    /// let client = async_nats::connect("localhost:4222").await?;
867    /// let jetstream = async_nats::jetstream::new(client);
868    ///
869    /// let stream = jetstream.get_stream("events").await?;
870    /// let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
871    /// # Ok(())
872    /// # }
873    /// ```
874    pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
875        &self,
876        name: &str,
877    ) -> Result<Consumer<T>, crate::Error> {
878        let info = self.consumer_info(name).await?;
879
880        Ok(Consumer::new(
881            T::try_from_consumer_config(info.config.clone())?,
882            info,
883            self.context.clone(),
884        ))
885    }
886
887    /// Create a [Consumer] with the given configuration if it is not present on the server. Returns a handle to the [Consumer].
888    ///
889    /// Note: This does not validate if the [Consumer] on the server is compatible with the configuration passed in except Push/Pull compatibility.
890    ///
891    /// # Examples
892    ///
893    /// ```no_run
894    /// # #[tokio::main]
895    /// # async fn main() -> Result<(), async_nats::Error> {
896    /// use async_nats::jetstream::consumer;
897    /// use futures_util::StreamExt;
898    /// let client = async_nats::connect("localhost:4222").await?;
899    /// let jetstream = async_nats::jetstream::new(client);
900    ///
901    /// let stream = jetstream.get_stream("events").await?;
902    /// let consumer = stream
903    ///     .get_or_create_consumer(
904    ///         "pull",
905    ///         consumer::pull::Config {
906    ///             durable_name: Some("pull".to_string()),
907    ///             ..Default::default()
908    ///         },
909    ///     )
910    ///     .await?;
911    /// # Ok(())
912    /// # }
913    /// ```
914    pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
915        &self,
916        name: &str,
917        config: T,
918    ) -> Result<Consumer<T>, ConsumerError> {
919        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
920
921        match self.context.request(subject, &json!({})).await? {
922            Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
923            Response::Err { error } => Err(error.into()),
924            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
925                T::try_from_consumer_config(info.config.clone()).map_err(|err| {
926                    ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
927                })?,
928                info,
929                self.context.clone(),
930            )),
931        }
932    }
933
934    /// Delete a [Consumer] from the server.
935    ///
936    /// # Examples
937    ///
938    /// ```no_run
939    /// # #[tokio::main]
940    /// # async fn main() -> Result<(), async_nats::Error> {
941    /// use async_nats::jetstream::consumer;
942    /// use futures_util::StreamExt;
943    /// let client = async_nats::connect("localhost:4222").await?;
944    /// let jetstream = async_nats::jetstream::new(client);
945    ///
946    /// jetstream
947    ///     .get_stream("events")
948    ///     .await?
949    ///     .delete_consumer("pull")
950    ///     .await?;
951    /// # Ok(())
952    /// # }
953    /// ```
954    pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
955        let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
956
957        match self.context.request(subject, &json!({})).await? {
958            Response::Ok(delete_status) => Ok(delete_status),
959            Response::Err { error } => Err(error.into()),
960        }
961    }
962
963    /// Pause a [Consumer] until the given time.
964    /// It will not deliver any messages to clients during that time.
965    ///
966    /// # Examples
967    ///
968    /// ```no_run
969    /// # #[tokio::main]
970    /// # async fn main() -> Result<(), async_nats::Error> {
971    /// use async_nats::jetstream::consumer;
972    /// use futures_util::StreamExt;
973    /// let client = async_nats::connect("localhost:4222").await?;
974    /// let jetstream = async_nats::jetstream::new(client);
975    /// let pause_until =
976    ///     time::OffsetDateTime::now_utc().saturating_add(time::Duration::seconds_f32(10.0));
977    ///
978    /// jetstream
979    ///     .get_stream("events")
980    ///     .await?
981    ///     .pause_consumer("my_consumer", pause_until)
982    ///     .await?;
983    /// # Ok(())
984    /// # }
985    /// ```
986    #[cfg(feature = "server_2_11")]
987    pub async fn pause_consumer(
988        &self,
989        name: &str,
990        pause_until: OffsetDateTime,
991    ) -> Result<PauseResponse, ConsumerError> {
992        self.request_pause_consumer(name, Some(pause_until)).await
993    }
994
995    /// Resume a paused [Consumer].
996    ///
997    /// # Examples
998    ///
999    /// ```no_run
1000    /// # #[tokio::main]
1001    /// # async fn main() -> Result<(), async_nats::Error> {
1002    /// use async_nats::jetstream::consumer;
1003    /// use futures_util::StreamExt;
1004    /// let client = async_nats::connect("localhost:4222").await?;
1005    /// let jetstream = async_nats::jetstream::new(client);
1006    ///
1007    /// jetstream
1008    ///     .get_stream("events")
1009    ///     .await?
1010    ///     .resume_consumer("my_consumer")
1011    ///     .await?;
1012    /// # Ok(())
1013    /// # }
1014    /// ```
1015    #[cfg(feature = "server_2_11")]
1016    pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1017        self.request_pause_consumer(name, None).await
1018    }
1019
1020    #[cfg(feature = "server_2_11")]
1021    async fn request_pause_consumer(
1022        &self,
1023        name: &str,
1024        pause_until: Option<OffsetDateTime>,
1025    ) -> Result<PauseResponse, ConsumerError> {
1026        let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1027        let payload = &PauseResumeConsumerRequest { pause_until };
1028
1029        match self.context.request(subject, payload).await? {
1030            Response::Ok::<PauseResponse>(resp) => Ok(resp),
1031            Response::Err { error } => Err(error.into()),
1032        }
1033    }
1034
1035    /// Reset a [Consumer]'s delivery state (ADR-60).
1036    ///
1037    /// `seq` semantics:
1038    /// - `None` (or `Some(0)`): reset back to the consumer's ack floor.
1039    ///   Pending and redelivered messages are cleared; the ack-floor stream
1040    ///   sequence is left where it was.
1041    /// - `Some(n)` with `n > 0`: ack-floor stream sequence is set to one
1042    ///   below `n`; the next delivered message will have a stream sequence
1043    ///   of at least `n`.
1044    ///
1045    /// Only valid on consumers with
1046    /// [`DeliverPolicy::All`][crate::jetstream::consumer::DeliverPolicy::All],
1047    /// [`DeliverPolicy::ByStartSequence`][crate::jetstream::consumer::DeliverPolicy::ByStartSequence],
1048    /// or
1049    /// [`DeliverPolicy::ByStartTime`][crate::jetstream::consumer::DeliverPolicy::ByStartTime].
1050    /// For policies with a configured starting sequence/time, resets below
1051    /// the configured start are rejected by the server.
1052    ///
1053    /// # Examples
1054    ///
1055    /// ```no_run
1056    /// # #[tokio::main]
1057    /// # async fn main() -> Result<(), async_nats::Error> {
1058    /// let client = async_nats::connect("localhost:4222").await?;
1059    /// let jetstream = async_nats::jetstream::new(client);
1060    ///
1061    /// let stream = jetstream.get_stream("events").await?;
1062    /// stream.reset_consumer("processor", Some(42)).await?;
1063    /// # Ok(())
1064    /// # }
1065    /// ```
1066    #[cfg(feature = "server_2_14")]
1067    #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1068    pub async fn reset_consumer(
1069        &self,
1070        name: &str,
1071        seq: Option<u64>,
1072    ) -> Result<ConsumerResetResponse, ConsumerResetError> {
1073        let subject = format!("CONSUMER.RESET.{}.{}", self.name, name);
1074        let payload = ConsumerResetRequest {
1075            seq: seq.unwrap_or(0),
1076        };
1077
1078        match self.context.request(subject, &payload).await? {
1079            Response::Ok::<ConsumerResetResponse>(resp) => Ok(resp),
1080            Response::Err { error } => Err(error.into()),
1081        }
1082    }
1083
1084    /// Lists names of all consumers for current stream.
1085    ///
1086    /// # Examples
1087    ///
1088    /// ```no_run
1089    /// # #[tokio::main]
1090    /// # async fn main() -> Result<(), async_nats::Error> {
1091    /// use futures_util::TryStreamExt;
1092    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1093    /// let jetstream = async_nats::jetstream::new(client);
1094    /// let stream = jetstream.get_stream("stream").await?;
1095    /// let mut names = stream.consumer_names();
1096    /// while let Some(consumer) = names.try_next().await? {
1097    ///     println!("consumer: {stream:?}");
1098    /// }
1099    /// # Ok(())
1100    /// # }
1101    /// ```
1102    pub fn consumer_names(&self) -> ConsumerNames {
1103        ConsumerNames {
1104            context: self.context.clone(),
1105            stream: self.name.clone(),
1106            offset: 0,
1107            page_request: None,
1108            consumers: Vec::new(),
1109            done: false,
1110        }
1111    }
1112
1113    /// Lists all consumers info for current stream.
1114    ///
1115    /// # Examples
1116    ///
1117    /// ```no_run
1118    /// # #[tokio::main]
1119    /// # async fn main() -> Result<(), async_nats::Error> {
1120    /// use futures_util::TryStreamExt;
1121    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1122    /// let jetstream = async_nats::jetstream::new(client);
1123    /// let stream = jetstream.get_stream("stream").await?;
1124    /// let mut consumers = stream.consumers();
1125    /// while let Some(consumer) = consumers.try_next().await? {
1126    ///     println!("consumer: {consumer:?}");
1127    /// }
1128    /// # Ok(())
1129    /// # }
1130    /// ```
1131    pub fn consumers(&self) -> Consumers {
1132        Consumers {
1133            context: self.context.clone(),
1134            stream: self.name.clone(),
1135            offset: 0,
1136            page_request: None,
1137            consumers: Vec::new(),
1138            done: false,
1139        }
1140    }
1141}
1142
1143pub struct StreamInfoBuilder {
1144    pub(crate) context: Context,
1145    pub(crate) name: String,
1146    pub(crate) deleted: bool,
1147    pub(crate) subject: String,
1148}
1149
1150impl StreamInfoBuilder {
1151    fn new(context: Context, name: String) -> Self {
1152        Self {
1153            context,
1154            name,
1155            deleted: false,
1156            subject: "".to_string(),
1157        }
1158    }
1159
1160    pub fn with_deleted(mut self, deleted: bool) -> Self {
1161        self.deleted = deleted;
1162        self
1163    }
1164
1165    pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1166        self.subject = subject.into();
1167        self
1168    }
1169
1170    pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1171        let info = stream_info_with_details(
1172            self.context.clone(),
1173            self.name.clone(),
1174            0,
1175            self.deleted,
1176            self.subject.clone(),
1177        )
1178        .await?;
1179
1180        Ok(InfoWithSubjects::new(self.context, info, self.subject))
1181    }
1182}
1183
1184/// `StreamConfig` determines the properties for a stream.
1185/// There are sensible defaults for most. If no subjects are
1186/// given the name will be used as the only subject.
1187#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1188pub struct Config {
1189    /// A name for the Stream. Must not have spaces, tabs or period `.` characters
1190    pub name: String,
1191    /// How large the Stream may become in total bytes before the configured discard policy kicks in
1192    #[serde(default)]
1193    pub max_bytes: i64,
1194    /// How large the Stream may become in total messages before the configured discard policy kicks in
1195    #[serde(default, rename = "max_msgs")]
1196    pub max_messages: i64,
1197    /// Maximum amount of messages to keep per subject
1198    #[serde(default, rename = "max_msgs_per_subject")]
1199    pub max_messages_per_subject: i64,
1200    /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in.
1201    /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space
1202    pub discard: DiscardPolicy,
1203    /// Prevents a message from being added to a stream if the max_msgs_per_subject limit for the subject has been reached
1204    #[serde(default, skip_serializing_if = "is_default")]
1205    pub discard_new_per_subject: bool,
1206    /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
1207    /// configured stream `name`.
1208    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1209    pub subjects: Vec<String>,
1210    /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
1211    pub retention: RetentionPolicy,
1212    /// How many Consumers can be defined for a given Stream, -1 for unlimited
1213    #[serde(default)]
1214    pub max_consumers: i32,
1215    /// Maximum age of any message in the stream, expressed in nanoseconds
1216    #[serde(default, with = "serde_nanos")]
1217    pub max_age: Duration,
1218    /// The largest message that will be accepted by the Stream
1219    #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1220    pub max_message_size: i32,
1221    /// The type of storage backend, `File` (default) and `Memory`
1222    pub storage: StorageType,
1223    /// How many replicas to keep for each message in a clustered JetStream, maximum 5
1224    pub num_replicas: usize,
1225    /// Disables acknowledging messages that are received by the Stream
1226    #[serde(default, skip_serializing_if = "is_default")]
1227    pub no_ack: bool,
1228    /// The window within which to track duplicate messages.
1229    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1230    pub duplicate_window: Duration,
1231    /// The owner of the template associated with this stream.
1232    #[serde(default, skip_serializing_if = "is_default")]
1233    pub template_owner: String,
1234    /// Indicates the stream is sealed and cannot be modified in any way
1235    #[serde(default, skip_serializing_if = "is_default")]
1236    pub sealed: bool,
1237    /// A short description of the purpose of this stream.
1238    #[serde(default, skip_serializing_if = "is_default")]
1239    pub description: Option<String>,
1240    #[serde(
1241        default,
1242        rename = "allow_rollup_hdrs",
1243        skip_serializing_if = "is_default"
1244    )]
1245    /// Indicates if rollups will be allowed or not.
1246    pub allow_rollup: bool,
1247    #[serde(default, skip_serializing_if = "is_default")]
1248    /// Indicates deletes will be denied or not.
1249    pub deny_delete: bool,
1250    /// Indicates if purges will be denied or not.
1251    #[serde(default, skip_serializing_if = "is_default")]
1252    pub deny_purge: bool,
1253
1254    /// Optional republish config.
1255    #[serde(default, skip_serializing_if = "is_default")]
1256    pub republish: Option<Republish>,
1257
1258    /// Enables direct get, which would get messages from
1259    /// non-leader.
1260    #[serde(default, skip_serializing_if = "is_default")]
1261    pub allow_direct: bool,
1262
1263    /// Enable direct access also for mirrors.
1264    #[serde(default, skip_serializing_if = "is_default")]
1265    pub mirror_direct: bool,
1266
1267    /// Stream mirror configuration.
1268    #[serde(default, skip_serializing_if = "Option::is_none")]
1269    pub mirror: Option<Source>,
1270
1271    /// Sources configuration.
1272    #[serde(default, skip_serializing_if = "Option::is_none")]
1273    pub sources: Option<Vec<Source>>,
1274
1275    #[cfg(feature = "server_2_10")]
1276    /// Additional stream metadata.
1277    #[serde(default, skip_serializing_if = "is_default")]
1278    pub metadata: HashMap<String, String>,
1279
1280    #[cfg(feature = "server_2_10")]
1281    /// Allow applying a subject transform to incoming messages
1282    #[serde(default, skip_serializing_if = "Option::is_none")]
1283    pub subject_transform: Option<SubjectTransform>,
1284
1285    #[cfg(feature = "server_2_10")]
1286    /// Override compression config for this stream.
1287    /// Wrapping enum that has `None` type with [Option] is there
1288    /// because [Stream] can override global compression set to [Compression::S2]
1289    /// to [Compression::None], which is different from not overriding global config with anything.
1290    #[serde(default, skip_serializing_if = "Option::is_none")]
1291    pub compression: Option<Compression>,
1292    #[cfg(feature = "server_2_10")]
1293    /// Set limits on consumers that are created on this stream.
1294    #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1295    pub consumer_limits: Option<ConsumerLimits>,
1296
1297    #[cfg(feature = "server_2_10")]
1298    /// Sets the first sequence for the stream.
1299    #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1300    pub first_sequence: Option<u64>,
1301
1302    /// Placement configuration for clusters and tags.
1303    #[serde(default, skip_serializing_if = "Option::is_none")]
1304    pub placement: Option<Placement>,
1305
1306    /// Persistence mode for the stream.
1307    #[serde(default, skip_serializing_if = "Option::is_none")]
1308    pub persist_mode: Option<PersistenceMode>,
1309
1310    /// For suspending the consumer until the deadline.
1311    #[cfg(feature = "server_2_11")]
1312    #[serde(
1313        default,
1314        with = "rfc3339::option",
1315        skip_serializing_if = "Option::is_none"
1316    )]
1317    pub pause_until: Option<OffsetDateTime>,
1318
1319    /// Allows setting a TTL for a message in the stream.
1320    #[cfg(feature = "server_2_11")]
1321    #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1322    pub allow_message_ttl: bool,
1323
1324    /// Enables delete markers for messages deleted from the stream and sets the TTL
1325    /// for how long the marker should be kept.
1326    #[cfg(feature = "server_2_11")]
1327    #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1328    pub subject_delete_marker_ttl: Option<Duration>,
1329
1330    /// Allows atomic publish operations.
1331    #[cfg(feature = "server_2_12")]
1332    #[serde(default, skip_serializing_if = "is_default", rename = "allow_atomic")]
1333    pub allow_atomic_publish: bool,
1334
1335    /// Enables the scheduling of messages
1336    #[cfg(feature = "server_2_12")]
1337    #[serde(
1338        default,
1339        skip_serializing_if = "is_default",
1340        rename = "allow_msg_schedules"
1341    )]
1342    pub allow_message_schedules: bool,
1343
1344    /// Enables counters for the stream
1345    #[cfg(feature = "server_2_12")]
1346    #[serde(
1347        default,
1348        skip_serializing_if = "is_default",
1349        rename = "allow_msg_counter"
1350    )]
1351    pub allow_message_counter: bool,
1352
1353    /// Allows fast-ingest batch publishing on the stream (ADR-50).
1354    #[cfg(feature = "server_2_14")]
1355    #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1356    #[serde(default, skip_serializing_if = "is_default", rename = "allow_batched")]
1357    pub allow_batch_publish: bool,
1358}
1359
1360impl From<&Config> for Config {
1361    fn from(sc: &Config) -> Config {
1362        sc.clone()
1363    }
1364}
1365
1366impl From<&str> for Config {
1367    fn from(s: &str) -> Config {
1368        Config {
1369            name: s.to_string(),
1370            ..Default::default()
1371        }
1372    }
1373}
1374
1375#[cfg(feature = "server_2_10")]
1376fn default_consumer_limits_as_none<'de, D>(
1377    deserializer: D,
1378) -> Result<Option<ConsumerLimits>, D::Error>
1379where
1380    D: Deserializer<'de>,
1381{
1382    let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1383    if let Some(cl) = consumer_limits {
1384        if cl == ConsumerLimits::default() {
1385            Ok(None)
1386        } else {
1387            Ok(Some(cl))
1388        }
1389    } else {
1390        Ok(None)
1391    }
1392}
1393#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1394pub struct ConsumerLimits {
1395    /// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
1396    #[serde(default, with = "serde_nanos")]
1397    pub inactive_threshold: std::time::Duration,
1398    /// Sets the maximum [crate::jetstream::consumer::Config::max_ack_pending] that can be set on the consumer.
1399    #[serde(default)]
1400    pub max_ack_pending: i64,
1401}
1402
1403#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1404pub enum Compression {
1405    #[serde(rename = "s2")]
1406    S2,
1407    #[serde(rename = "none")]
1408    None,
1409}
1410
1411// SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
1412#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1413pub struct SubjectTransform {
1414    #[serde(rename = "src")]
1415    pub source: String,
1416
1417    #[serde(rename = "dest")]
1418    pub destination: String,
1419}
1420
1421// Republish is for republishing messages once committed to a stream.
1422#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1423pub struct Republish {
1424    /// Subject that should be republished.
1425    #[serde(rename = "src")]
1426    pub source: String,
1427    /// Subject where messages will be republished.
1428    #[serde(rename = "dest")]
1429    pub destination: String,
1430    /// If true, only headers should be republished.
1431    #[serde(default)]
1432    pub headers_only: bool,
1433}
1434
1435/// Placement describes on which cluster or tags the stream should be placed.
1436#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1437pub struct Placement {
1438    // Cluster where the stream should be placed.
1439    #[serde(default, skip_serializing_if = "is_default")]
1440    pub cluster: Option<String>,
1441    // Matching tags for stream placement.
1442    #[serde(default, skip_serializing_if = "is_default")]
1443    pub tags: Vec<String>,
1444}
1445
1446/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
1447/// remove older messages. `New` will fail to store the new message.
1448#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1449#[repr(u8)]
1450pub enum DiscardPolicy {
1451    /// will remove older messages when limits are hit.
1452    #[default]
1453    #[serde(rename = "old")]
1454    Old = 0,
1455    /// will error on a StoreMsg call when limits are hit
1456    #[serde(rename = "new")]
1457    New = 1,
1458}
1459
1460/// `RetentionPolicy` determines how messages in a set are retained.
1461#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1462#[repr(u8)]
1463pub enum RetentionPolicy {
1464    /// `Limits` (default) means that messages are retained until any given limit is reached.
1465    /// This could be one of messages, bytes, or age.
1466    #[default]
1467    #[serde(rename = "limits")]
1468    Limits = 0,
1469    /// `Interest` specifies that when all known observables have acknowledged a message it can be removed.
1470    #[serde(rename = "interest")]
1471    Interest = 1,
1472    /// `WorkQueue` specifies that when the first worker or subscriber acknowledges the message it can be removed.
1473    #[serde(rename = "workqueue")]
1474    WorkQueue = 2,
1475}
1476
1477/// determines how messages are stored for retention.
1478#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1479#[repr(u8)]
1480pub enum StorageType {
1481    /// Stream data is kept in files. This is the default.
1482    #[default]
1483    #[serde(rename = "file")]
1484    File = 0,
1485    /// Stream data is kept only in memory.
1486    #[serde(rename = "memory")]
1487    Memory = 1,
1488}
1489
1490/// Determines the persistence mode for stream data.
1491#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1492#[repr(u8)]
1493pub enum PersistenceMode {
1494    /// Writes are immediately flushed, acknowledgement sent after message is stored.
1495    #[default]
1496    #[serde(rename = "default")]
1497    Default = 0,
1498    /// Writes are flushed asynchronously, acknowledgement may be sent before message is stored.
1499    #[serde(rename = "async")]
1500    Async = 1,
1501}
1502
1503async fn stream_info_with_details(
1504    context: Context,
1505    stream: String,
1506    offset: usize,
1507    deleted_details: bool,
1508    subjects_filter: String,
1509) -> Result<Info, InfoError> {
1510    let subject = format!("STREAM.INFO.{stream}");
1511
1512    let payload = StreamInfoRequest {
1513        offset,
1514        deleted_details,
1515        subjects_filter,
1516    };
1517
1518    let response: Response<Info> = context.request(subject, &payload).await?;
1519
1520    match response {
1521        Response::Ok(info) => Ok(info),
1522        Response::Err { error } => Err(error.into()),
1523    }
1524}
1525
1526type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1527
1528#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1529pub struct StreamInfoRequest {
1530    offset: usize,
1531    deleted_details: bool,
1532    subjects_filter: String,
1533}
1534
1535pub struct InfoWithSubjects {
1536    stream: String,
1537    context: Context,
1538    pub info: Info,
1539    offset: usize,
1540    subjects: collections::hash_map::IntoIter<String, usize>,
1541    info_request: Option<InfoRequest>,
1542    subjects_filter: String,
1543    pages_done: bool,
1544}
1545
1546impl InfoWithSubjects {
1547    pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1548        let subjects = info.state.subjects.take().unwrap_or_default();
1549        let name = info.config.name.clone();
1550        InfoWithSubjects {
1551            context,
1552            info,
1553            pages_done: subjects.is_empty(),
1554            offset: subjects.len(),
1555            subjects: subjects.into_iter(),
1556            subjects_filter: subject,
1557            stream: name,
1558            info_request: None,
1559        }
1560    }
1561}
1562
1563impl futures_util::Stream for InfoWithSubjects {
1564    type Item = Result<(String, usize), InfoError>;
1565
1566    fn poll_next(
1567        mut self: Pin<&mut Self>,
1568        cx: &mut std::task::Context<'_>,
1569    ) -> Poll<Option<Self::Item>> {
1570        match self.subjects.next() {
1571            Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1572            None => {
1573                // If we have already requested all pages, stop the iterator.
1574                if self.pages_done {
1575                    return Poll::Ready(None);
1576                }
1577                let stream = self.stream.clone();
1578                let context = self.context.clone();
1579                let subjects_filter = self.subjects_filter.clone();
1580                let offset = self.offset;
1581                match self
1582                    .info_request
1583                    .get_or_insert_with(|| {
1584                        Box::pin(stream_info_with_details(
1585                            context,
1586                            stream,
1587                            offset,
1588                            false,
1589                            subjects_filter,
1590                        ))
1591                    })
1592                    .poll_unpin(cx)
1593                {
1594                    Poll::Ready(resp) => match resp {
1595                        Ok(info) => {
1596                            let subjects = info.state.subjects.clone();
1597                            self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1598                            self.info_request = None;
1599                            let subjects = subjects.unwrap_or_default();
1600                            self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1601                            let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1602                            if total <= self.offset || subjects.is_empty() {
1603                                self.pages_done = true;
1604                            }
1605                            match self.subjects.next() {
1606                                Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1607                                None => Poll::Ready(None),
1608                            }
1609                        }
1610                        Err(err) => Poll::Ready(Some(Err(err))),
1611                    },
1612                    Poll::Pending => Poll::Pending,
1613                }
1614            }
1615        }
1616    }
1617}
1618
1619/// Shows config and current state for this stream.
1620#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1621pub struct Info {
1622    /// The configuration associated with this stream.
1623    pub config: Config,
1624    /// The time that this stream was created.
1625    #[serde(with = "rfc3339")]
1626    pub created: time::OffsetDateTime,
1627    /// Various metrics associated with this stream.
1628    pub state: State,
1629    /// Information about leader and replicas.
1630    pub cluster: Option<ClusterInfo>,
1631    /// Information about mirror config if present.
1632    #[serde(default)]
1633    pub mirror: Option<SourceInfo>,
1634    /// Information about sources configs if present.
1635    #[serde(default)]
1636    pub sources: Vec<SourceInfo>,
1637    #[serde(flatten)]
1638    paged_info: Option<PagedInfo>,
1639}
1640
1641#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1642pub struct PagedInfo {
1643    offset: usize,
1644    total: usize,
1645    limit: usize,
1646}
1647
1648#[derive(Deserialize)]
1649pub struct DeleteStatus {
1650    pub success: bool,
1651}
1652
1653#[cfg(feature = "server_2_11")]
1654#[derive(Deserialize)]
1655pub struct PauseResponse {
1656    pub paused: bool,
1657    #[serde(with = "rfc3339")]
1658    pub pause_until: OffsetDateTime,
1659    #[serde(default, with = "serde_nanos")]
1660    pub pause_remaining: Option<Duration>,
1661}
1662
1663#[cfg(feature = "server_2_11")]
1664#[derive(Serialize, Debug)]
1665struct PauseResumeConsumerRequest {
1666    #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1667    pause_until: Option<OffsetDateTime>,
1668}
1669
1670#[cfg(feature = "server_2_14")]
1671#[derive(Serialize, Debug)]
1672pub(crate) struct ConsumerResetRequest {
1673    #[serde(default, skip_serializing_if = "is_default")]
1674    pub(crate) seq: u64,
1675}
1676
1677/// Response from a [`Stream::reset_consumer`] / [`crate::jetstream::consumer::Consumer::reset`] call.
1678#[cfg(feature = "server_2_14")]
1679#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
1680#[derive(Debug, Deserialize, Clone)]
1681pub struct ConsumerResetResponse {
1682    /// Refreshed [Info][crate::jetstream::consumer::Info] for the consumer
1683    /// after the reset has been applied.
1684    #[serde(flatten)]
1685    pub info: super::consumer::Info,
1686    /// Stream sequence the consumer's ack floor is now sitting at after the
1687    /// reset. For an empty / `None` request this echoes the previously held
1688    /// ack-floor seq.
1689    pub reset_seq: u64,
1690}
1691
1692/// information about the given stream.
1693#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1694pub struct State {
1695    /// The number of messages contained in this stream
1696    pub messages: u64,
1697    /// The number of bytes of all messages contained in this stream
1698    pub bytes: u64,
1699    /// The lowest sequence number still present in this stream
1700    #[serde(rename = "first_seq")]
1701    pub first_sequence: u64,
1702    /// The time associated with the oldest message still present in this stream
1703    #[serde(with = "rfc3339", rename = "first_ts")]
1704    pub first_timestamp: time::OffsetDateTime,
1705    /// The last sequence number assigned to a message in this stream
1706    #[serde(rename = "last_seq")]
1707    pub last_sequence: u64,
1708    /// The time that the last message was received by this stream
1709    #[serde(with = "rfc3339", rename = "last_ts")]
1710    pub last_timestamp: time::OffsetDateTime,
1711    /// The number of consumers configured to consume this stream
1712    pub consumer_count: usize,
1713    /// The number of subjects in the stream
1714    #[serde(default, rename = "num_subjects")]
1715    pub subjects_count: u64,
1716    /// The number of deleted messages in the stream
1717    #[serde(default, rename = "num_deleted")]
1718    pub deleted_count: Option<u64>,
1719    /// The list of deleted subjects from the Stream.
1720    /// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
1721    #[serde(default)]
1722    pub deleted: Option<Vec<u64>>,
1723
1724    pub(crate) subjects: Option<HashMap<String, usize>>,
1725}
1726
1727/// A raw stream message in the representation it is stored.
1728#[derive(Debug, Serialize, Deserialize, Clone)]
1729pub struct RawMessage {
1730    /// Subject of the message.
1731    #[serde(rename = "subject")]
1732    pub subject: String,
1733
1734    /// Sequence of the message.
1735    #[serde(rename = "seq")]
1736    pub sequence: u64,
1737
1738    /// Raw payload of the message as a base64 encoded string.
1739    #[serde(default, rename = "data")]
1740    pub payload: String,
1741
1742    /// Raw header string, if any.
1743    #[serde(default, rename = "hdrs")]
1744    pub headers: Option<String>,
1745
1746    /// The time the message was published.
1747    #[serde(rename = "time", with = "rfc3339")]
1748    pub time: time::OffsetDateTime,
1749}
1750
1751impl TryFrom<RawMessage> for StreamMessage {
1752    type Error = crate::Error;
1753
1754    fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1755        let decoded_payload = STANDARD
1756            .decode(value.payload)
1757            .map_err(|err| Box::new(std::io::Error::other(err)))?;
1758        let decoded_headers = value
1759            .headers
1760            .map(|header| STANDARD.decode(header))
1761            .map_or(Ok(None), |v| v.map(Some))?;
1762
1763        let (headers, _, _) = decoded_headers
1764            .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1765
1766        Ok(StreamMessage {
1767            subject: value.subject.into(),
1768            payload: decoded_payload.into(),
1769            headers,
1770            sequence: value.sequence,
1771            time: value.time,
1772        })
1773    }
1774}
1775
1776fn is_continuation(c: char) -> bool {
1777    c == ' ' || c == '\t'
1778}
1779const HEADER_LINE: &str = "NATS/1.0";
1780
1781#[allow(clippy::type_complexity)]
1782fn parse_headers(
1783    buf: &[u8],
1784) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1785    let mut headers = HeaderMap::new();
1786    let mut maybe_status: Option<StatusCode> = None;
1787    let mut maybe_description: Option<String> = None;
1788    let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1789        line.lines().peekable()
1790    } else {
1791        return Err(Box::new(std::io::Error::other("invalid header")));
1792    };
1793
1794    if let Some(line) = lines.next() {
1795        let line = line
1796            .strip_prefix(HEADER_LINE)
1797            .ok_or_else(|| {
1798                Box::new(std::io::Error::other(
1799                    "version line does not start with NATS/1.0",
1800                ))
1801            })?
1802            .trim();
1803
1804        match line.split_once(' ') {
1805            Some((status, description)) => {
1806                if !status.is_empty() {
1807                    maybe_status = Some(status.parse()?);
1808                }
1809
1810                if !description.is_empty() {
1811                    maybe_description = Some(description.trim().to_string());
1812                }
1813            }
1814            None => {
1815                if !line.is_empty() {
1816                    maybe_status = Some(line.parse()?);
1817                }
1818            }
1819        }
1820    } else {
1821        return Err(Box::new(std::io::Error::other(
1822            "expected header information not found",
1823        )));
1824    };
1825
1826    while let Some(line) = lines.next() {
1827        if line.is_empty() {
1828            continue;
1829        }
1830
1831        if let Some((k, v)) = line.split_once(':').to_owned() {
1832            let mut s = String::from(v.trim());
1833            while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1834                s.push(' ');
1835                s.push_str(v.trim());
1836            }
1837
1838            headers.insert(
1839                HeaderName::from_str(k)?,
1840                HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1841            );
1842        } else {
1843            return Err(Box::new(std::io::Error::other("malformed header line")));
1844        }
1845    }
1846
1847    if headers.is_empty() {
1848        Ok((HeaderMap::new(), maybe_status, maybe_description))
1849    } else {
1850        Ok((headers, maybe_status, maybe_description))
1851    }
1852}
1853
1854#[derive(Debug, Serialize, Deserialize, Clone)]
1855struct GetRawMessage {
1856    pub(crate) message: RawMessage,
1857}
1858
1859fn is_default<T: Default + Eq>(t: &T) -> bool {
1860    t == &T::default()
1861}
1862/// Information about the stream's, consumer's associated `JetStream` cluster
1863#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1864pub struct ClusterInfo {
1865    /// The cluster name.
1866    #[serde(default)]
1867    pub name: Option<String>,
1868    /// The RAFT group name.
1869    #[serde(default)]
1870    pub raft_group: Option<String>,
1871    /// The server name of the RAFT leader.
1872    #[serde(default)]
1873    pub leader: Option<String>,
1874    /// The time since this server has been the leader.
1875    #[serde(default, with = "rfc3339::option")]
1876    pub leader_since: Option<OffsetDateTime>,
1877    /// Indicates if this account is a system account.
1878    #[cfg(feature = "server_2_12")]
1879    #[serde(default)]
1880    /// Indicates if `traffic_account` is set a system account.
1881    pub system_account: bool,
1882    #[cfg(feature = "server_2_12")]
1883    /// Name of the traffic (replication) account.
1884    #[serde(default)]
1885    pub traffic_account: Option<String>,
1886    /// The members of the RAFT cluster.
1887    #[serde(default)]
1888    pub replicas: Vec<PeerInfo>,
1889}
1890
1891/// The members of the RAFT cluster
1892#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1893pub struct PeerInfo {
1894    /// The server name of the peer.
1895    pub name: String,
1896    /// Indicates if the server is up to date and synchronized.
1897    pub current: bool,
1898    /// Nanoseconds since this peer was last seen.
1899    #[serde(with = "serde_nanos")]
1900    pub active: Duration,
1901    /// Indicates the node is considered offline by the group.
1902    #[serde(default)]
1903    pub offline: bool,
1904    /// How many uncommitted operations this peer is behind the leader.
1905    pub lag: Option<u64>,
1906}
1907
1908#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1909pub struct SourceInfo {
1910    /// Source name.
1911    pub name: String,
1912    /// Number of messages this source is lagging behind.
1913    pub lag: u64,
1914    /// Last time the source was seen active.
1915    #[serde(deserialize_with = "negative_duration_as_none")]
1916    pub active: Option<std::time::Duration>,
1917    /// Filtering for the source.
1918    #[serde(default)]
1919    pub filter_subject: Option<String>,
1920    /// Source destination subject.
1921    #[serde(default)]
1922    pub subject_transform_dest: Option<String>,
1923    /// List of transforms.
1924    #[serde(default)]
1925    pub subject_transforms: Vec<SubjectTransform>,
1926}
1927
1928fn negative_duration_as_none<'de, D>(
1929    deserializer: D,
1930) -> Result<Option<std::time::Duration>, D::Error>
1931where
1932    D: Deserializer<'de>,
1933{
1934    let n = i64::deserialize(deserializer)?;
1935    if n.is_negative() {
1936        Ok(None)
1937    } else {
1938        Ok(Some(std::time::Duration::from_nanos(n as u64)))
1939    }
1940}
1941
1942/// The response generated by trying to purge a stream.
1943#[derive(Debug, Deserialize, Clone, Copy)]
1944pub struct PurgeResponse {
1945    /// Whether the purge request was successful.
1946    pub success: bool,
1947    /// The number of purged messages in a stream.
1948    pub purged: u64,
1949}
1950/// The payload used to generate a purge request.
1951#[derive(Default, Debug, Serialize, Clone)]
1952pub struct PurgeRequest {
1953    /// Purge up to but not including sequence.
1954    #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1955    pub sequence: Option<u64>,
1956
1957    /// Subject to match against messages for the purge command.
1958    #[serde(default, skip_serializing_if = "is_default")]
1959    pub filter: Option<String>,
1960
1961    /// Number of messages to keep.
1962    #[serde(default, skip_serializing_if = "is_default")]
1963    pub keep: Option<u64>,
1964}
1965
1966#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1967pub struct Source {
1968    /// Name of the stream source.
1969    pub name: String,
1970    /// Optional source start sequence.
1971    #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1972    pub start_sequence: Option<u64>,
1973    #[serde(
1974        default,
1975        rename = "opt_start_time",
1976        skip_serializing_if = "is_default",
1977        with = "rfc3339::option"
1978    )]
1979    /// Optional source start time.
1980    pub start_time: Option<OffsetDateTime>,
1981    /// Optional additional filter subject.
1982    #[serde(default, skip_serializing_if = "is_default")]
1983    pub filter_subject: Option<String>,
1984    /// Optional config for sourcing streams from another prefix, used for cross-account.
1985    #[serde(default, skip_serializing_if = "Option::is_none")]
1986    pub external: Option<External>,
1987    /// Optional config to set a domain, if source is residing in different one.
1988    #[serde(default, skip_serializing_if = "is_default")]
1989    pub domain: Option<String>,
1990    /// Subject transforms for Stream.
1991    #[cfg(feature = "server_2_10")]
1992    #[serde(default, skip_serializing_if = "is_default")]
1993    pub subject_transforms: Vec<SubjectTransform>,
1994
1995    /// Pre-created durable consumer to use for sourcing instead of the
1996    /// auto-managed ephemeral one. Required for full control over the
1997    /// consumer's lifecycle (security, advanced delivery/replay options)
1998    /// when sourcing/mirroring from WorkQueue/Interest streams. Both
1999    /// `name` and `deliver_subject` must be non-empty and valid; the
2000    /// server rejects the config otherwise. See ADR-60.
2001    #[cfg(feature = "server_2_14")]
2002    #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2003    #[serde(default, skip_serializing_if = "Option::is_none")]
2004    pub consumer: Option<StreamConsumerSource>,
2005}
2006
2007/// Configures a pre-created durable consumer used for stream
2008/// sourcing/mirroring. See ADR-60.
2009#[cfg(feature = "server_2_14")]
2010#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2011#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
2012pub struct StreamConsumerSource {
2013    /// Name of the durable consumer to use for sourcing.
2014    #[serde(default, skip_serializing_if = "is_default")]
2015    pub name: String,
2016    /// Deliver subject of the (push) consumer used for sourcing.
2017    #[serde(default, skip_serializing_if = "is_default")]
2018    pub deliver_subject: String,
2019}
2020
2021#[cfg(feature = "server_2_14")]
2022impl StreamConsumerSource {
2023    /// Build a [`StreamConsumerSource`] with both required fields populated.
2024    /// The server rejects either field empty (`NewJSSourceDurableConsumerCfgInvalidError`).
2025    pub fn new(name: impl Into<String>, deliver_subject: impl Into<String>) -> Self {
2026        Self {
2027            name: name.into(),
2028            deliver_subject: deliver_subject.into(),
2029        }
2030    }
2031}
2032
2033#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
2034pub struct External {
2035    /// Api prefix of external source.
2036    #[serde(rename = "api")]
2037    pub api_prefix: String,
2038    /// Optional configuration of delivery prefix.
2039    #[serde(rename = "deliver", skip_serializing_if = "is_default")]
2040    pub delivery_prefix: Option<String>,
2041}
2042
2043use std::marker::PhantomData;
2044
2045#[derive(Debug, Default)]
2046pub struct Yes;
2047#[derive(Debug, Default)]
2048pub struct No;
2049
2050pub trait ToAssign: Debug {}
2051
2052impl ToAssign for Yes {}
2053impl ToAssign for No {}
2054
2055#[derive(Debug)]
2056pub struct Purge<SEQUENCE, KEEP>
2057where
2058    SEQUENCE: ToAssign,
2059    KEEP: ToAssign,
2060{
2061    inner: PurgeRequest,
2062    sequence_set: PhantomData<SEQUENCE>,
2063    keep_set: PhantomData<KEEP>,
2064    context: Context,
2065    stream_name: String,
2066}
2067
2068impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
2069where
2070    SEQUENCE: ToAssign,
2071    KEEP: ToAssign,
2072{
2073    /// Adds subject filter to [PurgeRequest]
2074    pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
2075        self.inner.filter = Some(filter.into());
2076        self
2077    }
2078}
2079
2080impl Purge<No, No> {
2081    pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
2082        Purge {
2083            context: stream.context.clone(),
2084            stream_name: stream.name.clone(),
2085            inner: Default::default(),
2086            sequence_set: PhantomData {},
2087            keep_set: PhantomData {},
2088        }
2089    }
2090}
2091
2092impl<KEEP> Purge<No, KEEP>
2093where
2094    KEEP: ToAssign,
2095{
2096    /// Creates a new [PurgeRequest].
2097    /// `keep` and `sequence` are exclusive, enforced compile time by generics.
2098    pub fn keep(self, keep: u64) -> Purge<No, Yes> {
2099        Purge {
2100            context: self.context.clone(),
2101            stream_name: self.stream_name.clone(),
2102            sequence_set: PhantomData {},
2103            keep_set: PhantomData {},
2104            inner: PurgeRequest {
2105                keep: Some(keep),
2106                ..self.inner
2107            },
2108        }
2109    }
2110}
2111impl<SEQUENCE> Purge<SEQUENCE, No>
2112where
2113    SEQUENCE: ToAssign,
2114{
2115    /// Creates a new [PurgeRequest].
2116    /// `keep` and `sequence` are exclusive, enforces compile time by generics.
2117    pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2118        Purge {
2119            context: self.context.clone(),
2120            stream_name: self.stream_name.clone(),
2121            sequence_set: PhantomData {},
2122            keep_set: PhantomData {},
2123            inner: PurgeRequest {
2124                sequence: Some(sequence),
2125                ..self.inner
2126            },
2127        }
2128    }
2129}
2130
2131#[derive(Clone, Debug, PartialEq)]
2132pub enum PurgeErrorKind {
2133    Request,
2134    TimedOut,
2135    JetStream(super::errors::Error),
2136}
2137
2138impl Display for PurgeErrorKind {
2139    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2140        match self {
2141            Self::Request => write!(f, "request failed"),
2142            Self::TimedOut => write!(f, "timed out"),
2143            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2144        }
2145    }
2146}
2147
2148pub type PurgeError = Error<PurgeErrorKind>;
2149
2150impl<S, K> IntoFuture for Purge<S, K>
2151where
2152    S: ToAssign + std::marker::Send,
2153    K: ToAssign + std::marker::Send,
2154{
2155    type Output = Result<PurgeResponse, PurgeError>;
2156
2157    type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2158
2159    fn into_future(self) -> Self::IntoFuture {
2160        Box::pin(std::future::IntoFuture::into_future(async move {
2161            let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2162            let response: Response<PurgeResponse> = self
2163                .context
2164                .request(request_subject, &self.inner)
2165                .map_err(|err| match err.kind() {
2166                    RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2167                    _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2168                })
2169                .await?;
2170
2171            match response {
2172                Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2173                Response::Ok(response) => Ok(response),
2174            }
2175        }))
2176    }
2177}
2178
2179#[derive(Deserialize, Debug)]
2180struct ConsumerPage {
2181    total: usize,
2182    consumers: Option<Vec<String>>,
2183}
2184
2185#[derive(Deserialize, Debug)]
2186struct ConsumerInfoPage {
2187    total: usize,
2188    consumers: Option<Vec<super::consumer::Info>>,
2189}
2190
2191type ConsumerNamesErrorKind = StreamsErrorKind;
2192type ConsumerNamesError = StreamsError;
2193type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2194
2195pub struct ConsumerNames {
2196    context: Context,
2197    stream: String,
2198    offset: usize,
2199    page_request: Option<PageRequest>,
2200    consumers: Vec<String>,
2201    done: bool,
2202}
2203
2204impl futures_util::Stream for ConsumerNames {
2205    type Item = Result<String, ConsumerNamesError>;
2206
2207    fn poll_next(
2208        mut self: Pin<&mut Self>,
2209        cx: &mut std::task::Context<'_>,
2210    ) -> std::task::Poll<Option<Self::Item>> {
2211        match self.page_request.as_mut() {
2212            Some(page) => match page.try_poll_unpin(cx) {
2213                std::task::Poll::Ready(page) => {
2214                    self.page_request = None;
2215                    let page = page.map_err(|err| {
2216                        ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2217                    })?;
2218
2219                    if let Some(consumers) = page.consumers {
2220                        self.offset += consumers.len();
2221                        self.consumers = consumers;
2222                        if self.offset >= page.total {
2223                            self.done = true;
2224                        }
2225                        match self.consumers.pop() {
2226                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2227                            None => Poll::Ready(None),
2228                        }
2229                    } else {
2230                        Poll::Ready(None)
2231                    }
2232                }
2233                std::task::Poll::Pending => std::task::Poll::Pending,
2234            },
2235            None => {
2236                if let Some(stream) = self.consumers.pop() {
2237                    Poll::Ready(Some(Ok(stream)))
2238                } else {
2239                    if self.done {
2240                        return Poll::Ready(None);
2241                    }
2242                    let context = self.context.clone();
2243                    let offset = self.offset;
2244                    let stream = self.stream.clone();
2245                    self.page_request = Some(Box::pin(async move {
2246                        match context
2247                            .request(
2248                                format!("CONSUMER.NAMES.{stream}"),
2249                                &json!({
2250                                    "offset": offset,
2251                                }),
2252                            )
2253                            .await?
2254                        {
2255                            Response::Err { error } => Err(RequestError::with_source(
2256                                super::context::RequestErrorKind::Other,
2257                                error,
2258                            )),
2259                            Response::Ok(page) => Ok(page),
2260                        }
2261                    }));
2262                    self.poll_next(cx)
2263                }
2264            }
2265        }
2266    }
2267}
2268
2269pub type ConsumersErrorKind = StreamsErrorKind;
2270pub type ConsumersError = StreamsError;
2271type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2272
2273pub struct Consumers {
2274    context: Context,
2275    stream: String,
2276    offset: usize,
2277    page_request: Option<PageInfoRequest>,
2278    consumers: Vec<super::consumer::Info>,
2279    done: bool,
2280}
2281
2282impl futures_util::Stream for Consumers {
2283    type Item = Result<super::consumer::Info, ConsumersError>;
2284
2285    fn poll_next(
2286        mut self: Pin<&mut Self>,
2287        cx: &mut std::task::Context<'_>,
2288    ) -> std::task::Poll<Option<Self::Item>> {
2289        match self.page_request.as_mut() {
2290            Some(page) => match page.try_poll_unpin(cx) {
2291                std::task::Poll::Ready(page) => {
2292                    self.page_request = None;
2293                    let page = page.map_err(|err| {
2294                        ConsumersError::with_source(ConsumersErrorKind::Other, err)
2295                    })?;
2296                    if let Some(consumers) = page.consumers {
2297                        self.offset += consumers.len();
2298                        self.consumers = consumers;
2299                        if self.offset >= page.total {
2300                            self.done = true;
2301                        }
2302                        match self.consumers.pop() {
2303                            Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2304                            None => Poll::Ready(None),
2305                        }
2306                    } else {
2307                        Poll::Ready(None)
2308                    }
2309                }
2310                std::task::Poll::Pending => std::task::Poll::Pending,
2311            },
2312            None => {
2313                if let Some(stream) = self.consumers.pop() {
2314                    Poll::Ready(Some(Ok(stream)))
2315                } else {
2316                    if self.done {
2317                        return Poll::Ready(None);
2318                    }
2319                    let context = self.context.clone();
2320                    let offset = self.offset;
2321                    let stream = self.stream.clone();
2322                    self.page_request = Some(Box::pin(async move {
2323                        match context
2324                            .request(
2325                                format!("CONSUMER.LIST.{stream}"),
2326                                &json!({
2327                                    "offset": offset,
2328                                }),
2329                            )
2330                            .await?
2331                        {
2332                            Response::Err { error } => Err(RequestError::with_source(
2333                                super::context::RequestErrorKind::Other,
2334                                error,
2335                            )),
2336                            Response::Ok(page) => Ok(page),
2337                        }
2338                    }));
2339                    self.poll_next(cx)
2340                }
2341            }
2342        }
2343    }
2344}
2345
2346#[derive(Clone, Debug, PartialEq)]
2347pub enum LastRawMessageErrorKind {
2348    NoMessageFound,
2349    InvalidSubject,
2350    JetStream(super::errors::Error),
2351    Other,
2352}
2353
2354impl Display for LastRawMessageErrorKind {
2355    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2356        match self {
2357            Self::NoMessageFound => write!(f, "no message found"),
2358            Self::InvalidSubject => write!(f, "invalid subject"),
2359            Self::Other => write!(f, "failed to get last raw message"),
2360            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2361        }
2362    }
2363}
2364
2365pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2366pub type RawMessageErrorKind = LastRawMessageErrorKind;
2367pub type RawMessageError = LastRawMessageError;
2368
2369#[derive(Clone, Debug, PartialEq)]
2370pub enum ConsumerErrorKind {
2371    //TODO: get last should have timeout, which should be mapped here.
2372    TimedOut,
2373    Request,
2374    InvalidConsumerType,
2375    InvalidName,
2376    JetStream(super::errors::Error),
2377    Other,
2378}
2379
2380impl Display for ConsumerErrorKind {
2381    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2382        match self {
2383            Self::TimedOut => write!(f, "timed out"),
2384            Self::Request => write!(f, "request failed"),
2385            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2386            Self::Other => write!(f, "consumer error"),
2387            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2388            Self::InvalidName => write!(f, "invalid consumer name"),
2389        }
2390    }
2391}
2392
2393pub type ConsumerError = Error<ConsumerErrorKind>;
2394
2395#[cfg(feature = "server_2_14")]
2396#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2397#[derive(Clone, Debug, PartialEq)]
2398pub enum ConsumerResetErrorKind {
2399    TimedOut,
2400    Request,
2401    /// Stream or consumer does not exist.
2402    ///
2403    /// Surfaced when the server replies with `CONSUMER_NOT_FOUND` /
2404    /// `STREAM_NOT_FOUND`, or via core NATS `NoResponders` if the JS API
2405    /// has no handler for the subject.
2406    NotFound,
2407    /// Reset request violates the consumer's `DeliverPolicy` constraints
2408    /// (e.g. seq below `OptStartSeq`, or non-zero seq with a `DeliverPolicy`
2409    /// other than `All` / `ByStartSequence` / `ByStartTime`).
2410    InvalidReset,
2411    JetStream(super::errors::Error),
2412}
2413
2414#[cfg(feature = "server_2_14")]
2415#[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
2416impl Display for ConsumerResetErrorKind {
2417    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2418        match self {
2419            Self::TimedOut => write!(f, "timed out"),
2420            Self::Request => write!(f, "request failed"),
2421            Self::NotFound => write!(f, "stream or consumer not found"),
2422            Self::InvalidReset => write!(f, "invalid reset"),
2423            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2424        }
2425    }
2426}
2427
2428#[cfg(feature = "server_2_14")]
2429pub type ConsumerResetError = Error<ConsumerResetErrorKind>;
2430
2431#[cfg(feature = "server_2_14")]
2432impl From<super::errors::Error> for ConsumerResetError {
2433    fn from(err: super::errors::Error) -> Self {
2434        match err.error_code() {
2435            super::errors::ErrorCode::CONSUMER_INVALID_RESET => {
2436                ConsumerResetError::new(ConsumerResetErrorKind::InvalidReset)
2437            }
2438            super::errors::ErrorCode::CONSUMER_NOT_FOUND
2439            | super::errors::ErrorCode::STREAM_NOT_FOUND => {
2440                ConsumerResetError::new(ConsumerResetErrorKind::NotFound)
2441            }
2442            _ => ConsumerResetError::new(ConsumerResetErrorKind::JetStream(err)),
2443        }
2444    }
2445}
2446
2447#[cfg(feature = "server_2_14")]
2448impl From<super::context::RequestError> for ConsumerResetError {
2449    fn from(err: super::context::RequestError) -> Self {
2450        match err.kind() {
2451            RequestErrorKind::TimedOut => ConsumerResetError::new(ConsumerResetErrorKind::TimedOut),
2452            RequestErrorKind::NoResponders => {
2453                ConsumerResetError::new(ConsumerResetErrorKind::NotFound)
2454            }
2455            _ => ConsumerResetError::with_source(ConsumerResetErrorKind::Request, err),
2456        }
2457    }
2458}
2459
2460#[derive(Clone, Debug, PartialEq)]
2461pub enum ConsumerCreateStrictErrorKind {
2462    //TODO: get last should have timeout, which should be mapped here.
2463    TimedOut,
2464    Request,
2465    InvalidConsumerType,
2466    InvalidName,
2467    AlreadyExists,
2468    JetStream(super::errors::Error),
2469    Other,
2470}
2471
2472impl Display for ConsumerCreateStrictErrorKind {
2473    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2474        match self {
2475            Self::TimedOut => write!(f, "timed out"),
2476            Self::Request => write!(f, "request failed"),
2477            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2478            Self::Other => write!(f, "consumer error"),
2479            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2480            Self::InvalidName => write!(f, "invalid consumer name"),
2481            Self::AlreadyExists => write!(f, "consumer already exists"),
2482        }
2483    }
2484}
2485
2486pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2487
2488#[derive(Clone, Debug, PartialEq)]
2489pub enum ConsumerUpdateErrorKind {
2490    //TODO: get last should have timeout, which should be mapped here.
2491    TimedOut,
2492    Request,
2493    InvalidConsumerType,
2494    InvalidName,
2495    DoesNotExist,
2496    JetStream(super::errors::Error),
2497    Other,
2498}
2499
2500impl Display for ConsumerUpdateErrorKind {
2501    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2502        match self {
2503            Self::TimedOut => write!(f, "timed out"),
2504            Self::Request => write!(f, "request failed"),
2505            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2506            Self::Other => write!(f, "consumer error"),
2507            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2508            Self::InvalidName => write!(f, "invalid consumer name"),
2509            Self::DoesNotExist => write!(f, "consumer does not exist"),
2510        }
2511    }
2512}
2513
2514pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2515
2516impl From<super::errors::Error> for ConsumerError {
2517    fn from(err: super::errors::Error) -> Self {
2518        ConsumerError::new(ConsumerErrorKind::JetStream(err))
2519    }
2520}
2521impl From<super::errors::Error> for ConsumerCreateStrictError {
2522    fn from(err: super::errors::Error) -> Self {
2523        if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2524            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2525        } else {
2526            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2527        }
2528    }
2529}
2530impl From<super::errors::Error> for ConsumerUpdateError {
2531    fn from(err: super::errors::Error) -> Self {
2532        if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2533            ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2534        } else {
2535            ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2536        }
2537    }
2538}
2539impl From<ConsumerError> for ConsumerUpdateError {
2540    fn from(err: ConsumerError) -> Self {
2541        match err.kind() {
2542            ConsumerErrorKind::JetStream(err) => {
2543                if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2544                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2545                } else {
2546                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2547                }
2548            }
2549            ConsumerErrorKind::Request => {
2550                ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2551            }
2552            ConsumerErrorKind::TimedOut => {
2553                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2554            }
2555            ConsumerErrorKind::InvalidConsumerType => {
2556                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2557            }
2558            ConsumerErrorKind::InvalidName => {
2559                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2560            }
2561            ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2562        }
2563    }
2564}
2565
2566impl From<ConsumerError> for ConsumerCreateStrictError {
2567    fn from(err: ConsumerError) -> Self {
2568        match err.kind() {
2569            ConsumerErrorKind::JetStream(err) => {
2570                if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2571                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2572                } else {
2573                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2574                }
2575            }
2576            ConsumerErrorKind::Request => {
2577                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2578            }
2579            ConsumerErrorKind::TimedOut => {
2580                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2581            }
2582            ConsumerErrorKind::InvalidConsumerType => {
2583                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2584            }
2585            ConsumerErrorKind::InvalidName => {
2586                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2587            }
2588            ConsumerErrorKind::Other => {
2589                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2590            }
2591        }
2592    }
2593}
2594
2595impl From<super::context::RequestError> for ConsumerError {
2596    fn from(err: super::context::RequestError) -> Self {
2597        match err.kind() {
2598            RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2599            _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2600        }
2601    }
2602}
2603impl From<super::context::RequestError> for ConsumerUpdateError {
2604    fn from(err: super::context::RequestError) -> Self {
2605        match err.kind() {
2606            RequestErrorKind::TimedOut => {
2607                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2608            }
2609            _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2610        }
2611    }
2612}
2613impl From<super::context::RequestError> for ConsumerCreateStrictError {
2614    fn from(err: super::context::RequestError) -> Self {
2615        match err.kind() {
2616            RequestErrorKind::TimedOut => {
2617                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2618            }
2619            _ => {
2620                ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2621            }
2622        }
2623    }
2624}
2625
2626#[derive(Debug, Serialize, Default)]
2627pub struct DirectGetRequest {
2628    #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2629    sequence: Option<u64>,
2630    #[serde(rename = "last_by_subj", skip_serializing)]
2631    last_by_subject: Option<String>,
2632    #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2633    next_by_subject: Option<String>,
2634}
2635
2636/// Marker type indicating that headers should be included in the response.
2637pub struct WithHeaders;
2638
2639/// Marker type indicating that headers should be excluded from the response.
2640pub struct WithoutHeaders;
2641
2642/// Trait for converting a Message response into the appropriate return type.
2643trait DirectGetResponse: Sized {
2644    fn from_message(message: crate::Message) -> Result<Self, DirectGetError>;
2645}
2646
2647impl DirectGetResponse for StreamMessage {
2648    fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2649        StreamMessage::try_from(message).map_err(Into::into)
2650    }
2651}
2652
2653impl DirectGetResponse for StreamValue {
2654    fn from_message(message: crate::Message) -> Result<Self, DirectGetError> {
2655        Ok(StreamValue {
2656            data: message.payload,
2657        })
2658    }
2659}
2660
2661pub struct DirectGetBuilder<T = WithHeaders> {
2662    context: Context,
2663    stream_name: String,
2664    request: DirectGetRequest,
2665    _phantom: std::marker::PhantomData<T>,
2666}
2667
2668impl DirectGetBuilder<WithHeaders> {
2669    fn new(context: Context, stream_name: String) -> DirectGetBuilder<WithHeaders> {
2670        DirectGetBuilder {
2671            context,
2672            stream_name,
2673            request: DirectGetRequest::default(),
2674            _phantom: std::marker::PhantomData,
2675        }
2676    }
2677}
2678
2679impl<T> DirectGetBuilder<T> {
2680    /// Internal method to send the direct get request and convert to the appropriate type.
2681    async fn send_internal<R: DirectGetResponse>(&self) -> Result<R, DirectGetError> {
2682        // When last_by_subject is used, the subject is embedded in the URL and we should send an empty payload
2683        // to match the NATS protocol expectation (similar to nats.go implementation)
2684        let payload = if self.request.last_by_subject.is_some() {
2685            Bytes::new()
2686        } else {
2687            serde_json::to_vec(&self.request).map(Bytes::from)?
2688        };
2689
2690        let request_subject = if let Some(ref subject) = self.request.last_by_subject {
2691            format!(
2692                "{}.DIRECT.GET.{}.{}",
2693                &self.context.prefix, &self.stream_name, subject
2694            )
2695        } else {
2696            format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.stream_name)
2697        };
2698
2699        let response = self
2700            .context
2701            .client
2702            .request(request_subject, payload)
2703            .await?;
2704
2705        // Check for error status
2706        if let Some(status) = response.status {
2707            if let Some(ref description) = response.description {
2708                match status {
2709                    StatusCode::NOT_FOUND => {
2710                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
2711                    }
2712                    // 408 is used in Direct Message for bad/empty payload.
2713                    StatusCode::TIMEOUT => {
2714                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
2715                    }
2716                    _ => {
2717                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
2718                            status,
2719                            description.to_string(),
2720                        )));
2721                    }
2722                }
2723            }
2724        }
2725
2726        R::from_message(response)
2727    }
2728
2729    /// Sets the sequence for the direct get request.
2730    pub fn sequence(mut self, seq: u64) -> Self {
2731        self.request.sequence = Some(seq);
2732        self
2733    }
2734
2735    /// Sets the last_by_subject for the direct get request.
2736    pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2737        self.request.last_by_subject = Some(subject.into());
2738        self
2739    }
2740
2741    /// Sets the next_by_subject for the direct get request.
2742    pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2743        self.request.next_by_subject = Some(subject.into());
2744        self
2745    }
2746}
2747
2748impl DirectGetBuilder<WithHeaders> {
2749    /// Sends the get request and returns a StreamMessage with headers.
2750    pub async fn send(self) -> Result<StreamMessage, DirectGetError> {
2751        self.send_internal::<StreamMessage>().await
2752    }
2753}
2754
2755impl DirectGetBuilder<WithoutHeaders> {
2756    /// Sends the get request and returns only the payload bytes without headers.
2757    pub async fn send(self) -> Result<StreamValue, DirectGetError> {
2758        self.send_internal::<StreamValue>().await
2759    }
2760}
2761
2762pub struct StreamValue {
2763    pub data: Bytes,
2764}
2765
2766#[derive(Debug, Serialize, Default)]
2767pub struct RawMessageRequest {
2768    #[serde(rename = "seq", skip_serializing_if = "Option::is_none")]
2769    sequence: Option<u64>,
2770    #[serde(rename = "last_by_subj", skip_serializing_if = "Option::is_none")]
2771    last_by_subject: Option<String>,
2772    #[serde(rename = "next_by_subj", skip_serializing_if = "Option::is_none")]
2773    next_by_subject: Option<String>,
2774}
2775
2776/// Trait for converting a RawMessage response into the appropriate return type.
2777trait RawMessageResponse: Sized {
2778    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError>;
2779}
2780
2781impl RawMessageResponse for StreamMessage {
2782    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2783        StreamMessage::try_from(message)
2784            .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2785    }
2786}
2787
2788impl RawMessageResponse for StreamValue {
2789    fn from_raw_message(message: RawMessage) -> Result<Self, RawMessageError> {
2790        use base64::engine::general_purpose::STANDARD;
2791        use base64::Engine;
2792
2793        let decoded_payload = STANDARD.decode(message.payload).map_err(|err| {
2794            RawMessageError::with_source(
2795                RawMessageErrorKind::Other,
2796                Box::new(std::io::Error::other(err)),
2797            )
2798        })?;
2799
2800        Ok(StreamValue {
2801            data: decoded_payload.into(),
2802        })
2803    }
2804}
2805
2806pub struct RawMessageBuilder<T = WithHeaders> {
2807    context: Context,
2808    stream_name: String,
2809    request: RawMessageRequest,
2810    _phantom: std::marker::PhantomData<T>,
2811}
2812
2813impl RawMessageBuilder<WithHeaders> {
2814    fn new(context: Context, stream_name: String) -> Self {
2815        RawMessageBuilder {
2816            context,
2817            stream_name,
2818            request: RawMessageRequest::default(),
2819            _phantom: std::marker::PhantomData,
2820        }
2821    }
2822}
2823
2824impl<T> RawMessageBuilder<T> {
2825    /// Internal method to send the raw message request and convert to the appropriate type.
2826    async fn send_internal<R: RawMessageResponse>(&self) -> Result<R, RawMessageError> {
2827        // Validate subjects
2828        for subject in [&self.request.last_by_subject, &self.request.next_by_subject]
2829            .into_iter()
2830            .flatten()
2831        {
2832            if !is_valid_subject(subject) {
2833                return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
2834            }
2835        }
2836
2837        let subject = format!("STREAM.MSG.GET.{}", &self.stream_name);
2838
2839        let response: Response<GetRawMessage> = self
2840            .context
2841            .request(subject, &self.request)
2842            .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err))
2843            .await?;
2844
2845        match response {
2846            Response::Err { error } => {
2847                if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
2848                    Err(RawMessageError::new(RawMessageErrorKind::NoMessageFound))
2849                } else {
2850                    Err(RawMessageError::new(RawMessageErrorKind::JetStream(error)))
2851                }
2852            }
2853            Response::Ok(value) => R::from_raw_message(value.message),
2854        }
2855    }
2856
2857    /// Sets the sequence for the raw message request.
2858    pub fn sequence(mut self, seq: u64) -> Self {
2859        self.request.sequence = Some(seq);
2860        self
2861    }
2862
2863    /// Sets the last_by_subject for the raw message request.
2864    pub fn last_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2865        self.request.last_by_subject = Some(subject.into());
2866        self
2867    }
2868
2869    /// Sets the next_by_subject for the raw message request.
2870    pub fn next_by_subject<S: Into<String>>(mut self, subject: S) -> Self {
2871        self.request.next_by_subject = Some(subject.into());
2872        self
2873    }
2874}
2875
2876impl RawMessageBuilder<WithHeaders> {
2877    /// Sends the raw message request and returns a StreamMessage with headers.
2878    pub async fn send(self) -> Result<StreamMessage, RawMessageError> {
2879        self.send_internal::<StreamMessage>().await
2880    }
2881}
2882
2883impl RawMessageBuilder<WithoutHeaders> {
2884    /// Sends the raw message request and returns only the payload as StreamValue.
2885    pub async fn send(self) -> Result<StreamValue, RawMessageError> {
2886        self.send_internal::<StreamValue>().await
2887    }
2888}
2889
2890#[cfg(test)]
2891mod tests {
2892    use super::*;
2893
2894    #[test]
2895    fn consumer_limits_de() {
2896        let config = Config {
2897            ..Default::default()
2898        };
2899
2900        let roundtrip: Config = {
2901            let ser = serde_json::to_string(&config).unwrap();
2902            serde_json::from_str(&ser).unwrap()
2903        };
2904        assert_eq!(config, roundtrip);
2905    }
2906}