async_nats/jetstream/
stream.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on a [Stream], create/delete/update [Consumer].
15
16use std::{
17    collections::{self, HashMap},
18    fmt::{self, Debug, Display},
19    future::IntoFuture,
20    io::{self},
21    pin::Pin,
22    str::FromStr,
23    task::Poll,
24    time::Duration,
25};
26
27use crate::{
28    error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39    consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40    context::{
41        ConsumerInfoError, ConsumerInfoErrorKind, RequestError, RequestErrorKind, StreamsError,
42        StreamsErrorKind,
43    },
44    errors::ErrorCode,
45    is_valid_name,
46    message::{StreamMessage, StreamMessageError},
47    response::Response,
48    Context, Message,
49};
50
51pub type InfoError = RequestError;
52
53#[derive(Clone, Debug, PartialEq)]
54pub enum DirectGetErrorKind {
55    NotFound,
56    InvalidSubject,
57    TimedOut,
58    Request,
59    ErrorResponse(StatusCode, String),
60    Other,
61}
62
63impl Display for DirectGetErrorKind {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        match self {
66            Self::InvalidSubject => write!(f, "invalid subject"),
67            Self::NotFound => write!(f, "message not found"),
68            Self::ErrorResponse(status, description) => {
69                write!(f, "unable to get message: {} {}", status, description)
70            }
71            Self::Other => write!(f, "error getting message"),
72            Self::TimedOut => write!(f, "timed out"),
73            Self::Request => write!(f, "request failed"),
74        }
75    }
76}
77
78pub type DirectGetError = Error<DirectGetErrorKind>;
79
80impl From<crate::RequestError> for DirectGetError {
81    fn from(err: crate::RequestError) -> Self {
82        match err.kind() {
83            crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
84            crate::RequestErrorKind::NoResponders => {
85                DirectGetError::new(DirectGetErrorKind::ErrorResponse(
86                    StatusCode::NO_RESPONDERS,
87                    "no responders".to_string(),
88                ))
89            }
90            crate::RequestErrorKind::Other => {
91                DirectGetError::with_source(DirectGetErrorKind::Other, err)
92            }
93        }
94    }
95}
96
97impl From<serde_json::Error> for DirectGetError {
98    fn from(err: serde_json::Error) -> Self {
99        DirectGetError::with_source(DirectGetErrorKind::Other, err)
100    }
101}
102
103impl From<StreamMessageError> for DirectGetError {
104    fn from(err: StreamMessageError) -> Self {
105        DirectGetError::with_source(DirectGetErrorKind::Other, err)
106    }
107}
108
109#[derive(Clone, Debug, PartialEq)]
110pub enum DeleteMessageErrorKind {
111    Request,
112    TimedOut,
113    JetStream(super::errors::Error),
114}
115
116impl Display for DeleteMessageErrorKind {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        match self {
119            Self::Request => write!(f, "request failed"),
120            Self::TimedOut => write!(f, "timed out"),
121            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
122        }
123    }
124}
125
126pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
127
128/// Handle to operations that can be performed on a `Stream`.
129/// It's generic over the type of `info` field to allow `Stream` with or without
130/// info contents.
131#[derive(Debug, Clone)]
132pub struct Stream<T = Info> {
133    pub(crate) info: T,
134    pub(crate) context: Context,
135    pub(crate) name: String,
136}
137
138impl Stream<Info> {
139    /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
140    /// [Stream] and returns it.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// # #[tokio::main]
146    /// # async fn main() -> Result<(), async_nats::Error> {
147    /// let client = async_nats::connect("localhost:4222").await?;
148    /// let jetstream = async_nats::jetstream::new(client);
149    ///
150    /// let mut stream = jetstream.get_stream("events").await?;
151    ///
152    /// let info = stream.info().await?;
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub async fn info(&mut self) -> Result<&Info, InfoError> {
157        let subject = format!("STREAM.INFO.{}", self.info.config.name);
158
159        match self.context.request(subject, &json!({})).await? {
160            Response::Ok::<Info>(info) => {
161                self.info = info;
162                Ok(&self.info)
163            }
164            Response::Err { error } => Err(error.into()),
165        }
166    }
167
168    /// Returns cached [Info] for the [Stream].
169    /// Cache is either from initial creation/retrieval of the [Stream] or last call to
170    /// [Stream::info].
171    ///
172    /// # Examples
173    ///
174    /// ```no_run
175    /// # #[tokio::main]
176    /// # async fn main() -> Result<(), async_nats::Error> {
177    /// let client = async_nats::connect("localhost:4222").await?;
178    /// let jetstream = async_nats::jetstream::new(client);
179    ///
180    /// let stream = jetstream.get_stream("events").await?;
181    ///
182    /// let info = stream.cached_info();
183    /// # Ok(())
184    /// # }
185    /// ```
186    pub fn cached_info(&self) -> &Info {
187        &self.info
188    }
189}
190
191impl<I> Stream<I> {
192    /// Retrieves `info` about [Stream] from the server. Does not update the cache.
193    /// Can be used on Stream retrieved by [Context::get_stream_no_info]
194    pub async fn get_info(&self) -> Result<Info, InfoError> {
195        let subject = format!("STREAM.INFO.{}", self.name);
196
197        match self.context.request(subject, &json!({})).await? {
198            Response::Ok::<Info>(info) => Ok(info),
199            Response::Err { error } => Err(error.into()),
200        }
201    }
202
203    /// Retrieves [[Info]] from the server and returns a [[futures::Stream]] that allows
204    /// iterating over all subjects in the stream fetched via paged API.
205    ///
206    /// # Examples
207    ///
208    /// ```no_run
209    /// # #[tokio::main]
210    /// # async fn main() -> Result<(), async_nats::Error> {
211    /// use futures::TryStreamExt;
212    /// let client = async_nats::connect("localhost:4222").await?;
213    /// let jetstream = async_nats::jetstream::new(client);
214    ///
215    /// let mut stream = jetstream.get_stream("events").await?;
216    ///
217    /// let mut info = stream.info_with_subjects("events.>").await?;
218    ///
219    /// while let Some((subject, count)) = info.try_next().await? {
220    ///     println!("Subject: {} count: {}", subject, count);
221    /// }
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub async fn info_with_subjects<F: AsRef<str>>(
226        &self,
227        subjects_filter: F,
228    ) -> Result<InfoWithSubjects, InfoError> {
229        let subjects_filter = subjects_filter.as_ref().to_string();
230        // TODO: validate the subject and decide if this should be a `Subject`
231        let info = stream_info_with_details(
232            self.context.clone(),
233            self.name.clone(),
234            0,
235            false,
236            subjects_filter.clone(),
237        )
238        .await?;
239
240        Ok(InfoWithSubjects::new(
241            self.context.clone(),
242            info,
243            subjects_filter,
244        ))
245    }
246
247    /// Creates a builder that allows to customize `Stream::Info`.
248    ///
249    /// # Examples
250    /// ```no_run
251    /// # #[tokio::main]
252    /// # async fn main() -> Result<(), async_nats::Error> {
253    /// use futures::TryStreamExt;
254    /// let client = async_nats::connect("localhost:4222").await?;
255    /// let jetstream = async_nats::jetstream::new(client);
256    ///
257    /// let mut stream = jetstream.get_stream("events").await?;
258    ///
259    /// let mut info = stream
260    ///     .info_builder()
261    ///     .with_deleted(true)
262    ///     .subjects("events.>")
263    ///     .fetch()
264    ///     .await?;
265    ///
266    /// while let Some((subject, count)) = info.try_next().await? {
267    ///     println!("Subject: {} count: {}", subject, count);
268    /// }
269    /// # Ok(())
270    /// # }
271    /// ```
272    pub fn info_builder(&self) -> StreamInfoBuilder {
273        StreamInfoBuilder::new(self.context.clone(), self.name.clone())
274    }
275
276    /// Gets next message for a [Stream].
277    ///
278    /// Requires a [Stream] with `allow_direct` set to `true`.
279    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
280    /// from any replica member. This means read after write is possible,
281    /// as that given replica might not yet catch up with the leader.
282    ///
283    /// # Examples
284    ///
285    /// ```no_run
286    /// # #[tokio::main]
287    /// # async fn main() -> Result<(), async_nats::Error> {
288    /// let client = async_nats::connect("demo.nats.io").await?;
289    /// let jetstream = async_nats::jetstream::new(client);
290    ///
291    /// let stream = jetstream
292    ///     .create_stream(async_nats::jetstream::stream::Config {
293    ///         name: "events".to_string(),
294    ///         subjects: vec!["events.>".to_string()],
295    ///         allow_direct: true,
296    ///         ..Default::default()
297    ///     })
298    ///     .await?;
299    ///
300    /// jetstream.publish("events.data", "data".into()).await?;
301    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
302    ///
303    /// let message = stream
304    ///     .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
305    ///     .await?;
306    ///
307    /// # Ok(())
308    /// # }
309    /// ```
310    pub async fn direct_get_next_for_subject<T: AsRef<str>>(
311        &self,
312        subject: T,
313        sequence: Option<u64>,
314    ) -> Result<Message, DirectGetError> {
315        if !is_valid_subject(&subject) {
316            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
317        }
318        let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
319        let payload;
320        if let Some(sequence) = sequence {
321            payload = json!({
322                "seq": sequence,
323                "next_by_subj": subject.as_ref(),
324            });
325        } else {
326            payload = json!({
327                 "next_by_subj": subject.as_ref(),
328            });
329        }
330
331        let response = self
332            .context
333            .client
334            .request(
335                request_subject,
336                serde_json::to_vec(&payload).map(Bytes::from)?,
337            )
338            .await
339            .map(|message| Message {
340                message,
341                context: self.context.clone(),
342            })?;
343
344        if let Some(status) = response.status {
345            if let Some(ref description) = response.description {
346                match status {
347                    StatusCode::NOT_FOUND => {
348                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
349                    }
350                    // 408 is used in Direct Message for bad/empty payload.
351                    StatusCode::TIMEOUT => {
352                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
353                    }
354                    _ => {
355                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
356                            status,
357                            description.to_string(),
358                        )));
359                    }
360                }
361            }
362        }
363        Ok(response)
364    }
365
366    /// Gets first message from [Stream].
367    ///
368    /// Requires a [Stream] with `allow_direct` set to `true`.
369    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
370    /// from any replica member. This means read after write is possible,
371    /// as that given replica might not yet catch up with the leader.
372    ///
373    /// # Examples
374    ///
375    /// ```no_run
376    /// # #[tokio::main]
377    /// # async fn main() -> Result<(), async_nats::Error> {
378    /// let client = async_nats::connect("demo.nats.io").await?;
379    /// let jetstream = async_nats::jetstream::new(client);
380    ///
381    /// let stream = jetstream
382    ///     .create_stream(async_nats::jetstream::stream::Config {
383    ///         name: "events".to_string(),
384    ///         subjects: vec!["events.>".to_string()],
385    ///         allow_direct: true,
386    ///         ..Default::default()
387    ///     })
388    ///     .await?;
389    ///
390    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
391    ///
392    /// let message = stream.direct_get_first_for_subject("events.data").await?;
393    ///
394    /// # Ok(())
395    /// # }
396    /// ```
397    pub async fn direct_get_first_for_subject<T: AsRef<str>>(
398        &self,
399        subject: T,
400    ) -> Result<Message, DirectGetError> {
401        if !is_valid_subject(&subject) {
402            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
403        }
404        let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
405        let payload = json!({
406            "next_by_subj": subject.as_ref(),
407        });
408
409        let response = self
410            .context
411            .client
412            .request(
413                request_subject,
414                serde_json::to_vec(&payload).map(Bytes::from)?,
415            )
416            .await
417            .map(|message| Message {
418                message,
419                context: self.context.clone(),
420            })?;
421        if let Some(status) = response.status {
422            if let Some(ref description) = response.description {
423                match status {
424                    StatusCode::NOT_FOUND => {
425                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
426                    }
427                    // 408 is used in Direct Message for bad/empty payload.
428                    StatusCode::TIMEOUT => {
429                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
430                    }
431                    _ => {
432                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
433                            status,
434                            description.to_string(),
435                        )));
436                    }
437                }
438            }
439        }
440        Ok(response)
441    }
442
443    /// Gets message from [Stream] with given `sequence id`.
444    ///
445    /// Requires a [Stream] with `allow_direct` set to `true`.
446    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
447    /// from any replica member. This means read after write is possible,
448    /// as that given replica might not yet catch up with the leader.
449    ///
450    /// # Examples
451    ///
452    /// ```no_run
453    /// # #[tokio::main]
454    /// # async fn main() -> Result<(), async_nats::Error> {
455    /// let client = async_nats::connect("demo.nats.io").await?;
456    /// let jetstream = async_nats::jetstream::new(client);
457    ///
458    /// let stream = jetstream
459    ///     .create_stream(async_nats::jetstream::stream::Config {
460    ///         name: "events".to_string(),
461    ///         subjects: vec!["events.>".to_string()],
462    ///         allow_direct: true,
463    ///         ..Default::default()
464    ///     })
465    ///     .await?;
466    ///
467    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
468    ///
469    /// let message = stream.direct_get(pub_ack.await?.sequence).await?;
470    ///
471    /// # Ok(())
472    /// # }
473    /// ```
474    pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
475        let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
476        let payload = json!({
477            "seq": sequence,
478        });
479
480        let response = self
481            .context
482            .client
483            .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
484            .await?;
485
486        if let Some(status) = response.status {
487            if let Some(ref description) = response.description {
488                match status {
489                    StatusCode::NOT_FOUND => {
490                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
491                    }
492                    // 408 is used in Direct Message for bad/empty payload.
493                    StatusCode::TIMEOUT => {
494                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
495                    }
496                    _ => {
497                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
498                            status,
499                            description.to_string(),
500                        )));
501                    }
502                }
503            }
504        }
505        StreamMessage::try_from(response).map_err(Into::into)
506    }
507
508    /// Gets last message for a given `subject`.
509    ///
510    /// Requires a [Stream] with `allow_direct` set to `true`.
511    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
512    /// from any replica member. This means read after write is possible,
513    /// as that given replica might not yet catch up with the leader.
514    ///
515    /// # Examples
516    ///
517    /// ```no_run
518    /// # #[tokio::main]
519    /// # async fn main() -> Result<(), async_nats::Error> {
520    /// let client = async_nats::connect("demo.nats.io").await?;
521    /// let jetstream = async_nats::jetstream::new(client);
522    ///
523    /// let stream = jetstream
524    ///     .create_stream(async_nats::jetstream::stream::Config {
525    ///         name: "events".to_string(),
526    ///         subjects: vec!["events.>".to_string()],
527    ///         allow_direct: true,
528    ///         ..Default::default()
529    ///     })
530    ///     .await?;
531    ///
532    /// jetstream.publish("events.data", "data".into()).await?;
533    ///
534    /// let message = stream.direct_get_last_for_subject("events.data").await?;
535    ///
536    /// # Ok(())
537    /// # }
538    /// ```
539    pub async fn direct_get_last_for_subject<T: AsRef<str>>(
540        &self,
541        subject: T,
542    ) -> Result<StreamMessage, DirectGetError> {
543        let subject = format!(
544            "{}.DIRECT.GET.{}.{}",
545            &self.context.prefix,
546            &self.name,
547            subject.as_ref()
548        );
549
550        let response = self.context.client.request(subject, "".into()).await?;
551        if let Some(status) = response.status {
552            if let Some(ref description) = response.description {
553                match status {
554                    StatusCode::NOT_FOUND => {
555                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
556                    }
557                    // 408 is used in Direct Message for bad/empty payload.
558                    StatusCode::TIMEOUT => {
559                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
560                    }
561                    _ => {
562                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
563                            status,
564                            description.to_string(),
565                        )));
566                    }
567                }
568            }
569        }
570        StreamMessage::try_from(response).map_err(Into::into)
571    }
572    /// Get a raw message from the stream for a given stream sequence.
573    /// This low-level API always reaches stream leader.
574    /// This should be discouraged in favor of using [Stream::direct_get].
575    ///
576    /// # Examples
577    ///
578    /// ```no_run
579    /// #[tokio::main]
580    /// # async fn main() -> Result<(), async_nats::Error> {
581    /// use futures::StreamExt;
582    /// use futures::TryStreamExt;
583    ///
584    /// let client = async_nats::connect("localhost:4222").await?;
585    /// let context = async_nats::jetstream::new(client);
586    ///
587    /// let stream = context
588    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
589    ///         name: "events".to_string(),
590    ///         max_messages: 10_000,
591    ///         ..Default::default()
592    ///     })
593    ///     .await?;
594    ///
595    /// let publish_ack = context.publish("events", "data".into()).await?;
596    /// let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
597    /// println!("Retrieved raw message {:?}", raw_message);
598    /// # Ok(())
599    /// # }
600    /// ```
601    pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
602        self.raw_message(StreamGetMessage {
603            sequence: Some(sequence),
604            last_by_subject: None,
605            next_by_subject: None,
606        })
607        .await
608    }
609
610    /// Get a first message from the stream for a given subject starting from provided sequence.
611    /// This low-level API always reaches stream leader.
612    /// This should be discouraged in favor of using [Stream::direct_get_first_for_subject].
613    ///
614    /// # Examples
615    ///
616    /// ```no_run
617    /// #[tokio::main]
618    /// # async fn main() -> Result<(), async_nats::Error> {
619    /// use futures::StreamExt;
620    /// use futures::TryStreamExt;
621    ///
622    /// let client = async_nats::connect("localhost:4222").await?;
623    /// let context = async_nats::jetstream::new(client);
624    /// let stream = context.get_stream_no_info("events").await?;
625    ///
626    /// let raw_message = stream
627    ///     .get_first_raw_message_by_subject("events.created", 10)
628    ///     .await?;
629    /// println!("Retrieved raw message {:?}", raw_message);
630    /// # Ok(())
631    /// # }
632    /// ```
633    pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
634        &self,
635        subject: T,
636        sequence: u64,
637    ) -> Result<StreamMessage, RawMessageError> {
638        self.raw_message(StreamGetMessage {
639            sequence: Some(sequence),
640            last_by_subject: None,
641            next_by_subject: Some(subject.as_ref().to_string()),
642        })
643        .await
644    }
645
646    /// Get a next message from the stream for a given subject.
647    /// This low-level API always reaches stream leader.
648    /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
649    ///
650    /// # Examples
651    ///
652    /// ```no_run
653    /// #[tokio::main]
654    /// # async fn main() -> Result<(), async_nats::Error> {
655    /// use futures::StreamExt;
656    /// use futures::TryStreamExt;
657    ///
658    /// let client = async_nats::connect("localhost:4222").await?;
659    /// let context = async_nats::jetstream::new(client);
660    /// let stream = context.get_stream_no_info("events").await?;
661    ///
662    /// let raw_message = stream
663    ///     .get_next_raw_message_by_subject("events.created")
664    ///     .await?;
665    /// println!("Retrieved raw message {:?}", raw_message);
666    /// # Ok(())
667    /// # }
668    /// ```
669    pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
670        &self,
671        subject: T,
672    ) -> Result<StreamMessage, RawMessageError> {
673        self.raw_message(StreamGetMessage {
674            sequence: None,
675            last_by_subject: None,
676            next_by_subject: Some(subject.as_ref().to_string()),
677        })
678        .await
679    }
680
681    async fn raw_message(
682        &self,
683        request: StreamGetMessage,
684    ) -> Result<StreamMessage, RawMessageError> {
685        for subject in [&request.last_by_subject, &request.next_by_subject]
686            .into_iter()
687            .flatten()
688        {
689            if !is_valid_subject(subject) {
690                return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
691            }
692        }
693        let subject = format!("STREAM.MSG.GET.{}", &self.name);
694
695        let response: Response<GetRawMessage> = self
696            .context
697            .request(subject, &request)
698            .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
699            .await?;
700
701        match response {
702            Response::Err { error } => {
703                if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
704                    Err(LastRawMessageError::new(
705                        LastRawMessageErrorKind::NoMessageFound,
706                    ))
707                } else {
708                    Err(LastRawMessageError::new(
709                        LastRawMessageErrorKind::JetStream(error),
710                    ))
711                }
712            }
713            Response::Ok(value) => StreamMessage::try_from(value.message)
714                .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err)),
715        }
716    }
717
718    /// Get a last message from the stream for a given subject.
719    /// This low-level API always reaches stream leader.
720    /// This should be discouraged in favor of using [Stream::direct_get_last_for_subject].
721    ///
722    /// # Examples
723    ///
724    /// ```no_run
725    /// #[tokio::main]
726    /// # async fn main() -> Result<(), async_nats::Error> {
727    /// use futures::StreamExt;
728    /// use futures::TryStreamExt;
729    ///
730    /// let client = async_nats::connect("localhost:4222").await?;
731    /// let context = async_nats::jetstream::new(client);
732    /// let stream = context.get_stream_no_info("events").await?;
733    ///
734    /// let raw_message = stream
735    ///     .get_last_raw_message_by_subject("events.created")
736    ///     .await?;
737    /// println!("Retrieved raw message {:?}", raw_message);
738    /// # Ok(())
739    /// # }
740    /// ```
741    pub async fn get_last_raw_message_by_subject(
742        &self,
743        stream_subject: &str,
744    ) -> Result<StreamMessage, LastRawMessageError> {
745        self.raw_message(StreamGetMessage {
746            sequence: None,
747            last_by_subject: Some(stream_subject.to_string()),
748            next_by_subject: None,
749        })
750        .await
751    }
752
753    /// Delete a message from the stream.
754    ///
755    /// # Examples
756    ///
757    /// ```no_run
758    /// # #[tokio::main]
759    /// # async fn main() -> Result<(), async_nats::Error> {
760    /// let client = async_nats::connect("localhost:4222").await?;
761    /// let context = async_nats::jetstream::new(client);
762    ///
763    /// let stream = context
764    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
765    ///         name: "events".to_string(),
766    ///         max_messages: 10_000,
767    ///         ..Default::default()
768    ///     })
769    ///     .await?;
770    ///
771    /// let publish_ack = context.publish("events", "data".into()).await?;
772    /// stream.delete_message(publish_ack.await?.sequence).await?;
773    /// # Ok(())
774    /// # }
775    /// ```
776    pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
777        let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
778        let payload = json!({
779            "seq": sequence,
780        });
781
782        let response: Response<DeleteStatus> = self
783            .context
784            .request(subject, &payload)
785            .map_err(|err| match err.kind() {
786                RequestErrorKind::TimedOut => {
787                    DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
788                }
789                _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
790            })
791            .await?;
792
793        match response {
794            Response::Err { error } => Err(DeleteMessageError::new(
795                DeleteMessageErrorKind::JetStream(error),
796            )),
797            Response::Ok(value) => Ok(value.success),
798        }
799    }
800
801    /// Purge `Stream` messages.
802    ///
803    /// # Examples
804    ///
805    /// ```no_run
806    /// # #[tokio::main]
807    /// # async fn main() -> Result<(), async_nats::Error> {
808    /// let client = async_nats::connect("demo.nats.io").await?;
809    /// let jetstream = async_nats::jetstream::new(client);
810    ///
811    /// let stream = jetstream.get_stream("events").await?;
812    /// stream.purge().await?;
813    /// # Ok(())
814    /// # }
815    /// ```
816    pub fn purge(&self) -> Purge<No, No> {
817        Purge::build(self)
818    }
819
820    /// Purge `Stream` messages for a matching subject.
821    ///
822    /// # Examples
823    ///
824    /// ```no_run
825    /// # #[tokio::main]
826    /// # #[allow(deprecated)]
827    /// # async fn main() -> Result<(), async_nats::Error> {
828    /// let client = async_nats::connect("demo.nats.io").await?;
829    /// let jetstream = async_nats::jetstream::new(client);
830    ///
831    /// let stream = jetstream.get_stream("events").await?;
832    /// stream.purge_subject("data").await?;
833    /// # Ok(())
834    /// # }
835    /// ```
836    #[deprecated(
837        since = "0.25.0",
838        note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
839    )]
840    pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
841    where
842        T: Into<String>,
843    {
844        self.purge().filter(subject).await
845    }
846
847    /// Create or update `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
848    /// returns the info from the server about created [Consumer]
849    /// If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
850    ///
851    /// # Examples
852    ///
853    /// ```no_run
854    /// # #[tokio::main]
855    /// # async fn main() -> Result<(), async_nats::Error> {
856    /// use async_nats::jetstream::consumer;
857    /// let client = async_nats::connect("localhost:4222").await?;
858    /// let jetstream = async_nats::jetstream::new(client);
859    ///
860    /// let stream = jetstream.get_stream("events").await?;
861    /// let info = stream
862    ///     .create_consumer(consumer::pull::Config {
863    ///         durable_name: Some("pull".to_string()),
864    ///         ..Default::default()
865    ///     })
866    ///     .await?;
867    /// # Ok(())
868    /// # }
869    /// ```
870    pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
871        &self,
872        config: C,
873    ) -> Result<Consumer<C>, ConsumerError> {
874        self.context
875            .create_consumer_on_stream(config, self.name.clone())
876            .await
877    }
878
879    /// Update an existing consumer.
880    /// This call will fail if the consumer does not exist.
881    /// returns the info from the server about updated [Consumer].
882    ///
883    /// # Examples
884    ///
885    /// ```no_run
886    /// # #[tokio::main]
887    /// # async fn main() -> Result<(), async_nats::Error> {
888    /// use async_nats::jetstream::consumer;
889    /// let client = async_nats::connect("localhost:4222").await?;
890    /// let jetstream = async_nats::jetstream::new(client);
891    ///
892    /// let stream = jetstream.get_stream("events").await?;
893    /// let info = stream
894    ///     .update_consumer(consumer::pull::Config {
895    ///         durable_name: Some("pull".to_string()),
896    ///         ..Default::default()
897    ///     })
898    ///     .await?;
899    /// # Ok(())
900    /// # }
901    /// ```
902    #[cfg(feature = "server_2_10")]
903    pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
904        &self,
905        config: C,
906    ) -> Result<Consumer<C>, ConsumerUpdateError> {
907        self.context
908            .update_consumer_on_stream(config, self.name.clone())
909            .await
910    }
911
912    /// Create consumer, but only if it does not exist or the existing config is exactly
913    /// the same.
914    /// This method will fail if consumer is already present with different config.
915    /// returns the info from the server about created [Consumer].
916    ///
917    /// # Examples
918    ///
919    /// ```no_run
920    /// # #[tokio::main]
921    /// # async fn main() -> Result<(), async_nats::Error> {
922    /// use async_nats::jetstream::consumer;
923    /// let client = async_nats::connect("localhost:4222").await?;
924    /// let jetstream = async_nats::jetstream::new(client);
925    ///
926    /// let stream = jetstream.get_stream("events").await?;
927    /// let info = stream
928    ///     .create_consumer_strict(consumer::pull::Config {
929    ///         durable_name: Some("pull".to_string()),
930    ///         ..Default::default()
931    ///     })
932    ///     .await?;
933    /// # Ok(())
934    /// # }
935    /// ```
936    #[cfg(feature = "server_2_10")]
937    pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
938        &self,
939        config: C,
940    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
941        self.context
942            .create_consumer_strict_on_stream(config, self.name.clone())
943            .await
944    }
945
946    /// Retrieve [Info] about [Consumer] from the server.
947    ///
948    /// # Examples
949    ///
950    /// ```no_run
951    /// # #[tokio::main]
952    /// # async fn main() -> Result<(), async_nats::Error> {
953    /// use async_nats::jetstream::consumer;
954    /// let client = async_nats::connect("localhost:4222").await?;
955    /// let jetstream = async_nats::jetstream::new(client);
956    ///
957    /// let stream = jetstream.get_stream("events").await?;
958    /// let info = stream.consumer_info("pull").await?;
959    /// # Ok(())
960    /// # }
961    /// ```
962    pub async fn consumer_info<T: AsRef<str>>(
963        &self,
964        name: T,
965    ) -> Result<consumer::Info, ConsumerInfoError> {
966        let name = name.as_ref();
967
968        if !is_valid_name(name) {
969            return Err(ConsumerInfoError::new(ConsumerInfoErrorKind::InvalidName));
970        }
971
972        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
973
974        match self.context.request(subject, &json!({})).await? {
975            Response::Ok(info) => Ok(info),
976            Response::Err { error } => Err(error.into()),
977        }
978    }
979
980    /// Get [Consumer] from the the server. [Consumer] iterators can be used to retrieve
981    /// [Messages][crate::jetstream::Message] for a given [Consumer].
982    ///
983    /// # Examples
984    ///
985    /// ```no_run
986    /// # #[tokio::main]
987    /// # async fn main() -> Result<(), async_nats::Error> {
988    /// use async_nats::jetstream::consumer;
989    /// use futures::StreamExt;
990    /// let client = async_nats::connect("localhost:4222").await?;
991    /// let jetstream = async_nats::jetstream::new(client);
992    ///
993    /// let stream = jetstream.get_stream("events").await?;
994    /// let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
995    /// # Ok(())
996    /// # }
997    /// ```
998    pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
999        &self,
1000        name: &str,
1001    ) -> Result<Consumer<T>, crate::Error> {
1002        let info = self.consumer_info(name).await?;
1003
1004        Ok(Consumer::new(
1005            T::try_from_consumer_config(info.config.clone())?,
1006            info,
1007            self.context.clone(),
1008        ))
1009    }
1010
1011    /// Create a [Consumer] with the given configuration if it is not present on the server. Returns a handle to the [Consumer].
1012    ///
1013    /// Note: This does not validate if the [Consumer] on the server is compatible with the configuration passed in except Push/Pull compatibility.
1014    ///
1015    /// # Examples
1016    ///
1017    /// ```no_run
1018    /// # #[tokio::main]
1019    /// # async fn main() -> Result<(), async_nats::Error> {
1020    /// use async_nats::jetstream::consumer;
1021    /// use futures::StreamExt;
1022    /// let client = async_nats::connect("localhost:4222").await?;
1023    /// let jetstream = async_nats::jetstream::new(client);
1024    ///
1025    /// let stream = jetstream.get_stream("events").await?;
1026    /// let consumer = stream
1027    ///     .get_or_create_consumer(
1028    ///         "pull",
1029    ///         consumer::pull::Config {
1030    ///             durable_name: Some("pull".to_string()),
1031    ///             ..Default::default()
1032    ///         },
1033    ///     )
1034    ///     .await?;
1035    /// # Ok(())
1036    /// # }
1037    /// ```
1038    pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
1039        &self,
1040        name: &str,
1041        config: T,
1042    ) -> Result<Consumer<T>, ConsumerError> {
1043        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
1044
1045        match self.context.request(subject, &json!({})).await? {
1046            Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
1047            Response::Err { error } => Err(error.into()),
1048            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1049                T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1050                    ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1051                })?,
1052                info,
1053                self.context.clone(),
1054            )),
1055        }
1056    }
1057
1058    /// Delete a [Consumer] from the server.
1059    ///
1060    /// # Examples
1061    ///
1062    /// ```no_run
1063    /// # #[tokio::main]
1064    /// # async fn main() -> Result<(), async_nats::Error> {
1065    /// use async_nats::jetstream::consumer;
1066    /// use futures::StreamExt;
1067    /// let client = async_nats::connect("localhost:4222").await?;
1068    /// let jetstream = async_nats::jetstream::new(client);
1069    ///
1070    /// jetstream
1071    ///     .get_stream("events")
1072    ///     .await?
1073    ///     .delete_consumer("pull")
1074    ///     .await?;
1075    /// # Ok(())
1076    /// # }
1077    /// ```
1078    pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
1079        let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
1080
1081        match self.context.request(subject, &json!({})).await? {
1082            Response::Ok(delete_status) => Ok(delete_status),
1083            Response::Err { error } => Err(error.into()),
1084        }
1085    }
1086
1087    /// Pause a [Consumer] until the given time.
1088    /// It will not deliver any messages to clients during that time.
1089    ///
1090    /// # Examples
1091    ///
1092    /// ```no_run
1093    /// # #[tokio::main]
1094    /// # async fn main() -> Result<(), async_nats::Error> {
1095    /// use async_nats::jetstream::consumer;
1096    /// use futures::StreamExt;
1097    /// let client = async_nats::connect("localhost:4222").await?;
1098    /// let jetstream = async_nats::jetstream::new(client);
1099    /// let pause_until =
1100    ///     time::OffsetDateTime::now_utc().saturating_add(time::Duration::seconds_f32(10.0));
1101    ///
1102    /// jetstream
1103    ///     .get_stream("events")
1104    ///     .await?
1105    ///     .pause_consumer("my_consumer", pause_until)
1106    ///     .await?;
1107    /// # Ok(())
1108    /// # }
1109    /// ```
1110    #[cfg(feature = "server_2_11")]
1111    pub async fn pause_consumer(
1112        &self,
1113        name: &str,
1114        pause_until: OffsetDateTime,
1115    ) -> Result<PauseResponse, ConsumerError> {
1116        self.request_pause_consumer(name, Some(pause_until)).await
1117    }
1118
1119    /// Resume a paused [Consumer].
1120    ///
1121    /// # Examples
1122    ///
1123    /// ```no_run
1124    /// # #[tokio::main]
1125    /// # async fn main() -> Result<(), async_nats::Error> {
1126    /// use async_nats::jetstream::consumer;
1127    /// use futures::StreamExt;
1128    /// let client = async_nats::connect("localhost:4222").await?;
1129    /// let jetstream = async_nats::jetstream::new(client);
1130    ///
1131    /// jetstream
1132    ///     .get_stream("events")
1133    ///     .await?
1134    ///     .resume_consumer("my_consumer")
1135    ///     .await?;
1136    /// # Ok(())
1137    /// # }
1138    /// ```
1139    #[cfg(feature = "server_2_11")]
1140    pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1141        self.request_pause_consumer(name, None).await
1142    }
1143
1144    #[cfg(feature = "server_2_11")]
1145    async fn request_pause_consumer(
1146        &self,
1147        name: &str,
1148        pause_until: Option<OffsetDateTime>,
1149    ) -> Result<PauseResponse, ConsumerError> {
1150        let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1151        let payload = &PauseResumeConsumerRequest { pause_until };
1152
1153        match self.context.request(subject, payload).await? {
1154            Response::Ok::<PauseResponse>(resp) => Ok(resp),
1155            Response::Err { error } => Err(error.into()),
1156        }
1157    }
1158
1159    /// Lists names of all consumers for current stream.
1160    ///
1161    /// # Examples
1162    ///
1163    /// ```no_run
1164    /// # #[tokio::main]
1165    /// # async fn main() -> Result<(), async_nats::Error> {
1166    /// use futures::TryStreamExt;
1167    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1168    /// let jetstream = async_nats::jetstream::new(client);
1169    /// let stream = jetstream.get_stream("stream").await?;
1170    /// let mut names = stream.consumer_names();
1171    /// while let Some(consumer) = names.try_next().await? {
1172    ///     println!("consumer: {stream:?}");
1173    /// }
1174    /// # Ok(())
1175    /// # }
1176    /// ```
1177    pub fn consumer_names(&self) -> ConsumerNames {
1178        ConsumerNames {
1179            context: self.context.clone(),
1180            stream: self.name.clone(),
1181            offset: 0,
1182            page_request: None,
1183            consumers: Vec::new(),
1184            done: false,
1185        }
1186    }
1187
1188    /// Lists all consumers info for current stream.
1189    ///
1190    /// # Examples
1191    ///
1192    /// ```no_run
1193    /// # #[tokio::main]
1194    /// # async fn main() -> Result<(), async_nats::Error> {
1195    /// use futures::TryStreamExt;
1196    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1197    /// let jetstream = async_nats::jetstream::new(client);
1198    /// let stream = jetstream.get_stream("stream").await?;
1199    /// let mut consumers = stream.consumers();
1200    /// while let Some(consumer) = consumers.try_next().await? {
1201    ///     println!("consumer: {consumer:?}");
1202    /// }
1203    /// # Ok(())
1204    /// # }
1205    /// ```
1206    pub fn consumers(&self) -> Consumers {
1207        Consumers {
1208            context: self.context.clone(),
1209            stream: self.name.clone(),
1210            offset: 0,
1211            page_request: None,
1212            consumers: Vec::new(),
1213            done: false,
1214        }
1215    }
1216}
1217
1218pub struct StreamInfoBuilder {
1219    pub(crate) context: Context,
1220    pub(crate) name: String,
1221    pub(crate) deleted: bool,
1222    pub(crate) subject: String,
1223}
1224
1225impl StreamInfoBuilder {
1226    fn new(context: Context, name: String) -> Self {
1227        Self {
1228            context,
1229            name,
1230            deleted: false,
1231            subject: "".to_string(),
1232        }
1233    }
1234
1235    pub fn with_deleted(mut self, deleted: bool) -> Self {
1236        self.deleted = deleted;
1237        self
1238    }
1239
1240    pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1241        self.subject = subject.into();
1242        self
1243    }
1244
1245    pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1246        let info = stream_info_with_details(
1247            self.context.clone(),
1248            self.name.clone(),
1249            0,
1250            self.deleted,
1251            self.subject.clone(),
1252        )
1253        .await?;
1254
1255        Ok(InfoWithSubjects::new(self.context, info, self.subject))
1256    }
1257}
1258
1259/// `StreamConfig` determines the properties for a stream.
1260/// There are sensible defaults for most. If no subjects are
1261/// given the name will be used as the only subject.
1262#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1263pub struct Config {
1264    /// A name for the Stream. Must not have spaces, tabs or period `.` characters
1265    pub name: String,
1266    /// How large the Stream may become in total bytes before the configured discard policy kicks in
1267    pub max_bytes: i64,
1268    /// How large the Stream may become in total messages before the configured discard policy kicks in
1269    #[serde(rename = "max_msgs")]
1270    pub max_messages: i64,
1271    /// Maximum amount of messages to keep per subject
1272    #[serde(rename = "max_msgs_per_subject")]
1273    pub max_messages_per_subject: i64,
1274    /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in.
1275    /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space
1276    pub discard: DiscardPolicy,
1277    /// Prevents a message from being added to a stream if the max_msgs_per_subject limit for the subject has been reached
1278    #[serde(default, skip_serializing_if = "is_default")]
1279    pub discard_new_per_subject: bool,
1280    /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
1281    /// configured stream `name`.
1282    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1283    pub subjects: Vec<String>,
1284    /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
1285    pub retention: RetentionPolicy,
1286    /// How many Consumers can be defined for a given Stream, -1 for unlimited
1287    pub max_consumers: i32,
1288    /// Maximum age of any message in the stream, expressed in nanoseconds
1289    #[serde(with = "serde_nanos")]
1290    pub max_age: Duration,
1291    /// The largest message that will be accepted by the Stream
1292    #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1293    pub max_message_size: i32,
1294    /// The type of storage backend, `File` (default) and `Memory`
1295    pub storage: StorageType,
1296    /// How many replicas to keep for each message in a clustered JetStream, maximum 5
1297    pub num_replicas: usize,
1298    /// Disables acknowledging messages that are received by the Stream
1299    #[serde(default, skip_serializing_if = "is_default")]
1300    pub no_ack: bool,
1301    /// The window within which to track duplicate messages.
1302    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1303    pub duplicate_window: Duration,
1304    /// The owner of the template associated with this stream.
1305    #[serde(default, skip_serializing_if = "is_default")]
1306    pub template_owner: String,
1307    /// Indicates the stream is sealed and cannot be modified in any way
1308    #[serde(default, skip_serializing_if = "is_default")]
1309    pub sealed: bool,
1310    /// A short description of the purpose of this stream.
1311    #[serde(default, skip_serializing_if = "is_default")]
1312    pub description: Option<String>,
1313    #[serde(
1314        default,
1315        rename = "allow_rollup_hdrs",
1316        skip_serializing_if = "is_default"
1317    )]
1318    /// Indicates if rollups will be allowed or not.
1319    pub allow_rollup: bool,
1320    #[serde(default, skip_serializing_if = "is_default")]
1321    /// Indicates deletes will be denied or not.
1322    pub deny_delete: bool,
1323    /// Indicates if purges will be denied or not.
1324    #[serde(default, skip_serializing_if = "is_default")]
1325    pub deny_purge: bool,
1326
1327    /// Optional republish config.
1328    #[serde(default, skip_serializing_if = "is_default")]
1329    pub republish: Option<Republish>,
1330
1331    /// Enables direct get, which would get messages from
1332    /// non-leader.
1333    #[serde(default, skip_serializing_if = "is_default")]
1334    pub allow_direct: bool,
1335
1336    /// Enable direct access also for mirrors.
1337    #[serde(default, skip_serializing_if = "is_default")]
1338    pub mirror_direct: bool,
1339
1340    /// Stream mirror configuration.
1341    #[serde(default, skip_serializing_if = "Option::is_none")]
1342    pub mirror: Option<Source>,
1343
1344    /// Sources configuration.
1345    #[serde(default, skip_serializing_if = "Option::is_none")]
1346    pub sources: Option<Vec<Source>>,
1347
1348    #[cfg(feature = "server_2_10")]
1349    /// Additional stream metadata.
1350    #[serde(default, skip_serializing_if = "is_default")]
1351    pub metadata: HashMap<String, String>,
1352
1353    #[cfg(feature = "server_2_10")]
1354    /// Allow applying a subject transform to incoming messages
1355    #[serde(default, skip_serializing_if = "Option::is_none")]
1356    pub subject_transform: Option<SubjectTransform>,
1357
1358    #[cfg(feature = "server_2_10")]
1359    /// Override compression config for this stream.
1360    /// Wrapping enum that has `None` type with [Option] is there
1361    /// because [Stream] can override global compression set to [Compression::S2]
1362    /// to [Compression::None], which is different from not overriding global config with anything.
1363    #[serde(default, skip_serializing_if = "Option::is_none")]
1364    pub compression: Option<Compression>,
1365    #[cfg(feature = "server_2_10")]
1366    /// Set limits on consumers that are created on this stream.
1367    #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1368    pub consumer_limits: Option<ConsumerLimits>,
1369
1370    #[cfg(feature = "server_2_10")]
1371    /// Sets the first sequence for the stream.
1372    #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1373    pub first_sequence: Option<u64>,
1374
1375    /// Placement configuration for clusters and tags.
1376    #[serde(default, skip_serializing_if = "Option::is_none")]
1377    pub placement: Option<Placement>,
1378    /// For suspending the consumer until the deadline.
1379    #[cfg(feature = "server_2_11")]
1380    #[serde(
1381        default,
1382        with = "rfc3339::option",
1383        skip_serializing_if = "Option::is_none"
1384    )]
1385    pub pause_until: Option<OffsetDateTime>,
1386
1387    /// Allows setting a TTL for a message in the stream.
1388    #[cfg(feature = "server_2_11")]
1389    #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1390    pub allow_message_ttl: bool,
1391
1392    /// Enables delete markers for messages deleted from the stream and sets the TTL
1393    /// for how long the marker should be kept.
1394    #[cfg(feature = "server_2_11")]
1395    #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1396    pub subject_delete_marker_ttl: Option<Duration>,
1397}
1398
1399impl From<&Config> for Config {
1400    fn from(sc: &Config) -> Config {
1401        sc.clone()
1402    }
1403}
1404
1405impl From<&str> for Config {
1406    fn from(s: &str) -> Config {
1407        Config {
1408            name: s.to_string(),
1409            ..Default::default()
1410        }
1411    }
1412}
1413
1414#[cfg(feature = "server_2_10")]
1415fn default_consumer_limits_as_none<'de, D>(
1416    deserializer: D,
1417) -> Result<Option<ConsumerLimits>, D::Error>
1418where
1419    D: Deserializer<'de>,
1420{
1421    let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1422    if let Some(cl) = consumer_limits {
1423        if cl == ConsumerLimits::default() {
1424            Ok(None)
1425        } else {
1426            Ok(Some(cl))
1427        }
1428    } else {
1429        Ok(None)
1430    }
1431}
1432#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1433pub struct ConsumerLimits {
1434    /// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
1435    #[serde(default, with = "serde_nanos")]
1436    pub inactive_threshold: std::time::Duration,
1437    /// Sets the maximum [crate::jetstream::consumer::Config::max_ack_pending] that can be set on the consumer.
1438    #[serde(default)]
1439    pub max_ack_pending: i64,
1440}
1441
1442#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1443pub enum Compression {
1444    #[serde(rename = "s2")]
1445    S2,
1446    #[serde(rename = "none")]
1447    None,
1448}
1449
1450// SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
1451#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1452pub struct SubjectTransform {
1453    #[serde(rename = "src")]
1454    pub source: String,
1455
1456    #[serde(rename = "dest")]
1457    pub destination: String,
1458}
1459
1460// Republish is for republishing messages once committed to a stream.
1461#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1462pub struct Republish {
1463    /// Subject that should be republished.
1464    #[serde(rename = "src")]
1465    pub source: String,
1466    /// Subject where messages will be republished.
1467    #[serde(rename = "dest")]
1468    pub destination: String,
1469    /// If true, only headers should be republished.
1470    #[serde(default)]
1471    pub headers_only: bool,
1472}
1473
1474/// Placement describes on which cluster or tags the stream should be placed.
1475#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1476pub struct Placement {
1477    // Cluster where the stream should be placed.
1478    #[serde(default, skip_serializing_if = "is_default")]
1479    pub cluster: Option<String>,
1480    // Matching tags for stream placement.
1481    #[serde(default, skip_serializing_if = "is_default")]
1482    pub tags: Vec<String>,
1483}
1484
1485/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
1486/// remove older messages. `New` will fail to store the new message.
1487#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1488#[repr(u8)]
1489pub enum DiscardPolicy {
1490    /// will remove older messages when limits are hit.
1491    #[default]
1492    #[serde(rename = "old")]
1493    Old = 0,
1494    /// will error on a StoreMsg call when limits are hit
1495    #[serde(rename = "new")]
1496    New = 1,
1497}
1498
1499/// `RetentionPolicy` determines how messages in a set are retained.
1500#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1501#[repr(u8)]
1502pub enum RetentionPolicy {
1503    /// `Limits` (default) means that messages are retained until any given limit is reached.
1504    /// This could be one of messages, bytes, or age.
1505    #[default]
1506    #[serde(rename = "limits")]
1507    Limits = 0,
1508    /// `Interest` specifies that when all known observables have acknowledged a message it can be removed.
1509    #[serde(rename = "interest")]
1510    Interest = 1,
1511    /// `WorkQueue` specifies that when the first worker or subscriber acknowledges the message it can be removed.
1512    #[serde(rename = "workqueue")]
1513    WorkQueue = 2,
1514}
1515
1516/// determines how messages are stored for retention.
1517#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1518#[repr(u8)]
1519pub enum StorageType {
1520    /// Stream data is kept in files. This is the default.
1521    #[default]
1522    #[serde(rename = "file")]
1523    File = 0,
1524    /// Stream data is kept only in memory.
1525    #[serde(rename = "memory")]
1526    Memory = 1,
1527}
1528
1529async fn stream_info_with_details(
1530    context: Context,
1531    stream: String,
1532    offset: usize,
1533    deleted_details: bool,
1534    subjects_filter: String,
1535) -> Result<Info, InfoError> {
1536    let subject = format!("STREAM.INFO.{}", stream);
1537
1538    let payload = StreamInfoRequest {
1539        offset,
1540        deleted_details,
1541        subjects_filter,
1542    };
1543
1544    let response: Response<Info> = context.request(subject, &payload).await?;
1545
1546    match response {
1547        Response::Ok(info) => Ok(info),
1548        Response::Err { error } => Err(error.into()),
1549    }
1550}
1551
1552type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1553
1554#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1555pub struct StreamInfoRequest {
1556    offset: usize,
1557    deleted_details: bool,
1558    subjects_filter: String,
1559}
1560
1561pub struct InfoWithSubjects {
1562    stream: String,
1563    context: Context,
1564    pub info: Info,
1565    offset: usize,
1566    subjects: collections::hash_map::IntoIter<String, usize>,
1567    info_request: Option<InfoRequest>,
1568    subjects_filter: String,
1569    pages_done: bool,
1570}
1571
1572impl InfoWithSubjects {
1573    pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1574        let subjects = info.state.subjects.take().unwrap_or_default();
1575        let name = info.config.name.clone();
1576        InfoWithSubjects {
1577            context,
1578            info,
1579            pages_done: subjects.is_empty(),
1580            offset: subjects.len(),
1581            subjects: subjects.into_iter(),
1582            subjects_filter: subject,
1583            stream: name,
1584            info_request: None,
1585        }
1586    }
1587}
1588
1589impl futures::Stream for InfoWithSubjects {
1590    type Item = Result<(String, usize), InfoError>;
1591
1592    fn poll_next(
1593        mut self: Pin<&mut Self>,
1594        cx: &mut std::task::Context<'_>,
1595    ) -> Poll<Option<Self::Item>> {
1596        match self.subjects.next() {
1597            Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1598            None => {
1599                // If we have already requested all pages, stop the iterator.
1600                if self.pages_done {
1601                    return Poll::Ready(None);
1602                }
1603                let stream = self.stream.clone();
1604                let context = self.context.clone();
1605                let subjects_filter = self.subjects_filter.clone();
1606                let offset = self.offset;
1607                match self
1608                    .info_request
1609                    .get_or_insert_with(|| {
1610                        Box::pin(stream_info_with_details(
1611                            context,
1612                            stream,
1613                            offset,
1614                            false,
1615                            subjects_filter,
1616                        ))
1617                    })
1618                    .poll_unpin(cx)
1619                {
1620                    Poll::Ready(resp) => match resp {
1621                        Ok(info) => {
1622                            let subjects = info.state.subjects.clone();
1623                            self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1624                            self.info_request = None;
1625                            let subjects = subjects.unwrap_or_default();
1626                            self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1627                            let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1628                            if total <= self.offset || subjects.is_empty() {
1629                                self.pages_done = true;
1630                            }
1631                            match self.subjects.next() {
1632                                Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1633                                None => Poll::Ready(None),
1634                            }
1635                        }
1636                        Err(err) => Poll::Ready(Some(Err(err))),
1637                    },
1638                    Poll::Pending => Poll::Pending,
1639                }
1640            }
1641        }
1642    }
1643}
1644
1645/// Shows config and current state for this stream.
1646#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1647pub struct Info {
1648    /// The configuration associated with this stream.
1649    pub config: Config,
1650    /// The time that this stream was created.
1651    #[serde(with = "rfc3339")]
1652    pub created: time::OffsetDateTime,
1653    /// Various metrics associated with this stream.
1654    pub state: State,
1655    /// Information about leader and replicas.
1656    pub cluster: Option<ClusterInfo>,
1657    /// Information about mirror config if present.
1658    #[serde(default)]
1659    pub mirror: Option<SourceInfo>,
1660    /// Information about sources configs if present.
1661    #[serde(default)]
1662    pub sources: Vec<SourceInfo>,
1663    #[serde(flatten)]
1664    paged_info: Option<PagedInfo>,
1665}
1666
1667#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1668pub struct PagedInfo {
1669    offset: usize,
1670    total: usize,
1671    limit: usize,
1672}
1673
1674#[derive(Deserialize)]
1675pub struct DeleteStatus {
1676    pub success: bool,
1677}
1678
1679#[cfg(feature = "server_2_11")]
1680#[derive(Deserialize)]
1681pub struct PauseResponse {
1682    pub paused: bool,
1683    #[serde(with = "rfc3339")]
1684    pub pause_until: OffsetDateTime,
1685    #[serde(default, with = "serde_nanos")]
1686    pub pause_remaining: Option<Duration>,
1687}
1688
1689#[cfg(feature = "server_2_11")]
1690#[derive(Serialize, Debug)]
1691struct PauseResumeConsumerRequest {
1692    #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1693    pause_until: Option<OffsetDateTime>,
1694}
1695
1696/// information about the given stream.
1697#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1698pub struct State {
1699    /// The number of messages contained in this stream
1700    pub messages: u64,
1701    /// The number of bytes of all messages contained in this stream
1702    pub bytes: u64,
1703    /// The lowest sequence number still present in this stream
1704    #[serde(rename = "first_seq")]
1705    pub first_sequence: u64,
1706    /// The time associated with the oldest message still present in this stream
1707    #[serde(with = "rfc3339", rename = "first_ts")]
1708    pub first_timestamp: time::OffsetDateTime,
1709    /// The last sequence number assigned to a message in this stream
1710    #[serde(rename = "last_seq")]
1711    pub last_sequence: u64,
1712    /// The time that the last message was received by this stream
1713    #[serde(with = "rfc3339", rename = "last_ts")]
1714    pub last_timestamp: time::OffsetDateTime,
1715    /// The number of consumers configured to consume this stream
1716    pub consumer_count: usize,
1717    /// The number of subjects in the stream
1718    #[serde(default, rename = "num_subjects")]
1719    pub subjects_count: u64,
1720    /// The number of deleted messages in the stream
1721    #[serde(default, rename = "num_deleted")]
1722    pub deleted_count: Option<u64>,
1723    /// The list of deleted subjects from the Stream.
1724    /// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
1725    #[serde(default)]
1726    pub deleted: Option<Vec<u64>>,
1727
1728    pub(crate) subjects: Option<HashMap<String, usize>>,
1729}
1730
1731/// A raw stream message in the representation it is stored.
1732#[derive(Debug, Serialize, Deserialize, Clone)]
1733pub struct RawMessage {
1734    /// Subject of the message.
1735    #[serde(rename = "subject")]
1736    pub subject: String,
1737
1738    /// Sequence of the message.
1739    #[serde(rename = "seq")]
1740    pub sequence: u64,
1741
1742    /// Raw payload of the message as a base64 encoded string.
1743    #[serde(default, rename = "data")]
1744    pub payload: String,
1745
1746    /// Raw header string, if any.
1747    #[serde(default, rename = "hdrs")]
1748    pub headers: Option<String>,
1749
1750    /// The time the message was published.
1751    #[serde(rename = "time", with = "rfc3339")]
1752    pub time: time::OffsetDateTime,
1753}
1754
1755impl TryFrom<RawMessage> for StreamMessage {
1756    type Error = crate::Error;
1757
1758    fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1759        let decoded_payload = STANDARD
1760            .decode(value.payload)
1761            .map_err(|err| Box::new(std::io::Error::other(err)))?;
1762        let decoded_headers = value
1763            .headers
1764            .map(|header| STANDARD.decode(header))
1765            .map_or(Ok(None), |v| v.map(Some))?;
1766
1767        let (headers, _, _) = decoded_headers
1768            .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1769
1770        Ok(StreamMessage {
1771            subject: value.subject.into(),
1772            payload: decoded_payload.into(),
1773            headers,
1774            sequence: value.sequence,
1775            time: value.time,
1776        })
1777    }
1778}
1779
1780fn is_continuation(c: char) -> bool {
1781    c == ' ' || c == '\t'
1782}
1783const HEADER_LINE: &str = "NATS/1.0";
1784
1785#[allow(clippy::type_complexity)]
1786fn parse_headers(
1787    buf: &[u8],
1788) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1789    let mut headers = HeaderMap::new();
1790    let mut maybe_status: Option<StatusCode> = None;
1791    let mut maybe_description: Option<String> = None;
1792    let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1793        line.lines().peekable()
1794    } else {
1795        return Err(Box::new(std::io::Error::other("invalid header")));
1796    };
1797
1798    if let Some(line) = lines.next() {
1799        let line = line
1800            .strip_prefix(HEADER_LINE)
1801            .ok_or_else(|| {
1802                Box::new(std::io::Error::other(
1803                    "version line does not start with NATS/1.0",
1804                ))
1805            })?
1806            .trim();
1807
1808        match line.split_once(' ') {
1809            Some((status, description)) => {
1810                if !status.is_empty() {
1811                    maybe_status = Some(status.parse()?);
1812                }
1813
1814                if !description.is_empty() {
1815                    maybe_description = Some(description.trim().to_string());
1816                }
1817            }
1818            None => {
1819                if !line.is_empty() {
1820                    maybe_status = Some(line.parse()?);
1821                }
1822            }
1823        }
1824    } else {
1825        return Err(Box::new(std::io::Error::other(
1826            "expected header information not found",
1827        )));
1828    };
1829
1830    while let Some(line) = lines.next() {
1831        if line.is_empty() {
1832            continue;
1833        }
1834
1835        if let Some((k, v)) = line.split_once(':').to_owned() {
1836            let mut s = String::from(v.trim());
1837            while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1838                s.push(' ');
1839                s.push_str(v.trim());
1840            }
1841
1842            headers.insert(
1843                HeaderName::from_str(k)?,
1844                HeaderValue::from_str(&s).map_err(|err| Box::new(io::Error::other(err)))?,
1845            );
1846        } else {
1847            return Err(Box::new(std::io::Error::other("malformed header line")));
1848        }
1849    }
1850
1851    if headers.is_empty() {
1852        Ok((HeaderMap::new(), maybe_status, maybe_description))
1853    } else {
1854        Ok((headers, maybe_status, maybe_description))
1855    }
1856}
1857
1858#[derive(Debug, Serialize, Deserialize, Clone)]
1859struct GetRawMessage {
1860    pub(crate) message: RawMessage,
1861}
1862
1863fn is_default<T: Default + Eq>(t: &T) -> bool {
1864    t == &T::default()
1865}
1866/// Information about the stream's, consumer's associated `JetStream` cluster
1867#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1868pub struct ClusterInfo {
1869    /// The cluster name.
1870    pub name: Option<String>,
1871    /// The server name of the RAFT leader.
1872    pub leader: Option<String>,
1873    /// The members of the RAFT cluster.
1874    #[serde(default)]
1875    pub replicas: Vec<PeerInfo>,
1876}
1877
1878/// The members of the RAFT cluster
1879#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1880pub struct PeerInfo {
1881    /// The server name of the peer.
1882    pub name: String,
1883    /// Indicates if the server is up to date and synchronized.
1884    pub current: bool,
1885    /// Nanoseconds since this peer was last seen.
1886    #[serde(with = "serde_nanos")]
1887    pub active: Duration,
1888    /// Indicates the node is considered offline by the group.
1889    #[serde(default)]
1890    pub offline: bool,
1891    /// How many uncommitted operations this peer is behind the leader.
1892    pub lag: Option<u64>,
1893}
1894
1895#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1896pub struct SourceInfo {
1897    /// Source name.
1898    pub name: String,
1899    /// Number of messages this source is lagging behind.
1900    pub lag: u64,
1901    /// Last time the source was seen active.
1902    #[serde(deserialize_with = "negative_duration_as_none")]
1903    pub active: Option<std::time::Duration>,
1904    /// Filtering for the source.
1905    #[serde(default)]
1906    pub filter_subject: Option<String>,
1907    /// Source destination subject.
1908    #[serde(default)]
1909    pub subject_transform_dest: Option<String>,
1910    /// List of transforms.
1911    #[serde(default)]
1912    pub subject_transforms: Vec<SubjectTransform>,
1913}
1914
1915fn negative_duration_as_none<'de, D>(
1916    deserializer: D,
1917) -> Result<Option<std::time::Duration>, D::Error>
1918where
1919    D: Deserializer<'de>,
1920{
1921    let n = i64::deserialize(deserializer)?;
1922    if n.is_negative() {
1923        Ok(None)
1924    } else {
1925        Ok(Some(std::time::Duration::from_nanos(n as u64)))
1926    }
1927}
1928
1929/// The response generated by trying to purge a stream.
1930#[derive(Debug, Deserialize, Clone, Copy)]
1931pub struct PurgeResponse {
1932    /// Whether the purge request was successful.
1933    pub success: bool,
1934    /// The number of purged messages in a stream.
1935    pub purged: u64,
1936}
1937/// The payload used to generate a purge request.
1938#[derive(Default, Debug, Serialize, Clone)]
1939pub struct PurgeRequest {
1940    /// Purge up to but not including sequence.
1941    #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1942    pub sequence: Option<u64>,
1943
1944    /// Subject to match against messages for the purge command.
1945    #[serde(default, skip_serializing_if = "is_default")]
1946    pub filter: Option<String>,
1947
1948    /// Number of messages to keep.
1949    #[serde(default, skip_serializing_if = "is_default")]
1950    pub keep: Option<u64>,
1951}
1952
1953#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1954pub struct Source {
1955    /// Name of the stream source.
1956    pub name: String,
1957    /// Optional source start sequence.
1958    #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1959    pub start_sequence: Option<u64>,
1960    #[serde(
1961        default,
1962        rename = "opt_start_time",
1963        skip_serializing_if = "is_default",
1964        with = "rfc3339::option"
1965    )]
1966    /// Optional source start time.
1967    pub start_time: Option<OffsetDateTime>,
1968    /// Optional additional filter subject.
1969    #[serde(default, skip_serializing_if = "is_default")]
1970    pub filter_subject: Option<String>,
1971    /// Optional config for sourcing streams from another prefix, used for cross-account.
1972    #[serde(default, skip_serializing_if = "Option::is_none")]
1973    pub external: Option<External>,
1974    /// Optional config to set a domain, if source is residing in different one.
1975    #[serde(default, skip_serializing_if = "is_default")]
1976    pub domain: Option<String>,
1977    /// Subject transforms for Stream.
1978    #[cfg(feature = "server_2_10")]
1979    #[serde(default, skip_serializing_if = "is_default")]
1980    pub subject_transforms: Vec<SubjectTransform>,
1981}
1982
1983#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1984pub struct External {
1985    /// Api prefix of external source.
1986    #[serde(rename = "api")]
1987    pub api_prefix: String,
1988    /// Optional configuration of delivery prefix.
1989    #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1990    pub delivery_prefix: Option<String>,
1991}
1992
1993use std::marker::PhantomData;
1994
1995#[derive(Debug, Default)]
1996pub struct Yes;
1997#[derive(Debug, Default)]
1998pub struct No;
1999
2000pub trait ToAssign: Debug {}
2001
2002impl ToAssign for Yes {}
2003impl ToAssign for No {}
2004
2005#[derive(Debug)]
2006pub struct Purge<SEQUENCE, KEEP>
2007where
2008    SEQUENCE: ToAssign,
2009    KEEP: ToAssign,
2010{
2011    inner: PurgeRequest,
2012    sequence_set: PhantomData<SEQUENCE>,
2013    keep_set: PhantomData<KEEP>,
2014    context: Context,
2015    stream_name: String,
2016}
2017
2018impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
2019where
2020    SEQUENCE: ToAssign,
2021    KEEP: ToAssign,
2022{
2023    /// Adds subject filter to [PurgeRequest]
2024    pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
2025        self.inner.filter = Some(filter.into());
2026        self
2027    }
2028}
2029
2030impl Purge<No, No> {
2031    pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
2032        Purge {
2033            context: stream.context.clone(),
2034            stream_name: stream.name.clone(),
2035            inner: Default::default(),
2036            sequence_set: PhantomData {},
2037            keep_set: PhantomData {},
2038        }
2039    }
2040}
2041
2042impl<KEEP> Purge<No, KEEP>
2043where
2044    KEEP: ToAssign,
2045{
2046    /// Creates a new [PurgeRequest].
2047    /// `keep` and `sequence` are exclusive, enforced compile time by generics.
2048    pub fn keep(self, keep: u64) -> Purge<No, Yes> {
2049        Purge {
2050            context: self.context.clone(),
2051            stream_name: self.stream_name.clone(),
2052            sequence_set: PhantomData {},
2053            keep_set: PhantomData {},
2054            inner: PurgeRequest {
2055                keep: Some(keep),
2056                ..self.inner
2057            },
2058        }
2059    }
2060}
2061impl<SEQUENCE> Purge<SEQUENCE, No>
2062where
2063    SEQUENCE: ToAssign,
2064{
2065    /// Creates a new [PurgeRequest].
2066    /// `keep` and `sequence` are exclusive, enforces compile time by generics.
2067    pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2068        Purge {
2069            context: self.context.clone(),
2070            stream_name: self.stream_name.clone(),
2071            sequence_set: PhantomData {},
2072            keep_set: PhantomData {},
2073            inner: PurgeRequest {
2074                sequence: Some(sequence),
2075                ..self.inner
2076            },
2077        }
2078    }
2079}
2080
2081#[derive(Clone, Debug, PartialEq)]
2082pub enum PurgeErrorKind {
2083    Request,
2084    TimedOut,
2085    JetStream(super::errors::Error),
2086}
2087
2088impl Display for PurgeErrorKind {
2089    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2090        match self {
2091            Self::Request => write!(f, "request failed"),
2092            Self::TimedOut => write!(f, "timed out"),
2093            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2094        }
2095    }
2096}
2097
2098pub type PurgeError = Error<PurgeErrorKind>;
2099
2100impl<S, K> IntoFuture for Purge<S, K>
2101where
2102    S: ToAssign + std::marker::Send,
2103    K: ToAssign + std::marker::Send,
2104{
2105    type Output = Result<PurgeResponse, PurgeError>;
2106
2107    type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2108
2109    fn into_future(self) -> Self::IntoFuture {
2110        Box::pin(std::future::IntoFuture::into_future(async move {
2111            let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2112            let response: Response<PurgeResponse> = self
2113                .context
2114                .request(request_subject, &self.inner)
2115                .map_err(|err| match err.kind() {
2116                    RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2117                    _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2118                })
2119                .await?;
2120
2121            match response {
2122                Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2123                Response::Ok(response) => Ok(response),
2124            }
2125        }))
2126    }
2127}
2128
2129#[derive(Deserialize, Debug)]
2130struct ConsumerPage {
2131    total: usize,
2132    consumers: Option<Vec<String>>,
2133}
2134
2135#[derive(Deserialize, Debug)]
2136struct ConsumerInfoPage {
2137    total: usize,
2138    consumers: Option<Vec<super::consumer::Info>>,
2139}
2140
2141type ConsumerNamesErrorKind = StreamsErrorKind;
2142type ConsumerNamesError = StreamsError;
2143type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2144
2145pub struct ConsumerNames {
2146    context: Context,
2147    stream: String,
2148    offset: usize,
2149    page_request: Option<PageRequest>,
2150    consumers: Vec<String>,
2151    done: bool,
2152}
2153
2154impl futures::Stream for ConsumerNames {
2155    type Item = Result<String, ConsumerNamesError>;
2156
2157    fn poll_next(
2158        mut self: Pin<&mut Self>,
2159        cx: &mut std::task::Context<'_>,
2160    ) -> std::task::Poll<Option<Self::Item>> {
2161        match self.page_request.as_mut() {
2162            Some(page) => match page.try_poll_unpin(cx) {
2163                std::task::Poll::Ready(page) => {
2164                    self.page_request = None;
2165                    let page = page.map_err(|err| {
2166                        ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2167                    })?;
2168
2169                    if let Some(consumers) = page.consumers {
2170                        self.offset += consumers.len();
2171                        self.consumers = consumers;
2172                        if self.offset >= page.total {
2173                            self.done = true;
2174                        }
2175                        match self.consumers.pop() {
2176                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2177                            None => Poll::Ready(None),
2178                        }
2179                    } else {
2180                        Poll::Ready(None)
2181                    }
2182                }
2183                std::task::Poll::Pending => std::task::Poll::Pending,
2184            },
2185            None => {
2186                if let Some(stream) = self.consumers.pop() {
2187                    Poll::Ready(Some(Ok(stream)))
2188                } else {
2189                    if self.done {
2190                        return Poll::Ready(None);
2191                    }
2192                    let context = self.context.clone();
2193                    let offset = self.offset;
2194                    let stream = self.stream.clone();
2195                    self.page_request = Some(Box::pin(async move {
2196                        match context
2197                            .request(
2198                                format!("CONSUMER.NAMES.{stream}"),
2199                                &json!({
2200                                    "offset": offset,
2201                                }),
2202                            )
2203                            .await?
2204                        {
2205                            Response::Err { error } => Err(RequestError::with_source(
2206                                super::context::RequestErrorKind::Other,
2207                                error,
2208                            )),
2209                            Response::Ok(page) => Ok(page),
2210                        }
2211                    }));
2212                    self.poll_next(cx)
2213                }
2214            }
2215        }
2216    }
2217}
2218
2219pub type ConsumersErrorKind = StreamsErrorKind;
2220pub type ConsumersError = StreamsError;
2221type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2222
2223pub struct Consumers {
2224    context: Context,
2225    stream: String,
2226    offset: usize,
2227    page_request: Option<PageInfoRequest>,
2228    consumers: Vec<super::consumer::Info>,
2229    done: bool,
2230}
2231
2232impl futures::Stream for Consumers {
2233    type Item = Result<super::consumer::Info, ConsumersError>;
2234
2235    fn poll_next(
2236        mut self: Pin<&mut Self>,
2237        cx: &mut std::task::Context<'_>,
2238    ) -> std::task::Poll<Option<Self::Item>> {
2239        match self.page_request.as_mut() {
2240            Some(page) => match page.try_poll_unpin(cx) {
2241                std::task::Poll::Ready(page) => {
2242                    self.page_request = None;
2243                    let page = page.map_err(|err| {
2244                        ConsumersError::with_source(ConsumersErrorKind::Other, err)
2245                    })?;
2246                    if let Some(consumers) = page.consumers {
2247                        self.offset += consumers.len();
2248                        self.consumers = consumers;
2249                        if self.offset >= page.total {
2250                            self.done = true;
2251                        }
2252                        match self.consumers.pop() {
2253                            Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2254                            None => Poll::Ready(None),
2255                        }
2256                    } else {
2257                        Poll::Ready(None)
2258                    }
2259                }
2260                std::task::Poll::Pending => std::task::Poll::Pending,
2261            },
2262            None => {
2263                if let Some(stream) = self.consumers.pop() {
2264                    Poll::Ready(Some(Ok(stream)))
2265                } else {
2266                    if self.done {
2267                        return Poll::Ready(None);
2268                    }
2269                    let context = self.context.clone();
2270                    let offset = self.offset;
2271                    let stream = self.stream.clone();
2272                    self.page_request = Some(Box::pin(async move {
2273                        match context
2274                            .request(
2275                                format!("CONSUMER.LIST.{stream}"),
2276                                &json!({
2277                                    "offset": offset,
2278                                }),
2279                            )
2280                            .await?
2281                        {
2282                            Response::Err { error } => Err(RequestError::with_source(
2283                                super::context::RequestErrorKind::Other,
2284                                error,
2285                            )),
2286                            Response::Ok(page) => Ok(page),
2287                        }
2288                    }));
2289                    self.poll_next(cx)
2290                }
2291            }
2292        }
2293    }
2294}
2295
2296#[derive(Clone, Debug, PartialEq)]
2297pub enum LastRawMessageErrorKind {
2298    NoMessageFound,
2299    InvalidSubject,
2300    JetStream(super::errors::Error),
2301    Other,
2302}
2303
2304impl Display for LastRawMessageErrorKind {
2305    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2306        match self {
2307            Self::NoMessageFound => write!(f, "no message found"),
2308            Self::InvalidSubject => write!(f, "invalid subject"),
2309            Self::Other => write!(f, "failed to get last raw message"),
2310            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2311        }
2312    }
2313}
2314
2315pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2316pub type RawMessageErrorKind = LastRawMessageErrorKind;
2317pub type RawMessageError = LastRawMessageError;
2318
2319#[derive(Clone, Debug, PartialEq)]
2320pub enum ConsumerErrorKind {
2321    //TODO: get last should have timeout, which should be mapped here.
2322    TimedOut,
2323    Request,
2324    InvalidConsumerType,
2325    InvalidName,
2326    JetStream(super::errors::Error),
2327    Other,
2328}
2329
2330impl Display for ConsumerErrorKind {
2331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2332        match self {
2333            Self::TimedOut => write!(f, "timed out"),
2334            Self::Request => write!(f, "request failed"),
2335            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2336            Self::Other => write!(f, "consumer error"),
2337            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2338            Self::InvalidName => write!(f, "invalid consumer name"),
2339        }
2340    }
2341}
2342
2343pub type ConsumerError = Error<ConsumerErrorKind>;
2344
2345#[derive(Clone, Debug, PartialEq)]
2346pub enum ConsumerCreateStrictErrorKind {
2347    //TODO: get last should have timeout, which should be mapped here.
2348    TimedOut,
2349    Request,
2350    InvalidConsumerType,
2351    InvalidName,
2352    AlreadyExists,
2353    JetStream(super::errors::Error),
2354    Other,
2355}
2356
2357impl Display for ConsumerCreateStrictErrorKind {
2358    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2359        match self {
2360            Self::TimedOut => write!(f, "timed out"),
2361            Self::Request => write!(f, "request failed"),
2362            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2363            Self::Other => write!(f, "consumer error"),
2364            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2365            Self::InvalidName => write!(f, "invalid consumer name"),
2366            Self::AlreadyExists => write!(f, "consumer already exists"),
2367        }
2368    }
2369}
2370
2371pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2372
2373#[derive(Clone, Debug, PartialEq)]
2374pub enum ConsumerUpdateErrorKind {
2375    //TODO: get last should have timeout, which should be mapped here.
2376    TimedOut,
2377    Request,
2378    InvalidConsumerType,
2379    InvalidName,
2380    DoesNotExist,
2381    JetStream(super::errors::Error),
2382    Other,
2383}
2384
2385impl Display for ConsumerUpdateErrorKind {
2386    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2387        match self {
2388            Self::TimedOut => write!(f, "timed out"),
2389            Self::Request => write!(f, "request failed"),
2390            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2391            Self::Other => write!(f, "consumer error"),
2392            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2393            Self::InvalidName => write!(f, "invalid consumer name"),
2394            Self::DoesNotExist => write!(f, "consumer does not exist"),
2395        }
2396    }
2397}
2398
2399pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2400
2401impl From<super::errors::Error> for ConsumerError {
2402    fn from(err: super::errors::Error) -> Self {
2403        ConsumerError::new(ConsumerErrorKind::JetStream(err))
2404    }
2405}
2406impl From<super::errors::Error> for ConsumerCreateStrictError {
2407    fn from(err: super::errors::Error) -> Self {
2408        if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2409            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2410        } else {
2411            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2412        }
2413    }
2414}
2415impl From<super::errors::Error> for ConsumerUpdateError {
2416    fn from(err: super::errors::Error) -> Self {
2417        if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2418            ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2419        } else {
2420            ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2421        }
2422    }
2423}
2424impl From<ConsumerError> for ConsumerUpdateError {
2425    fn from(err: ConsumerError) -> Self {
2426        match err.kind() {
2427            ConsumerErrorKind::JetStream(err) => {
2428                if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2429                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2430                } else {
2431                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2432                }
2433            }
2434            ConsumerErrorKind::Request => {
2435                ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2436            }
2437            ConsumerErrorKind::TimedOut => {
2438                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2439            }
2440            ConsumerErrorKind::InvalidConsumerType => {
2441                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2442            }
2443            ConsumerErrorKind::InvalidName => {
2444                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2445            }
2446            ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2447        }
2448    }
2449}
2450
2451impl From<ConsumerError> for ConsumerCreateStrictError {
2452    fn from(err: ConsumerError) -> Self {
2453        match err.kind() {
2454            ConsumerErrorKind::JetStream(err) => {
2455                if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2456                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2457                } else {
2458                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2459                }
2460            }
2461            ConsumerErrorKind::Request => {
2462                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2463            }
2464            ConsumerErrorKind::TimedOut => {
2465                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2466            }
2467            ConsumerErrorKind::InvalidConsumerType => {
2468                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2469            }
2470            ConsumerErrorKind::InvalidName => {
2471                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2472            }
2473            ConsumerErrorKind::Other => {
2474                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2475            }
2476        }
2477    }
2478}
2479
2480impl From<super::context::RequestError> for ConsumerError {
2481    fn from(err: super::context::RequestError) -> Self {
2482        match err.kind() {
2483            RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2484            _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2485        }
2486    }
2487}
2488impl From<super::context::RequestError> for ConsumerUpdateError {
2489    fn from(err: super::context::RequestError) -> Self {
2490        match err.kind() {
2491            RequestErrorKind::TimedOut => {
2492                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2493            }
2494            _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2495        }
2496    }
2497}
2498impl From<super::context::RequestError> for ConsumerCreateStrictError {
2499    fn from(err: super::context::RequestError) -> Self {
2500        match err.kind() {
2501            RequestErrorKind::TimedOut => {
2502                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2503            }
2504            _ => {
2505                ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2506            }
2507        }
2508    }
2509}
2510
2511#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
2512pub struct StreamGetMessage {
2513    #[serde(rename = "seq", skip_serializing_if = "is_default")]
2514    sequence: Option<u64>,
2515    #[serde(rename = "next_by_subj", skip_serializing_if = "is_default")]
2516    next_by_subject: Option<String>,
2517    #[serde(rename = "last_by_subj", skip_serializing_if = "is_default")]
2518    last_by_subject: Option<String>,
2519}
2520
2521#[cfg(test)]
2522mod tests {
2523    use super::*;
2524
2525    #[test]
2526    fn consumer_limits_de() {
2527        let config = Config {
2528            ..Default::default()
2529        };
2530
2531        let roundtrip: Config = {
2532            let ser = serde_json::to_string(&config).unwrap();
2533            serde_json::from_str(&ser).unwrap()
2534        };
2535        assert_eq!(config, roundtrip);
2536    }
2537}