async_nats_flyradar/jetstream/
stream.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on a [Stream], create/delete/update [Consumer].
15
16use std::{
17    collections::{self, HashMap},
18    fmt::{self, Debug, Display},
19    future::IntoFuture,
20    io::{self, ErrorKind},
21    pin::Pin,
22    str::FromStr,
23    task::Poll,
24    time::Duration,
25};
26
27use crate::{
28    error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39    consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40    context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind},
41    errors::ErrorCode,
42    message::{StreamMessage, StreamMessageError},
43    response::Response,
44    Context, Message,
45};
46
47pub type InfoError = RequestError;
48
49#[derive(Clone, Debug, PartialEq)]
50pub enum DirectGetErrorKind {
51    NotFound,
52    InvalidSubject,
53    TimedOut,
54    Request,
55    ErrorResponse(StatusCode, String),
56    Other,
57}
58
59impl Display for DirectGetErrorKind {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        match self {
62            Self::InvalidSubject => write!(f, "invalid subject"),
63            Self::NotFound => write!(f, "message not found"),
64            Self::ErrorResponse(status, description) => {
65                write!(f, "unable to get message: {} {}", status, description)
66            }
67            Self::Other => write!(f, "error getting message"),
68            Self::TimedOut => write!(f, "timed out"),
69            Self::Request => write!(f, "request failed"),
70        }
71    }
72}
73
74pub type DirectGetError = Error<DirectGetErrorKind>;
75
76impl From<crate::RequestError> for DirectGetError {
77    fn from(err: crate::RequestError) -> Self {
78        match err.kind() {
79            crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
80            crate::RequestErrorKind::NoResponders => {
81                DirectGetError::new(DirectGetErrorKind::ErrorResponse(
82                    StatusCode::NO_RESPONDERS,
83                    "no responders".to_string(),
84                ))
85            }
86            crate::RequestErrorKind::Other => {
87                DirectGetError::with_source(DirectGetErrorKind::Other, err)
88            }
89        }
90    }
91}
92
93impl From<serde_json::Error> for DirectGetError {
94    fn from(err: serde_json::Error) -> Self {
95        DirectGetError::with_source(DirectGetErrorKind::Other, err)
96    }
97}
98
99impl From<StreamMessageError> for DirectGetError {
100    fn from(err: StreamMessageError) -> Self {
101        DirectGetError::with_source(DirectGetErrorKind::Other, err)
102    }
103}
104
105#[derive(Clone, Debug, PartialEq)]
106pub enum DeleteMessageErrorKind {
107    Request,
108    TimedOut,
109    JetStream(super::errors::Error),
110}
111
112impl Display for DeleteMessageErrorKind {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        match self {
115            Self::Request => write!(f, "request failed"),
116            Self::TimedOut => write!(f, "timed out"),
117            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
118        }
119    }
120}
121
122pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
123
124/// Handle to operations that can be performed on a `Stream`.
125/// It's generic over the type of `info` field to allow `Stream` with or without
126/// info contents.
127#[derive(Debug, Clone)]
128pub struct Stream<T = Info> {
129    pub(crate) info: T,
130    pub(crate) context: Context,
131    pub(crate) name: String,
132}
133
134impl Stream<Info> {
135    /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
136    /// [Stream] and returns it.
137    ///
138    /// # Examples
139    ///
140    /// ```no_run
141    /// # #[tokio::main]
142    /// # async fn main() -> Result<(), async_nats::Error> {
143    /// let client = async_nats::connect("localhost:4222").await?;
144    /// let jetstream = async_nats::jetstream::new(client);
145    ///
146    /// let mut stream = jetstream.get_stream("events").await?;
147    ///
148    /// let info = stream.info().await?;
149    /// # Ok(())
150    /// # }
151    /// ```
152    pub async fn info(&mut self) -> Result<&Info, InfoError> {
153        let subject = format!("STREAM.INFO.{}", self.info.config.name);
154
155        match self.context.request(subject, &json!({})).await? {
156            Response::Ok::<Info>(info) => {
157                self.info = info;
158                Ok(&self.info)
159            }
160            Response::Err { error } => Err(error.into()),
161        }
162    }
163
164    /// Returns cached [Info] for the [Stream].
165    /// Cache is either from initial creation/retrieval of the [Stream] or last call to
166    /// [Stream::info].
167    ///
168    /// # Examples
169    ///
170    /// ```no_run
171    /// # #[tokio::main]
172    /// # async fn main() -> Result<(), async_nats::Error> {
173    /// let client = async_nats::connect("localhost:4222").await?;
174    /// let jetstream = async_nats::jetstream::new(client);
175    ///
176    /// let stream = jetstream.get_stream("events").await?;
177    ///
178    /// let info = stream.cached_info();
179    /// # Ok(())
180    /// # }
181    /// ```
182    pub fn cached_info(&self) -> &Info {
183        &self.info
184    }
185}
186
187impl<I> Stream<I> {
188    /// Retrieves `info` about [Stream] from the server. Does not update the cache.
189    /// Can be used on Stream retrieved by [Context::get_stream_no_info]
190    pub async fn get_info(&self) -> Result<Info, InfoError> {
191        let subject = format!("STREAM.INFO.{}", self.name);
192
193        match self.context.request(subject, &json!({})).await? {
194            Response::Ok::<Info>(info) => Ok(info),
195            Response::Err { error } => Err(error.into()),
196        }
197    }
198
199    /// Retrieves [[Info]] from the server and returns a [[futures::Stream]] that allows
200    /// iterating over all subjects in the stream fetched via paged API.
201    ///
202    /// # Examples
203    ///
204    /// ```no_run
205    /// # #[tokio::main]
206    /// # async fn main() -> Result<(), async_nats::Error> {
207    /// use futures::TryStreamExt;
208    /// let client = async_nats::connect("localhost:4222").await?;
209    /// let jetstream = async_nats::jetstream::new(client);
210    ///
211    /// let mut stream = jetstream.get_stream("events").await?;
212    ///
213    /// let mut info = stream.info_with_subjects("events.>").await?;
214    ///
215    /// while let Some((subject, count)) = info.try_next().await? {
216    ///     println!("Subject: {} count: {}", subject, count);
217    /// }
218    /// # Ok(())
219    /// # }
220    /// ```
221    pub async fn info_with_subjects<F: AsRef<str>>(
222        &self,
223        subjects_filter: F,
224    ) -> Result<InfoWithSubjects, InfoError> {
225        let subjects_filter = subjects_filter.as_ref().to_string();
226        // TODO: validate the subject and decide if this should be a `Subject`
227        let info = stream_info_with_details(
228            self.context.clone(),
229            self.name.clone(),
230            0,
231            false,
232            subjects_filter.clone(),
233        )
234        .await?;
235
236        Ok(InfoWithSubjects::new(
237            self.context.clone(),
238            info,
239            subjects_filter,
240        ))
241    }
242
243    /// Creates a builder that allows to customize `Stream::Info`.
244    ///
245    /// # Examples
246    /// ```no_run
247    /// # #[tokio::main]
248    /// # async fn main() -> Result<(), async_nats::Error> {
249    /// use futures::TryStreamExt;
250    /// let client = async_nats::connect("localhost:4222").await?;
251    /// let jetstream = async_nats::jetstream::new(client);
252    ///
253    /// let mut stream = jetstream.get_stream("events").await?;
254    ///
255    /// let mut info = stream
256    ///     .info_builder()
257    ///     .with_deleted(true)
258    ///     .subjects("events.>")
259    ///     .fetch()
260    ///     .await?;
261    ///
262    /// while let Some((subject, count)) = info.try_next().await? {
263    ///     println!("Subject: {} count: {}", subject, count);
264    /// }
265    /// # Ok(())
266    /// # }
267    /// ```
268    pub fn info_builder(&self) -> StreamInfoBuilder {
269        StreamInfoBuilder::new(self.context.clone(), self.name.clone())
270    }
271
272    /// Gets next message for a [Stream].
273    ///
274    /// Requires a [Stream] with `allow_direct` set to `true`.
275    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
276    /// from any replica member. This means read after write is possible,
277    /// as that given replica might not yet catch up with the leader.
278    ///
279    /// # Examples
280    ///
281    /// ```no_run
282    /// # #[tokio::main]
283    /// # async fn main() -> Result<(), async_nats::Error> {
284    /// let client = async_nats::connect("demo.nats.io").await?;
285    /// let jetstream = async_nats::jetstream::new(client);
286    ///
287    /// let stream = jetstream
288    ///     .create_stream(async_nats::jetstream::stream::Config {
289    ///         name: "events".to_string(),
290    ///         subjects: vec!["events.>".to_string()],
291    ///         allow_direct: true,
292    ///         ..Default::default()
293    ///     })
294    ///     .await?;
295    ///
296    /// jetstream.publish("events.data", "data".into()).await?;
297    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
298    ///
299    /// let message = stream
300    ///     .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
301    ///     .await?;
302    ///
303    /// # Ok(())
304    /// # }
305    /// ```
306    pub async fn direct_get_next_for_subject<T: AsRef<str>>(
307        &self,
308        subject: T,
309        sequence: Option<u64>,
310    ) -> Result<Message, DirectGetError> {
311        if !is_valid_subject(&subject) {
312            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
313        }
314        let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
315        let payload;
316        if let Some(sequence) = sequence {
317            payload = json!({
318                "seq": sequence,
319                "next_by_subj": subject.as_ref(),
320            });
321        } else {
322            payload = json!({
323                 "next_by_subj": subject.as_ref(),
324            });
325        }
326
327        let response = self
328            .context
329            .client
330            .request(
331                request_subject,
332                serde_json::to_vec(&payload).map(Bytes::from)?,
333            )
334            .await
335            .map(|message| Message {
336                message,
337                context: self.context.clone(),
338            })?;
339
340        if let Some(status) = response.status {
341            if let Some(ref description) = response.description {
342                match status {
343                    StatusCode::NOT_FOUND => {
344                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
345                    }
346                    // 408 is used in Direct Message for bad/empty payload.
347                    StatusCode::TIMEOUT => {
348                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
349                    }
350                    _ => {
351                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
352                            status,
353                            description.to_string(),
354                        )));
355                    }
356                }
357            }
358        }
359        Ok(response)
360    }
361
362    /// Gets first message from [Stream].
363    ///
364    /// Requires a [Stream] with `allow_direct` set to `true`.
365    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
366    /// from any replica member. This means read after write is possible,
367    /// as that given replica might not yet catch up with the leader.
368    ///
369    /// # Examples
370    ///
371    /// ```no_run
372    /// # #[tokio::main]
373    /// # async fn main() -> Result<(), async_nats::Error> {
374    /// let client = async_nats::connect("demo.nats.io").await?;
375    /// let jetstream = async_nats::jetstream::new(client);
376    ///
377    /// let stream = jetstream
378    ///     .create_stream(async_nats::jetstream::stream::Config {
379    ///         name: "events".to_string(),
380    ///         subjects: vec!["events.>".to_string()],
381    ///         allow_direct: true,
382    ///         ..Default::default()
383    ///     })
384    ///     .await?;
385    ///
386    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
387    ///
388    /// let message = stream.direct_get_first_for_subject("events.data").await?;
389    ///
390    /// # Ok(())
391    /// # }
392    /// ```
393    pub async fn direct_get_first_for_subject<T: AsRef<str>>(
394        &self,
395        subject: T,
396    ) -> Result<Message, DirectGetError> {
397        if !is_valid_subject(&subject) {
398            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
399        }
400        let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
401        let payload = json!({
402            "next_by_subj": subject.as_ref(),
403        });
404
405        let response = self
406            .context
407            .client
408            .request(
409                request_subject,
410                serde_json::to_vec(&payload).map(Bytes::from)?,
411            )
412            .await
413            .map(|message| Message {
414                message,
415                context: self.context.clone(),
416            })?;
417        if let Some(status) = response.status {
418            if let Some(ref description) = response.description {
419                match status {
420                    StatusCode::NOT_FOUND => {
421                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
422                    }
423                    // 408 is used in Direct Message for bad/empty payload.
424                    StatusCode::TIMEOUT => {
425                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
426                    }
427                    _ => {
428                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
429                            status,
430                            description.to_string(),
431                        )));
432                    }
433                }
434            }
435        }
436        Ok(response)
437    }
438
439    /// Gets message from [Stream] with given `sequence id`.
440    ///
441    /// Requires a [Stream] with `allow_direct` set to `true`.
442    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
443    /// from any replica member. This means read after write is possible,
444    /// as that given replica might not yet catch up with the leader.
445    ///
446    /// # Examples
447    ///
448    /// ```no_run
449    /// # #[tokio::main]
450    /// # async fn main() -> Result<(), async_nats::Error> {
451    /// let client = async_nats::connect("demo.nats.io").await?;
452    /// let jetstream = async_nats::jetstream::new(client);
453    ///
454    /// let stream = jetstream
455    ///     .create_stream(async_nats::jetstream::stream::Config {
456    ///         name: "events".to_string(),
457    ///         subjects: vec!["events.>".to_string()],
458    ///         allow_direct: true,
459    ///         ..Default::default()
460    ///     })
461    ///     .await?;
462    ///
463    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
464    ///
465    /// let message = stream.direct_get(pub_ack.await?.sequence).await?;
466    ///
467    /// # Ok(())
468    /// # }
469    /// ```
470    pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
471        let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
472        let payload = json!({
473            "seq": sequence,
474        });
475
476        let response = self
477            .context
478            .client
479            .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
480            .await?;
481
482        if let Some(status) = response.status {
483            if let Some(ref description) = response.description {
484                match status {
485                    StatusCode::NOT_FOUND => {
486                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
487                    }
488                    // 408 is used in Direct Message for bad/empty payload.
489                    StatusCode::TIMEOUT => {
490                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
491                    }
492                    _ => {
493                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
494                            status,
495                            description.to_string(),
496                        )));
497                    }
498                }
499            }
500        }
501        StreamMessage::try_from(response).map_err(Into::into)
502    }
503
504    /// Gets last message for a given `subject`.
505    ///
506    /// Requires a [Stream] with `allow_direct` set to `true`.
507    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
508    /// from any replica member. This means read after write is possible,
509    /// as that given replica might not yet catch up with the leader.
510    ///
511    /// # Examples
512    ///
513    /// ```no_run
514    /// # #[tokio::main]
515    /// # async fn main() -> Result<(), async_nats::Error> {
516    /// let client = async_nats::connect("demo.nats.io").await?;
517    /// let jetstream = async_nats::jetstream::new(client);
518    ///
519    /// let stream = jetstream
520    ///     .create_stream(async_nats::jetstream::stream::Config {
521    ///         name: "events".to_string(),
522    ///         subjects: vec!["events.>".to_string()],
523    ///         allow_direct: true,
524    ///         ..Default::default()
525    ///     })
526    ///     .await?;
527    ///
528    /// jetstream.publish("events.data", "data".into()).await?;
529    ///
530    /// let message = stream.direct_get_last_for_subject("events.data").await?;
531    ///
532    /// # Ok(())
533    /// # }
534    /// ```
535    pub async fn direct_get_last_for_subject<T: AsRef<str>>(
536        &self,
537        subject: T,
538    ) -> Result<StreamMessage, DirectGetError> {
539        let subject = format!(
540            "{}.DIRECT.GET.{}.{}",
541            &self.context.prefix,
542            &self.name,
543            subject.as_ref()
544        );
545
546        let response = self.context.client.request(subject, "".into()).await?;
547        if let Some(status) = response.status {
548            if let Some(ref description) = response.description {
549                match status {
550                    StatusCode::NOT_FOUND => {
551                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
552                    }
553                    // 408 is used in Direct Message for bad/empty payload.
554                    StatusCode::TIMEOUT => {
555                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
556                    }
557                    _ => {
558                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
559                            status,
560                            description.to_string(),
561                        )));
562                    }
563                }
564            }
565        }
566        StreamMessage::try_from(response).map_err(Into::into)
567    }
568    /// Get a raw message from the stream for a given stream sequence.
569    /// This low-level API always reaches stream leader.
570    /// This should be discouraged in favor of using [Stream::direct_get].
571    ///
572    /// # Examples
573    ///
574    /// ```no_run
575    /// #[tokio::main]
576    /// # async fn main() -> Result<(), async_nats::Error> {
577    /// use futures::StreamExt;
578    /// use futures::TryStreamExt;
579    ///
580    /// let client = async_nats::connect("localhost:4222").await?;
581    /// let context = async_nats::jetstream::new(client);
582    ///
583    /// let stream = context
584    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
585    ///         name: "events".to_string(),
586    ///         max_messages: 10_000,
587    ///         ..Default::default()
588    ///     })
589    ///     .await?;
590    ///
591    /// let publish_ack = context.publish("events", "data".into()).await?;
592    /// let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
593    /// println!("Retrieved raw message {:?}", raw_message);
594    /// # Ok(())
595    /// # }
596    /// ```
597    pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
598        self.raw_message(StreamGetMessage {
599            sequence: Some(sequence),
600            last_by_subject: None,
601            next_by_subject: None,
602        })
603        .await
604    }
605
606    /// Get a fist message from the stream for a given subject starting from provided sequence.
607    /// This low-level API always reaches stream leader.
608    /// This should be discouraged in favor of using [Stream::direct_get_first_for_subject].
609    ///
610    /// # Examples
611    ///
612    /// ```no_run
613    /// #[tokio::main]
614    /// # async fn main() -> Result<(), async_nats::Error> {
615    /// use futures::StreamExt;
616    /// use futures::TryStreamExt;
617    ///
618    /// let client = async_nats::connect("localhost:4222").await?;
619    /// let context = async_nats::jetstream::new(client);
620    /// let stream = context.get_stream_no_info("events").await?;
621    ///
622    /// let raw_message = stream
623    ///     .get_first_raw_message_by_subject("events.created", 10)
624    ///     .await?;
625    /// println!("Retrieved raw message {:?}", raw_message);
626    /// # Ok(())
627    /// # }
628    /// ```
629    pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
630        &self,
631        subject: T,
632        sequence: u64,
633    ) -> Result<StreamMessage, RawMessageError> {
634        self.raw_message(StreamGetMessage {
635            sequence: Some(sequence),
636            last_by_subject: None,
637            next_by_subject: Some(subject.as_ref().to_string()),
638        })
639        .await
640    }
641
642    /// Get a next message from the stream for a given subject.
643    /// This low-level API always reaches stream leader.
644    /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
645    ///
646    /// # Examples
647    ///
648    /// ```no_run
649    /// #[tokio::main]
650    /// # async fn main() -> Result<(), async_nats::Error> {
651    /// use futures::StreamExt;
652    /// use futures::TryStreamExt;
653    ///
654    /// let client = async_nats::connect("localhost:4222").await?;
655    /// let context = async_nats::jetstream::new(client);
656    /// let stream = context.get_stream_no_info("events").await?;
657    ///
658    /// let raw_message = stream
659    ///     .get_next_raw_message_by_subject("events.created")
660    ///     .await?;
661    /// println!("Retrieved raw message {:?}", raw_message);
662    /// # Ok(())
663    /// # }
664    /// ```
665    pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
666        &self,
667        subject: T,
668    ) -> Result<StreamMessage, RawMessageError> {
669        self.raw_message(StreamGetMessage {
670            sequence: None,
671            last_by_subject: None,
672            next_by_subject: Some(subject.as_ref().to_string()),
673        })
674        .await
675    }
676
677    async fn raw_message(
678        &self,
679        request: StreamGetMessage,
680    ) -> Result<StreamMessage, RawMessageError> {
681        for subject in [&request.last_by_subject, &request.next_by_subject]
682            .into_iter()
683            .flatten()
684        {
685            if !is_valid_subject(subject) {
686                return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
687            }
688        }
689        let subject = format!("STREAM.MSG.GET.{}", &self.name);
690
691        let response: Response<GetRawMessage> = self
692            .context
693            .request(subject, &request)
694            .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
695            .await?;
696
697        match response {
698            Response::Err { error } => {
699                if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
700                    Err(LastRawMessageError::new(
701                        LastRawMessageErrorKind::NoMessageFound,
702                    ))
703                } else {
704                    Err(LastRawMessageError::new(
705                        LastRawMessageErrorKind::JetStream(error),
706                    ))
707                }
708            }
709            Response::Ok(value) => StreamMessage::try_from(value.message)
710                .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err)),
711        }
712    }
713
714    /// Get a last message from the stream for a given subject.
715    /// This low-level API always reaches stream leader.
716    /// This should be discouraged in favor of using [Stream::direct_get_last_for_subject].
717    ///
718    /// # Examples
719    ///
720    /// ```no_run
721    /// #[tokio::main]
722    /// # async fn main() -> Result<(), async_nats::Error> {
723    /// use futures::StreamExt;
724    /// use futures::TryStreamExt;
725    ///
726    /// let client = async_nats::connect("localhost:4222").await?;
727    /// let context = async_nats::jetstream::new(client);
728    /// let stream = context.get_stream_no_info("events").await?;
729    ///
730    /// let raw_message = stream
731    ///     .get_last_raw_message_by_subject("events.created")
732    ///     .await?;
733    /// println!("Retrieved raw message {:?}", raw_message);
734    /// # Ok(())
735    /// # }
736    /// ```
737    pub async fn get_last_raw_message_by_subject(
738        &self,
739        stream_subject: &str,
740    ) -> Result<StreamMessage, LastRawMessageError> {
741        self.raw_message(StreamGetMessage {
742            sequence: None,
743            last_by_subject: Some(stream_subject.to_string()),
744            next_by_subject: None,
745        })
746        .await
747    }
748
749    /// Delete a message from the stream.
750    ///
751    /// # Examples
752    ///
753    /// ```no_run
754    /// # #[tokio::main]
755    /// # async fn main() -> Result<(), async_nats::Error> {
756    /// let client = async_nats::connect("localhost:4222").await?;
757    /// let context = async_nats::jetstream::new(client);
758    ///
759    /// let stream = context
760    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
761    ///         name: "events".to_string(),
762    ///         max_messages: 10_000,
763    ///         ..Default::default()
764    ///     })
765    ///     .await?;
766    ///
767    /// let publish_ack = context.publish("events", "data".into()).await?;
768    /// stream.delete_message(publish_ack.await?.sequence).await?;
769    /// # Ok(())
770    /// # }
771    /// ```
772    pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
773        let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
774        let payload = json!({
775            "seq": sequence,
776        });
777
778        let response: Response<DeleteStatus> = self
779            .context
780            .request(subject, &payload)
781            .map_err(|err| match err.kind() {
782                RequestErrorKind::TimedOut => {
783                    DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
784                }
785                _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
786            })
787            .await?;
788
789        match response {
790            Response::Err { error } => Err(DeleteMessageError::new(
791                DeleteMessageErrorKind::JetStream(error),
792            )),
793            Response::Ok(value) => Ok(value.success),
794        }
795    }
796
797    /// Purge `Stream` messages.
798    ///
799    /// # Examples
800    ///
801    /// ```no_run
802    /// # #[tokio::main]
803    /// # async fn main() -> Result<(), async_nats::Error> {
804    /// let client = async_nats::connect("demo.nats.io").await?;
805    /// let jetstream = async_nats::jetstream::new(client);
806    ///
807    /// let stream = jetstream.get_stream("events").await?;
808    /// stream.purge().await?;
809    /// # Ok(())
810    /// # }
811    /// ```
812    pub fn purge(&self) -> Purge<No, No> {
813        Purge::build(self)
814    }
815
816    /// Purge `Stream` messages for a matching subject.
817    ///
818    /// # Examples
819    ///
820    /// ```no_run
821    /// # #[tokio::main]
822    /// # #[allow(deprecated)]
823    /// # async fn main() -> Result<(), async_nats::Error> {
824    /// let client = async_nats::connect("demo.nats.io").await?;
825    /// let jetstream = async_nats::jetstream::new(client);
826    ///
827    /// let stream = jetstream.get_stream("events").await?;
828    /// stream.purge_subject("data").await?;
829    /// # Ok(())
830    /// # }
831    /// ```
832    #[deprecated(
833        since = "0.25.0",
834        note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
835    )]
836    pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
837    where
838        T: Into<String>,
839    {
840        self.purge().filter(subject).await
841    }
842
843    /// Create or update `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
844    /// returns the info from the server about created [Consumer]
845    /// If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
846    ///
847    /// # Examples
848    ///
849    /// ```no_run
850    /// # #[tokio::main]
851    /// # async fn main() -> Result<(), async_nats::Error> {
852    /// use async_nats::jetstream::consumer;
853    /// let client = async_nats::connect("localhost:4222").await?;
854    /// let jetstream = async_nats::jetstream::new(client);
855    ///
856    /// let stream = jetstream.get_stream("events").await?;
857    /// let info = stream
858    ///     .create_consumer(consumer::pull::Config {
859    ///         durable_name: Some("pull".to_string()),
860    ///         ..Default::default()
861    ///     })
862    ///     .await?;
863    /// # Ok(())
864    /// # }
865    /// ```
866    pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
867        &self,
868        config: C,
869    ) -> Result<Consumer<C>, ConsumerError> {
870        self.context
871            .create_consumer_on_stream(config, self.name.clone())
872            .await
873    }
874
875    /// Update an existing consumer.
876    /// This call will fail if the consumer does not exist.
877    /// returns the info from the server about updated [Consumer].
878    ///
879    /// # Examples
880    ///
881    /// ```no_run
882    /// # #[tokio::main]
883    /// # async fn main() -> Result<(), async_nats::Error> {
884    /// use async_nats::jetstream::consumer;
885    /// let client = async_nats::connect("localhost:4222").await?;
886    /// let jetstream = async_nats::jetstream::new(client);
887    ///
888    /// let stream = jetstream.get_stream("events").await?;
889    /// let info = stream
890    ///     .update_consumer(consumer::pull::Config {
891    ///         durable_name: Some("pull".to_string()),
892    ///         ..Default::default()
893    ///     })
894    ///     .await?;
895    /// # Ok(())
896    /// # }
897    /// ```
898    #[cfg(feature = "server_2_10")]
899    pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
900        &self,
901        config: C,
902    ) -> Result<Consumer<C>, ConsumerUpdateError> {
903        self.context
904            .update_consumer_on_stream(config, self.name.clone())
905            .await
906    }
907
908    /// Create consumer, but only if it does not exist or the existing config is exactly
909    /// the same.
910    /// This method will fail if consumer is already present with different config.
911    /// returns the info from the server about created [Consumer].
912    ///
913    /// # Examples
914    ///
915    /// ```no_run
916    /// # #[tokio::main]
917    /// # async fn main() -> Result<(), async_nats::Error> {
918    /// use async_nats::jetstream::consumer;
919    /// let client = async_nats::connect("localhost:4222").await?;
920    /// let jetstream = async_nats::jetstream::new(client);
921    ///
922    /// let stream = jetstream.get_stream("events").await?;
923    /// let info = stream
924    ///     .create_consumer_strict(consumer::pull::Config {
925    ///         durable_name: Some("pull".to_string()),
926    ///         ..Default::default()
927    ///     })
928    ///     .await?;
929    /// # Ok(())
930    /// # }
931    /// ```
932    #[cfg(feature = "server_2_10")]
933    pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
934        &self,
935        config: C,
936    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
937        self.context
938            .create_consumer_strict_on_stream(config, self.name.clone())
939            .await
940    }
941
942    /// Retrieve [Info] about [Consumer] from the server.
943    ///
944    /// # Examples
945    ///
946    /// ```no_run
947    /// # #[tokio::main]
948    /// # async fn main() -> Result<(), async_nats::Error> {
949    /// use async_nats::jetstream::consumer;
950    /// let client = async_nats::connect("localhost:4222").await?;
951    /// let jetstream = async_nats::jetstream::new(client);
952    ///
953    /// let stream = jetstream.get_stream("events").await?;
954    /// let info = stream.consumer_info("pull").await?;
955    /// # Ok(())
956    /// # }
957    /// ```
958    pub async fn consumer_info<T: AsRef<str>>(
959        &self,
960        name: T,
961    ) -> Result<consumer::Info, crate::Error> {
962        let name = name.as_ref();
963
964        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
965
966        match self.context.request(subject, &json!({})).await? {
967            Response::Ok(info) => Ok(info),
968            Response::Err { error } => Err(Box::new(std::io::Error::new(
969                ErrorKind::Other,
970                format!("nats: error while getting consumer info: {}", error),
971            ))),
972        }
973    }
974
975    /// Get [Consumer] from the the server. [Consumer] iterators can be used to retrieve
976    /// [Messages][crate::jetstream::Message] for a given [Consumer].
977    ///
978    /// # Examples
979    ///
980    /// ```no_run
981    /// # #[tokio::main]
982    /// # async fn main() -> Result<(), async_nats::Error> {
983    /// use async_nats::jetstream::consumer;
984    /// use futures::StreamExt;
985    /// let client = async_nats::connect("localhost:4222").await?;
986    /// let jetstream = async_nats::jetstream::new(client);
987    ///
988    /// let stream = jetstream.get_stream("events").await?;
989    /// let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
990    /// # Ok(())
991    /// # }
992    /// ```
993    pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
994        &self,
995        name: &str,
996    ) -> Result<Consumer<T>, crate::Error> {
997        let info = self.consumer_info(name).await?;
998
999        Ok(Consumer::new(
1000            T::try_from_consumer_config(info.config.clone())?,
1001            info,
1002            self.context.clone(),
1003        ))
1004    }
1005
1006    /// Create a [Consumer] with the given configuration if it is not present on the server. Returns a handle to the [Consumer].
1007    ///
1008    /// Note: This does not validate if the [Consumer] on the server is compatible with the configuration passed in except Push/Pull compatibility.
1009    ///
1010    /// # Examples
1011    ///
1012    /// ```no_run
1013    /// # #[tokio::main]
1014    /// # async fn main() -> Result<(), async_nats::Error> {
1015    /// use async_nats::jetstream::consumer;
1016    /// use futures::StreamExt;
1017    /// let client = async_nats::connect("localhost:4222").await?;
1018    /// let jetstream = async_nats::jetstream::new(client);
1019    ///
1020    /// let stream = jetstream.get_stream("events").await?;
1021    /// let consumer = stream
1022    ///     .get_or_create_consumer(
1023    ///         "pull",
1024    ///         consumer::pull::Config {
1025    ///             durable_name: Some("pull".to_string()),
1026    ///             ..Default::default()
1027    ///         },
1028    ///     )
1029    ///     .await?;
1030    /// # Ok(())
1031    /// # }
1032    /// ```
1033    pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
1034        &self,
1035        name: &str,
1036        config: T,
1037    ) -> Result<Consumer<T>, ConsumerError> {
1038        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
1039
1040        match self.context.request(subject, &json!({})).await? {
1041            Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
1042            Response::Err { error } => Err(error.into()),
1043            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1044                T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1045                    ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1046                })?,
1047                info,
1048                self.context.clone(),
1049            )),
1050        }
1051    }
1052
1053    /// Delete a [Consumer] from the server.
1054    ///
1055    /// # Examples
1056    ///
1057    /// ```no_run
1058    /// # #[tokio::main]
1059    /// # async fn main() -> Result<(), async_nats::Error> {
1060    /// use async_nats::jetstream::consumer;
1061    /// use futures::StreamExt;
1062    /// let client = async_nats::connect("localhost:4222").await?;
1063    /// let jetstream = async_nats::jetstream::new(client);
1064    ///
1065    /// jetstream
1066    ///     .get_stream("events")
1067    ///     .await?
1068    ///     .delete_consumer("pull")
1069    ///     .await?;
1070    /// # Ok(())
1071    /// # }
1072    /// ```
1073    pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
1074        let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
1075
1076        match self.context.request(subject, &json!({})).await? {
1077            Response::Ok(delete_status) => Ok(delete_status),
1078            Response::Err { error } => Err(error.into()),
1079        }
1080    }
1081
1082    /// Lists names of all consumers for current stream.
1083    ///
1084    /// # Examples
1085    ///
1086    /// ```no_run
1087    /// # #[tokio::main]
1088    /// # async fn main() -> Result<(), async_nats::Error> {
1089    /// use futures::TryStreamExt;
1090    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1091    /// let jetstream = async_nats::jetstream::new(client);
1092    /// let stream = jetstream.get_stream("stream").await?;
1093    /// let mut names = stream.consumer_names();
1094    /// while let Some(consumer) = names.try_next().await? {
1095    ///     println!("consumer: {stream:?}");
1096    /// }
1097    /// # Ok(())
1098    /// # }
1099    /// ```
1100    pub fn consumer_names(&self) -> ConsumerNames {
1101        ConsumerNames {
1102            context: self.context.clone(),
1103            stream: self.name.clone(),
1104            offset: 0,
1105            page_request: None,
1106            consumers: Vec::new(),
1107            done: false,
1108        }
1109    }
1110
1111    /// Lists all consumers info for current stream.
1112    ///
1113    /// # Examples
1114    ///
1115    /// ```no_run
1116    /// # #[tokio::main]
1117    /// # async fn main() -> Result<(), async_nats::Error> {
1118    /// use futures::TryStreamExt;
1119    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1120    /// let jetstream = async_nats::jetstream::new(client);
1121    /// let stream = jetstream.get_stream("stream").await?;
1122    /// let mut consumers = stream.consumers();
1123    /// while let Some(consumer) = consumers.try_next().await? {
1124    ///     println!("consumer: {consumer:?}");
1125    /// }
1126    /// # Ok(())
1127    /// # }
1128    /// ```
1129    pub fn consumers(&self) -> Consumers {
1130        Consumers {
1131            context: self.context.clone(),
1132            stream: self.name.clone(),
1133            offset: 0,
1134            page_request: None,
1135            consumers: Vec::new(),
1136            done: false,
1137        }
1138    }
1139}
1140
1141pub struct StreamInfoBuilder {
1142    pub(crate) context: Context,
1143    pub(crate) name: String,
1144    pub(crate) deleted: bool,
1145    pub(crate) subject: String,
1146}
1147
1148impl StreamInfoBuilder {
1149    fn new(context: Context, name: String) -> Self {
1150        Self {
1151            context,
1152            name,
1153            deleted: false,
1154            subject: "".to_string(),
1155        }
1156    }
1157
1158    pub fn with_deleted(mut self, deleted: bool) -> Self {
1159        self.deleted = deleted;
1160        self
1161    }
1162
1163    pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1164        self.subject = subject.into();
1165        self
1166    }
1167
1168    pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1169        let info = stream_info_with_details(
1170            self.context.clone(),
1171            self.name.clone(),
1172            0,
1173            self.deleted,
1174            self.subject.clone(),
1175        )
1176        .await?;
1177
1178        Ok(InfoWithSubjects::new(self.context, info, self.subject))
1179    }
1180}
1181
1182/// `StreamConfig` determines the properties for a stream.
1183/// There are sensible defaults for most. If no subjects are
1184/// given the name will be used as the only subject.
1185#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1186pub struct Config {
1187    /// A name for the Stream. Must not have spaces, tabs or period `.` characters
1188    pub name: String,
1189    /// How large the Stream may become in total bytes before the configured discard policy kicks in
1190    pub max_bytes: i64,
1191    /// How large the Stream may become in total messages before the configured discard policy kicks in
1192    #[serde(rename = "max_msgs")]
1193    pub max_messages: i64,
1194    /// Maximum amount of messages to keep per subject
1195    #[serde(rename = "max_msgs_per_subject")]
1196    pub max_messages_per_subject: i64,
1197    /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in.
1198    /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space
1199    pub discard: DiscardPolicy,
1200    /// Prevents a message from being added to a stream if the max_msgs_per_subject limit for the subject has been reached
1201    #[serde(default, skip_serializing_if = "is_default")]
1202    pub discard_new_per_subject: bool,
1203    /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
1204    /// configured stream `name`.
1205    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1206    pub subjects: Vec<String>,
1207    /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
1208    pub retention: RetentionPolicy,
1209    /// How many Consumers can be defined for a given Stream, -1 for unlimited
1210    pub max_consumers: i32,
1211    /// Maximum age of any message in the stream, expressed in nanoseconds
1212    #[serde(with = "serde_nanos")]
1213    pub max_age: Duration,
1214    /// The largest message that will be accepted by the Stream
1215    #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1216    pub max_message_size: i32,
1217    /// The type of storage backend, `File` (default) and `Memory`
1218    pub storage: StorageType,
1219    /// How many replicas to keep for each message in a clustered JetStream, maximum 5
1220    pub num_replicas: usize,
1221    /// Disables acknowledging messages that are received by the Stream
1222    #[serde(default, skip_serializing_if = "is_default")]
1223    pub no_ack: bool,
1224    /// The window within which to track duplicate messages.
1225    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1226    pub duplicate_window: Duration,
1227    /// The owner of the template associated with this stream.
1228    #[serde(default, skip_serializing_if = "is_default")]
1229    pub template_owner: String,
1230    /// Indicates the stream is sealed and cannot be modified in any way
1231    #[serde(default, skip_serializing_if = "is_default")]
1232    pub sealed: bool,
1233    /// A short description of the purpose of this stream.
1234    #[serde(default, skip_serializing_if = "is_default")]
1235    pub description: Option<String>,
1236    #[serde(
1237        default,
1238        rename = "allow_rollup_hdrs",
1239        skip_serializing_if = "is_default"
1240    )]
1241    /// Indicates if rollups will be allowed or not.
1242    pub allow_rollup: bool,
1243    #[serde(default, skip_serializing_if = "is_default")]
1244    /// Indicates deletes will be denied or not.
1245    pub deny_delete: bool,
1246    /// Indicates if purges will be denied or not.
1247    #[serde(default, skip_serializing_if = "is_default")]
1248    pub deny_purge: bool,
1249
1250    /// Optional republish config.
1251    #[serde(default, skip_serializing_if = "is_default")]
1252    pub republish: Option<Republish>,
1253
1254    /// Enables direct get, which would get messages from
1255    /// non-leader.
1256    #[serde(default, skip_serializing_if = "is_default")]
1257    pub allow_direct: bool,
1258
1259    /// Enable direct access also for mirrors.
1260    #[serde(default, skip_serializing_if = "is_default")]
1261    pub mirror_direct: bool,
1262
1263    /// Stream mirror configuration.
1264    #[serde(default, skip_serializing_if = "Option::is_none")]
1265    pub mirror: Option<Source>,
1266
1267    /// Sources configuration.
1268    #[serde(default, skip_serializing_if = "Option::is_none")]
1269    pub sources: Option<Vec<Source>>,
1270
1271    #[cfg(feature = "server_2_10")]
1272    /// Additional stream metadata.
1273    #[serde(default, skip_serializing_if = "is_default")]
1274    pub metadata: HashMap<String, String>,
1275
1276    #[cfg(feature = "server_2_10")]
1277    /// Allow applying a subject transform to incoming messages
1278    #[serde(default, skip_serializing_if = "Option::is_none")]
1279    pub subject_transform: Option<SubjectTransform>,
1280
1281    #[cfg(feature = "server_2_10")]
1282    /// Override compression config for this stream.
1283    /// Wrapping enum that has `None` type with [Option] is there
1284    /// because [Stream] can override global compression set to [Compression::S2]
1285    /// to [Compression::None], which is different from not overriding global config with anything.
1286    #[serde(default, skip_serializing_if = "Option::is_none")]
1287    pub compression: Option<Compression>,
1288    #[cfg(feature = "server_2_10")]
1289    /// Set limits on consumers that are created on this stream.
1290    #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1291    pub consumer_limits: Option<ConsumerLimits>,
1292
1293    #[cfg(feature = "server_2_10")]
1294    /// Sets the first sequence for the stream.
1295    #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1296    pub first_sequence: Option<u64>,
1297
1298    /// Placement configuration for clusters and tags.
1299    #[serde(default, skip_serializing_if = "Option::is_none")]
1300    pub placement: Option<Placement>,
1301}
1302
1303impl From<&Config> for Config {
1304    fn from(sc: &Config) -> Config {
1305        sc.clone()
1306    }
1307}
1308
1309impl From<&str> for Config {
1310    fn from(s: &str) -> Config {
1311        Config {
1312            name: s.to_string(),
1313            ..Default::default()
1314        }
1315    }
1316}
1317
1318#[cfg(feature = "server_2_10")]
1319fn default_consumer_limits_as_none<'de, D>(
1320    deserializer: D,
1321) -> Result<Option<ConsumerLimits>, D::Error>
1322where
1323    D: Deserializer<'de>,
1324{
1325    let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1326    if let Some(cl) = consumer_limits {
1327        if cl == ConsumerLimits::default() {
1328            Ok(None)
1329        } else {
1330            Ok(Some(cl))
1331        }
1332    } else {
1333        Ok(None)
1334    }
1335}
1336#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1337pub struct ConsumerLimits {
1338    /// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
1339    #[serde(default, with = "serde_nanos")]
1340    pub inactive_threshold: std::time::Duration,
1341    /// Sets the maximum [crate::jetstream::consumer::Config::max_ack_pending] that can be set on the consumer.
1342    #[serde(default)]
1343    pub max_ack_pending: i64,
1344}
1345
1346#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1347pub enum Compression {
1348    #[serde(rename = "s2")]
1349    S2,
1350    #[serde(rename = "none")]
1351    None,
1352}
1353
1354// SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
1355#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1356pub struct SubjectTransform {
1357    #[serde(rename = "src")]
1358    pub source: String,
1359
1360    #[serde(rename = "dest")]
1361    pub destination: String,
1362}
1363
1364// Republish is for republishing messages once committed to a stream.
1365#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1366pub struct Republish {
1367    /// Subject that should be republished.
1368    #[serde(rename = "src")]
1369    pub source: String,
1370    /// Subject where messages will be republished.
1371    #[serde(rename = "dest")]
1372    pub destination: String,
1373    /// If true, only headers should be republished.
1374    #[serde(default)]
1375    pub headers_only: bool,
1376}
1377
1378/// Placement describes on which cluster or tags the stream should be placed.
1379#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1380pub struct Placement {
1381    // Cluster where the stream should be placed.
1382    #[serde(default, skip_serializing_if = "is_default")]
1383    pub cluster: Option<String>,
1384    // Matching tags for stream placement.
1385    #[serde(default, skip_serializing_if = "is_default")]
1386    pub tags: Vec<String>,
1387}
1388
1389/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
1390/// remove older messages. `New` will fail to store the new message.
1391#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1392#[repr(u8)]
1393pub enum DiscardPolicy {
1394    /// will remove older messages when limits are hit.
1395    #[default]
1396    #[serde(rename = "old")]
1397    Old = 0,
1398    /// will error on a StoreMsg call when limits are hit
1399    #[serde(rename = "new")]
1400    New = 1,
1401}
1402
1403/// `RetentionPolicy` determines how messages in a set are retained.
1404#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1405#[repr(u8)]
1406pub enum RetentionPolicy {
1407    /// `Limits` (default) means that messages are retained until any given limit is reached.
1408    /// This could be one of messages, bytes, or age.
1409    #[default]
1410    #[serde(rename = "limits")]
1411    Limits = 0,
1412    /// `Interest` specifies that when all known observables have acknowledged a message it can be removed.
1413    #[serde(rename = "interest")]
1414    Interest = 1,
1415    /// `WorkQueue` specifies that when the first worker or subscriber acknowledges the message it can be removed.
1416    #[serde(rename = "workqueue")]
1417    WorkQueue = 2,
1418}
1419
1420/// determines how messages are stored for retention.
1421#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1422#[repr(u8)]
1423pub enum StorageType {
1424    /// Stream data is kept in files. This is the default.
1425    #[default]
1426    #[serde(rename = "file")]
1427    File = 0,
1428    /// Stream data is kept only in memory.
1429    #[serde(rename = "memory")]
1430    Memory = 1,
1431}
1432
1433async fn stream_info_with_details(
1434    context: Context,
1435    stream: String,
1436    offset: usize,
1437    deleted_details: bool,
1438    subjects_filter: String,
1439) -> Result<Info, InfoError> {
1440    let subject = format!("STREAM.INFO.{}", stream);
1441
1442    let payload = StreamInfoRequest {
1443        offset,
1444        deleted_details,
1445        subjects_filter,
1446    };
1447
1448    let response: Response<Info> = context.request(subject, &payload).await?;
1449
1450    match response {
1451        Response::Ok(info) => Ok(info),
1452        Response::Err { error } => Err(error.into()),
1453    }
1454}
1455
1456type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1457
1458#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1459pub struct StreamInfoRequest {
1460    offset: usize,
1461    deleted_details: bool,
1462    subjects_filter: String,
1463}
1464
1465pub struct InfoWithSubjects {
1466    stream: String,
1467    context: Context,
1468    pub info: Info,
1469    offset: usize,
1470    subjects: collections::hash_map::IntoIter<String, usize>,
1471    info_request: Option<InfoRequest>,
1472    subjects_filter: String,
1473    pages_done: bool,
1474}
1475
1476impl InfoWithSubjects {
1477    pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1478        let subjects = info.state.subjects.take().unwrap_or_default();
1479        let name = info.config.name.clone();
1480        InfoWithSubjects {
1481            context,
1482            info,
1483            pages_done: subjects.is_empty(),
1484            offset: subjects.len(),
1485            subjects: subjects.into_iter(),
1486            subjects_filter: subject,
1487            stream: name,
1488            info_request: None,
1489        }
1490    }
1491}
1492
1493impl futures::Stream for InfoWithSubjects {
1494    type Item = Result<(String, usize), InfoError>;
1495
1496    fn poll_next(
1497        mut self: Pin<&mut Self>,
1498        cx: &mut std::task::Context<'_>,
1499    ) -> Poll<Option<Self::Item>> {
1500        match self.subjects.next() {
1501            Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1502            None => {
1503                // If we have already requested all pages, stop the iterator.
1504                if self.pages_done {
1505                    return Poll::Ready(None);
1506                }
1507                let stream = self.stream.clone();
1508                let context = self.context.clone();
1509                let subjects_filter = self.subjects_filter.clone();
1510                let offset = self.offset;
1511                match self
1512                    .info_request
1513                    .get_or_insert_with(|| {
1514                        Box::pin(stream_info_with_details(
1515                            context,
1516                            stream,
1517                            offset,
1518                            false,
1519                            subjects_filter,
1520                        ))
1521                    })
1522                    .poll_unpin(cx)
1523                {
1524                    Poll::Ready(resp) => match resp {
1525                        Ok(info) => {
1526                            let subjects = info.state.subjects.clone();
1527                            self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1528                            self.info_request = None;
1529                            let subjects = subjects.unwrap_or_default();
1530                            self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1531                            let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1532                            if total <= self.offset || subjects.is_empty() {
1533                                self.pages_done = true;
1534                            }
1535                            match self.subjects.next() {
1536                                Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1537                                None => Poll::Ready(None),
1538                            }
1539                        }
1540                        Err(err) => Poll::Ready(Some(Err(err))),
1541                    },
1542                    Poll::Pending => Poll::Pending,
1543                }
1544            }
1545        }
1546    }
1547}
1548
1549/// Shows config and current state for this stream.
1550#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1551pub struct Info {
1552    /// The configuration associated with this stream.
1553    pub config: Config,
1554    /// The time that this stream was created.
1555    #[serde(with = "rfc3339")]
1556    pub created: time::OffsetDateTime,
1557    /// Various metrics associated with this stream.
1558    pub state: State,
1559    /// Information about leader and replicas.
1560    pub cluster: Option<ClusterInfo>,
1561    /// Information about mirror config if present.
1562    #[serde(default)]
1563    pub mirror: Option<SourceInfo>,
1564    /// Information about sources configs if present.
1565    #[serde(default)]
1566    pub sources: Vec<SourceInfo>,
1567    #[serde(flatten)]
1568    paged_info: Option<PagedInfo>,
1569}
1570
1571#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1572pub struct PagedInfo {
1573    offset: usize,
1574    total: usize,
1575    limit: usize,
1576}
1577
1578#[derive(Deserialize)]
1579pub struct DeleteStatus {
1580    pub success: bool,
1581}
1582
1583/// information about the given stream.
1584#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1585pub struct State {
1586    /// The number of messages contained in this stream
1587    pub messages: u64,
1588    /// The number of bytes of all messages contained in this stream
1589    pub bytes: u64,
1590    /// The lowest sequence number still present in this stream
1591    #[serde(rename = "first_seq")]
1592    pub first_sequence: u64,
1593    /// The time associated with the oldest message still present in this stream
1594    #[serde(with = "rfc3339", rename = "first_ts")]
1595    pub first_timestamp: time::OffsetDateTime,
1596    /// The last sequence number assigned to a message in this stream
1597    #[serde(rename = "last_seq")]
1598    pub last_sequence: u64,
1599    /// The time that the last message was received by this stream
1600    #[serde(with = "rfc3339", rename = "last_ts")]
1601    pub last_timestamp: time::OffsetDateTime,
1602    /// The number of consumers configured to consume this stream
1603    pub consumer_count: usize,
1604    /// The number of subjects in the stream
1605    #[serde(default, rename = "num_subjects")]
1606    pub subjects_count: u64,
1607    /// The number of deleted messages in the stream
1608    #[serde(default, rename = "num_deleted")]
1609    pub deleted_count: Option<u64>,
1610    /// The list of deleted subjects from the Stream.
1611    /// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
1612    #[serde(default)]
1613    pub deleted: Option<Vec<u64>>,
1614
1615    pub(crate) subjects: Option<HashMap<String, usize>>,
1616}
1617
1618/// A raw stream message in the representation it is stored.
1619#[derive(Debug, Serialize, Deserialize, Clone)]
1620pub struct RawMessage {
1621    /// Subject of the message.
1622    #[serde(rename = "subject")]
1623    pub subject: String,
1624
1625    /// Sequence of the message.
1626    #[serde(rename = "seq")]
1627    pub sequence: u64,
1628
1629    /// Raw payload of the message as a base64 encoded string.
1630    #[serde(default, rename = "data")]
1631    pub payload: String,
1632
1633    /// Raw header string, if any.
1634    #[serde(default, rename = "hdrs")]
1635    pub headers: Option<String>,
1636
1637    /// The time the message was published.
1638    #[serde(rename = "time", with = "rfc3339")]
1639    pub time: time::OffsetDateTime,
1640}
1641
1642impl TryFrom<RawMessage> for StreamMessage {
1643    type Error = crate::Error;
1644
1645    fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1646        let decoded_payload = STANDARD
1647            .decode(value.payload)
1648            .map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?;
1649        let decoded_headers = value
1650            .headers
1651            .map(|header| STANDARD.decode(header))
1652            .map_or(Ok(None), |v| v.map(Some))?;
1653
1654        let (headers, _, _) = decoded_headers
1655            .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1656
1657        Ok(StreamMessage {
1658            subject: value.subject.into(),
1659            payload: decoded_payload.into(),
1660            headers,
1661            sequence: value.sequence,
1662            time: value.time,
1663        })
1664    }
1665}
1666
1667fn is_continuation(c: char) -> bool {
1668    c == ' ' || c == '\t'
1669}
1670const HEADER_LINE: &str = "NATS/1.0";
1671
1672#[allow(clippy::type_complexity)]
1673fn parse_headers(
1674    buf: &[u8],
1675) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1676    let mut headers = HeaderMap::new();
1677    let mut maybe_status: Option<StatusCode> = None;
1678    let mut maybe_description: Option<String> = None;
1679    let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1680        line.lines().peekable()
1681    } else {
1682        return Err(Box::new(std::io::Error::new(
1683            ErrorKind::Other,
1684            "invalid header",
1685        )));
1686    };
1687
1688    if let Some(line) = lines.next() {
1689        let line = line
1690            .strip_prefix(HEADER_LINE)
1691            .ok_or_else(|| {
1692                Box::new(std::io::Error::new(
1693                    ErrorKind::Other,
1694                    "version line does not start with NATS/1.0",
1695                ))
1696            })?
1697            .trim();
1698
1699        match line.split_once(' ') {
1700            Some((status, description)) => {
1701                if !status.is_empty() {
1702                    maybe_status = Some(status.parse()?);
1703                }
1704
1705                if !description.is_empty() {
1706                    maybe_description = Some(description.trim().to_string());
1707                }
1708            }
1709            None => {
1710                if !line.is_empty() {
1711                    maybe_status = Some(line.parse()?);
1712                }
1713            }
1714        }
1715    } else {
1716        return Err(Box::new(std::io::Error::new(
1717            ErrorKind::Other,
1718            "expected header information not found",
1719        )));
1720    };
1721
1722    while let Some(line) = lines.next() {
1723        if line.is_empty() {
1724            continue;
1725        }
1726
1727        if let Some((k, v)) = line.split_once(':').to_owned() {
1728            let mut s = String::from(v.trim());
1729            while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1730                s.push(' ');
1731                s.push_str(v.trim());
1732            }
1733
1734            headers.insert(
1735                HeaderName::from_str(k)?,
1736                HeaderValue::from_str(&s)
1737                    .map_err(|err| Box::new(io::Error::new(ErrorKind::Other, err)))?,
1738            );
1739        } else {
1740            return Err(Box::new(std::io::Error::new(
1741                ErrorKind::Other,
1742                "malformed header line",
1743            )));
1744        }
1745    }
1746
1747    if headers.is_empty() {
1748        Ok((HeaderMap::new(), maybe_status, maybe_description))
1749    } else {
1750        Ok((headers, maybe_status, maybe_description))
1751    }
1752}
1753
1754#[derive(Debug, Serialize, Deserialize, Clone)]
1755struct GetRawMessage {
1756    pub(crate) message: RawMessage,
1757}
1758
1759fn is_default<T: Default + Eq>(t: &T) -> bool {
1760    t == &T::default()
1761}
1762/// Information about the stream's, consumer's associated `JetStream` cluster
1763#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1764pub struct ClusterInfo {
1765    /// The cluster name.
1766    pub name: Option<String>,
1767    /// The server name of the RAFT leader.
1768    pub leader: Option<String>,
1769    /// The members of the RAFT cluster.
1770    #[serde(default)]
1771    pub replicas: Vec<PeerInfo>,
1772}
1773
1774/// The members of the RAFT cluster
1775#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1776pub struct PeerInfo {
1777    /// The server name of the peer.
1778    pub name: String,
1779    /// Indicates if the server is up to date and synchronized.
1780    pub current: bool,
1781    /// Nanoseconds since this peer was last seen.
1782    #[serde(with = "serde_nanos")]
1783    pub active: Duration,
1784    /// Indicates the node is considered offline by the group.
1785    #[serde(default)]
1786    pub offline: bool,
1787    /// How many uncommitted operations this peer is behind the leader.
1788    pub lag: Option<u64>,
1789}
1790
1791#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1792pub struct SourceInfo {
1793    /// Source name.
1794    pub name: String,
1795    /// Number of messages this source is lagging behind.
1796    pub lag: u64,
1797    /// Last time the source was seen active.
1798    #[serde(deserialize_with = "negative_duration_as_none")]
1799    pub active: Option<std::time::Duration>,
1800    /// Filtering for the source.
1801    #[serde(default)]
1802    pub filter_subject: Option<String>,
1803    /// Source destination subject.
1804    #[serde(default)]
1805    pub subject_transform_dest: Option<String>,
1806    /// List of transforms.
1807    #[serde(default)]
1808    pub subject_transforms: Vec<SubjectTransform>,
1809}
1810
1811fn negative_duration_as_none<'de, D>(
1812    deserializer: D,
1813) -> Result<Option<std::time::Duration>, D::Error>
1814where
1815    D: Deserializer<'de>,
1816{
1817    let n = i64::deserialize(deserializer)?;
1818    if n.is_negative() {
1819        Ok(None)
1820    } else {
1821        Ok(Some(std::time::Duration::from_nanos(n as u64)))
1822    }
1823}
1824
1825/// The response generated by trying to purge a stream.
1826#[derive(Debug, Deserialize, Clone, Copy)]
1827pub struct PurgeResponse {
1828    /// Whether the purge request was successful.
1829    pub success: bool,
1830    /// The number of purged messages in a stream.
1831    pub purged: u64,
1832}
1833/// The payload used to generate a purge request.
1834#[derive(Default, Debug, Serialize, Clone)]
1835pub struct PurgeRequest {
1836    /// Purge up to but not including sequence.
1837    #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1838    pub sequence: Option<u64>,
1839
1840    /// Subject to match against messages for the purge command.
1841    #[serde(default, skip_serializing_if = "is_default")]
1842    pub filter: Option<String>,
1843
1844    /// Number of messages to keep.
1845    #[serde(default, skip_serializing_if = "is_default")]
1846    pub keep: Option<u64>,
1847}
1848
1849#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1850pub struct Source {
1851    /// Name of the stream source.
1852    pub name: String,
1853    /// Optional source start sequence.
1854    #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1855    pub start_sequence: Option<u64>,
1856    #[serde(
1857        default,
1858        rename = "opt_start_time",
1859        skip_serializing_if = "is_default",
1860        with = "rfc3339::option"
1861    )]
1862    /// Optional source start time.
1863    pub start_time: Option<OffsetDateTime>,
1864    /// Optional additional filter subject.
1865    #[serde(default, skip_serializing_if = "is_default")]
1866    pub filter_subject: Option<String>,
1867    /// Optional config for sourcing streams from another prefix, used for cross-account.
1868    #[serde(default, skip_serializing_if = "Option::is_none")]
1869    pub external: Option<External>,
1870    /// Optional config to set a domain, if source is residing in different one.
1871    #[serde(default, skip_serializing_if = "is_default")]
1872    pub domain: Option<String>,
1873    /// Subject transforms for Stream.
1874    #[cfg(feature = "server_2_10")]
1875    #[serde(default, skip_serializing_if = "is_default")]
1876    pub subject_transforms: Vec<SubjectTransform>,
1877}
1878
1879#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1880pub struct External {
1881    /// Api prefix of external source.
1882    #[serde(rename = "api")]
1883    pub api_prefix: String,
1884    /// Optional configuration of delivery prefix.
1885    #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1886    pub delivery_prefix: Option<String>,
1887}
1888
1889use std::marker::PhantomData;
1890
1891#[derive(Debug, Default)]
1892pub struct Yes;
1893#[derive(Debug, Default)]
1894pub struct No;
1895
1896pub trait ToAssign: Debug {}
1897
1898impl ToAssign for Yes {}
1899impl ToAssign for No {}
1900
1901#[derive(Debug)]
1902pub struct Purge<SEQUENCE, KEEP>
1903where
1904    SEQUENCE: ToAssign,
1905    KEEP: ToAssign,
1906{
1907    inner: PurgeRequest,
1908    sequence_set: PhantomData<SEQUENCE>,
1909    keep_set: PhantomData<KEEP>,
1910    context: Context,
1911    stream_name: String,
1912}
1913
1914impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
1915where
1916    SEQUENCE: ToAssign,
1917    KEEP: ToAssign,
1918{
1919    /// Adds subject filter to [PurgeRequest]
1920    pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
1921        self.inner.filter = Some(filter.into());
1922        self
1923    }
1924}
1925
1926impl Purge<No, No> {
1927    pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
1928        Purge {
1929            context: stream.context.clone(),
1930            stream_name: stream.name.clone(),
1931            inner: Default::default(),
1932            sequence_set: PhantomData {},
1933            keep_set: PhantomData {},
1934        }
1935    }
1936}
1937
1938impl<KEEP> Purge<No, KEEP>
1939where
1940    KEEP: ToAssign,
1941{
1942    /// Creates a new [PurgeRequest].
1943    /// `keep` and `sequence` are exclusive, enforced compile time by generics.
1944    pub fn keep(self, keep: u64) -> Purge<No, Yes> {
1945        Purge {
1946            context: self.context.clone(),
1947            stream_name: self.stream_name.clone(),
1948            sequence_set: PhantomData {},
1949            keep_set: PhantomData {},
1950            inner: PurgeRequest {
1951                keep: Some(keep),
1952                ..self.inner
1953            },
1954        }
1955    }
1956}
1957impl<SEQUENCE> Purge<SEQUENCE, No>
1958where
1959    SEQUENCE: ToAssign,
1960{
1961    /// Creates a new [PurgeRequest].
1962    /// `keep` and `sequence` are exclusive, enforces compile time by generics.
1963    pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
1964        Purge {
1965            context: self.context.clone(),
1966            stream_name: self.stream_name.clone(),
1967            sequence_set: PhantomData {},
1968            keep_set: PhantomData {},
1969            inner: PurgeRequest {
1970                sequence: Some(sequence),
1971                ..self.inner
1972            },
1973        }
1974    }
1975}
1976
1977#[derive(Clone, Debug, PartialEq)]
1978pub enum PurgeErrorKind {
1979    Request,
1980    TimedOut,
1981    JetStream(super::errors::Error),
1982}
1983
1984impl Display for PurgeErrorKind {
1985    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1986        match self {
1987            Self::Request => write!(f, "request failed"),
1988            Self::TimedOut => write!(f, "timed out"),
1989            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1990        }
1991    }
1992}
1993
1994pub type PurgeError = Error<PurgeErrorKind>;
1995
1996impl<S, K> IntoFuture for Purge<S, K>
1997where
1998    S: ToAssign + std::marker::Send,
1999    K: ToAssign + std::marker::Send,
2000{
2001    type Output = Result<PurgeResponse, PurgeError>;
2002
2003    type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2004
2005    fn into_future(self) -> Self::IntoFuture {
2006        Box::pin(std::future::IntoFuture::into_future(async move {
2007            let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2008            let response: Response<PurgeResponse> = self
2009                .context
2010                .request(request_subject, &self.inner)
2011                .map_err(|err| match err.kind() {
2012                    RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2013                    _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2014                })
2015                .await?;
2016
2017            match response {
2018                Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2019                Response::Ok(response) => Ok(response),
2020            }
2021        }))
2022    }
2023}
2024
2025#[derive(Deserialize, Debug)]
2026struct ConsumerPage {
2027    total: usize,
2028    consumers: Option<Vec<String>>,
2029}
2030
2031#[derive(Deserialize, Debug)]
2032struct ConsumerInfoPage {
2033    total: usize,
2034    consumers: Option<Vec<super::consumer::Info>>,
2035}
2036
2037type ConsumerNamesErrorKind = StreamsErrorKind;
2038type ConsumerNamesError = StreamsError;
2039type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2040
2041pub struct ConsumerNames {
2042    context: Context,
2043    stream: String,
2044    offset: usize,
2045    page_request: Option<PageRequest>,
2046    consumers: Vec<String>,
2047    done: bool,
2048}
2049
2050impl futures::Stream for ConsumerNames {
2051    type Item = Result<String, ConsumerNamesError>;
2052
2053    fn poll_next(
2054        mut self: Pin<&mut Self>,
2055        cx: &mut std::task::Context<'_>,
2056    ) -> std::task::Poll<Option<Self::Item>> {
2057        match self.page_request.as_mut() {
2058            Some(page) => match page.try_poll_unpin(cx) {
2059                std::task::Poll::Ready(page) => {
2060                    self.page_request = None;
2061                    let page = page.map_err(|err| {
2062                        ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2063                    })?;
2064
2065                    if let Some(consumers) = page.consumers {
2066                        self.offset += consumers.len();
2067                        self.consumers = consumers;
2068                        if self.offset >= page.total {
2069                            self.done = true;
2070                        }
2071                        match self.consumers.pop() {
2072                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2073                            None => Poll::Ready(None),
2074                        }
2075                    } else {
2076                        Poll::Ready(None)
2077                    }
2078                }
2079                std::task::Poll::Pending => std::task::Poll::Pending,
2080            },
2081            None => {
2082                if let Some(stream) = self.consumers.pop() {
2083                    Poll::Ready(Some(Ok(stream)))
2084                } else {
2085                    if self.done {
2086                        return Poll::Ready(None);
2087                    }
2088                    let context = self.context.clone();
2089                    let offset = self.offset;
2090                    let stream = self.stream.clone();
2091                    self.page_request = Some(Box::pin(async move {
2092                        match context
2093                            .request(
2094                                format!("CONSUMER.NAMES.{stream}"),
2095                                &json!({
2096                                    "offset": offset,
2097                                }),
2098                            )
2099                            .await?
2100                        {
2101                            Response::Err { error } => Err(RequestError::with_source(
2102                                super::context::RequestErrorKind::Other,
2103                                error,
2104                            )),
2105                            Response::Ok(page) => Ok(page),
2106                        }
2107                    }));
2108                    self.poll_next(cx)
2109                }
2110            }
2111        }
2112    }
2113}
2114
2115pub type ConsumersErrorKind = StreamsErrorKind;
2116pub type ConsumersError = StreamsError;
2117type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2118
2119pub struct Consumers {
2120    context: Context,
2121    stream: String,
2122    offset: usize,
2123    page_request: Option<PageInfoRequest>,
2124    consumers: Vec<super::consumer::Info>,
2125    done: bool,
2126}
2127
2128impl futures::Stream for Consumers {
2129    type Item = Result<super::consumer::Info, ConsumersError>;
2130
2131    fn poll_next(
2132        mut self: Pin<&mut Self>,
2133        cx: &mut std::task::Context<'_>,
2134    ) -> std::task::Poll<Option<Self::Item>> {
2135        match self.page_request.as_mut() {
2136            Some(page) => match page.try_poll_unpin(cx) {
2137                std::task::Poll::Ready(page) => {
2138                    self.page_request = None;
2139                    let page = page.map_err(|err| {
2140                        ConsumersError::with_source(ConsumersErrorKind::Other, err)
2141                    })?;
2142                    if let Some(consumers) = page.consumers {
2143                        self.offset += consumers.len();
2144                        self.consumers = consumers;
2145                        if self.offset >= page.total {
2146                            self.done = true;
2147                        }
2148                        match self.consumers.pop() {
2149                            Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2150                            None => Poll::Ready(None),
2151                        }
2152                    } else {
2153                        Poll::Ready(None)
2154                    }
2155                }
2156                std::task::Poll::Pending => std::task::Poll::Pending,
2157            },
2158            None => {
2159                if let Some(stream) = self.consumers.pop() {
2160                    Poll::Ready(Some(Ok(stream)))
2161                } else {
2162                    if self.done {
2163                        return Poll::Ready(None);
2164                    }
2165                    let context = self.context.clone();
2166                    let offset = self.offset;
2167                    let stream = self.stream.clone();
2168                    self.page_request = Some(Box::pin(async move {
2169                        match context
2170                            .request(
2171                                format!("CONSUMER.LIST.{stream}"),
2172                                &json!({
2173                                    "offset": offset,
2174                                }),
2175                            )
2176                            .await?
2177                        {
2178                            Response::Err { error } => Err(RequestError::with_source(
2179                                super::context::RequestErrorKind::Other,
2180                                error,
2181                            )),
2182                            Response::Ok(page) => Ok(page),
2183                        }
2184                    }));
2185                    self.poll_next(cx)
2186                }
2187            }
2188        }
2189    }
2190}
2191
2192#[derive(Clone, Debug, PartialEq)]
2193pub enum LastRawMessageErrorKind {
2194    NoMessageFound,
2195    InvalidSubject,
2196    JetStream(super::errors::Error),
2197    Other,
2198}
2199
2200impl Display for LastRawMessageErrorKind {
2201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2202        match self {
2203            Self::NoMessageFound => write!(f, "no message found"),
2204            Self::InvalidSubject => write!(f, "invalid subject"),
2205            Self::Other => write!(f, "failed to get last raw message"),
2206            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2207        }
2208    }
2209}
2210
2211pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2212pub type RawMessageErrorKind = LastRawMessageErrorKind;
2213pub type RawMessageError = LastRawMessageError;
2214
2215#[derive(Clone, Debug, PartialEq)]
2216pub enum ConsumerErrorKind {
2217    //TODO: get last should have timeout, which should be mapped here.
2218    TimedOut,
2219    Request,
2220    InvalidConsumerType,
2221    InvalidName,
2222    JetStream(super::errors::Error),
2223    Other,
2224}
2225
2226impl Display for ConsumerErrorKind {
2227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2228        match self {
2229            Self::TimedOut => write!(f, "timed out"),
2230            Self::Request => write!(f, "request failed"),
2231            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2232            Self::Other => write!(f, "consumer error"),
2233            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2234            Self::InvalidName => write!(f, "invalid consumer name"),
2235        }
2236    }
2237}
2238
2239pub type ConsumerError = Error<ConsumerErrorKind>;
2240
2241#[derive(Clone, Debug, PartialEq)]
2242pub enum ConsumerCreateStrictErrorKind {
2243    //TODO: get last should have timeout, which should be mapped here.
2244    TimedOut,
2245    Request,
2246    InvalidConsumerType,
2247    InvalidName,
2248    AlreadyExists,
2249    JetStream(super::errors::Error),
2250    Other,
2251}
2252
2253impl Display for ConsumerCreateStrictErrorKind {
2254    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2255        match self {
2256            Self::TimedOut => write!(f, "timed out"),
2257            Self::Request => write!(f, "request failed"),
2258            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2259            Self::Other => write!(f, "consumer error"),
2260            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2261            Self::InvalidName => write!(f, "invalid consumer name"),
2262            Self::AlreadyExists => write!(f, "consumer already exists"),
2263        }
2264    }
2265}
2266
2267pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2268
2269#[derive(Clone, Debug, PartialEq)]
2270pub enum ConsumerUpdateErrorKind {
2271    //TODO: get last should have timeout, which should be mapped here.
2272    TimedOut,
2273    Request,
2274    InvalidConsumerType,
2275    InvalidName,
2276    DoesNotExist,
2277    JetStream(super::errors::Error),
2278    Other,
2279}
2280
2281impl Display for ConsumerUpdateErrorKind {
2282    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2283        match self {
2284            Self::TimedOut => write!(f, "timed out"),
2285            Self::Request => write!(f, "request failed"),
2286            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2287            Self::Other => write!(f, "consumer error"),
2288            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2289            Self::InvalidName => write!(f, "invalid consumer name"),
2290            Self::DoesNotExist => write!(f, "consumer does not exist"),
2291        }
2292    }
2293}
2294
2295pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2296
2297impl From<super::errors::Error> for ConsumerError {
2298    fn from(err: super::errors::Error) -> Self {
2299        ConsumerError::new(ConsumerErrorKind::JetStream(err))
2300    }
2301}
2302impl From<super::errors::Error> for ConsumerCreateStrictError {
2303    fn from(err: super::errors::Error) -> Self {
2304        if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2305            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2306        } else {
2307            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2308        }
2309    }
2310}
2311impl From<super::errors::Error> for ConsumerUpdateError {
2312    fn from(err: super::errors::Error) -> Self {
2313        if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2314            ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2315        } else {
2316            ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2317        }
2318    }
2319}
2320impl From<ConsumerError> for ConsumerUpdateError {
2321    fn from(err: ConsumerError) -> Self {
2322        match err.kind() {
2323            ConsumerErrorKind::JetStream(err) => {
2324                if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2325                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2326                } else {
2327                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2328                }
2329            }
2330            ConsumerErrorKind::Request => {
2331                ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2332            }
2333            ConsumerErrorKind::TimedOut => {
2334                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2335            }
2336            ConsumerErrorKind::InvalidConsumerType => {
2337                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2338            }
2339            ConsumerErrorKind::InvalidName => {
2340                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2341            }
2342            ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2343        }
2344    }
2345}
2346
2347impl From<ConsumerError> for ConsumerCreateStrictError {
2348    fn from(err: ConsumerError) -> Self {
2349        match err.kind() {
2350            ConsumerErrorKind::JetStream(err) => {
2351                if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2352                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2353                } else {
2354                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2355                }
2356            }
2357            ConsumerErrorKind::Request => {
2358                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2359            }
2360            ConsumerErrorKind::TimedOut => {
2361                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2362            }
2363            ConsumerErrorKind::InvalidConsumerType => {
2364                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2365            }
2366            ConsumerErrorKind::InvalidName => {
2367                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2368            }
2369            ConsumerErrorKind::Other => {
2370                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2371            }
2372        }
2373    }
2374}
2375
2376impl From<super::context::RequestError> for ConsumerError {
2377    fn from(err: super::context::RequestError) -> Self {
2378        match err.kind() {
2379            RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2380            _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2381        }
2382    }
2383}
2384impl From<super::context::RequestError> for ConsumerUpdateError {
2385    fn from(err: super::context::RequestError) -> Self {
2386        match err.kind() {
2387            RequestErrorKind::TimedOut => {
2388                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2389            }
2390            _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2391        }
2392    }
2393}
2394impl From<super::context::RequestError> for ConsumerCreateStrictError {
2395    fn from(err: super::context::RequestError) -> Self {
2396        match err.kind() {
2397            RequestErrorKind::TimedOut => {
2398                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2399            }
2400            _ => {
2401                ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2402            }
2403        }
2404    }
2405}
2406
2407#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
2408pub struct StreamGetMessage {
2409    #[serde(rename = "seq", skip_serializing_if = "is_default")]
2410    sequence: Option<u64>,
2411    #[serde(rename = "next_by_subj", skip_serializing_if = "is_default")]
2412    next_by_subject: Option<String>,
2413    #[serde(rename = "last_by_subj", skip_serializing_if = "is_default")]
2414    last_by_subject: Option<String>,
2415}
2416
2417#[cfg(test)]
2418mod tests {
2419    use super::*;
2420
2421    #[test]
2422    fn consumer_limits_de() {
2423        let config = Config {
2424            ..Default::default()
2425        };
2426
2427        let roundtrip: Config = {
2428            let ser = serde_json::to_string(&config).unwrap();
2429            serde_json::from_str(&ser).unwrap()
2430        };
2431        assert_eq!(config, roundtrip);
2432    }
2433}