async_nats/jetstream/
stream.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on a [Stream], create/delete/update [Consumer].
15
16use std::{
17    collections::{self, HashMap},
18    fmt::{self, Debug, Display},
19    future::IntoFuture,
20    io::{self, ErrorKind},
21    pin::Pin,
22    str::FromStr,
23    task::Poll,
24    time::Duration,
25};
26
27use crate::{
28    error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39    consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40    context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind},
41    errors::ErrorCode,
42    message::{StreamMessage, StreamMessageError},
43    response::Response,
44    Context, Message,
45};
46
47pub type InfoError = RequestError;
48
49#[derive(Clone, Debug, PartialEq)]
50pub enum DirectGetErrorKind {
51    NotFound,
52    InvalidSubject,
53    TimedOut,
54    Request,
55    ErrorResponse(StatusCode, String),
56    Other,
57}
58
59impl Display for DirectGetErrorKind {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        match self {
62            Self::InvalidSubject => write!(f, "invalid subject"),
63            Self::NotFound => write!(f, "message not found"),
64            Self::ErrorResponse(status, description) => {
65                write!(f, "unable to get message: {} {}", status, description)
66            }
67            Self::Other => write!(f, "error getting message"),
68            Self::TimedOut => write!(f, "timed out"),
69            Self::Request => write!(f, "request failed"),
70        }
71    }
72}
73
74pub type DirectGetError = Error<DirectGetErrorKind>;
75
76impl From<crate::RequestError> for DirectGetError {
77    fn from(err: crate::RequestError) -> Self {
78        match err.kind() {
79            crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
80            crate::RequestErrorKind::NoResponders => {
81                DirectGetError::new(DirectGetErrorKind::ErrorResponse(
82                    StatusCode::NO_RESPONDERS,
83                    "no responders".to_string(),
84                ))
85            }
86            crate::RequestErrorKind::Other => {
87                DirectGetError::with_source(DirectGetErrorKind::Other, err)
88            }
89        }
90    }
91}
92
93impl From<serde_json::Error> for DirectGetError {
94    fn from(err: serde_json::Error) -> Self {
95        DirectGetError::with_source(DirectGetErrorKind::Other, err)
96    }
97}
98
99impl From<StreamMessageError> for DirectGetError {
100    fn from(err: StreamMessageError) -> Self {
101        DirectGetError::with_source(DirectGetErrorKind::Other, err)
102    }
103}
104
105#[derive(Clone, Debug, PartialEq)]
106pub enum DeleteMessageErrorKind {
107    Request,
108    TimedOut,
109    JetStream(super::errors::Error),
110}
111
112impl Display for DeleteMessageErrorKind {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        match self {
115            Self::Request => write!(f, "request failed"),
116            Self::TimedOut => write!(f, "timed out"),
117            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
118        }
119    }
120}
121
122pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
123
124/// Handle to operations that can be performed on a `Stream`.
125/// It's generic over the type of `info` field to allow `Stream` with or without
126/// info contents.
127#[derive(Debug, Clone)]
128pub struct Stream<T = Info> {
129    pub(crate) info: T,
130    pub(crate) context: Context,
131    pub(crate) name: String,
132}
133
134impl Stream<Info> {
135    /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
136    /// [Stream] and returns it.
137    ///
138    /// # Examples
139    ///
140    /// ```no_run
141    /// # #[tokio::main]
142    /// # async fn main() -> Result<(), async_nats::Error> {
143    /// let client = async_nats::connect("localhost:4222").await?;
144    /// let jetstream = async_nats::jetstream::new(client);
145    ///
146    /// let mut stream = jetstream.get_stream("events").await?;
147    ///
148    /// let info = stream.info().await?;
149    /// # Ok(())
150    /// # }
151    /// ```
152    pub async fn info(&mut self) -> Result<&Info, InfoError> {
153        let subject = format!("STREAM.INFO.{}", self.info.config.name);
154
155        match self.context.request(subject, &json!({})).await? {
156            Response::Ok::<Info>(info) => {
157                self.info = info;
158                Ok(&self.info)
159            }
160            Response::Err { error } => Err(error.into()),
161        }
162    }
163
164    /// Returns cached [Info] for the [Stream].
165    /// Cache is either from initial creation/retrieval of the [Stream] or last call to
166    /// [Stream::info].
167    ///
168    /// # Examples
169    ///
170    /// ```no_run
171    /// # #[tokio::main]
172    /// # async fn main() -> Result<(), async_nats::Error> {
173    /// let client = async_nats::connect("localhost:4222").await?;
174    /// let jetstream = async_nats::jetstream::new(client);
175    ///
176    /// let stream = jetstream.get_stream("events").await?;
177    ///
178    /// let info = stream.cached_info();
179    /// # Ok(())
180    /// # }
181    /// ```
182    pub fn cached_info(&self) -> &Info {
183        &self.info
184    }
185}
186
187impl<I> Stream<I> {
188    /// Retrieves `info` about [Stream] from the server. Does not update the cache.
189    /// Can be used on Stream retrieved by [Context::get_stream_no_info]
190    pub async fn get_info(&self) -> Result<Info, InfoError> {
191        let subject = format!("STREAM.INFO.{}", self.name);
192
193        match self.context.request(subject, &json!({})).await? {
194            Response::Ok::<Info>(info) => Ok(info),
195            Response::Err { error } => Err(error.into()),
196        }
197    }
198
199    /// Retrieves [[Info]] from the server and returns a [[futures::Stream]] that allows
200    /// iterating over all subjects in the stream fetched via paged API.
201    ///
202    /// # Examples
203    ///
204    /// ```no_run
205    /// # #[tokio::main]
206    /// # async fn main() -> Result<(), async_nats::Error> {
207    /// use futures::TryStreamExt;
208    /// let client = async_nats::connect("localhost:4222").await?;
209    /// let jetstream = async_nats::jetstream::new(client);
210    ///
211    /// let mut stream = jetstream.get_stream("events").await?;
212    ///
213    /// let mut info = stream.info_with_subjects("events.>").await?;
214    ///
215    /// while let Some((subject, count)) = info.try_next().await? {
216    ///     println!("Subject: {} count: {}", subject, count);
217    /// }
218    /// # Ok(())
219    /// # }
220    /// ```
221    pub async fn info_with_subjects<F: AsRef<str>>(
222        &self,
223        subjects_filter: F,
224    ) -> Result<InfoWithSubjects, InfoError> {
225        let subjects_filter = subjects_filter.as_ref().to_string();
226        // TODO: validate the subject and decide if this should be a `Subject`
227        let info = stream_info_with_details(
228            self.context.clone(),
229            self.name.clone(),
230            0,
231            false,
232            subjects_filter.clone(),
233        )
234        .await?;
235
236        Ok(InfoWithSubjects::new(
237            self.context.clone(),
238            info,
239            subjects_filter,
240        ))
241    }
242
243    /// Creates a builder that allows to customize `Stream::Info`.
244    ///
245    /// # Examples
246    /// ```no_run
247    /// # #[tokio::main]
248    /// # async fn main() -> Result<(), async_nats::Error> {
249    /// use futures::TryStreamExt;
250    /// let client = async_nats::connect("localhost:4222").await?;
251    /// let jetstream = async_nats::jetstream::new(client);
252    ///
253    /// let mut stream = jetstream.get_stream("events").await?;
254    ///
255    /// let mut info = stream
256    ///     .info_builder()
257    ///     .with_deleted(true)
258    ///     .subjects("events.>")
259    ///     .fetch()
260    ///     .await?;
261    ///
262    /// while let Some((subject, count)) = info.try_next().await? {
263    ///     println!("Subject: {} count: {}", subject, count);
264    /// }
265    /// # Ok(())
266    /// # }
267    /// ```
268    pub fn info_builder(&self) -> StreamInfoBuilder {
269        StreamInfoBuilder::new(self.context.clone(), self.name.clone())
270    }
271
272    /// Gets next message for a [Stream].
273    ///
274    /// Requires a [Stream] with `allow_direct` set to `true`.
275    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
276    /// from any replica member. This means read after write is possible,
277    /// as that given replica might not yet catch up with the leader.
278    ///
279    /// # Examples
280    ///
281    /// ```no_run
282    /// # #[tokio::main]
283    /// # async fn main() -> Result<(), async_nats::Error> {
284    /// let client = async_nats::connect("demo.nats.io").await?;
285    /// let jetstream = async_nats::jetstream::new(client);
286    ///
287    /// let stream = jetstream
288    ///     .create_stream(async_nats::jetstream::stream::Config {
289    ///         name: "events".to_string(),
290    ///         subjects: vec!["events.>".to_string()],
291    ///         allow_direct: true,
292    ///         ..Default::default()
293    ///     })
294    ///     .await?;
295    ///
296    /// jetstream.publish("events.data", "data".into()).await?;
297    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
298    ///
299    /// let message = stream
300    ///     .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
301    ///     .await?;
302    ///
303    /// # Ok(())
304    /// # }
305    /// ```
306    pub async fn direct_get_next_for_subject<T: AsRef<str>>(
307        &self,
308        subject: T,
309        sequence: Option<u64>,
310    ) -> Result<Message, DirectGetError> {
311        if !is_valid_subject(&subject) {
312            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
313        }
314        let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
315        let payload;
316        if let Some(sequence) = sequence {
317            payload = json!({
318                "seq": sequence,
319                "next_by_subj": subject.as_ref(),
320            });
321        } else {
322            payload = json!({
323                 "next_by_subj": subject.as_ref(),
324            });
325        }
326
327        let response = self
328            .context
329            .client
330            .request(
331                request_subject,
332                serde_json::to_vec(&payload).map(Bytes::from)?,
333            )
334            .await
335            .map(|message| Message {
336                message,
337                context: self.context.clone(),
338            })?;
339
340        if let Some(status) = response.status {
341            if let Some(ref description) = response.description {
342                match status {
343                    StatusCode::NOT_FOUND => {
344                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
345                    }
346                    // 408 is used in Direct Message for bad/empty payload.
347                    StatusCode::TIMEOUT => {
348                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
349                    }
350                    _ => {
351                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
352                            status,
353                            description.to_string(),
354                        )));
355                    }
356                }
357            }
358        }
359        Ok(response)
360    }
361
362    /// Gets first message from [Stream].
363    ///
364    /// Requires a [Stream] with `allow_direct` set to `true`.
365    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
366    /// from any replica member. This means read after write is possible,
367    /// as that given replica might not yet catch up with the leader.
368    ///
369    /// # Examples
370    ///
371    /// ```no_run
372    /// # #[tokio::main]
373    /// # async fn main() -> Result<(), async_nats::Error> {
374    /// let client = async_nats::connect("demo.nats.io").await?;
375    /// let jetstream = async_nats::jetstream::new(client);
376    ///
377    /// let stream = jetstream
378    ///     .create_stream(async_nats::jetstream::stream::Config {
379    ///         name: "events".to_string(),
380    ///         subjects: vec!["events.>".to_string()],
381    ///         allow_direct: true,
382    ///         ..Default::default()
383    ///     })
384    ///     .await?;
385    ///
386    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
387    ///
388    /// let message = stream.direct_get_first_for_subject("events.data").await?;
389    ///
390    /// # Ok(())
391    /// # }
392    /// ```
393    pub async fn direct_get_first_for_subject<T: AsRef<str>>(
394        &self,
395        subject: T,
396    ) -> Result<Message, DirectGetError> {
397        if !is_valid_subject(&subject) {
398            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
399        }
400        let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
401        let payload = json!({
402            "next_by_subj": subject.as_ref(),
403        });
404
405        let response = self
406            .context
407            .client
408            .request(
409                request_subject,
410                serde_json::to_vec(&payload).map(Bytes::from)?,
411            )
412            .await
413            .map(|message| Message {
414                message,
415                context: self.context.clone(),
416            })?;
417        if let Some(status) = response.status {
418            if let Some(ref description) = response.description {
419                match status {
420                    StatusCode::NOT_FOUND => {
421                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
422                    }
423                    // 408 is used in Direct Message for bad/empty payload.
424                    StatusCode::TIMEOUT => {
425                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
426                    }
427                    _ => {
428                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
429                            status,
430                            description.to_string(),
431                        )));
432                    }
433                }
434            }
435        }
436        Ok(response)
437    }
438
439    /// Gets message from [Stream] with given `sequence id`.
440    ///
441    /// Requires a [Stream] with `allow_direct` set to `true`.
442    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
443    /// from any replica member. This means read after write is possible,
444    /// as that given replica might not yet catch up with the leader.
445    ///
446    /// # Examples
447    ///
448    /// ```no_run
449    /// # #[tokio::main]
450    /// # async fn main() -> Result<(), async_nats::Error> {
451    /// let client = async_nats::connect("demo.nats.io").await?;
452    /// let jetstream = async_nats::jetstream::new(client);
453    ///
454    /// let stream = jetstream
455    ///     .create_stream(async_nats::jetstream::stream::Config {
456    ///         name: "events".to_string(),
457    ///         subjects: vec!["events.>".to_string()],
458    ///         allow_direct: true,
459    ///         ..Default::default()
460    ///     })
461    ///     .await?;
462    ///
463    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
464    ///
465    /// let message = stream.direct_get(pub_ack.await?.sequence).await?;
466    ///
467    /// # Ok(())
468    /// # }
469    /// ```
470    pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
471        let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
472        let payload = json!({
473            "seq": sequence,
474        });
475
476        let response = self
477            .context
478            .client
479            .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
480            .await?;
481
482        if let Some(status) = response.status {
483            if let Some(ref description) = response.description {
484                match status {
485                    StatusCode::NOT_FOUND => {
486                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
487                    }
488                    // 408 is used in Direct Message for bad/empty payload.
489                    StatusCode::TIMEOUT => {
490                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
491                    }
492                    _ => {
493                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
494                            status,
495                            description.to_string(),
496                        )));
497                    }
498                }
499            }
500        }
501        StreamMessage::try_from(response).map_err(Into::into)
502    }
503
504    /// Gets last message for a given `subject`.
505    ///
506    /// Requires a [Stream] with `allow_direct` set to `true`.
507    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
508    /// from any replica member. This means read after write is possible,
509    /// as that given replica might not yet catch up with the leader.
510    ///
511    /// # Examples
512    ///
513    /// ```no_run
514    /// # #[tokio::main]
515    /// # async fn main() -> Result<(), async_nats::Error> {
516    /// let client = async_nats::connect("demo.nats.io").await?;
517    /// let jetstream = async_nats::jetstream::new(client);
518    ///
519    /// let stream = jetstream
520    ///     .create_stream(async_nats::jetstream::stream::Config {
521    ///         name: "events".to_string(),
522    ///         subjects: vec!["events.>".to_string()],
523    ///         allow_direct: true,
524    ///         ..Default::default()
525    ///     })
526    ///     .await?;
527    ///
528    /// jetstream.publish("events.data", "data".into()).await?;
529    ///
530    /// let message = stream.direct_get_last_for_subject("events.data").await?;
531    ///
532    /// # Ok(())
533    /// # }
534    /// ```
535    pub async fn direct_get_last_for_subject<T: AsRef<str>>(
536        &self,
537        subject: T,
538    ) -> Result<StreamMessage, DirectGetError> {
539        let subject = format!(
540            "{}.DIRECT.GET.{}.{}",
541            &self.context.prefix,
542            &self.name,
543            subject.as_ref()
544        );
545
546        let response = self.context.client.request(subject, "".into()).await?;
547        if let Some(status) = response.status {
548            if let Some(ref description) = response.description {
549                match status {
550                    StatusCode::NOT_FOUND => {
551                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
552                    }
553                    // 408 is used in Direct Message for bad/empty payload.
554                    StatusCode::TIMEOUT => {
555                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
556                    }
557                    _ => {
558                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
559                            status,
560                            description.to_string(),
561                        )));
562                    }
563                }
564            }
565        }
566        StreamMessage::try_from(response).map_err(Into::into)
567    }
568    /// Get a raw message from the stream for a given stream sequence.
569    /// This low-level API always reaches stream leader.
570    /// This should be discouraged in favor of using [Stream::direct_get].
571    ///
572    /// # Examples
573    ///
574    /// ```no_run
575    /// #[tokio::main]
576    /// # async fn main() -> Result<(), async_nats::Error> {
577    /// use futures::StreamExt;
578    /// use futures::TryStreamExt;
579    ///
580    /// let client = async_nats::connect("localhost:4222").await?;
581    /// let context = async_nats::jetstream::new(client);
582    ///
583    /// let stream = context
584    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
585    ///         name: "events".to_string(),
586    ///         max_messages: 10_000,
587    ///         ..Default::default()
588    ///     })
589    ///     .await?;
590    ///
591    /// let publish_ack = context.publish("events", "data".into()).await?;
592    /// let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
593    /// println!("Retrieved raw message {:?}", raw_message);
594    /// # Ok(())
595    /// # }
596    /// ```
597    pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
598        self.raw_message(StreamGetMessage {
599            sequence: Some(sequence),
600            last_by_subject: None,
601            next_by_subject: None,
602        })
603        .await
604    }
605
606    /// Get a fist message from the stream for a given subject starting from provided sequence.
607    /// This low-level API always reaches stream leader.
608    /// This should be discouraged in favor of using [Stream::direct_get_first_for_subject].
609    ///
610    /// # Examples
611    ///
612    /// ```no_run
613    /// #[tokio::main]
614    /// # async fn main() -> Result<(), async_nats::Error> {
615    /// use futures::StreamExt;
616    /// use futures::TryStreamExt;
617    ///
618    /// let client = async_nats::connect("localhost:4222").await?;
619    /// let context = async_nats::jetstream::new(client);
620    /// let stream = context.get_stream_no_info("events").await?;
621    ///
622    /// let raw_message = stream
623    ///     .get_first_raw_message_by_subject("events.created", 10)
624    ///     .await?;
625    /// println!("Retrieved raw message {:?}", raw_message);
626    /// # Ok(())
627    /// # }
628    /// ```
629    pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
630        &self,
631        subject: T,
632        sequence: u64,
633    ) -> Result<StreamMessage, RawMessageError> {
634        self.raw_message(StreamGetMessage {
635            sequence: Some(sequence),
636            last_by_subject: None,
637            next_by_subject: Some(subject.as_ref().to_string()),
638        })
639        .await
640    }
641
642    /// Get a next message from the stream for a given subject.
643    /// This low-level API always reaches stream leader.
644    /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
645    ///
646    /// # Examples
647    ///
648    /// ```no_run
649    /// #[tokio::main]
650    /// # async fn main() -> Result<(), async_nats::Error> {
651    /// use futures::StreamExt;
652    /// use futures::TryStreamExt;
653    ///
654    /// let client = async_nats::connect("localhost:4222").await?;
655    /// let context = async_nats::jetstream::new(client);
656    /// let stream = context.get_stream_no_info("events").await?;
657    ///
658    /// let raw_message = stream
659    ///     .get_next_raw_message_by_subject("events.created")
660    ///     .await?;
661    /// println!("Retrieved raw message {:?}", raw_message);
662    /// # Ok(())
663    /// # }
664    /// ```
665    pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
666        &self,
667        subject: T,
668    ) -> Result<StreamMessage, RawMessageError> {
669        self.raw_message(StreamGetMessage {
670            sequence: None,
671            last_by_subject: None,
672            next_by_subject: Some(subject.as_ref().to_string()),
673        })
674        .await
675    }
676
677    async fn raw_message(
678        &self,
679        request: StreamGetMessage,
680    ) -> Result<StreamMessage, RawMessageError> {
681        for subject in [&request.last_by_subject, &request.next_by_subject]
682            .into_iter()
683            .flatten()
684        {
685            if !is_valid_subject(subject) {
686                return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
687            }
688        }
689        let subject = format!("STREAM.MSG.GET.{}", &self.name);
690
691        let response: Response<GetRawMessage> = self
692            .context
693            .request(subject, &request)
694            .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
695            .await?;
696
697        match response {
698            Response::Err { error } => {
699                if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
700                    Err(LastRawMessageError::new(
701                        LastRawMessageErrorKind::NoMessageFound,
702                    ))
703                } else {
704                    Err(LastRawMessageError::new(
705                        LastRawMessageErrorKind::JetStream(error),
706                    ))
707                }
708            }
709            Response::Ok(value) => StreamMessage::try_from(value.message)
710                .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err)),
711        }
712    }
713
714    /// Get a last message from the stream for a given subject.
715    /// This low-level API always reaches stream leader.
716    /// This should be discouraged in favor of using [Stream::direct_get_last_for_subject].
717    ///
718    /// # Examples
719    ///
720    /// ```no_run
721    /// #[tokio::main]
722    /// # async fn main() -> Result<(), async_nats::Error> {
723    /// use futures::StreamExt;
724    /// use futures::TryStreamExt;
725    ///
726    /// let client = async_nats::connect("localhost:4222").await?;
727    /// let context = async_nats::jetstream::new(client);
728    /// let stream = context.get_stream_no_info("events").await?;
729    ///
730    /// let raw_message = stream
731    ///     .get_last_raw_message_by_subject("events.created")
732    ///     .await?;
733    /// println!("Retrieved raw message {:?}", raw_message);
734    /// # Ok(())
735    /// # }
736    /// ```
737    pub async fn get_last_raw_message_by_subject(
738        &self,
739        stream_subject: &str,
740    ) -> Result<StreamMessage, LastRawMessageError> {
741        self.raw_message(StreamGetMessage {
742            sequence: None,
743            last_by_subject: Some(stream_subject.to_string()),
744            next_by_subject: None,
745        })
746        .await
747    }
748
749    /// Delete a message from the stream.
750    ///
751    /// # Examples
752    ///
753    /// ```no_run
754    /// # #[tokio::main]
755    /// # async fn main() -> Result<(), async_nats::Error> {
756    /// let client = async_nats::connect("localhost:4222").await?;
757    /// let context = async_nats::jetstream::new(client);
758    ///
759    /// let stream = context
760    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
761    ///         name: "events".to_string(),
762    ///         max_messages: 10_000,
763    ///         ..Default::default()
764    ///     })
765    ///     .await?;
766    ///
767    /// let publish_ack = context.publish("events", "data".into()).await?;
768    /// stream.delete_message(publish_ack.await?.sequence).await?;
769    /// # Ok(())
770    /// # }
771    /// ```
772    pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
773        let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
774        let payload = json!({
775            "seq": sequence,
776        });
777
778        let response: Response<DeleteStatus> = self
779            .context
780            .request(subject, &payload)
781            .map_err(|err| match err.kind() {
782                RequestErrorKind::TimedOut => {
783                    DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
784                }
785                _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
786            })
787            .await?;
788
789        match response {
790            Response::Err { error } => Err(DeleteMessageError::new(
791                DeleteMessageErrorKind::JetStream(error),
792            )),
793            Response::Ok(value) => Ok(value.success),
794        }
795    }
796
797    /// Purge `Stream` messages.
798    ///
799    /// # Examples
800    ///
801    /// ```no_run
802    /// # #[tokio::main]
803    /// # async fn main() -> Result<(), async_nats::Error> {
804    /// let client = async_nats::connect("demo.nats.io").await?;
805    /// let jetstream = async_nats::jetstream::new(client);
806    ///
807    /// let stream = jetstream.get_stream("events").await?;
808    /// stream.purge().await?;
809    /// # Ok(())
810    /// # }
811    /// ```
812    pub fn purge(&self) -> Purge<No, No> {
813        Purge::build(self)
814    }
815
816    /// Purge `Stream` messages for a matching subject.
817    ///
818    /// # Examples
819    ///
820    /// ```no_run
821    /// # #[tokio::main]
822    /// # #[allow(deprecated)]
823    /// # async fn main() -> Result<(), async_nats::Error> {
824    /// let client = async_nats::connect("demo.nats.io").await?;
825    /// let jetstream = async_nats::jetstream::new(client);
826    ///
827    /// let stream = jetstream.get_stream("events").await?;
828    /// stream.purge_subject("data").await?;
829    /// # Ok(())
830    /// # }
831    /// ```
832    #[deprecated(
833        since = "0.25.0",
834        note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
835    )]
836    pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
837    where
838        T: Into<String>,
839    {
840        self.purge().filter(subject).await
841    }
842
843    /// Create or update `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
844    /// returns the info from the server about created [Consumer]
845    /// If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
846    ///
847    /// # Examples
848    ///
849    /// ```no_run
850    /// # #[tokio::main]
851    /// # async fn main() -> Result<(), async_nats::Error> {
852    /// use async_nats::jetstream::consumer;
853    /// let client = async_nats::connect("localhost:4222").await?;
854    /// let jetstream = async_nats::jetstream::new(client);
855    ///
856    /// let stream = jetstream.get_stream("events").await?;
857    /// let info = stream
858    ///     .create_consumer(consumer::pull::Config {
859    ///         durable_name: Some("pull".to_string()),
860    ///         ..Default::default()
861    ///     })
862    ///     .await?;
863    /// # Ok(())
864    /// # }
865    /// ```
866    pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
867        &self,
868        config: C,
869    ) -> Result<Consumer<C>, ConsumerError> {
870        self.context
871            .create_consumer_on_stream(config, self.name.clone())
872            .await
873    }
874
875    /// Update an existing consumer.
876    /// This call will fail if the consumer does not exist.
877    /// returns the info from the server about updated [Consumer].
878    ///
879    /// # Examples
880    ///
881    /// ```no_run
882    /// # #[tokio::main]
883    /// # async fn main() -> Result<(), async_nats::Error> {
884    /// use async_nats::jetstream::consumer;
885    /// let client = async_nats::connect("localhost:4222").await?;
886    /// let jetstream = async_nats::jetstream::new(client);
887    ///
888    /// let stream = jetstream.get_stream("events").await?;
889    /// let info = stream
890    ///     .update_consumer(consumer::pull::Config {
891    ///         durable_name: Some("pull".to_string()),
892    ///         ..Default::default()
893    ///     })
894    ///     .await?;
895    /// # Ok(())
896    /// # }
897    /// ```
898    #[cfg(feature = "server_2_10")]
899    pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
900        &self,
901        config: C,
902    ) -> Result<Consumer<C>, ConsumerUpdateError> {
903        self.context
904            .update_consumer_on_stream(config, self.name.clone())
905            .await
906    }
907
908    /// Create consumer, but only if it does not exist or the existing config is exactly
909    /// the same.
910    /// This method will fail if consumer is already present with different config.
911    /// returns the info from the server about created [Consumer].
912    ///
913    /// # Examples
914    ///
915    /// ```no_run
916    /// # #[tokio::main]
917    /// # async fn main() -> Result<(), async_nats::Error> {
918    /// use async_nats::jetstream::consumer;
919    /// let client = async_nats::connect("localhost:4222").await?;
920    /// let jetstream = async_nats::jetstream::new(client);
921    ///
922    /// let stream = jetstream.get_stream("events").await?;
923    /// let info = stream
924    ///     .create_consumer_strict(consumer::pull::Config {
925    ///         durable_name: Some("pull".to_string()),
926    ///         ..Default::default()
927    ///     })
928    ///     .await?;
929    /// # Ok(())
930    /// # }
931    /// ```
932    #[cfg(feature = "server_2_10")]
933    pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
934        &self,
935        config: C,
936    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
937        self.context
938            .create_consumer_strict_on_stream(config, self.name.clone())
939            .await
940    }
941
942    /// Retrieve [Info] about [Consumer] from the server.
943    ///
944    /// # Examples
945    ///
946    /// ```no_run
947    /// # #[tokio::main]
948    /// # async fn main() -> Result<(), async_nats::Error> {
949    /// use async_nats::jetstream::consumer;
950    /// let client = async_nats::connect("localhost:4222").await?;
951    /// let jetstream = async_nats::jetstream::new(client);
952    ///
953    /// let stream = jetstream.get_stream("events").await?;
954    /// let info = stream.consumer_info("pull").await?;
955    /// # Ok(())
956    /// # }
957    /// ```
958    pub async fn consumer_info<T: AsRef<str>>(
959        &self,
960        name: T,
961    ) -> Result<consumer::Info, crate::Error> {
962        let name = name.as_ref();
963
964        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
965
966        match self.context.request(subject, &json!({})).await? {
967            Response::Ok(info) => Ok(info),
968            Response::Err { error } => Err(Box::new(std::io::Error::new(
969                ErrorKind::Other,
970                format!("nats: error while getting consumer info: {}", error),
971            ))),
972        }
973    }
974
975    /// Get [Consumer] from the the server. [Consumer] iterators can be used to retrieve
976    /// [Messages][crate::jetstream::Message] for a given [Consumer].
977    ///
978    /// # Examples
979    ///
980    /// ```no_run
981    /// # #[tokio::main]
982    /// # async fn main() -> Result<(), async_nats::Error> {
983    /// use async_nats::jetstream::consumer;
984    /// use futures::StreamExt;
985    /// let client = async_nats::connect("localhost:4222").await?;
986    /// let jetstream = async_nats::jetstream::new(client);
987    ///
988    /// let stream = jetstream.get_stream("events").await?;
989    /// let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
990    /// # Ok(())
991    /// # }
992    /// ```
993    pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
994        &self,
995        name: &str,
996    ) -> Result<Consumer<T>, crate::Error> {
997        let info = self.consumer_info(name).await?;
998
999        Ok(Consumer::new(
1000            T::try_from_consumer_config(info.config.clone())?,
1001            info,
1002            self.context.clone(),
1003        ))
1004    }
1005
1006    /// Create a [Consumer] with the given configuration if it is not present on the server. Returns a handle to the [Consumer].
1007    ///
1008    /// Note: This does not validate if the [Consumer] on the server is compatible with the configuration passed in except Push/Pull compatibility.
1009    ///
1010    /// # Examples
1011    ///
1012    /// ```no_run
1013    /// # #[tokio::main]
1014    /// # async fn main() -> Result<(), async_nats::Error> {
1015    /// use async_nats::jetstream::consumer;
1016    /// use futures::StreamExt;
1017    /// let client = async_nats::connect("localhost:4222").await?;
1018    /// let jetstream = async_nats::jetstream::new(client);
1019    ///
1020    /// let stream = jetstream.get_stream("events").await?;
1021    /// let consumer = stream
1022    ///     .get_or_create_consumer(
1023    ///         "pull",
1024    ///         consumer::pull::Config {
1025    ///             durable_name: Some("pull".to_string()),
1026    ///             ..Default::default()
1027    ///         },
1028    ///     )
1029    ///     .await?;
1030    /// # Ok(())
1031    /// # }
1032    /// ```
1033    pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
1034        &self,
1035        name: &str,
1036        config: T,
1037    ) -> Result<Consumer<T>, ConsumerError> {
1038        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
1039
1040        match self.context.request(subject, &json!({})).await? {
1041            Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
1042            Response::Err { error } => Err(error.into()),
1043            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1044                T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1045                    ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1046                })?,
1047                info,
1048                self.context.clone(),
1049            )),
1050        }
1051    }
1052
1053    /// Delete a [Consumer] from the server.
1054    ///
1055    /// # Examples
1056    ///
1057    /// ```no_run
1058    /// # #[tokio::main]
1059    /// # async fn main() -> Result<(), async_nats::Error> {
1060    /// use async_nats::jetstream::consumer;
1061    /// use futures::StreamExt;
1062    /// let client = async_nats::connect("localhost:4222").await?;
1063    /// let jetstream = async_nats::jetstream::new(client);
1064    ///
1065    /// jetstream
1066    ///     .get_stream("events")
1067    ///     .await?
1068    ///     .delete_consumer("pull")
1069    ///     .await?;
1070    /// # Ok(())
1071    /// # }
1072    /// ```
1073    pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
1074        let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
1075
1076        match self.context.request(subject, &json!({})).await? {
1077            Response::Ok(delete_status) => Ok(delete_status),
1078            Response::Err { error } => Err(error.into()),
1079        }
1080    }
1081
1082    /// Pause a [Consumer] until the given time.
1083    /// It will not deliver any messages to clients during that time.
1084    ///
1085    /// # Examples
1086    ///
1087    /// ```no_run
1088    /// # #[tokio::main]
1089    /// # async fn main() -> Result<(), async_nats::Error> {
1090    /// use async_nats::jetstream::consumer;
1091    /// use futures::StreamExt;
1092    /// let client = async_nats::connect("localhost:4222").await?;
1093    /// let jetstream = async_nats::jetstream::new(client);
1094    /// let pause_until =
1095    ///     time::OffsetDateTime::now_utc().saturating_add(time::Duration::seconds_f32(10.0));
1096    ///
1097    /// jetstream
1098    ///     .get_stream("events")
1099    ///     .await?
1100    ///     .pause_consumer("my_consumer", pause_until)
1101    ///     .await?;
1102    /// # Ok(())
1103    /// # }
1104    /// ```
1105    #[cfg(feature = "server_2_11")]
1106    pub async fn pause_consumer(
1107        &self,
1108        name: &str,
1109        pause_until: OffsetDateTime,
1110    ) -> Result<PauseResponse, ConsumerError> {
1111        self.request_pause_consumer(name, Some(pause_until)).await
1112    }
1113
1114    /// Resume a paused [Consumer].
1115    ///
1116    /// # Examples
1117    ///
1118    /// ```no_run
1119    /// # #[tokio::main]
1120    /// # async fn main() -> Result<(), async_nats::Error> {
1121    /// use async_nats::jetstream::consumer;
1122    /// use futures::StreamExt;
1123    /// let client = async_nats::connect("localhost:4222").await?;
1124    /// let jetstream = async_nats::jetstream::new(client);
1125    ///
1126    /// jetstream
1127    ///     .get_stream("events")
1128    ///     .await?
1129    ///     .resume_consumer("my_consumer")
1130    ///     .await?;
1131    /// # Ok(())
1132    /// # }
1133    /// ```
1134    #[cfg(feature = "server_2_11")]
1135    pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
1136        self.request_pause_consumer(name, None).await
1137    }
1138
1139    #[cfg(feature = "server_2_11")]
1140    async fn request_pause_consumer(
1141        &self,
1142        name: &str,
1143        pause_until: Option<OffsetDateTime>,
1144    ) -> Result<PauseResponse, ConsumerError> {
1145        let subject = format!("CONSUMER.PAUSE.{}.{}", self.name, name);
1146        let payload = &PauseResumeConsumerRequest { pause_until };
1147
1148        match self.context.request(subject, payload).await? {
1149            Response::Ok::<PauseResponse>(resp) => Ok(resp),
1150            Response::Err { error } => Err(error.into()),
1151        }
1152    }
1153
1154    /// Lists names of all consumers for current stream.
1155    ///
1156    /// # Examples
1157    ///
1158    /// ```no_run
1159    /// # #[tokio::main]
1160    /// # async fn main() -> Result<(), async_nats::Error> {
1161    /// use futures::TryStreamExt;
1162    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1163    /// let jetstream = async_nats::jetstream::new(client);
1164    /// let stream = jetstream.get_stream("stream").await?;
1165    /// let mut names = stream.consumer_names();
1166    /// while let Some(consumer) = names.try_next().await? {
1167    ///     println!("consumer: {stream:?}");
1168    /// }
1169    /// # Ok(())
1170    /// # }
1171    /// ```
1172    pub fn consumer_names(&self) -> ConsumerNames {
1173        ConsumerNames {
1174            context: self.context.clone(),
1175            stream: self.name.clone(),
1176            offset: 0,
1177            page_request: None,
1178            consumers: Vec::new(),
1179            done: false,
1180        }
1181    }
1182
1183    /// Lists all consumers info for current stream.
1184    ///
1185    /// # Examples
1186    ///
1187    /// ```no_run
1188    /// # #[tokio::main]
1189    /// # async fn main() -> Result<(), async_nats::Error> {
1190    /// use futures::TryStreamExt;
1191    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1192    /// let jetstream = async_nats::jetstream::new(client);
1193    /// let stream = jetstream.get_stream("stream").await?;
1194    /// let mut consumers = stream.consumers();
1195    /// while let Some(consumer) = consumers.try_next().await? {
1196    ///     println!("consumer: {consumer:?}");
1197    /// }
1198    /// # Ok(())
1199    /// # }
1200    /// ```
1201    pub fn consumers(&self) -> Consumers {
1202        Consumers {
1203            context: self.context.clone(),
1204            stream: self.name.clone(),
1205            offset: 0,
1206            page_request: None,
1207            consumers: Vec::new(),
1208            done: false,
1209        }
1210    }
1211}
1212
1213pub struct StreamInfoBuilder {
1214    pub(crate) context: Context,
1215    pub(crate) name: String,
1216    pub(crate) deleted: bool,
1217    pub(crate) subject: String,
1218}
1219
1220impl StreamInfoBuilder {
1221    fn new(context: Context, name: String) -> Self {
1222        Self {
1223            context,
1224            name,
1225            deleted: false,
1226            subject: "".to_string(),
1227        }
1228    }
1229
1230    pub fn with_deleted(mut self, deleted: bool) -> Self {
1231        self.deleted = deleted;
1232        self
1233    }
1234
1235    pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1236        self.subject = subject.into();
1237        self
1238    }
1239
1240    pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1241        let info = stream_info_with_details(
1242            self.context.clone(),
1243            self.name.clone(),
1244            0,
1245            self.deleted,
1246            self.subject.clone(),
1247        )
1248        .await?;
1249
1250        Ok(InfoWithSubjects::new(self.context, info, self.subject))
1251    }
1252}
1253
1254/// `StreamConfig` determines the properties for a stream.
1255/// There are sensible defaults for most. If no subjects are
1256/// given the name will be used as the only subject.
1257#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1258pub struct Config {
1259    /// A name for the Stream. Must not have spaces, tabs or period `.` characters
1260    pub name: String,
1261    /// How large the Stream may become in total bytes before the configured discard policy kicks in
1262    pub max_bytes: i64,
1263    /// How large the Stream may become in total messages before the configured discard policy kicks in
1264    #[serde(rename = "max_msgs")]
1265    pub max_messages: i64,
1266    /// Maximum amount of messages to keep per subject
1267    #[serde(rename = "max_msgs_per_subject")]
1268    pub max_messages_per_subject: i64,
1269    /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in.
1270    /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space
1271    pub discard: DiscardPolicy,
1272    /// Prevents a message from being added to a stream if the max_msgs_per_subject limit for the subject has been reached
1273    #[serde(default, skip_serializing_if = "is_default")]
1274    pub discard_new_per_subject: bool,
1275    /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
1276    /// configured stream `name`.
1277    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1278    pub subjects: Vec<String>,
1279    /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
1280    pub retention: RetentionPolicy,
1281    /// How many Consumers can be defined for a given Stream, -1 for unlimited
1282    pub max_consumers: i32,
1283    /// Maximum age of any message in the stream, expressed in nanoseconds
1284    #[serde(with = "serde_nanos")]
1285    pub max_age: Duration,
1286    /// The largest message that will be accepted by the Stream
1287    #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1288    pub max_message_size: i32,
1289    /// The type of storage backend, `File` (default) and `Memory`
1290    pub storage: StorageType,
1291    /// How many replicas to keep for each message in a clustered JetStream, maximum 5
1292    pub num_replicas: usize,
1293    /// Disables acknowledging messages that are received by the Stream
1294    #[serde(default, skip_serializing_if = "is_default")]
1295    pub no_ack: bool,
1296    /// The window within which to track duplicate messages.
1297    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1298    pub duplicate_window: Duration,
1299    /// The owner of the template associated with this stream.
1300    #[serde(default, skip_serializing_if = "is_default")]
1301    pub template_owner: String,
1302    /// Indicates the stream is sealed and cannot be modified in any way
1303    #[serde(default, skip_serializing_if = "is_default")]
1304    pub sealed: bool,
1305    /// A short description of the purpose of this stream.
1306    #[serde(default, skip_serializing_if = "is_default")]
1307    pub description: Option<String>,
1308    #[serde(
1309        default,
1310        rename = "allow_rollup_hdrs",
1311        skip_serializing_if = "is_default"
1312    )]
1313    /// Indicates if rollups will be allowed or not.
1314    pub allow_rollup: bool,
1315    #[serde(default, skip_serializing_if = "is_default")]
1316    /// Indicates deletes will be denied or not.
1317    pub deny_delete: bool,
1318    /// Indicates if purges will be denied or not.
1319    #[serde(default, skip_serializing_if = "is_default")]
1320    pub deny_purge: bool,
1321
1322    /// Optional republish config.
1323    #[serde(default, skip_serializing_if = "is_default")]
1324    pub republish: Option<Republish>,
1325
1326    /// Enables direct get, which would get messages from
1327    /// non-leader.
1328    #[serde(default, skip_serializing_if = "is_default")]
1329    pub allow_direct: bool,
1330
1331    /// Enable direct access also for mirrors.
1332    #[serde(default, skip_serializing_if = "is_default")]
1333    pub mirror_direct: bool,
1334
1335    /// Stream mirror configuration.
1336    #[serde(default, skip_serializing_if = "Option::is_none")]
1337    pub mirror: Option<Source>,
1338
1339    /// Sources configuration.
1340    #[serde(default, skip_serializing_if = "Option::is_none")]
1341    pub sources: Option<Vec<Source>>,
1342
1343    #[cfg(feature = "server_2_10")]
1344    /// Additional stream metadata.
1345    #[serde(default, skip_serializing_if = "is_default")]
1346    pub metadata: HashMap<String, String>,
1347
1348    #[cfg(feature = "server_2_10")]
1349    /// Allow applying a subject transform to incoming messages
1350    #[serde(default, skip_serializing_if = "Option::is_none")]
1351    pub subject_transform: Option<SubjectTransform>,
1352
1353    #[cfg(feature = "server_2_10")]
1354    /// Override compression config for this stream.
1355    /// Wrapping enum that has `None` type with [Option] is there
1356    /// because [Stream] can override global compression set to [Compression::S2]
1357    /// to [Compression::None], which is different from not overriding global config with anything.
1358    #[serde(default, skip_serializing_if = "Option::is_none")]
1359    pub compression: Option<Compression>,
1360    #[cfg(feature = "server_2_10")]
1361    /// Set limits on consumers that are created on this stream.
1362    #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1363    pub consumer_limits: Option<ConsumerLimits>,
1364
1365    #[cfg(feature = "server_2_10")]
1366    /// Sets the first sequence for the stream.
1367    #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1368    pub first_sequence: Option<u64>,
1369
1370    /// Placement configuration for clusters and tags.
1371    #[serde(default, skip_serializing_if = "Option::is_none")]
1372    pub placement: Option<Placement>,
1373    /// For suspending the consumer until the deadline.
1374    #[cfg(feature = "server_2_11")]
1375    #[serde(
1376        default,
1377        with = "rfc3339::option",
1378        skip_serializing_if = "Option::is_none"
1379    )]
1380    pub pause_until: Option<OffsetDateTime>,
1381
1382    /// Allows setting a TTL for a message in the stream.
1383    #[cfg(feature = "server_2_11")]
1384    #[serde(default, skip_serializing_if = "is_default", rename = "allow_msg_ttl")]
1385    pub allow_message_ttl: bool,
1386
1387    /// Enables delete markers for messages deleted from the stream and sets the TTL
1388    /// for how long the marker should be kept.
1389    #[cfg(feature = "server_2_11")]
1390    #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1391    pub subject_delete_marker_ttl: Option<Duration>,
1392}
1393
1394impl From<&Config> for Config {
1395    fn from(sc: &Config) -> Config {
1396        sc.clone()
1397    }
1398}
1399
1400impl From<&str> for Config {
1401    fn from(s: &str) -> Config {
1402        Config {
1403            name: s.to_string(),
1404            ..Default::default()
1405        }
1406    }
1407}
1408
1409#[cfg(feature = "server_2_10")]
1410fn default_consumer_limits_as_none<'de, D>(
1411    deserializer: D,
1412) -> Result<Option<ConsumerLimits>, D::Error>
1413where
1414    D: Deserializer<'de>,
1415{
1416    let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1417    if let Some(cl) = consumer_limits {
1418        if cl == ConsumerLimits::default() {
1419            Ok(None)
1420        } else {
1421            Ok(Some(cl))
1422        }
1423    } else {
1424        Ok(None)
1425    }
1426}
1427#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1428pub struct ConsumerLimits {
1429    /// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
1430    #[serde(default, with = "serde_nanos")]
1431    pub inactive_threshold: std::time::Duration,
1432    /// Sets the maximum [crate::jetstream::consumer::Config::max_ack_pending] that can be set on the consumer.
1433    #[serde(default)]
1434    pub max_ack_pending: i64,
1435}
1436
1437#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1438pub enum Compression {
1439    #[serde(rename = "s2")]
1440    S2,
1441    #[serde(rename = "none")]
1442    None,
1443}
1444
1445// SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
1446#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1447pub struct SubjectTransform {
1448    #[serde(rename = "src")]
1449    pub source: String,
1450
1451    #[serde(rename = "dest")]
1452    pub destination: String,
1453}
1454
1455// Republish is for republishing messages once committed to a stream.
1456#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1457pub struct Republish {
1458    /// Subject that should be republished.
1459    #[serde(rename = "src")]
1460    pub source: String,
1461    /// Subject where messages will be republished.
1462    #[serde(rename = "dest")]
1463    pub destination: String,
1464    /// If true, only headers should be republished.
1465    #[serde(default)]
1466    pub headers_only: bool,
1467}
1468
1469/// Placement describes on which cluster or tags the stream should be placed.
1470#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1471pub struct Placement {
1472    // Cluster where the stream should be placed.
1473    #[serde(default, skip_serializing_if = "is_default")]
1474    pub cluster: Option<String>,
1475    // Matching tags for stream placement.
1476    #[serde(default, skip_serializing_if = "is_default")]
1477    pub tags: Vec<String>,
1478}
1479
1480/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
1481/// remove older messages. `New` will fail to store the new message.
1482#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1483#[repr(u8)]
1484pub enum DiscardPolicy {
1485    /// will remove older messages when limits are hit.
1486    #[default]
1487    #[serde(rename = "old")]
1488    Old = 0,
1489    /// will error on a StoreMsg call when limits are hit
1490    #[serde(rename = "new")]
1491    New = 1,
1492}
1493
1494/// `RetentionPolicy` determines how messages in a set are retained.
1495#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1496#[repr(u8)]
1497pub enum RetentionPolicy {
1498    /// `Limits` (default) means that messages are retained until any given limit is reached.
1499    /// This could be one of messages, bytes, or age.
1500    #[default]
1501    #[serde(rename = "limits")]
1502    Limits = 0,
1503    /// `Interest` specifies that when all known observables have acknowledged a message it can be removed.
1504    #[serde(rename = "interest")]
1505    Interest = 1,
1506    /// `WorkQueue` specifies that when the first worker or subscriber acknowledges the message it can be removed.
1507    #[serde(rename = "workqueue")]
1508    WorkQueue = 2,
1509}
1510
1511/// determines how messages are stored for retention.
1512#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1513#[repr(u8)]
1514pub enum StorageType {
1515    /// Stream data is kept in files. This is the default.
1516    #[default]
1517    #[serde(rename = "file")]
1518    File = 0,
1519    /// Stream data is kept only in memory.
1520    #[serde(rename = "memory")]
1521    Memory = 1,
1522}
1523
1524async fn stream_info_with_details(
1525    context: Context,
1526    stream: String,
1527    offset: usize,
1528    deleted_details: bool,
1529    subjects_filter: String,
1530) -> Result<Info, InfoError> {
1531    let subject = format!("STREAM.INFO.{}", stream);
1532
1533    let payload = StreamInfoRequest {
1534        offset,
1535        deleted_details,
1536        subjects_filter,
1537    };
1538
1539    let response: Response<Info> = context.request(subject, &payload).await?;
1540
1541    match response {
1542        Response::Ok(info) => Ok(info),
1543        Response::Err { error } => Err(error.into()),
1544    }
1545}
1546
1547type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1548
1549#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1550pub struct StreamInfoRequest {
1551    offset: usize,
1552    deleted_details: bool,
1553    subjects_filter: String,
1554}
1555
1556pub struct InfoWithSubjects {
1557    stream: String,
1558    context: Context,
1559    pub info: Info,
1560    offset: usize,
1561    subjects: collections::hash_map::IntoIter<String, usize>,
1562    info_request: Option<InfoRequest>,
1563    subjects_filter: String,
1564    pages_done: bool,
1565}
1566
1567impl InfoWithSubjects {
1568    pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1569        let subjects = info.state.subjects.take().unwrap_or_default();
1570        let name = info.config.name.clone();
1571        InfoWithSubjects {
1572            context,
1573            info,
1574            pages_done: subjects.is_empty(),
1575            offset: subjects.len(),
1576            subjects: subjects.into_iter(),
1577            subjects_filter: subject,
1578            stream: name,
1579            info_request: None,
1580        }
1581    }
1582}
1583
1584impl futures::Stream for InfoWithSubjects {
1585    type Item = Result<(String, usize), InfoError>;
1586
1587    fn poll_next(
1588        mut self: Pin<&mut Self>,
1589        cx: &mut std::task::Context<'_>,
1590    ) -> Poll<Option<Self::Item>> {
1591        match self.subjects.next() {
1592            Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1593            None => {
1594                // If we have already requested all pages, stop the iterator.
1595                if self.pages_done {
1596                    return Poll::Ready(None);
1597                }
1598                let stream = self.stream.clone();
1599                let context = self.context.clone();
1600                let subjects_filter = self.subjects_filter.clone();
1601                let offset = self.offset;
1602                match self
1603                    .info_request
1604                    .get_or_insert_with(|| {
1605                        Box::pin(stream_info_with_details(
1606                            context,
1607                            stream,
1608                            offset,
1609                            false,
1610                            subjects_filter,
1611                        ))
1612                    })
1613                    .poll_unpin(cx)
1614                {
1615                    Poll::Ready(resp) => match resp {
1616                        Ok(info) => {
1617                            let subjects = info.state.subjects.clone();
1618                            self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1619                            self.info_request = None;
1620                            let subjects = subjects.unwrap_or_default();
1621                            self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1622                            let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1623                            if total <= self.offset || subjects.is_empty() {
1624                                self.pages_done = true;
1625                            }
1626                            match self.subjects.next() {
1627                                Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1628                                None => Poll::Ready(None),
1629                            }
1630                        }
1631                        Err(err) => Poll::Ready(Some(Err(err))),
1632                    },
1633                    Poll::Pending => Poll::Pending,
1634                }
1635            }
1636        }
1637    }
1638}
1639
1640/// Shows config and current state for this stream.
1641#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1642pub struct Info {
1643    /// The configuration associated with this stream.
1644    pub config: Config,
1645    /// The time that this stream was created.
1646    #[serde(with = "rfc3339")]
1647    pub created: time::OffsetDateTime,
1648    /// Various metrics associated with this stream.
1649    pub state: State,
1650    /// Information about leader and replicas.
1651    pub cluster: Option<ClusterInfo>,
1652    /// Information about mirror config if present.
1653    #[serde(default)]
1654    pub mirror: Option<SourceInfo>,
1655    /// Information about sources configs if present.
1656    #[serde(default)]
1657    pub sources: Vec<SourceInfo>,
1658    #[serde(flatten)]
1659    paged_info: Option<PagedInfo>,
1660}
1661
1662#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1663pub struct PagedInfo {
1664    offset: usize,
1665    total: usize,
1666    limit: usize,
1667}
1668
1669#[derive(Deserialize)]
1670pub struct DeleteStatus {
1671    pub success: bool,
1672}
1673
1674#[cfg(feature = "server_2_11")]
1675#[derive(Deserialize)]
1676pub struct PauseResponse {
1677    pub paused: bool,
1678    #[serde(with = "rfc3339")]
1679    pub pause_until: OffsetDateTime,
1680    #[serde(default, with = "serde_nanos")]
1681    pub pause_remaining: Option<Duration>,
1682}
1683
1684#[cfg(feature = "server_2_11")]
1685#[derive(Serialize, Debug)]
1686struct PauseResumeConsumerRequest {
1687    #[serde(with = "rfc3339::option", skip_serializing_if = "Option::is_none")]
1688    pause_until: Option<OffsetDateTime>,
1689}
1690
1691/// information about the given stream.
1692#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1693pub struct State {
1694    /// The number of messages contained in this stream
1695    pub messages: u64,
1696    /// The number of bytes of all messages contained in this stream
1697    pub bytes: u64,
1698    /// The lowest sequence number still present in this stream
1699    #[serde(rename = "first_seq")]
1700    pub first_sequence: u64,
1701    /// The time associated with the oldest message still present in this stream
1702    #[serde(with = "rfc3339", rename = "first_ts")]
1703    pub first_timestamp: time::OffsetDateTime,
1704    /// The last sequence number assigned to a message in this stream
1705    #[serde(rename = "last_seq")]
1706    pub last_sequence: u64,
1707    /// The time that the last message was received by this stream
1708    #[serde(with = "rfc3339", rename = "last_ts")]
1709    pub last_timestamp: time::OffsetDateTime,
1710    /// The number of consumers configured to consume this stream
1711    pub consumer_count: usize,
1712    /// The number of subjects in the stream
1713    #[serde(default, rename = "num_subjects")]
1714    pub subjects_count: u64,
1715    /// The number of deleted messages in the stream
1716    #[serde(default, rename = "num_deleted")]
1717    pub deleted_count: Option<u64>,
1718    /// The list of deleted subjects from the Stream.
1719    /// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
1720    #[serde(default)]
1721    pub deleted: Option<Vec<u64>>,
1722
1723    pub(crate) subjects: Option<HashMap<String, usize>>,
1724}
1725
1726/// A raw stream message in the representation it is stored.
1727#[derive(Debug, Serialize, Deserialize, Clone)]
1728pub struct RawMessage {
1729    /// Subject of the message.
1730    #[serde(rename = "subject")]
1731    pub subject: String,
1732
1733    /// Sequence of the message.
1734    #[serde(rename = "seq")]
1735    pub sequence: u64,
1736
1737    /// Raw payload of the message as a base64 encoded string.
1738    #[serde(default, rename = "data")]
1739    pub payload: String,
1740
1741    /// Raw header string, if any.
1742    #[serde(default, rename = "hdrs")]
1743    pub headers: Option<String>,
1744
1745    /// The time the message was published.
1746    #[serde(rename = "time", with = "rfc3339")]
1747    pub time: time::OffsetDateTime,
1748}
1749
1750impl TryFrom<RawMessage> for StreamMessage {
1751    type Error = crate::Error;
1752
1753    fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1754        let decoded_payload = STANDARD
1755            .decode(value.payload)
1756            .map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?;
1757        let decoded_headers = value
1758            .headers
1759            .map(|header| STANDARD.decode(header))
1760            .map_or(Ok(None), |v| v.map(Some))?;
1761
1762        let (headers, _, _) = decoded_headers
1763            .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1764
1765        Ok(StreamMessage {
1766            subject: value.subject.into(),
1767            payload: decoded_payload.into(),
1768            headers,
1769            sequence: value.sequence,
1770            time: value.time,
1771        })
1772    }
1773}
1774
1775fn is_continuation(c: char) -> bool {
1776    c == ' ' || c == '\t'
1777}
1778const HEADER_LINE: &str = "NATS/1.0";
1779
1780#[allow(clippy::type_complexity)]
1781fn parse_headers(
1782    buf: &[u8],
1783) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1784    let mut headers = HeaderMap::new();
1785    let mut maybe_status: Option<StatusCode> = None;
1786    let mut maybe_description: Option<String> = None;
1787    let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1788        line.lines().peekable()
1789    } else {
1790        return Err(Box::new(std::io::Error::new(
1791            ErrorKind::Other,
1792            "invalid header",
1793        )));
1794    };
1795
1796    if let Some(line) = lines.next() {
1797        let line = line
1798            .strip_prefix(HEADER_LINE)
1799            .ok_or_else(|| {
1800                Box::new(std::io::Error::new(
1801                    ErrorKind::Other,
1802                    "version line does not start with NATS/1.0",
1803                ))
1804            })?
1805            .trim();
1806
1807        match line.split_once(' ') {
1808            Some((status, description)) => {
1809                if !status.is_empty() {
1810                    maybe_status = Some(status.parse()?);
1811                }
1812
1813                if !description.is_empty() {
1814                    maybe_description = Some(description.trim().to_string());
1815                }
1816            }
1817            None => {
1818                if !line.is_empty() {
1819                    maybe_status = Some(line.parse()?);
1820                }
1821            }
1822        }
1823    } else {
1824        return Err(Box::new(std::io::Error::new(
1825            ErrorKind::Other,
1826            "expected header information not found",
1827        )));
1828    };
1829
1830    while let Some(line) = lines.next() {
1831        if line.is_empty() {
1832            continue;
1833        }
1834
1835        if let Some((k, v)) = line.split_once(':').to_owned() {
1836            let mut s = String::from(v.trim());
1837            while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1838                s.push(' ');
1839                s.push_str(v.trim());
1840            }
1841
1842            headers.insert(
1843                HeaderName::from_str(k)?,
1844                HeaderValue::from_str(&s)
1845                    .map_err(|err| Box::new(io::Error::new(ErrorKind::Other, err)))?,
1846            );
1847        } else {
1848            return Err(Box::new(std::io::Error::new(
1849                ErrorKind::Other,
1850                "malformed header line",
1851            )));
1852        }
1853    }
1854
1855    if headers.is_empty() {
1856        Ok((HeaderMap::new(), maybe_status, maybe_description))
1857    } else {
1858        Ok((headers, maybe_status, maybe_description))
1859    }
1860}
1861
1862#[derive(Debug, Serialize, Deserialize, Clone)]
1863struct GetRawMessage {
1864    pub(crate) message: RawMessage,
1865}
1866
1867fn is_default<T: Default + Eq>(t: &T) -> bool {
1868    t == &T::default()
1869}
1870/// Information about the stream's, consumer's associated `JetStream` cluster
1871#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1872pub struct ClusterInfo {
1873    /// The cluster name.
1874    pub name: Option<String>,
1875    /// The server name of the RAFT leader.
1876    pub leader: Option<String>,
1877    /// The members of the RAFT cluster.
1878    #[serde(default)]
1879    pub replicas: Vec<PeerInfo>,
1880}
1881
1882/// The members of the RAFT cluster
1883#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1884pub struct PeerInfo {
1885    /// The server name of the peer.
1886    pub name: String,
1887    /// Indicates if the server is up to date and synchronized.
1888    pub current: bool,
1889    /// Nanoseconds since this peer was last seen.
1890    #[serde(with = "serde_nanos")]
1891    pub active: Duration,
1892    /// Indicates the node is considered offline by the group.
1893    #[serde(default)]
1894    pub offline: bool,
1895    /// How many uncommitted operations this peer is behind the leader.
1896    pub lag: Option<u64>,
1897}
1898
1899#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1900pub struct SourceInfo {
1901    /// Source name.
1902    pub name: String,
1903    /// Number of messages this source is lagging behind.
1904    pub lag: u64,
1905    /// Last time the source was seen active.
1906    #[serde(deserialize_with = "negative_duration_as_none")]
1907    pub active: Option<std::time::Duration>,
1908    /// Filtering for the source.
1909    #[serde(default)]
1910    pub filter_subject: Option<String>,
1911    /// Source destination subject.
1912    #[serde(default)]
1913    pub subject_transform_dest: Option<String>,
1914    /// List of transforms.
1915    #[serde(default)]
1916    pub subject_transforms: Vec<SubjectTransform>,
1917}
1918
1919fn negative_duration_as_none<'de, D>(
1920    deserializer: D,
1921) -> Result<Option<std::time::Duration>, D::Error>
1922where
1923    D: Deserializer<'de>,
1924{
1925    let n = i64::deserialize(deserializer)?;
1926    if n.is_negative() {
1927        Ok(None)
1928    } else {
1929        Ok(Some(std::time::Duration::from_nanos(n as u64)))
1930    }
1931}
1932
1933/// The response generated by trying to purge a stream.
1934#[derive(Debug, Deserialize, Clone, Copy)]
1935pub struct PurgeResponse {
1936    /// Whether the purge request was successful.
1937    pub success: bool,
1938    /// The number of purged messages in a stream.
1939    pub purged: u64,
1940}
1941/// The payload used to generate a purge request.
1942#[derive(Default, Debug, Serialize, Clone)]
1943pub struct PurgeRequest {
1944    /// Purge up to but not including sequence.
1945    #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1946    pub sequence: Option<u64>,
1947
1948    /// Subject to match against messages for the purge command.
1949    #[serde(default, skip_serializing_if = "is_default")]
1950    pub filter: Option<String>,
1951
1952    /// Number of messages to keep.
1953    #[serde(default, skip_serializing_if = "is_default")]
1954    pub keep: Option<u64>,
1955}
1956
1957#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1958pub struct Source {
1959    /// Name of the stream source.
1960    pub name: String,
1961    /// Optional source start sequence.
1962    #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1963    pub start_sequence: Option<u64>,
1964    #[serde(
1965        default,
1966        rename = "opt_start_time",
1967        skip_serializing_if = "is_default",
1968        with = "rfc3339::option"
1969    )]
1970    /// Optional source start time.
1971    pub start_time: Option<OffsetDateTime>,
1972    /// Optional additional filter subject.
1973    #[serde(default, skip_serializing_if = "is_default")]
1974    pub filter_subject: Option<String>,
1975    /// Optional config for sourcing streams from another prefix, used for cross-account.
1976    #[serde(default, skip_serializing_if = "Option::is_none")]
1977    pub external: Option<External>,
1978    /// Optional config to set a domain, if source is residing in different one.
1979    #[serde(default, skip_serializing_if = "is_default")]
1980    pub domain: Option<String>,
1981    /// Subject transforms for Stream.
1982    #[cfg(feature = "server_2_10")]
1983    #[serde(default, skip_serializing_if = "is_default")]
1984    pub subject_transforms: Vec<SubjectTransform>,
1985}
1986
1987#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1988pub struct External {
1989    /// Api prefix of external source.
1990    #[serde(rename = "api")]
1991    pub api_prefix: String,
1992    /// Optional configuration of delivery prefix.
1993    #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1994    pub delivery_prefix: Option<String>,
1995}
1996
1997use std::marker::PhantomData;
1998
1999#[derive(Debug, Default)]
2000pub struct Yes;
2001#[derive(Debug, Default)]
2002pub struct No;
2003
2004pub trait ToAssign: Debug {}
2005
2006impl ToAssign for Yes {}
2007impl ToAssign for No {}
2008
2009#[derive(Debug)]
2010pub struct Purge<SEQUENCE, KEEP>
2011where
2012    SEQUENCE: ToAssign,
2013    KEEP: ToAssign,
2014{
2015    inner: PurgeRequest,
2016    sequence_set: PhantomData<SEQUENCE>,
2017    keep_set: PhantomData<KEEP>,
2018    context: Context,
2019    stream_name: String,
2020}
2021
2022impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
2023where
2024    SEQUENCE: ToAssign,
2025    KEEP: ToAssign,
2026{
2027    /// Adds subject filter to [PurgeRequest]
2028    pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
2029        self.inner.filter = Some(filter.into());
2030        self
2031    }
2032}
2033
2034impl Purge<No, No> {
2035    pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
2036        Purge {
2037            context: stream.context.clone(),
2038            stream_name: stream.name.clone(),
2039            inner: Default::default(),
2040            sequence_set: PhantomData {},
2041            keep_set: PhantomData {},
2042        }
2043    }
2044}
2045
2046impl<KEEP> Purge<No, KEEP>
2047where
2048    KEEP: ToAssign,
2049{
2050    /// Creates a new [PurgeRequest].
2051    /// `keep` and `sequence` are exclusive, enforced compile time by generics.
2052    pub fn keep(self, keep: u64) -> Purge<No, Yes> {
2053        Purge {
2054            context: self.context.clone(),
2055            stream_name: self.stream_name.clone(),
2056            sequence_set: PhantomData {},
2057            keep_set: PhantomData {},
2058            inner: PurgeRequest {
2059                keep: Some(keep),
2060                ..self.inner
2061            },
2062        }
2063    }
2064}
2065impl<SEQUENCE> Purge<SEQUENCE, No>
2066where
2067    SEQUENCE: ToAssign,
2068{
2069    /// Creates a new [PurgeRequest].
2070    /// `keep` and `sequence` are exclusive, enforces compile time by generics.
2071    pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
2072        Purge {
2073            context: self.context.clone(),
2074            stream_name: self.stream_name.clone(),
2075            sequence_set: PhantomData {},
2076            keep_set: PhantomData {},
2077            inner: PurgeRequest {
2078                sequence: Some(sequence),
2079                ..self.inner
2080            },
2081        }
2082    }
2083}
2084
2085#[derive(Clone, Debug, PartialEq)]
2086pub enum PurgeErrorKind {
2087    Request,
2088    TimedOut,
2089    JetStream(super::errors::Error),
2090}
2091
2092impl Display for PurgeErrorKind {
2093    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2094        match self {
2095            Self::Request => write!(f, "request failed"),
2096            Self::TimedOut => write!(f, "timed out"),
2097            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2098        }
2099    }
2100}
2101
2102pub type PurgeError = Error<PurgeErrorKind>;
2103
2104impl<S, K> IntoFuture for Purge<S, K>
2105where
2106    S: ToAssign + std::marker::Send,
2107    K: ToAssign + std::marker::Send,
2108{
2109    type Output = Result<PurgeResponse, PurgeError>;
2110
2111    type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
2112
2113    fn into_future(self) -> Self::IntoFuture {
2114        Box::pin(std::future::IntoFuture::into_future(async move {
2115            let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2116            let response: Response<PurgeResponse> = self
2117                .context
2118                .request(request_subject, &self.inner)
2119                .map_err(|err| match err.kind() {
2120                    RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2121                    _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2122                })
2123                .await?;
2124
2125            match response {
2126                Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2127                Response::Ok(response) => Ok(response),
2128            }
2129        }))
2130    }
2131}
2132
2133#[derive(Deserialize, Debug)]
2134struct ConsumerPage {
2135    total: usize,
2136    consumers: Option<Vec<String>>,
2137}
2138
2139#[derive(Deserialize, Debug)]
2140struct ConsumerInfoPage {
2141    total: usize,
2142    consumers: Option<Vec<super::consumer::Info>>,
2143}
2144
2145type ConsumerNamesErrorKind = StreamsErrorKind;
2146type ConsumerNamesError = StreamsError;
2147type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2148
2149pub struct ConsumerNames {
2150    context: Context,
2151    stream: String,
2152    offset: usize,
2153    page_request: Option<PageRequest>,
2154    consumers: Vec<String>,
2155    done: bool,
2156}
2157
2158impl futures::Stream for ConsumerNames {
2159    type Item = Result<String, ConsumerNamesError>;
2160
2161    fn poll_next(
2162        mut self: Pin<&mut Self>,
2163        cx: &mut std::task::Context<'_>,
2164    ) -> std::task::Poll<Option<Self::Item>> {
2165        match self.page_request.as_mut() {
2166            Some(page) => match page.try_poll_unpin(cx) {
2167                std::task::Poll::Ready(page) => {
2168                    self.page_request = None;
2169                    let page = page.map_err(|err| {
2170                        ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2171                    })?;
2172
2173                    if let Some(consumers) = page.consumers {
2174                        self.offset += consumers.len();
2175                        self.consumers = consumers;
2176                        if self.offset >= page.total {
2177                            self.done = true;
2178                        }
2179                        match self.consumers.pop() {
2180                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2181                            None => Poll::Ready(None),
2182                        }
2183                    } else {
2184                        Poll::Ready(None)
2185                    }
2186                }
2187                std::task::Poll::Pending => std::task::Poll::Pending,
2188            },
2189            None => {
2190                if let Some(stream) = self.consumers.pop() {
2191                    Poll::Ready(Some(Ok(stream)))
2192                } else {
2193                    if self.done {
2194                        return Poll::Ready(None);
2195                    }
2196                    let context = self.context.clone();
2197                    let offset = self.offset;
2198                    let stream = self.stream.clone();
2199                    self.page_request = Some(Box::pin(async move {
2200                        match context
2201                            .request(
2202                                format!("CONSUMER.NAMES.{stream}"),
2203                                &json!({
2204                                    "offset": offset,
2205                                }),
2206                            )
2207                            .await?
2208                        {
2209                            Response::Err { error } => Err(RequestError::with_source(
2210                                super::context::RequestErrorKind::Other,
2211                                error,
2212                            )),
2213                            Response::Ok(page) => Ok(page),
2214                        }
2215                    }));
2216                    self.poll_next(cx)
2217                }
2218            }
2219        }
2220    }
2221}
2222
2223pub type ConsumersErrorKind = StreamsErrorKind;
2224pub type ConsumersError = StreamsError;
2225type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2226
2227pub struct Consumers {
2228    context: Context,
2229    stream: String,
2230    offset: usize,
2231    page_request: Option<PageInfoRequest>,
2232    consumers: Vec<super::consumer::Info>,
2233    done: bool,
2234}
2235
2236impl futures::Stream for Consumers {
2237    type Item = Result<super::consumer::Info, ConsumersError>;
2238
2239    fn poll_next(
2240        mut self: Pin<&mut Self>,
2241        cx: &mut std::task::Context<'_>,
2242    ) -> std::task::Poll<Option<Self::Item>> {
2243        match self.page_request.as_mut() {
2244            Some(page) => match page.try_poll_unpin(cx) {
2245                std::task::Poll::Ready(page) => {
2246                    self.page_request = None;
2247                    let page = page.map_err(|err| {
2248                        ConsumersError::with_source(ConsumersErrorKind::Other, err)
2249                    })?;
2250                    if let Some(consumers) = page.consumers {
2251                        self.offset += consumers.len();
2252                        self.consumers = consumers;
2253                        if self.offset >= page.total {
2254                            self.done = true;
2255                        }
2256                        match self.consumers.pop() {
2257                            Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2258                            None => Poll::Ready(None),
2259                        }
2260                    } else {
2261                        Poll::Ready(None)
2262                    }
2263                }
2264                std::task::Poll::Pending => std::task::Poll::Pending,
2265            },
2266            None => {
2267                if let Some(stream) = self.consumers.pop() {
2268                    Poll::Ready(Some(Ok(stream)))
2269                } else {
2270                    if self.done {
2271                        return Poll::Ready(None);
2272                    }
2273                    let context = self.context.clone();
2274                    let offset = self.offset;
2275                    let stream = self.stream.clone();
2276                    self.page_request = Some(Box::pin(async move {
2277                        match context
2278                            .request(
2279                                format!("CONSUMER.LIST.{stream}"),
2280                                &json!({
2281                                    "offset": offset,
2282                                }),
2283                            )
2284                            .await?
2285                        {
2286                            Response::Err { error } => Err(RequestError::with_source(
2287                                super::context::RequestErrorKind::Other,
2288                                error,
2289                            )),
2290                            Response::Ok(page) => Ok(page),
2291                        }
2292                    }));
2293                    self.poll_next(cx)
2294                }
2295            }
2296        }
2297    }
2298}
2299
2300#[derive(Clone, Debug, PartialEq)]
2301pub enum LastRawMessageErrorKind {
2302    NoMessageFound,
2303    InvalidSubject,
2304    JetStream(super::errors::Error),
2305    Other,
2306}
2307
2308impl Display for LastRawMessageErrorKind {
2309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2310        match self {
2311            Self::NoMessageFound => write!(f, "no message found"),
2312            Self::InvalidSubject => write!(f, "invalid subject"),
2313            Self::Other => write!(f, "failed to get last raw message"),
2314            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2315        }
2316    }
2317}
2318
2319pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2320pub type RawMessageErrorKind = LastRawMessageErrorKind;
2321pub type RawMessageError = LastRawMessageError;
2322
2323#[derive(Clone, Debug, PartialEq)]
2324pub enum ConsumerErrorKind {
2325    //TODO: get last should have timeout, which should be mapped here.
2326    TimedOut,
2327    Request,
2328    InvalidConsumerType,
2329    InvalidName,
2330    JetStream(super::errors::Error),
2331    Other,
2332}
2333
2334impl Display for ConsumerErrorKind {
2335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2336        match self {
2337            Self::TimedOut => write!(f, "timed out"),
2338            Self::Request => write!(f, "request failed"),
2339            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2340            Self::Other => write!(f, "consumer error"),
2341            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2342            Self::InvalidName => write!(f, "invalid consumer name"),
2343        }
2344    }
2345}
2346
2347pub type ConsumerError = Error<ConsumerErrorKind>;
2348
2349#[derive(Clone, Debug, PartialEq)]
2350pub enum ConsumerCreateStrictErrorKind {
2351    //TODO: get last should have timeout, which should be mapped here.
2352    TimedOut,
2353    Request,
2354    InvalidConsumerType,
2355    InvalidName,
2356    AlreadyExists,
2357    JetStream(super::errors::Error),
2358    Other,
2359}
2360
2361impl Display for ConsumerCreateStrictErrorKind {
2362    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2363        match self {
2364            Self::TimedOut => write!(f, "timed out"),
2365            Self::Request => write!(f, "request failed"),
2366            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2367            Self::Other => write!(f, "consumer error"),
2368            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2369            Self::InvalidName => write!(f, "invalid consumer name"),
2370            Self::AlreadyExists => write!(f, "consumer already exists"),
2371        }
2372    }
2373}
2374
2375pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2376
2377#[derive(Clone, Debug, PartialEq)]
2378pub enum ConsumerUpdateErrorKind {
2379    //TODO: get last should have timeout, which should be mapped here.
2380    TimedOut,
2381    Request,
2382    InvalidConsumerType,
2383    InvalidName,
2384    DoesNotExist,
2385    JetStream(super::errors::Error),
2386    Other,
2387}
2388
2389impl Display for ConsumerUpdateErrorKind {
2390    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2391        match self {
2392            Self::TimedOut => write!(f, "timed out"),
2393            Self::Request => write!(f, "request failed"),
2394            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2395            Self::Other => write!(f, "consumer error"),
2396            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2397            Self::InvalidName => write!(f, "invalid consumer name"),
2398            Self::DoesNotExist => write!(f, "consumer does not exist"),
2399        }
2400    }
2401}
2402
2403pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2404
2405impl From<super::errors::Error> for ConsumerError {
2406    fn from(err: super::errors::Error) -> Self {
2407        ConsumerError::new(ConsumerErrorKind::JetStream(err))
2408    }
2409}
2410impl From<super::errors::Error> for ConsumerCreateStrictError {
2411    fn from(err: super::errors::Error) -> Self {
2412        if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2413            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2414        } else {
2415            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2416        }
2417    }
2418}
2419impl From<super::errors::Error> for ConsumerUpdateError {
2420    fn from(err: super::errors::Error) -> Self {
2421        if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2422            ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2423        } else {
2424            ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2425        }
2426    }
2427}
2428impl From<ConsumerError> for ConsumerUpdateError {
2429    fn from(err: ConsumerError) -> Self {
2430        match err.kind() {
2431            ConsumerErrorKind::JetStream(err) => {
2432                if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2433                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2434                } else {
2435                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2436                }
2437            }
2438            ConsumerErrorKind::Request => {
2439                ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2440            }
2441            ConsumerErrorKind::TimedOut => {
2442                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2443            }
2444            ConsumerErrorKind::InvalidConsumerType => {
2445                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2446            }
2447            ConsumerErrorKind::InvalidName => {
2448                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2449            }
2450            ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2451        }
2452    }
2453}
2454
2455impl From<ConsumerError> for ConsumerCreateStrictError {
2456    fn from(err: ConsumerError) -> Self {
2457        match err.kind() {
2458            ConsumerErrorKind::JetStream(err) => {
2459                if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2460                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2461                } else {
2462                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2463                }
2464            }
2465            ConsumerErrorKind::Request => {
2466                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2467            }
2468            ConsumerErrorKind::TimedOut => {
2469                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2470            }
2471            ConsumerErrorKind::InvalidConsumerType => {
2472                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2473            }
2474            ConsumerErrorKind::InvalidName => {
2475                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2476            }
2477            ConsumerErrorKind::Other => {
2478                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2479            }
2480        }
2481    }
2482}
2483
2484impl From<super::context::RequestError> for ConsumerError {
2485    fn from(err: super::context::RequestError) -> Self {
2486        match err.kind() {
2487            RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2488            _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2489        }
2490    }
2491}
2492impl From<super::context::RequestError> for ConsumerUpdateError {
2493    fn from(err: super::context::RequestError) -> Self {
2494        match err.kind() {
2495            RequestErrorKind::TimedOut => {
2496                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2497            }
2498            _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2499        }
2500    }
2501}
2502impl From<super::context::RequestError> for ConsumerCreateStrictError {
2503    fn from(err: super::context::RequestError) -> Self {
2504        match err.kind() {
2505            RequestErrorKind::TimedOut => {
2506                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2507            }
2508            _ => {
2509                ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2510            }
2511        }
2512    }
2513}
2514
2515#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
2516pub struct StreamGetMessage {
2517    #[serde(rename = "seq", skip_serializing_if = "is_default")]
2518    sequence: Option<u64>,
2519    #[serde(rename = "next_by_subj", skip_serializing_if = "is_default")]
2520    next_by_subject: Option<String>,
2521    #[serde(rename = "last_by_subj", skip_serializing_if = "is_default")]
2522    last_by_subject: Option<String>,
2523}
2524
2525#[cfg(test)]
2526mod tests {
2527    use super::*;
2528
2529    #[test]
2530    fn consumer_limits_de() {
2531        let config = Config {
2532            ..Default::default()
2533        };
2534
2535        let roundtrip: Config = {
2536            let ser = serde_json::to_string(&config).unwrap();
2537            serde_json::from_str(&ser).unwrap()
2538        };
2539        assert_eq!(config, roundtrip);
2540    }
2541}