async_nats_wrpc/jetstream/
stream.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on a [Stream], create/delete/update [Consumer].
15
16#[cfg(feature = "server_2_10")]
17use std::collections::HashMap;
18use std::{
19    fmt::{self, Debug, Display},
20    future::IntoFuture,
21    io::{self, ErrorKind},
22    pin::Pin,
23    str::FromStr,
24    task::Poll,
25    time::Duration,
26};
27
28use crate::{
29    error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
30};
31use base64::engine::general_purpose::STANDARD;
32use base64::engine::Engine;
33use bytes::Bytes;
34use futures::{future::BoxFuture, TryFutureExt};
35use serde::{Deserialize, Deserializer, Serialize};
36use serde_json::json;
37use time::{serde::rfc3339, OffsetDateTime};
38
39use super::{
40    consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
41    context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind},
42    errors::ErrorCode,
43    response::Response,
44    Context, Message,
45};
46
47pub type InfoError = RequestError;
48
49#[derive(Clone, Debug, PartialEq)]
50pub enum DirectGetErrorKind {
51    NotFound,
52    InvalidSubject,
53    TimedOut,
54    Request,
55    ErrorResponse(StatusCode, String),
56    Other,
57}
58
59impl Display for DirectGetErrorKind {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        match self {
62            Self::InvalidSubject => write!(f, "invalid subject"),
63            Self::NotFound => write!(f, "message not found"),
64            Self::ErrorResponse(status, description) => {
65                write!(f, "unable to get message: {} {}", status, description)
66            }
67            Self::Other => write!(f, "error getting message"),
68            Self::TimedOut => write!(f, "timed out"),
69            Self::Request => write!(f, "request failed"),
70        }
71    }
72}
73
74pub type DirectGetError = Error<DirectGetErrorKind>;
75
76impl From<crate::RequestError> for DirectGetError {
77    fn from(err: crate::RequestError) -> Self {
78        match err.kind() {
79            crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
80            crate::RequestErrorKind::NoResponders => DirectGetError::new(DirectGetErrorKind::Other),
81            crate::RequestErrorKind::Other => {
82                DirectGetError::with_source(DirectGetErrorKind::Other, err)
83            }
84        }
85    }
86}
87
88impl From<serde_json::Error> for DirectGetError {
89    fn from(err: serde_json::Error) -> Self {
90        DirectGetError::with_source(DirectGetErrorKind::Other, err)
91    }
92}
93
94#[derive(Clone, Debug, PartialEq)]
95pub enum DeleteMessageErrorKind {
96    Request,
97    TimedOut,
98    JetStream(super::errors::Error),
99}
100
101impl Display for DeleteMessageErrorKind {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        match self {
104            Self::Request => write!(f, "request failed"),
105            Self::TimedOut => write!(f, "timed out"),
106            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
107        }
108    }
109}
110
111pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
112
113/// Handle to operations that can be performed on a `Stream`.
114#[derive(Debug, Clone)]
115pub struct Stream {
116    pub(crate) info: Info,
117    pub(crate) context: Context,
118}
119
120impl Stream {
121    /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
122    /// [Stream] and returns it.
123    ///
124    /// # Examples
125    ///
126    /// ```no_run
127    /// # #[tokio::main]
128    /// # async fn main() -> Result<(), async_nats::Error> {
129    /// let client = async_nats::connect("localhost:4222").await?;
130    /// let jetstream = async_nats::jetstream::new(client);
131    ///
132    /// let mut stream = jetstream.get_stream("events").await?;
133    ///
134    /// let info = stream.info().await?;
135    /// # Ok(())
136    /// # }
137    /// ```
138    pub async fn info(&mut self) -> Result<&Info, InfoError> {
139        let subject = format!("STREAM.INFO.{}", self.info.config.name);
140
141        match self.context.request(subject, &json!({})).await? {
142            Response::Ok::<Info>(info) => {
143                self.info = info;
144                Ok(&self.info)
145            }
146            Response::Err { error } => Err(error.into()),
147        }
148    }
149
150    /// Returns cached [Info] for the [Stream].
151    /// Cache is either from initial creation/retrieval of the [Stream] or last call to
152    /// [Stream::info].
153    ///
154    /// # Examples
155    ///
156    /// ```no_run
157    /// # #[tokio::main]
158    /// # async fn main() -> Result<(), async_nats::Error> {
159    /// let client = async_nats::connect("localhost:4222").await?;
160    /// let jetstream = async_nats::jetstream::new(client);
161    ///
162    /// let stream = jetstream.get_stream("events").await?;
163    ///
164    /// let info = stream.cached_info();
165    /// # Ok(())
166    /// # }
167    /// ```
168    pub fn cached_info(&self) -> &Info {
169        &self.info
170    }
171
172    /// Gets next message for a [Stream].
173    ///
174    /// Requires a [Stream] with `allow_direct` set to `true`.
175    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
176    /// from any replica member. This means read after write is possible,
177    /// as that given replica might not yet catch up with the leader.
178    ///
179    /// # Examples
180    ///
181    /// ```no_run
182    /// # #[tokio::main]
183    /// # async fn main() -> Result<(), async_nats::Error> {
184    /// let client = async_nats::connect("demo.nats.io").await?;
185    /// let jetstream = async_nats::jetstream::new(client);
186    ///
187    /// let stream = jetstream
188    ///     .create_stream(async_nats::jetstream::stream::Config {
189    ///         name: "events".to_string(),
190    ///         subjects: vec!["events.>".to_string()],
191    ///         allow_direct: true,
192    ///         ..Default::default()
193    ///     })
194    ///     .await?;
195    ///
196    /// jetstream.publish("events.data", "data".into()).await?;
197    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
198    ///
199    /// let message = stream
200    ///     .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
201    ///     .await?;
202    ///
203    /// # Ok(())
204    /// # }
205    /// ```
206    pub async fn direct_get_next_for_subject<T: AsRef<str>>(
207        &self,
208        subject: T,
209        sequence: Option<u64>,
210    ) -> Result<Message, DirectGetError> {
211        if !is_valid_subject(&subject) {
212            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
213        }
214        let request_subject = format!(
215            "{}.DIRECT.GET.{}",
216            &self.context.prefix, &self.info.config.name
217        );
218        let payload;
219        if let Some(sequence) = sequence {
220            payload = json!({
221                "seq": sequence,
222                "next_by_subj": subject.as_ref(),
223            });
224        } else {
225            payload = json!({
226                 "next_by_subj": subject.as_ref(),
227            });
228        }
229
230        let response = self
231            .context
232            .client
233            .request(
234                request_subject,
235                serde_json::to_vec(&payload).map(Bytes::from)?,
236            )
237            .await
238            .map(|message| Message {
239                message,
240                context: self.context.clone(),
241            })?;
242        if let Some(status) = response.status {
243            if let Some(ref description) = response.description {
244                match status {
245                    StatusCode::NOT_FOUND => {
246                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
247                    }
248                    // 408 is used in Direct Message for bad/empty payload.
249                    StatusCode::TIMEOUT => {
250                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
251                    }
252                    _ => {
253                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
254                            status,
255                            description.to_string(),
256                        )));
257                    }
258                }
259            }
260        }
261        Ok(response)
262    }
263
264    /// Gets first message from [Stream].
265    ///
266    /// Requires a [Stream] with `allow_direct` set to `true`.
267    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
268    /// from any replica member. This means read after write is possible,
269    /// as that given replica might not yet catch up with the leader.
270    ///
271    /// # Examples
272    ///
273    /// ```no_run
274    /// # #[tokio::main]
275    /// # async fn main() -> Result<(), async_nats::Error> {
276    /// let client = async_nats::connect("demo.nats.io").await?;
277    /// let jetstream = async_nats::jetstream::new(client);
278    ///
279    /// let stream = jetstream
280    ///     .create_stream(async_nats::jetstream::stream::Config {
281    ///         name: "events".to_string(),
282    ///         subjects: vec!["events.>".to_string()],
283    ///         allow_direct: true,
284    ///         ..Default::default()
285    ///     })
286    ///     .await?;
287    ///
288    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
289    ///
290    /// let message = stream.direct_get_first_for_subject("events.data").await?;
291    ///
292    /// # Ok(())
293    /// # }
294    /// ```
295    pub async fn direct_get_first_for_subject<T: AsRef<str>>(
296        &self,
297        subject: T,
298    ) -> Result<Message, DirectGetError> {
299        if !is_valid_subject(&subject) {
300            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
301        }
302        let request_subject = format!(
303            "{}.DIRECT.GET.{}",
304            &self.context.prefix, &self.info.config.name
305        );
306        let payload = json!({
307            "next_by_subj": subject.as_ref(),
308        });
309
310        let response = self
311            .context
312            .client
313            .request(
314                request_subject,
315                serde_json::to_vec(&payload).map(Bytes::from)?,
316            )
317            .await
318            .map(|message| Message {
319                message,
320                context: self.context.clone(),
321            })?;
322        if let Some(status) = response.status {
323            if let Some(ref description) = response.description {
324                match status {
325                    StatusCode::NOT_FOUND => {
326                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
327                    }
328                    // 408 is used in Direct Message for bad/empty payload.
329                    StatusCode::TIMEOUT => {
330                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
331                    }
332                    _ => {
333                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
334                            status,
335                            description.to_string(),
336                        )));
337                    }
338                }
339            }
340        }
341        Ok(response)
342    }
343
344    /// Gets message from [Stream] with given `sequence id`.
345    ///
346    /// Requires a [Stream] with `allow_direct` set to `true`.
347    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
348    /// from any replica member. This means read after write is possible,
349    /// as that given replica might not yet catch up with the leader.
350    ///
351    /// # Examples
352    ///
353    /// ```no_run
354    /// # #[tokio::main]
355    /// # async fn main() -> Result<(), async_nats::Error> {
356    /// let client = async_nats::connect("demo.nats.io").await?;
357    /// let jetstream = async_nats::jetstream::new(client);
358    ///
359    /// let stream = jetstream
360    ///     .create_stream(async_nats::jetstream::stream::Config {
361    ///         name: "events".to_string(),
362    ///         subjects: vec!["events.>".to_string()],
363    ///         allow_direct: true,
364    ///         ..Default::default()
365    ///     })
366    ///     .await?;
367    ///
368    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
369    ///
370    /// let message = stream.direct_get(pub_ack.await?.sequence).await?;
371    ///
372    /// # Ok(())
373    /// # }
374    /// ```
375    pub async fn direct_get(&self, sequence: u64) -> Result<Message, DirectGetError> {
376        let subject = format!(
377            "{}.DIRECT.GET.{}",
378            &self.context.prefix, &self.info.config.name
379        );
380        let payload = json!({
381            "seq": sequence,
382        });
383
384        let response = self
385            .context
386            .client
387            .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
388            .await
389            .map(|message| Message {
390                context: self.context.clone(),
391                message,
392            })?;
393
394        if let Some(status) = response.status {
395            if let Some(ref description) = response.description {
396                match status {
397                    StatusCode::NOT_FOUND => {
398                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
399                    }
400                    // 408 is used in Direct Message for bad/empty payload.
401                    StatusCode::TIMEOUT => {
402                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
403                    }
404                    _ => {
405                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
406                            status,
407                            description.to_string(),
408                        )));
409                    }
410                }
411            }
412        }
413        Ok(response)
414    }
415
416    /// Gets last message for a given `subject`.
417    ///
418    /// Requires a [Stream] with `allow_direct` set to `true`.
419    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
420    /// from any replica member. This means read after write is possible,
421    /// as that given replica might not yet catch up with the leader.
422    ///
423    /// # Examples
424    ///
425    /// ```no_run
426    /// # #[tokio::main]
427    /// # async fn main() -> Result<(), async_nats::Error> {
428    /// let client = async_nats::connect("demo.nats.io").await?;
429    /// let jetstream = async_nats::jetstream::new(client);
430    ///
431    /// let stream = jetstream
432    ///     .create_stream(async_nats::jetstream::stream::Config {
433    ///         name: "events".to_string(),
434    ///         subjects: vec!["events.>".to_string()],
435    ///         allow_direct: true,
436    ///         ..Default::default()
437    ///     })
438    ///     .await?;
439    ///
440    /// jetstream.publish("events.data", "data".into()).await?;
441    ///
442    /// let message = stream.direct_get_last_for_subject("events.data").await?;
443    ///
444    /// # Ok(())
445    /// # }
446    /// ```
447    pub async fn direct_get_last_for_subject<T: AsRef<str>>(
448        &self,
449        subject: T,
450    ) -> Result<Message, DirectGetError> {
451        let subject = format!(
452            "{}.DIRECT.GET.{}.{}",
453            &self.context.prefix,
454            &self.info.config.name,
455            subject.as_ref()
456        );
457
458        let response = self
459            .context
460            .client
461            .request(subject, "".into())
462            .await
463            .map(|message| Message {
464                context: self.context.clone(),
465                message,
466            })?;
467        if let Some(status) = response.status {
468            if let Some(ref description) = response.description {
469                match status {
470                    StatusCode::NOT_FOUND => {
471                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
472                    }
473                    // 408 is used in Direct Message for bad/empty payload.
474                    StatusCode::TIMEOUT => {
475                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
476                    }
477                    _ => {
478                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
479                            status,
480                            description.to_string(),
481                        )));
482                    }
483                }
484            }
485        }
486        Ok(response)
487    }
488    /// Get a raw message from the stream.
489    ///
490    /// # Examples
491    ///
492    /// ```no_run
493    /// #[tokio::main]
494    /// # async fn mains() -> Result<(), async_nats::Error> {
495    /// use futures::StreamExt;
496    /// use futures::TryStreamExt;
497    ///
498    /// let client = async_nats::connect("localhost:4222").await?;
499    /// let context = async_nats::jetstream::new(client);
500    ///
501    /// let stream = context
502    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
503    ///         name: "events".to_string(),
504    ///         max_messages: 10_000,
505    ///         ..Default::default()
506    ///     })
507    ///     .await?;
508    ///
509    /// let publish_ack = context.publish("events", "data".into()).await?;
510    /// let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
511    /// println!("Retrieved raw message {:?}", raw_message);
512    /// # Ok(())
513    /// # }
514    /// ```
515    pub async fn get_raw_message(&self, sequence: u64) -> Result<RawMessage, crate::Error> {
516        let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name);
517        let payload = json!({
518            "seq": sequence,
519        });
520
521        let response: Response<GetRawMessage> = self.context.request(subject, &payload).await?;
522        match response {
523            Response::Err { error } => Err(Box::new(std::io::Error::new(
524                ErrorKind::Other,
525                format!("nats: error while getting message: {}", error),
526            ))),
527            Response::Ok(value) => Ok(value.message),
528        }
529    }
530
531    /// Get the last raw message from the stream by subject.
532    ///
533    /// # Examples
534    ///
535    /// ```no_run
536    /// #[tokio::main]
537    /// # async fn mains() -> Result<(), async_nats::Error> {
538    /// use futures::StreamExt;
539    /// use futures::TryStreamExt;
540    ///
541    /// let client = async_nats::connect("localhost:4222").await?;
542    /// let context = async_nats::jetstream::new(client);
543    ///
544    /// let stream = context
545    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
546    ///         name: "events".to_string(),
547    ///         max_messages: 10_000,
548    ///         ..Default::default()
549    ///     })
550    ///     .await?;
551    ///
552    /// let publish_ack = context.publish("events", "data".into()).await?;
553    /// let raw_message = stream.get_last_raw_message_by_subject("events").await?;
554    /// println!("Retrieved raw message {:?}", raw_message);
555    /// # Ok(())
556    /// # }
557    /// ```
558    pub async fn get_last_raw_message_by_subject(
559        &self,
560        stream_subject: &str,
561    ) -> Result<RawMessage, LastRawMessageError> {
562        let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name);
563        let payload = json!({
564            "last_by_subj":  stream_subject,
565        });
566
567        let response: Response<GetRawMessage> = self
568            .context
569            .request(subject, &payload)
570            .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
571            .await?;
572        match response {
573            Response::Err { error } => {
574                if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
575                    Err(LastRawMessageError::new(
576                        LastRawMessageErrorKind::NoMessageFound,
577                    ))
578                } else {
579                    Err(LastRawMessageError::new(
580                        LastRawMessageErrorKind::JetStream(error),
581                    ))
582                }
583            }
584            Response::Ok(value) => Ok(value.message),
585        }
586    }
587
588    /// Delete a message from the stream.
589    ///
590    /// # Examples
591    ///
592    /// ```no_run
593    /// # #[tokio::main]
594    /// # async fn main() -> Result<(), async_nats::Error> {
595    /// let client = async_nats::connect("localhost:4222").await?;
596    /// let context = async_nats::jetstream::new(client);
597    ///
598    /// let stream = context
599    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
600    ///         name: "events".to_string(),
601    ///         max_messages: 10_000,
602    ///         ..Default::default()
603    ///     })
604    ///     .await?;
605    ///
606    /// let publish_ack = context.publish("events", "data".into()).await?;
607    /// stream.delete_message(publish_ack.await?.sequence).await?;
608    /// # Ok(())
609    /// # }
610    /// ```
611    pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
612        let subject = format!("STREAM.MSG.DELETE.{}", &self.info.config.name);
613        let payload = json!({
614            "seq": sequence,
615        });
616
617        let response: Response<DeleteStatus> = self
618            .context
619            .request(subject, &payload)
620            .map_err(|err| match err.kind() {
621                RequestErrorKind::TimedOut => {
622                    DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
623                }
624                _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
625            })
626            .await?;
627
628        match response {
629            Response::Err { error } => Err(DeleteMessageError::new(
630                DeleteMessageErrorKind::JetStream(error),
631            )),
632            Response::Ok(value) => Ok(value.success),
633        }
634    }
635
636    /// Purge `Stream` messages.
637    ///
638    /// # Examples
639    ///
640    /// ```no_run
641    /// # #[tokio::main]
642    /// # async fn main() -> Result<(), async_nats::Error> {
643    /// let client = async_nats::connect("demo.nats.io").await?;
644    /// let jetstream = async_nats::jetstream::new(client);
645    ///
646    /// let stream = jetstream.get_stream("events").await?;
647    /// stream.purge().await?;
648    /// # Ok(())
649    /// # }
650    /// ```
651    pub fn purge(&self) -> Purge<No, No> {
652        Purge::build(self)
653    }
654
655    /// Purge `Stream` messages for a matching subject.
656    ///
657    /// # Examples
658    ///
659    /// ```no_run
660    /// # #[tokio::main]
661    /// # #[allow(deprecated)]
662    /// # async fn main() -> Result<(), async_nats::Error> {
663    /// let client = async_nats::connect("demo.nats.io").await?;
664    /// let jetstream = async_nats::jetstream::new(client);
665    ///
666    /// let stream = jetstream.get_stream("events").await?;
667    /// stream.purge_subject("data").await?;
668    /// # Ok(())
669    /// # }
670    /// ```
671    #[deprecated(
672        since = "0.25.0",
673        note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
674    )]
675    pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
676    where
677        T: Into<String>,
678    {
679        self.purge().filter(subject).await
680    }
681
682    /// Create or update `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
683    /// returns the info from the server about created [Consumer]
684    /// If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
685    ///
686    /// # Examples
687    ///
688    /// ```no_run
689    /// # #[tokio::main]
690    /// # async fn main() -> Result<(), async_nats::Error> {
691    /// use async_nats::jetstream::consumer;
692    /// let client = async_nats::connect("localhost:4222").await?;
693    /// let jetstream = async_nats::jetstream::new(client);
694    ///
695    /// let stream = jetstream.get_stream("events").await?;
696    /// let info = stream
697    ///     .create_consumer(consumer::pull::Config {
698    ///         durable_name: Some("pull".to_string()),
699    ///         ..Default::default()
700    ///     })
701    ///     .await?;
702    /// # Ok(())
703    /// # }
704    /// ```
705    pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
706        &self,
707        config: C,
708    ) -> Result<Consumer<C>, ConsumerError> {
709        self.context
710            .create_consumer_on_stream(config, self.info.config.name.clone())
711            .await
712    }
713
714    /// Update an existing consumer.
715    /// This call will fail if the consumer does not exist.
716    /// returns the info from the server about updated [Consumer].
717    ///
718    /// # Examples
719    ///
720    /// ```no_run
721    /// # #[tokio::main]
722    /// # async fn main() -> Result<(), async_nats::Error> {
723    /// use async_nats::jetstream::consumer;
724    /// let client = async_nats::connect("localhost:4222").await?;
725    /// let jetstream = async_nats::jetstream::new(client);
726    ///
727    /// let stream = jetstream.get_stream("events").await?;
728    /// let info = stream
729    ///     .update_consumer(consumer::pull::Config {
730    ///         durable_name: Some("pull".to_string()),
731    ///         ..Default::default()
732    ///     })
733    ///     .await?;
734    /// # Ok(())
735    /// # }
736    /// ```
737    #[cfg(feature = "server_2_10")]
738    pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
739        &self,
740        config: C,
741    ) -> Result<Consumer<C>, ConsumerUpdateError> {
742        self.context
743            .update_consumer_on_stream(config, self.info.config.name.clone())
744            .await
745    }
746
747    /// Create consumer, but only if it does not exist or the existing config is exactly
748    /// the same.
749    /// This method will fail if consumer is already present with different config.
750    /// returns the info from the server about created [Consumer].
751    ///
752    /// # Examples
753    ///
754    /// ```no_run
755    /// # #[tokio::main]
756    /// # async fn main() -> Result<(), async_nats::Error> {
757    /// use async_nats::jetstream::consumer;
758    /// let client = async_nats::connect("localhost:4222").await?;
759    /// let jetstream = async_nats::jetstream::new(client);
760    ///
761    /// let stream = jetstream.get_stream("events").await?;
762    /// let info = stream
763    ///     .create_consumer_strict(consumer::pull::Config {
764    ///         durable_name: Some("pull".to_string()),
765    ///         ..Default::default()
766    ///     })
767    ///     .await?;
768    /// # Ok(())
769    /// # }
770    /// ```
771    #[cfg(feature = "server_2_10")]
772    pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
773        &self,
774        config: C,
775    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
776        self.context
777            .create_consumer_strict_on_stream(config, self.info.config.name.clone())
778            .await
779    }
780
781    /// Retrieve [Info] about [Consumer] from the server.
782    ///
783    /// # Examples
784    ///
785    /// ```no_run
786    /// # #[tokio::main]
787    /// # async fn main() -> Result<(), async_nats::Error> {
788    /// use async_nats::jetstream::consumer;
789    /// let client = async_nats::connect("localhost:4222").await?;
790    /// let jetstream = async_nats::jetstream::new(client);
791    ///
792    /// let stream = jetstream.get_stream("events").await?;
793    /// let info = stream.consumer_info("pull").await?;
794    /// # Ok(())
795    /// # }
796    /// ```
797    pub async fn consumer_info<T: AsRef<str>>(
798        &self,
799        name: T,
800    ) -> Result<consumer::Info, crate::Error> {
801        let name = name.as_ref();
802
803        let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name);
804
805        match self.context.request(subject, &json!({})).await? {
806            Response::Ok(info) => Ok(info),
807            Response::Err { error } => Err(Box::new(std::io::Error::new(
808                ErrorKind::Other,
809                format!("nats: error while getting consumer info: {}", error),
810            ))),
811        }
812    }
813
814    /// Get [Consumer] from the the server. [Consumer] iterators can be used to retrieve
815    /// [Messages][crate::jetstream::Message] for a given [Consumer].
816    ///
817    /// # Examples
818    ///
819    /// ```no_run
820    /// # #[tokio::main]
821    /// # async fn main() -> Result<(), async_nats::Error> {
822    /// use async_nats::jetstream::consumer;
823    /// use futures::StreamExt;
824    /// let client = async_nats::connect("localhost:4222").await?;
825    /// let jetstream = async_nats::jetstream::new(client);
826    ///
827    /// let stream = jetstream.get_stream("events").await?;
828    /// let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
829    /// # Ok(())
830    /// # }
831    /// ```
832    pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
833        &self,
834        name: &str,
835    ) -> Result<Consumer<T>, crate::Error> {
836        let info = self.consumer_info(name).await?;
837
838        Ok(Consumer::new(
839            T::try_from_consumer_config(info.config.clone())?,
840            info,
841            self.context.clone(),
842        ))
843    }
844
845    /// Create a [Consumer] with the given configuration if it is not present on the server. Returns a handle to the [Consumer].
846    ///
847    /// Note: This does not validate if the [Consumer] on the server is compatible with the configuration passed in except Push/Pull compatibility.
848    ///
849    /// # Examples
850    ///
851    /// ```no_run
852    /// # #[tokio::main]
853    /// # async fn main() -> Result<(), async_nats::Error> {
854    /// use async_nats::jetstream::consumer;
855    /// use futures::StreamExt;
856    /// let client = async_nats::connect("localhost:4222").await?;
857    /// let jetstream = async_nats::jetstream::new(client);
858    ///
859    /// let stream = jetstream.get_stream("events").await?;
860    /// let consumer = stream
861    ///     .get_or_create_consumer(
862    ///         "pull",
863    ///         consumer::pull::Config {
864    ///             durable_name: Some("pull".to_string()),
865    ///             ..Default::default()
866    ///         },
867    ///     )
868    ///     .await?;
869    /// # Ok(())
870    /// # }
871    /// ```
872    pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
873        &self,
874        name: &str,
875        config: T,
876    ) -> Result<Consumer<T>, ConsumerError> {
877        let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name);
878
879        match self.context.request(subject, &json!({})).await? {
880            Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
881            Response::Err { error } => Err(error.into()),
882            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
883                T::try_from_consumer_config(info.config.clone()).map_err(|err| {
884                    ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
885                })?,
886                info,
887                self.context.clone(),
888            )),
889        }
890    }
891
892    /// Delete a [Consumer] from the server.
893    ///
894    /// # Examples
895    ///
896    /// ```no_run
897    /// # #[tokio::main]
898    /// # async fn main() -> Result<(), async_nats::Error> {
899    /// use async_nats::jetstream::consumer;
900    /// use futures::StreamExt;
901    /// let client = async_nats::connect("localhost:4222").await?;
902    /// let jetstream = async_nats::jetstream::new(client);
903    ///
904    /// jetstream
905    ///     .get_stream("events")
906    ///     .await?
907    ///     .delete_consumer("pull")
908    ///     .await?;
909    /// # Ok(())
910    /// # }
911    /// ```
912    pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
913        let subject = format!("CONSUMER.DELETE.{}.{}", self.info.config.name, name);
914
915        match self.context.request(subject, &json!({})).await? {
916            Response::Ok(delete_status) => Ok(delete_status),
917            Response::Err { error } => Err(error.into()),
918        }
919    }
920
921    /// Lists names of all consumers for current stream.
922    ///
923    /// # Examples
924    ///
925    /// ```no_run
926    /// # #[tokio::main]
927    /// # async fn main() -> Result<(), async_nats::Error> {
928    /// use futures::TryStreamExt;
929    /// let client = async_nats::connect("demo.nats.io:4222").await?;
930    /// let jetstream = async_nats::jetstream::new(client);
931    /// let stream = jetstream.get_stream("stream").await?;
932    /// let mut names = stream.consumer_names();
933    /// while let Some(consumer) = names.try_next().await? {
934    ///     println!("consumer: {stream:?}");
935    /// }
936    /// # Ok(())
937    /// # }
938    /// ```
939    pub fn consumer_names(&self) -> ConsumerNames {
940        ConsumerNames {
941            context: self.context.clone(),
942            stream: self.info.config.name.clone(),
943            offset: 0,
944            page_request: None,
945            consumers: Vec::new(),
946            done: false,
947        }
948    }
949
950    /// Lists all consumers info for current stream.
951    ///
952    /// # Examples
953    ///
954    /// ```no_run
955    /// # #[tokio::main]
956    /// # async fn main() -> Result<(), async_nats::Error> {
957    /// use futures::TryStreamExt;
958    /// let client = async_nats::connect("demo.nats.io:4222").await?;
959    /// let jetstream = async_nats::jetstream::new(client);
960    /// let stream = jetstream.get_stream("stream").await?;
961    /// let mut consumers = stream.consumers();
962    /// while let Some(consumer) = consumers.try_next().await? {
963    ///     println!("consumer: {consumer:?}");
964    /// }
965    /// # Ok(())
966    /// # }
967    /// ```
968    pub fn consumers(&self) -> Consumers {
969        Consumers {
970            context: self.context.clone(),
971            stream: self.info.config.name.clone(),
972            offset: 0,
973            page_request: None,
974            consumers: Vec::new(),
975            done: false,
976        }
977    }
978}
979
980/// `StreamConfig` determines the properties for a stream.
981/// There are sensible defaults for most. If no subjects are
982/// given the name will be used as the only subject.
983#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
984pub struct Config {
985    /// A name for the Stream. Must not have spaces, tabs or period `.` characters
986    pub name: String,
987    /// How large the Stream may become in total bytes before the configured discard policy kicks in
988    pub max_bytes: i64,
989    /// How large the Stream may become in total messages before the configured discard policy kicks in
990    #[serde(rename = "max_msgs")]
991    pub max_messages: i64,
992    /// Maximum amount of messages to keep per subject
993    #[serde(rename = "max_msgs_per_subject")]
994    pub max_messages_per_subject: i64,
995    /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in.
996    /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space
997    pub discard: DiscardPolicy,
998    /// Prevents a message from being added to a stream if the max_msgs_per_subject limit for the subject has been reached
999    #[serde(default, skip_serializing_if = "is_default")]
1000    pub discard_new_per_subject: bool,
1001    /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
1002    /// configured stream `name`.
1003    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1004    pub subjects: Vec<String>,
1005    /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
1006    pub retention: RetentionPolicy,
1007    /// How many Consumers can be defined for a given Stream, -1 for unlimited
1008    pub max_consumers: i32,
1009    /// Maximum age of any message in the stream, expressed in nanoseconds
1010    #[serde(with = "serde_nanos")]
1011    pub max_age: Duration,
1012    /// The largest message that will be accepted by the Stream
1013    #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1014    pub max_message_size: i32,
1015    /// The type of storage backend, `File` (default) and `Memory`
1016    pub storage: StorageType,
1017    /// How many replicas to keep for each message in a clustered JetStream, maximum 5
1018    pub num_replicas: usize,
1019    /// Disables acknowledging messages that are received by the Stream
1020    #[serde(default, skip_serializing_if = "is_default")]
1021    pub no_ack: bool,
1022    /// The window within which to track duplicate messages.
1023    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1024    pub duplicate_window: Duration,
1025    /// The owner of the template associated with this stream.
1026    #[serde(default, skip_serializing_if = "is_default")]
1027    pub template_owner: String,
1028    /// Indicates the stream is sealed and cannot be modified in any way
1029    #[serde(default, skip_serializing_if = "is_default")]
1030    pub sealed: bool,
1031    /// A short description of the purpose of this stream.
1032    #[serde(default, skip_serializing_if = "is_default")]
1033    pub description: Option<String>,
1034    #[serde(
1035        default,
1036        rename = "allow_rollup_hdrs",
1037        skip_serializing_if = "is_default"
1038    )]
1039    /// Indicates if rollups will be allowed or not.
1040    pub allow_rollup: bool,
1041    #[serde(default, skip_serializing_if = "is_default")]
1042    /// Indicates deletes will be denied or not.
1043    pub deny_delete: bool,
1044    /// Indicates if purges will be denied or not.
1045    #[serde(default, skip_serializing_if = "is_default")]
1046    pub deny_purge: bool,
1047
1048    /// Optional republish config.
1049    #[serde(default, skip_serializing_if = "is_default")]
1050    pub republish: Option<Republish>,
1051
1052    /// Enables direct get, which would get messages from
1053    /// non-leader.
1054    #[serde(default, skip_serializing_if = "is_default")]
1055    pub allow_direct: bool,
1056
1057    /// Enable direct access also for mirrors.
1058    #[serde(default, skip_serializing_if = "is_default")]
1059    pub mirror_direct: bool,
1060
1061    /// Stream mirror configuration.
1062    #[serde(default, skip_serializing_if = "Option::is_none")]
1063    pub mirror: Option<Source>,
1064
1065    /// Sources configuration.
1066    #[serde(default, skip_serializing_if = "Option::is_none")]
1067    pub sources: Option<Vec<Source>>,
1068
1069    #[cfg(feature = "server_2_10")]
1070    /// Additional stream metadata.
1071    #[serde(default, skip_serializing_if = "is_default")]
1072    pub metadata: HashMap<String, String>,
1073
1074    #[cfg(feature = "server_2_10")]
1075    /// Allow applying a subject transform to incoming messages
1076    #[serde(default, skip_serializing_if = "Option::is_none")]
1077    pub subject_transform: Option<SubjectTransform>,
1078
1079    #[cfg(feature = "server_2_10")]
1080    /// Override compression config for this stream.
1081    /// Wrapping enum that has `None` type with [Option] is there
1082    /// because [Stream] can override global compression set to [Compression::S2]
1083    /// to [Compression::None], which is different from not overriding global config with anything.
1084    #[serde(default, skip_serializing_if = "Option::is_none")]
1085    pub compression: Option<Compression>,
1086    #[cfg(feature = "server_2_10")]
1087    /// Set limits on consumers that are created on this stream.
1088    #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1089    pub consumer_limits: Option<ConsumerLimits>,
1090
1091    #[cfg(feature = "server_2_10")]
1092    /// Sets the first sequence for the stream.
1093    #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1094    pub first_sequence: Option<u64>,
1095
1096    /// Placement configuration for clusters and tags.
1097    #[serde(default, skip_serializing_if = "Option::is_none")]
1098    pub placement: Option<Placement>,
1099}
1100
1101impl From<&Config> for Config {
1102    fn from(sc: &Config) -> Config {
1103        sc.clone()
1104    }
1105}
1106
1107impl From<&str> for Config {
1108    fn from(s: &str) -> Config {
1109        Config {
1110            name: s.to_string(),
1111            ..Default::default()
1112        }
1113    }
1114}
1115
1116#[cfg(feature = "server_2_10")]
1117fn default_consumer_limits_as_none<'de, D>(
1118    deserializer: D,
1119) -> Result<Option<ConsumerLimits>, D::Error>
1120where
1121    D: Deserializer<'de>,
1122{
1123    let consumer_limits = ConsumerLimits::deserialize(deserializer)?;
1124    if consumer_limits == ConsumerLimits::default() {
1125        Ok(None)
1126    } else {
1127        Ok(Some(consumer_limits))
1128    }
1129}
1130
1131#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1132pub struct ConsumerLimits {
1133    /// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
1134    #[serde(default, with = "serde_nanos")]
1135    pub inactive_threshold: std::time::Duration,
1136    /// Sets the maximum [crate::jetstream::consumer::Config::max_ack_pending] that can be set on the consumer.
1137    #[serde(default)]
1138    pub max_ack_pending: i64,
1139}
1140
1141#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1142pub enum Compression {
1143    #[serde(rename = "s2")]
1144    S2,
1145    #[serde(rename = "none")]
1146    None,
1147}
1148
1149// SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
1150#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1151pub struct SubjectTransform {
1152    #[serde(rename = "src")]
1153    pub source: String,
1154
1155    #[serde(rename = "dest")]
1156    pub destination: String,
1157}
1158
1159// Republish is for republishing messages once committed to a stream.
1160#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1161pub struct Republish {
1162    /// Subject that should be republished.
1163    #[serde(rename = "src")]
1164    pub source: String,
1165    /// Subject where messages will be republished.
1166    #[serde(rename = "dest")]
1167    pub destination: String,
1168    /// If true, only headers should be republished.
1169    #[serde(default)]
1170    pub headers_only: bool,
1171}
1172
1173/// Placement describes on which cluster or tags the stream should be placed.
1174#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1175pub struct Placement {
1176    // Cluster where the stream should be placed.
1177    #[serde(default, skip_serializing_if = "is_default")]
1178    pub cluster: Option<String>,
1179    // Matching tags for stream placement.
1180    #[serde(default, skip_serializing_if = "is_default")]
1181    pub tags: Vec<String>,
1182}
1183
1184/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
1185/// remove older messages. `New` will fail to store the new message.
1186#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1187#[repr(u8)]
1188pub enum DiscardPolicy {
1189    /// will remove older messages when limits are hit.
1190    #[default]
1191    #[serde(rename = "old")]
1192    Old = 0,
1193    /// will error on a StoreMsg call when limits are hit
1194    #[serde(rename = "new")]
1195    New = 1,
1196}
1197
1198/// `RetentionPolicy` determines how messages in a set are retained.
1199#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1200#[repr(u8)]
1201pub enum RetentionPolicy {
1202    /// `Limits` (default) means that messages are retained until any given limit is reached.
1203    /// This could be one of messages, bytes, or age.
1204    #[default]
1205    #[serde(rename = "limits")]
1206    Limits = 0,
1207    /// `Interest` specifies that when all known observables have acknowledged a message it can be removed.
1208    #[serde(rename = "interest")]
1209    Interest = 1,
1210    /// `WorkQueue` specifies that when the first worker or subscriber acknowledges the message it can be removed.
1211    #[serde(rename = "workqueue")]
1212    WorkQueue = 2,
1213}
1214
1215/// determines how messages are stored for retention.
1216#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1217#[repr(u8)]
1218pub enum StorageType {
1219    /// Stream data is kept in files. This is the default.
1220    #[default]
1221    #[serde(rename = "file")]
1222    File = 0,
1223    /// Stream data is kept only in memory.
1224    #[serde(rename = "memory")]
1225    Memory = 1,
1226}
1227
1228/// Shows config and current state for this stream.
1229#[derive(Debug, Deserialize, Clone)]
1230pub struct Info {
1231    /// The configuration associated with this stream.
1232    pub config: Config,
1233    /// The time that this stream was created.
1234    #[serde(with = "rfc3339")]
1235    pub created: time::OffsetDateTime,
1236    /// Various metrics associated with this stream.
1237    pub state: State,
1238    /// Information about leader and replicas.
1239    pub cluster: Option<ClusterInfo>,
1240    /// Information about mirror config if present.
1241    #[serde(default)]
1242    pub mirror: Option<SourceInfo>,
1243    /// Information about sources configs if present.
1244    #[serde(default)]
1245    pub sources: Vec<SourceInfo>,
1246}
1247
1248#[derive(Deserialize)]
1249pub struct DeleteStatus {
1250    pub success: bool,
1251}
1252
1253/// information about the given stream.
1254#[derive(Debug, Deserialize, Clone, Copy)]
1255pub struct State {
1256    /// The number of messages contained in this stream
1257    pub messages: u64,
1258    /// The number of bytes of all messages contained in this stream
1259    pub bytes: u64,
1260    /// The lowest sequence number still present in this stream
1261    #[serde(rename = "first_seq")]
1262    pub first_sequence: u64,
1263    /// The time associated with the oldest message still present in this stream
1264    #[serde(with = "rfc3339", rename = "first_ts")]
1265    pub first_timestamp: time::OffsetDateTime,
1266    /// The last sequence number assigned to a message in this stream
1267    #[serde(rename = "last_seq")]
1268    pub last_sequence: u64,
1269    /// The time that the last message was received by this stream
1270    #[serde(with = "rfc3339", rename = "last_ts")]
1271    pub last_timestamp: time::OffsetDateTime,
1272    /// The number of consumers configured to consume this stream
1273    pub consumer_count: usize,
1274}
1275
1276/// A raw stream message in the representation it is stored.
1277#[derive(Debug, Serialize, Deserialize, Clone)]
1278pub struct RawMessage {
1279    /// Subject of the message.
1280    #[serde(rename = "subject")]
1281    pub subject: String,
1282
1283    /// Sequence of the message.
1284    #[serde(rename = "seq")]
1285    pub sequence: u64,
1286
1287    /// Raw payload of the message as a base64 encoded string.
1288    #[serde(default, rename = "data")]
1289    pub payload: String,
1290
1291    /// Raw header string, if any.
1292    #[serde(default, rename = "hdrs")]
1293    pub headers: Option<String>,
1294
1295    /// The time the message was published.
1296    #[serde(rename = "time", with = "rfc3339")]
1297    pub time: time::OffsetDateTime,
1298}
1299
1300impl TryFrom<RawMessage> for crate::Message {
1301    type Error = crate::Error;
1302
1303    fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1304        let decoded_payload = STANDARD
1305            .decode(value.payload)
1306            .map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?;
1307        let decoded_headers = value
1308            .headers
1309            .map(|header| STANDARD.decode(header))
1310            .map_or(Ok(None), |v| v.map(Some))?;
1311
1312        let length = decoded_headers
1313            .as_ref()
1314            .map_or_else(|| 0, |headers| headers.len())
1315            + decoded_payload.len()
1316            + value.subject.len();
1317
1318        let (headers, status, description) =
1319            decoded_headers.map_or_else(|| Ok((None, None, None)), |h| parse_headers(&h))?;
1320
1321        Ok(crate::Message {
1322            subject: value.subject.into(),
1323            reply: None,
1324            payload: decoded_payload.into(),
1325            headers,
1326            status,
1327            description,
1328            length,
1329        })
1330    }
1331}
1332
1333fn is_continuation(c: char) -> bool {
1334    c == ' ' || c == '\t'
1335}
1336const HEADER_LINE: &str = "NATS/1.0";
1337
1338#[allow(clippy::type_complexity)]
1339fn parse_headers(
1340    buf: &[u8],
1341) -> Result<(Option<HeaderMap>, Option<StatusCode>, Option<String>), crate::Error> {
1342    let mut headers = HeaderMap::new();
1343    let mut maybe_status: Option<StatusCode> = None;
1344    let mut maybe_description: Option<String> = None;
1345    let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1346        line.lines().peekable()
1347    } else {
1348        return Err(Box::new(std::io::Error::new(
1349            ErrorKind::Other,
1350            "invalid header",
1351        )));
1352    };
1353
1354    if let Some(line) = lines.next() {
1355        let line = line
1356            .strip_prefix(HEADER_LINE)
1357            .ok_or_else(|| {
1358                Box::new(std::io::Error::new(
1359                    ErrorKind::Other,
1360                    "version line does not start with NATS/1.0",
1361                ))
1362            })?
1363            .trim();
1364
1365        match line.split_once(' ') {
1366            Some((status, description)) => {
1367                if !status.is_empty() {
1368                    maybe_status = Some(status.parse()?);
1369                }
1370
1371                if !description.is_empty() {
1372                    maybe_description = Some(description.trim().to_string());
1373                }
1374            }
1375            None => {
1376                if !line.is_empty() {
1377                    maybe_status = Some(line.parse()?);
1378                }
1379            }
1380        }
1381    } else {
1382        return Err(Box::new(std::io::Error::new(
1383            ErrorKind::Other,
1384            "expected header information not found",
1385        )));
1386    };
1387
1388    while let Some(line) = lines.next() {
1389        if line.is_empty() {
1390            continue;
1391        }
1392
1393        if let Some((k, v)) = line.split_once(':').to_owned() {
1394            let mut s = String::from(v.trim());
1395            while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1396                s.push(' ');
1397                s.push_str(v.trim());
1398            }
1399
1400            headers.insert(
1401                HeaderName::from_str(k)?,
1402                HeaderValue::from_str(&s)
1403                    .map_err(|err| Box::new(io::Error::new(ErrorKind::Other, err)))?,
1404            );
1405        } else {
1406            return Err(Box::new(std::io::Error::new(
1407                ErrorKind::Other,
1408                "malformed header line",
1409            )));
1410        }
1411    }
1412
1413    if headers.is_empty() {
1414        Ok((None, maybe_status, maybe_description))
1415    } else {
1416        Ok((Some(headers), maybe_status, maybe_description))
1417    }
1418}
1419
1420#[derive(Debug, Serialize, Deserialize, Clone)]
1421struct GetRawMessage {
1422    pub(crate) message: RawMessage,
1423}
1424
1425fn is_default<T: Default + Eq>(t: &T) -> bool {
1426    t == &T::default()
1427}
1428/// Information about the stream's, consumer's associated `JetStream` cluster
1429#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1430pub struct ClusterInfo {
1431    /// The cluster name.
1432    pub name: Option<String>,
1433    /// The server name of the RAFT leader.
1434    pub leader: Option<String>,
1435    /// The members of the RAFT cluster.
1436    #[serde(default)]
1437    pub replicas: Vec<PeerInfo>,
1438}
1439
1440/// The members of the RAFT cluster
1441#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1442pub struct PeerInfo {
1443    /// The server name of the peer.
1444    pub name: String,
1445    /// Indicates if the server is up to date and synchronized.
1446    pub current: bool,
1447    /// Nanoseconds since this peer was last seen.
1448    #[serde(with = "serde_nanos")]
1449    pub active: Duration,
1450    /// Indicates the node is considered offline by the group.
1451    #[serde(default)]
1452    pub offline: bool,
1453    /// How many uncommitted operations this peer is behind the leader.
1454    pub lag: Option<u64>,
1455}
1456
1457#[derive(Debug, Clone, Deserialize)]
1458pub struct SourceInfo {
1459    /// Source name.
1460    pub name: String,
1461    /// Number of messages this source is lagging behind.
1462    pub lag: u64,
1463    /// Last time the source was seen active.
1464    #[serde(deserialize_with = "negative_duration_as_none")]
1465    pub active: Option<std::time::Duration>,
1466    /// Filtering for the source.
1467    #[serde(default)]
1468    pub filter_subject: Option<String>,
1469    /// Source destination subject.
1470    #[serde(default)]
1471    pub subject_transform_dest: Option<String>,
1472    /// List of transforms.
1473    #[serde(default)]
1474    pub subject_transforms: Vec<SubjectTransform>,
1475}
1476
1477fn negative_duration_as_none<'de, D>(
1478    deserializer: D,
1479) -> Result<Option<std::time::Duration>, D::Error>
1480where
1481    D: Deserializer<'de>,
1482{
1483    let n = i64::deserialize(deserializer)?;
1484    if n.is_negative() {
1485        Ok(None)
1486    } else {
1487        Ok(Some(std::time::Duration::from_nanos(n as u64)))
1488    }
1489}
1490
1491/// The response generated by trying to purge a stream.
1492#[derive(Debug, Deserialize, Clone, Copy)]
1493pub struct PurgeResponse {
1494    /// Whether the purge request was successful.
1495    pub success: bool,
1496    /// The number of purged messages in a stream.
1497    pub purged: u64,
1498}
1499/// The payload used to generate a purge request.
1500#[derive(Default, Debug, Serialize, Clone)]
1501pub struct PurgeRequest {
1502    /// Purge up to but not including sequence.
1503    #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1504    pub sequence: Option<u64>,
1505
1506    /// Subject to match against messages for the purge command.
1507    #[serde(default, skip_serializing_if = "is_default")]
1508    pub filter: Option<String>,
1509
1510    /// Number of messages to keep.
1511    #[serde(default, skip_serializing_if = "is_default")]
1512    pub keep: Option<u64>,
1513}
1514
1515#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1516pub struct Source {
1517    /// Name of the stream source.
1518    pub name: String,
1519    /// Optional source start sequence.
1520    #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1521    pub start_sequence: Option<u64>,
1522    #[serde(
1523        default,
1524        rename = "opt_start_time",
1525        skip_serializing_if = "is_default",
1526        with = "rfc3339::option"
1527    )]
1528    /// Optional source start time.
1529    pub start_time: Option<OffsetDateTime>,
1530    /// Optional additional filter subject.
1531    #[serde(default, skip_serializing_if = "is_default")]
1532    pub filter_subject: Option<String>,
1533    /// Optional config for sourcing streams from another prefix, used for cross-account.
1534    #[serde(default, skip_serializing_if = "Option::is_none")]
1535    pub external: Option<External>,
1536    /// Optional config to set a domain, if source is residing in different one.
1537    #[serde(default, skip_serializing_if = "is_default")]
1538    pub domain: Option<String>,
1539    /// Subject transforms for Stream.
1540    #[cfg(feature = "server_2_10")]
1541    #[serde(default, skip_serializing_if = "is_default")]
1542    pub subject_transforms: Vec<SubjectTransform>,
1543}
1544
1545#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1546pub struct External {
1547    /// Api prefix of external source.
1548    #[serde(rename = "api")]
1549    pub api_prefix: String,
1550    /// Optional configuration of delivery prefix.
1551    #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1552    pub delivery_prefix: Option<String>,
1553}
1554
1555use std::marker::PhantomData;
1556
1557#[derive(Debug, Default)]
1558pub struct Yes;
1559#[derive(Debug, Default)]
1560pub struct No;
1561
1562pub trait ToAssign: Debug {}
1563
1564impl ToAssign for Yes {}
1565impl ToAssign for No {}
1566
1567#[derive(Debug)]
1568pub struct Purge<'a, SEQUENCE, KEEP>
1569where
1570    SEQUENCE: ToAssign,
1571    KEEP: ToAssign,
1572{
1573    stream: &'a Stream,
1574    inner: PurgeRequest,
1575    sequence_set: PhantomData<SEQUENCE>,
1576    keep_set: PhantomData<KEEP>,
1577}
1578
1579impl<'a, SEQUENCE, KEEP> Purge<'a, SEQUENCE, KEEP>
1580where
1581    SEQUENCE: ToAssign,
1582    KEEP: ToAssign,
1583{
1584    /// Adds subject filter to [PurgeRequest]
1585    pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<'a, SEQUENCE, KEEP> {
1586        self.inner.filter = Some(filter.into());
1587        self
1588    }
1589}
1590
1591impl<'a> Purge<'a, No, No> {
1592    pub(crate) fn build(stream: &'a Stream) -> Purge<'a, No, No> {
1593        Purge {
1594            stream,
1595            inner: Default::default(),
1596            sequence_set: PhantomData {},
1597            keep_set: PhantomData {},
1598        }
1599    }
1600}
1601
1602impl<'a, KEEP> Purge<'a, No, KEEP>
1603where
1604    KEEP: ToAssign,
1605{
1606    /// Creates a new [PurgeRequest].
1607    /// `keep` and `sequence` are exclusive, enforced compile time by generics.
1608    pub fn keep(self, keep: u64) -> Purge<'a, No, Yes> {
1609        Purge {
1610            stream: self.stream,
1611            sequence_set: PhantomData {},
1612            keep_set: PhantomData {},
1613            inner: PurgeRequest {
1614                keep: Some(keep),
1615                ..self.inner
1616            },
1617        }
1618    }
1619}
1620impl<'a, SEQUENCE> Purge<'a, SEQUENCE, No>
1621where
1622    SEQUENCE: ToAssign,
1623{
1624    /// Creates a new [PurgeRequest].
1625    /// `keep` and `sequence` are exclusive, enforces compile time by generics.
1626    pub fn sequence(self, sequence: u64) -> Purge<'a, Yes, No> {
1627        Purge {
1628            stream: self.stream,
1629            sequence_set: PhantomData {},
1630            keep_set: PhantomData {},
1631            inner: PurgeRequest {
1632                sequence: Some(sequence),
1633                ..self.inner
1634            },
1635        }
1636    }
1637}
1638
1639#[derive(Clone, Debug, PartialEq)]
1640pub enum PurgeErrorKind {
1641    Request,
1642    TimedOut,
1643    JetStream(super::errors::Error),
1644}
1645
1646impl Display for PurgeErrorKind {
1647    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1648        match self {
1649            Self::Request => write!(f, "request failed"),
1650            Self::TimedOut => write!(f, "timed out"),
1651            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1652        }
1653    }
1654}
1655
1656pub type PurgeError = Error<PurgeErrorKind>;
1657
1658impl<'a, S, K> IntoFuture for Purge<'a, S, K>
1659where
1660    S: ToAssign + std::marker::Send,
1661    K: ToAssign + std::marker::Send,
1662{
1663    type Output = Result<PurgeResponse, PurgeError>;
1664
1665    type IntoFuture = BoxFuture<'a, Result<PurgeResponse, PurgeError>>;
1666
1667    fn into_future(self) -> Self::IntoFuture {
1668        Box::pin(std::future::IntoFuture::into_future(async move {
1669            let request_subject = format!("STREAM.PURGE.{}", self.stream.info.config.name);
1670            let response: Response<PurgeResponse> = self
1671                .stream
1672                .context
1673                .request(request_subject, &self.inner)
1674                .map_err(|err| match err.kind() {
1675                    RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
1676                    _ => PurgeError::with_source(PurgeErrorKind::Request, err),
1677                })
1678                .await?;
1679
1680            match response {
1681                Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
1682                Response::Ok(response) => Ok(response),
1683            }
1684        }))
1685    }
1686}
1687
1688#[derive(Deserialize, Debug)]
1689struct ConsumerPage {
1690    total: usize,
1691    consumers: Option<Vec<String>>,
1692}
1693
1694#[derive(Deserialize, Debug)]
1695struct ConsumerInfoPage {
1696    total: usize,
1697    consumers: Option<Vec<super::consumer::Info>>,
1698}
1699
1700type ConsumerNamesErrorKind = StreamsErrorKind;
1701type ConsumerNamesError = StreamsError;
1702type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
1703
1704pub struct ConsumerNames {
1705    context: Context,
1706    stream: String,
1707    offset: usize,
1708    page_request: Option<PageRequest>,
1709    consumers: Vec<String>,
1710    done: bool,
1711}
1712
1713impl futures::Stream for ConsumerNames {
1714    type Item = Result<String, ConsumerNamesError>;
1715
1716    fn poll_next(
1717        mut self: Pin<&mut Self>,
1718        cx: &mut std::task::Context<'_>,
1719    ) -> std::task::Poll<Option<Self::Item>> {
1720        match self.page_request.as_mut() {
1721            Some(page) => match page.try_poll_unpin(cx) {
1722                std::task::Poll::Ready(page) => {
1723                    self.page_request = None;
1724                    let page = page.map_err(|err| {
1725                        ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
1726                    })?;
1727
1728                    if let Some(consumers) = page.consumers {
1729                        self.offset += consumers.len();
1730                        self.consumers = consumers;
1731                        if self.offset >= page.total {
1732                            self.done = true;
1733                        }
1734                        match self.consumers.pop() {
1735                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1736                            None => Poll::Ready(None),
1737                        }
1738                    } else {
1739                        Poll::Ready(None)
1740                    }
1741                }
1742                std::task::Poll::Pending => std::task::Poll::Pending,
1743            },
1744            None => {
1745                if let Some(stream) = self.consumers.pop() {
1746                    Poll::Ready(Some(Ok(stream)))
1747                } else {
1748                    if self.done {
1749                        return Poll::Ready(None);
1750                    }
1751                    let context = self.context.clone();
1752                    let offset = self.offset;
1753                    let stream = self.stream.clone();
1754                    self.page_request = Some(Box::pin(async move {
1755                        match context
1756                            .request(
1757                                format!("CONSUMER.NAMES.{stream}"),
1758                                &json!({
1759                                    "offset": offset,
1760                                }),
1761                            )
1762                            .await?
1763                        {
1764                            Response::Err { error } => Err(RequestError::with_source(
1765                                super::context::RequestErrorKind::Other,
1766                                error,
1767                            )),
1768                            Response::Ok(page) => Ok(page),
1769                        }
1770                    }));
1771                    self.poll_next(cx)
1772                }
1773            }
1774        }
1775    }
1776}
1777
1778pub type ConsumersErrorKind = StreamsErrorKind;
1779pub type ConsumersError = StreamsError;
1780type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
1781
1782pub struct Consumers {
1783    context: Context,
1784    stream: String,
1785    offset: usize,
1786    page_request: Option<PageInfoRequest>,
1787    consumers: Vec<super::consumer::Info>,
1788    done: bool,
1789}
1790
1791impl futures::Stream for Consumers {
1792    type Item = Result<super::consumer::Info, ConsumersError>;
1793
1794    fn poll_next(
1795        mut self: Pin<&mut Self>,
1796        cx: &mut std::task::Context<'_>,
1797    ) -> std::task::Poll<Option<Self::Item>> {
1798        match self.page_request.as_mut() {
1799            Some(page) => match page.try_poll_unpin(cx) {
1800                std::task::Poll::Ready(page) => {
1801                    self.page_request = None;
1802                    let page = page.map_err(|err| {
1803                        ConsumersError::with_source(ConsumersErrorKind::Other, err)
1804                    })?;
1805                    if let Some(consumers) = page.consumers {
1806                        self.offset += consumers.len();
1807                        self.consumers = consumers;
1808                        if self.offset >= page.total {
1809                            self.done = true;
1810                        }
1811                        match self.consumers.pop() {
1812                            Some(consumer) => Poll::Ready(Some(Ok(consumer))),
1813                            None => Poll::Ready(None),
1814                        }
1815                    } else {
1816                        Poll::Ready(None)
1817                    }
1818                }
1819                std::task::Poll::Pending => std::task::Poll::Pending,
1820            },
1821            None => {
1822                if let Some(stream) = self.consumers.pop() {
1823                    Poll::Ready(Some(Ok(stream)))
1824                } else {
1825                    if self.done {
1826                        return Poll::Ready(None);
1827                    }
1828                    let context = self.context.clone();
1829                    let offset = self.offset;
1830                    let stream = self.stream.clone();
1831                    self.page_request = Some(Box::pin(async move {
1832                        match context
1833                            .request(
1834                                format!("CONSUMER.LIST.{stream}"),
1835                                &json!({
1836                                    "offset": offset,
1837                                }),
1838                            )
1839                            .await?
1840                        {
1841                            Response::Err { error } => Err(RequestError::with_source(
1842                                super::context::RequestErrorKind::Other,
1843                                error,
1844                            )),
1845                            Response::Ok(page) => Ok(page),
1846                        }
1847                    }));
1848                    self.poll_next(cx)
1849                }
1850            }
1851        }
1852    }
1853}
1854
1855#[derive(Clone, Debug, PartialEq)]
1856pub enum LastRawMessageErrorKind {
1857    NoMessageFound,
1858    JetStream(super::errors::Error),
1859    Other,
1860}
1861
1862impl Display for LastRawMessageErrorKind {
1863    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1864        match self {
1865            Self::NoMessageFound => write!(f, "no message found"),
1866            Self::Other => write!(f, "failed to get last raw message"),
1867            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1868        }
1869    }
1870}
1871
1872pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
1873
1874#[derive(Clone, Debug, PartialEq)]
1875pub enum ConsumerErrorKind {
1876    //TODO: get last should have timeout, which should be mapped here.
1877    TimedOut,
1878    Request,
1879    InvalidConsumerType,
1880    InvalidName,
1881    JetStream(super::errors::Error),
1882    Other,
1883}
1884
1885impl Display for ConsumerErrorKind {
1886    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1887        match self {
1888            Self::TimedOut => write!(f, "timed out"),
1889            Self::Request => write!(f, "request failed"),
1890            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1891            Self::Other => write!(f, "consumer error"),
1892            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
1893            Self::InvalidName => write!(f, "invalid consumer name"),
1894        }
1895    }
1896}
1897
1898pub type ConsumerError = Error<ConsumerErrorKind>;
1899
1900#[derive(Clone, Debug, PartialEq)]
1901pub enum ConsumerCreateStrictErrorKind {
1902    //TODO: get last should have timeout, which should be mapped here.
1903    TimedOut,
1904    Request,
1905    InvalidConsumerType,
1906    InvalidName,
1907    AlreadyExists,
1908    JetStream(super::errors::Error),
1909    Other,
1910}
1911
1912impl Display for ConsumerCreateStrictErrorKind {
1913    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1914        match self {
1915            Self::TimedOut => write!(f, "timed out"),
1916            Self::Request => write!(f, "request failed"),
1917            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1918            Self::Other => write!(f, "consumer error"),
1919            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
1920            Self::InvalidName => write!(f, "invalid consumer name"),
1921            Self::AlreadyExists => write!(f, "consumer already exists"),
1922        }
1923    }
1924}
1925
1926pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
1927
1928#[derive(Clone, Debug, PartialEq)]
1929pub enum ConsumerUpdateErrorKind {
1930    //TODO: get last should have timeout, which should be mapped here.
1931    TimedOut,
1932    Request,
1933    InvalidConsumerType,
1934    InvalidName,
1935    DoesNotExist,
1936    JetStream(super::errors::Error),
1937    Other,
1938}
1939
1940impl Display for ConsumerUpdateErrorKind {
1941    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1942        match self {
1943            Self::TimedOut => write!(f, "timed out"),
1944            Self::Request => write!(f, "request failed"),
1945            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1946            Self::Other => write!(f, "consumer error"),
1947            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
1948            Self::InvalidName => write!(f, "invalid consumer name"),
1949            Self::DoesNotExist => write!(f, "consumer does not exist"),
1950        }
1951    }
1952}
1953
1954pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
1955
1956impl From<super::errors::Error> for ConsumerError {
1957    fn from(err: super::errors::Error) -> Self {
1958        ConsumerError::new(ConsumerErrorKind::JetStream(err))
1959    }
1960}
1961impl From<super::errors::Error> for ConsumerCreateStrictError {
1962    fn from(err: super::errors::Error) -> Self {
1963        if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
1964            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
1965        } else {
1966            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
1967        }
1968    }
1969}
1970impl From<super::errors::Error> for ConsumerUpdateError {
1971    fn from(err: super::errors::Error) -> Self {
1972        if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
1973            ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
1974        } else {
1975            ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
1976        }
1977    }
1978}
1979impl From<ConsumerError> for ConsumerUpdateError {
1980    fn from(err: ConsumerError) -> Self {
1981        match err.kind() {
1982            ConsumerErrorKind::JetStream(err) => {
1983                if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
1984                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
1985                } else {
1986                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
1987                }
1988            }
1989            ConsumerErrorKind::Request => {
1990                ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
1991            }
1992            ConsumerErrorKind::TimedOut => {
1993                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
1994            }
1995            ConsumerErrorKind::InvalidConsumerType => {
1996                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
1997            }
1998            ConsumerErrorKind::InvalidName => {
1999                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2000            }
2001            ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2002        }
2003    }
2004}
2005
2006impl From<ConsumerError> for ConsumerCreateStrictError {
2007    fn from(err: ConsumerError) -> Self {
2008        match err.kind() {
2009            ConsumerErrorKind::JetStream(err) => {
2010                if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2011                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2012                } else {
2013                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2014                }
2015            }
2016            ConsumerErrorKind::Request => {
2017                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2018            }
2019            ConsumerErrorKind::TimedOut => {
2020                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2021            }
2022            ConsumerErrorKind::InvalidConsumerType => {
2023                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2024            }
2025            ConsumerErrorKind::InvalidName => {
2026                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2027            }
2028            ConsumerErrorKind::Other => {
2029                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2030            }
2031        }
2032    }
2033}
2034
2035impl From<super::context::RequestError> for ConsumerError {
2036    fn from(err: super::context::RequestError) -> Self {
2037        match err.kind() {
2038            RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2039            _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2040        }
2041    }
2042}
2043impl From<super::context::RequestError> for ConsumerUpdateError {
2044    fn from(err: super::context::RequestError) -> Self {
2045        match err.kind() {
2046            RequestErrorKind::TimedOut => {
2047                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2048            }
2049            _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2050        }
2051    }
2052}
2053impl From<super::context::RequestError> for ConsumerCreateStrictError {
2054    fn from(err: super::context::RequestError) -> Self {
2055        match err.kind() {
2056            RequestErrorKind::TimedOut => {
2057                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2058            }
2059            _ => {
2060                ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2061            }
2062        }
2063    }
2064}