async_nats_wrpc/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{header, Client, Command, HeaderMap, HeaderValue, Message, StatusCode};
23use bytes::Bytes;
24use futures::future::BoxFuture;
25use futures::{Future, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Display;
31use std::future::IntoFuture;
32use std::pin::Pin;
33use std::str::from_utf8;
34use std::task::Poll;
35use std::time::Duration;
36use tokio::sync::oneshot;
37use tracing::debug;
38
39use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
40use super::errors::ErrorCode;
41use super::is_valid_name;
42use super::kv::{Store, MAX_HISTORY};
43use super::object_store::{is_valid_bucket_name, ObjectStore};
44use super::stream::{
45    self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
46    Stream,
47};
48#[cfg(feature = "server_2_10")]
49use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
50
51/// A context which can perform jetstream scoped requests.
52#[derive(Debug, Clone)]
53pub struct Context {
54    pub(crate) client: Client,
55    pub(crate) prefix: String,
56    pub(crate) timeout: Duration,
57}
58
59impl Context {
60    pub(crate) fn new(client: Client) -> Context {
61        Context {
62            client,
63            prefix: "$JS.API".to_string(),
64            timeout: Duration::from_secs(5),
65        }
66    }
67
68    pub fn set_timeout(&mut self, timeout: Duration) {
69        self.timeout = timeout
70    }
71
72    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
73        Context {
74            client,
75            prefix: prefix.to_string(),
76            timeout: Duration::from_secs(5),
77        }
78    }
79
80    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
81        Context {
82            client,
83            prefix: format!("$JS.{}.API", domain.as_ref()),
84            timeout: Duration::from_secs(5),
85        }
86    }
87
88    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
89    /// acknowledgment from the server that the message has been successfully delivered.
90    ///
91    /// Acknowledgment future that can be polled is returned instead.
92    ///
93    /// If the stream does not exist, `no responders` error will be returned.
94    ///
95    /// # Examples
96    ///
97    /// Publish, and after each publish, await for acknowledgment.
98    ///
99    /// ```no_run
100    /// # #[tokio::main]
101    /// # async fn main() -> Result<(), async_nats::Error> {
102    /// let client = async_nats::connect("localhost:4222").await?;
103    /// let jetstream = async_nats::jetstream::new(client);
104    ///
105    /// let ack = jetstream.publish("events", "data".into()).await?;
106    /// ack.await?;
107    /// jetstream.publish("events", "data".into()).await?.await?;
108    /// # Ok(())
109    /// # }
110    /// ```
111    ///
112    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
113    /// ignored entirely.
114    ///
115    /// ```no_run
116    /// # #[tokio::main]
117    /// # async fn main() -> Result<(), async_nats::Error> {
118    /// let client = async_nats::connect("localhost:4222").await?;
119    /// let jetstream = async_nats::jetstream::new(client);
120    ///
121    /// let first_ack = jetstream.publish("events", "data".into()).await?;
122    /// let second_ack = jetstream.publish("events", "data".into()).await?;
123    /// first_ack.await?;
124    /// second_ack.await?;
125    /// # Ok(())
126    /// # }
127    /// ```
128    pub async fn publish<S: ToSubject>(
129        &self,
130        subject: S,
131        payload: Bytes,
132    ) -> Result<PublishAckFuture, PublishError> {
133        self.send_publish(subject, Publish::build().payload(payload))
134            .await
135    }
136
137    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
138    /// the server that the message has been successfully delivered.
139    ///
140    /// If the stream does not exist, `no responders` error will be returned.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// # #[tokio::main]
146    /// # async fn main() -> Result<(), async_nats::Error> {
147    /// let client = async_nats::connect("localhost:4222").await?;
148    /// let jetstream = async_nats::jetstream::new(client);
149    ///
150    /// let mut headers = async_nats::HeaderMap::new();
151    /// headers.append("X-key", "Value");
152    /// let ack = jetstream
153    ///     .publish_with_headers("events", headers, "data".into())
154    ///     .await?;
155    /// # Ok(())
156    /// # }
157    /// ```
158    pub async fn publish_with_headers<S: ToSubject>(
159        &self,
160        subject: S,
161        headers: crate::header::HeaderMap,
162        payload: Bytes,
163    ) -> Result<PublishAckFuture, PublishError> {
164        self.send_publish(subject, Publish::build().payload(payload).headers(headers))
165            .await
166    }
167
168    /// Publish a message built by [Publish] and returns an acknowledgment future.
169    ///
170    /// If the stream does not exist, `no responders` error will be returned.
171    ///
172    /// # Examples
173    ///
174    /// ```no_run
175    /// # use async_nats::jetstream::context::Publish;
176    /// # #[tokio::main]
177    /// # async fn main() -> Result<(), async_nats::Error> {
178    /// let client = async_nats::connect("localhost:4222").await?;
179    /// let jetstream = async_nats::jetstream::new(client);
180    ///
181    /// let ack = jetstream
182    ///     .send_publish(
183    ///         "events",
184    ///         Publish::build().payload("data".into()).message_id("uuid"),
185    ///     )
186    ///     .await?;
187    /// # Ok(())
188    /// # }
189    /// ```
190    pub async fn send_publish<S: ToSubject>(
191        &self,
192        subject: S,
193        publish: Publish,
194    ) -> Result<PublishAckFuture, PublishError> {
195        let subject = subject.to_subject();
196        let (sender, receiver) = oneshot::channel();
197
198        let respond = self.client.new_inbox().into();
199
200        let send_fut = self
201            .client
202            .sender
203            .send(Command::Request {
204                subject,
205                payload: publish.payload,
206                respond,
207                headers: publish.headers,
208                sender,
209            })
210            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
211
212        tokio::time::timeout(self.timeout, send_fut)
213            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
214            .await??;
215
216        Ok(PublishAckFuture {
217            timeout: self.timeout,
218            subscription: receiver,
219        })
220    }
221
222    /// Query the server for account information
223    pub async fn query_account(&self) -> Result<Account, AccountError> {
224        let response: Response<Account> = self.request("INFO", b"").await?;
225
226        match response {
227            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
228            Response::Ok(account) => Ok(account),
229        }
230    }
231
232    /// Create a JetStream [Stream] with given config and return a handle to it.
233    /// That handle can be used to manage and use [Consumer].
234    ///
235    /// # Examples
236    ///
237    /// ```no_run
238    /// # #[tokio::main]
239    /// # async fn main() -> Result<(), async_nats::Error> {
240    /// use async_nats::jetstream::stream::Config;
241    /// use async_nats::jetstream::stream::DiscardPolicy;
242    /// let client = async_nats::connect("localhost:4222").await?;
243    /// let jetstream = async_nats::jetstream::new(client);
244    ///
245    /// let stream = jetstream
246    ///     .create_stream(Config {
247    ///         name: "events".to_string(),
248    ///         max_messages: 100_000,
249    ///         discard: DiscardPolicy::Old,
250    ///         ..Default::default()
251    ///     })
252    ///     .await?;
253    /// # Ok(())
254    /// # }
255    /// ```
256    pub async fn create_stream<S>(&self, stream_config: S) -> Result<Stream, CreateStreamError>
257    where
258        Config: From<S>,
259    {
260        let mut config: Config = stream_config.into();
261        if config.name.is_empty() {
262            return Err(CreateStreamError::new(
263                CreateStreamErrorKind::EmptyStreamName,
264            ));
265        }
266        if !is_valid_name(config.name.as_str()) {
267            return Err(CreateStreamError::new(
268                CreateStreamErrorKind::InvalidStreamName,
269            ));
270        }
271        if let Some(ref mut mirror) = config.mirror {
272            if let Some(ref mut domain) = mirror.domain {
273                if mirror.external.is_some() {
274                    return Err(CreateStreamError::new(
275                        CreateStreamErrorKind::DomainAndExternalSet,
276                    ));
277                }
278                mirror.external = Some(External {
279                    api_prefix: format!("$JS.{domain}.API"),
280                    delivery_prefix: None,
281                })
282            }
283        }
284
285        if let Some(ref mut sources) = config.sources {
286            for source in sources {
287                if let Some(ref mut domain) = source.domain {
288                    if source.external.is_some() {
289                        return Err(CreateStreamError::new(
290                            CreateStreamErrorKind::DomainAndExternalSet,
291                        ));
292                    }
293                    source.external = Some(External {
294                        api_prefix: format!("$JS.{domain}.API"),
295                        delivery_prefix: None,
296                    })
297                }
298            }
299        }
300        let subject = format!("STREAM.CREATE.{}", config.name);
301        let response: Response<Info> = self.request(subject, &config).await?;
302
303        match response {
304            Response::Err { error } => Err(error.into()),
305            Response::Ok(info) => Ok(Stream {
306                context: self.clone(),
307                info,
308            }),
309        }
310    }
311
312    /// Checks for [Stream] existence on the server and returns handle to it.
313    /// That handle can be used to manage and use [Consumer].
314    ///
315    /// # Examples
316    ///
317    /// ```no_run
318    /// # #[tokio::main]
319    /// # async fn main() -> Result<(), async_nats::Error> {
320    /// let client = async_nats::connect("localhost:4222").await?;
321    /// let jetstream = async_nats::jetstream::new(client);
322    ///
323    /// let stream = jetstream.get_stream("events").await?;
324    /// # Ok(())
325    /// # }
326    /// ```
327    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
328        let stream = stream.as_ref();
329        if stream.is_empty() {
330            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
331        }
332
333        if !is_valid_name(stream) {
334            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
335        }
336
337        let subject = format!("STREAM.INFO.{stream}");
338        let request: Response<Info> = self
339            .request(subject, &())
340            .await
341            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
342        match request {
343            Response::Err { error } => {
344                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
345            }
346            Response::Ok(info) => Ok(Stream {
347                context: self.clone(),
348                info,
349            }),
350        }
351    }
352
353    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
354    ///
355    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
356    ///
357    /// # Examples
358    ///
359    /// ```no_run
360    /// # #[tokio::main]
361    /// # async fn main() -> Result<(), async_nats::Error> {
362    /// use async_nats::jetstream::stream::Config;
363    /// let client = async_nats::connect("localhost:4222").await?;
364    /// let jetstream = async_nats::jetstream::new(client);
365    ///
366    /// let stream = jetstream
367    ///     .get_or_create_stream(Config {
368    ///         name: "events".to_string(),
369    ///         max_messages: 10_000,
370    ///         ..Default::default()
371    ///     })
372    ///     .await?;
373    /// # Ok(())
374    /// # }
375    /// ```
376    pub async fn get_or_create_stream<S>(
377        &self,
378        stream_config: S,
379    ) -> Result<Stream, CreateStreamError>
380    where
381        S: Into<Config>,
382    {
383        let config: Config = stream_config.into();
384
385        if config.name.is_empty() {
386            return Err(CreateStreamError::new(
387                CreateStreamErrorKind::EmptyStreamName,
388            ));
389        }
390
391        if !is_valid_name(config.name.as_str()) {
392            return Err(CreateStreamError::new(
393                CreateStreamErrorKind::InvalidStreamName,
394            ));
395        }
396        let subject = format!("STREAM.INFO.{}", config.name);
397
398        let request: Response<Info> = self.request(subject, &()).await?;
399        match request {
400            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
401            Response::Err { error } => Err(error.into()),
402            Response::Ok(info) => Ok(Stream {
403                context: self.clone(),
404                info,
405            }),
406        }
407    }
408
409    /// Deletes a [Stream] with a given name.
410    ///
411    /// # Examples
412    ///
413    /// ```no_run
414    /// # #[tokio::main]
415    /// # async fn main() -> Result<(), async_nats::Error> {
416    /// use async_nats::jetstream::stream::Config;
417    /// let client = async_nats::connect("localhost:4222").await?;
418    /// let jetstream = async_nats::jetstream::new(client);
419    ///
420    /// let stream = jetstream.delete_stream("events").await?;
421    /// # Ok(())
422    /// # }
423    /// ```
424    pub async fn delete_stream<T: AsRef<str>>(
425        &self,
426        stream: T,
427    ) -> Result<DeleteStatus, DeleteStreamError> {
428        let stream = stream.as_ref();
429        if stream.is_empty() {
430            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
431        }
432
433        if !is_valid_name(stream) {
434            return Err(DeleteStreamError::new(
435                DeleteStreamErrorKind::InvalidStreamName,
436            ));
437        }
438
439        let subject = format!("STREAM.DELETE.{stream}");
440        match self
441            .request(subject, &json!({}))
442            .await
443            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
444        {
445            Response::Err { error } => Err(DeleteStreamError::new(
446                DeleteStreamErrorKind::JetStream(error),
447            )),
448            Response::Ok(delete_response) => Ok(delete_response),
449        }
450    }
451
452    /// Updates a [Stream] with a given config. If specific field cannot be updated,
453    /// error is returned.
454    ///
455    /// # Examples
456    ///
457    /// ```no_run
458    /// # #[tokio::main]
459    /// # async fn main() -> Result<(), async_nats::Error> {
460    /// use async_nats::jetstream::stream::Config;
461    /// use async_nats::jetstream::stream::DiscardPolicy;
462    /// let client = async_nats::connect("localhost:4222").await?;
463    /// let jetstream = async_nats::jetstream::new(client);
464    ///
465    /// let stream = jetstream
466    ///     .update_stream(&Config {
467    ///         name: "events".to_string(),
468    ///         discard: DiscardPolicy::New,
469    ///         max_messages: 50_000,
470    ///         ..Default::default()
471    ///     })
472    ///     .await?;
473    /// # Ok(())
474    /// # }
475    /// ```
476    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
477    where
478        S: Borrow<Config>,
479    {
480        let config = config.borrow();
481
482        if config.name.is_empty() {
483            return Err(CreateStreamError::new(
484                CreateStreamErrorKind::EmptyStreamName,
485            ));
486        }
487
488        if !is_valid_name(config.name.as_str()) {
489            return Err(CreateStreamError::new(
490                CreateStreamErrorKind::InvalidStreamName,
491            ));
492        }
493
494        let subject = format!("STREAM.UPDATE.{}", config.name);
495        match self.request(subject, config).await? {
496            Response::Err { error } => Err(error.into()),
497            Response::Ok(info) => Ok(info),
498        }
499    }
500
501    /// Lists names of all streams for current context.
502    ///
503    /// # Examples
504    ///
505    /// ```no_run
506    /// # #[tokio::main]
507    /// # async fn main() -> Result<(), async_nats::Error> {
508    /// use futures::TryStreamExt;
509    /// let client = async_nats::connect("demo.nats.io:4222").await?;
510    /// let jetstream = async_nats::jetstream::new(client);
511    /// let mut names = jetstream.stream_names();
512    /// while let Some(stream) = names.try_next().await? {
513    ///     println!("stream: {}", stream);
514    /// }
515    /// # Ok(())
516    /// # }
517    /// ```
518    pub fn stream_names(&self) -> StreamNames {
519        StreamNames {
520            context: self.clone(),
521            offset: 0,
522            page_request: None,
523            streams: Vec::new(),
524            done: false,
525        }
526    }
527
528    /// Lists all streams info for current context.
529    ///
530    /// # Examples
531    ///
532    /// ```no_run
533    /// # #[tokio::main]
534    /// # async fn main() -> Result<(), async_nats::Error> {
535    /// use futures::TryStreamExt;
536    /// let client = async_nats::connect("demo.nats.io:4222").await?;
537    /// let jetstream = async_nats::jetstream::new(client);
538    /// let mut streams = jetstream.streams();
539    /// while let Some(stream) = streams.try_next().await? {
540    ///     println!("stream: {:?}", stream);
541    /// }
542    /// # Ok(())
543    /// # }
544    /// ```
545    pub fn streams(&self) -> Streams {
546        Streams {
547            context: self.clone(),
548            offset: 0,
549            page_request: None,
550            streams: Vec::new(),
551            done: false,
552        }
553    }
554    /// Returns an existing key-value bucket.
555    ///
556    /// # Examples
557    ///
558    /// ```no_run
559    /// # #[tokio::main]
560    /// # async fn main() -> Result<(), async_nats::Error> {
561    /// let client = async_nats::connect("demo.nats.io:4222").await?;
562    /// let jetstream = async_nats::jetstream::new(client);
563    /// let kv = jetstream.get_key_value("bucket").await?;
564    /// # Ok(())
565    /// # }
566    /// ```
567    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
568        let bucket: String = bucket.into();
569        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
570            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
571        }
572
573        let stream_name = format!("KV_{}", &bucket);
574        let stream = self
575            .get_stream(stream_name.clone())
576            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
577            .await?;
578
579        if stream.info.config.max_messages_per_subject < 1 {
580            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
581        }
582        let mut store = Store {
583            prefix: format!("$KV.{}.", &bucket),
584            name: bucket,
585            stream_name,
586            stream: stream.clone(),
587            put_prefix: None,
588            use_jetstream_prefix: self.prefix != "$JS.API",
589        };
590        if let Some(ref mirror) = stream.info.config.mirror {
591            let bucket = mirror.name.trim_start_matches("KV_");
592            if let Some(ref external) = mirror.external {
593                if !external.api_prefix.is_empty() {
594                    store.use_jetstream_prefix = false;
595                    store.prefix = format!("$KV.{bucket}.");
596                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
597                } else {
598                    store.put_prefix = Some(format!("$KV.{bucket}."));
599                }
600            }
601        };
602
603        Ok(store)
604    }
605
606    /// Creates a new key-value bucket.
607    ///
608    /// # Examples
609    ///
610    /// ```no_run
611    /// # #[tokio::main]
612    /// # async fn main() -> Result<(), async_nats::Error> {
613    /// let client = async_nats::connect("demo.nats.io:4222").await?;
614    /// let jetstream = async_nats::jetstream::new(client);
615    /// let kv = jetstream
616    ///     .create_key_value(async_nats::jetstream::kv::Config {
617    ///         bucket: "kv".to_string(),
618    ///         history: 10,
619    ///         ..Default::default()
620    ///     })
621    ///     .await?;
622    /// # Ok(())
623    /// # }
624    /// ```
625    pub async fn create_key_value(
626        &self,
627        mut config: crate::jetstream::kv::Config,
628    ) -> Result<Store, CreateKeyValueError> {
629        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
630            return Err(CreateKeyValueError::new(
631                CreateKeyValueErrorKind::InvalidStoreName,
632            ));
633        }
634
635        let history = if config.history > 0 {
636            if config.history > MAX_HISTORY {
637                return Err(CreateKeyValueError::new(
638                    CreateKeyValueErrorKind::TooLongHistory,
639                ));
640            }
641            config.history
642        } else {
643            1
644        };
645
646        let num_replicas = if config.num_replicas == 0 {
647            1
648        } else {
649            config.num_replicas
650        };
651
652        let mut subjects = Vec::new();
653        if let Some(ref mut mirror) = config.mirror {
654            if !mirror.name.starts_with("KV_") {
655                mirror.name = format!("KV_{}", mirror.name);
656            }
657            config.mirror_direct = true;
658        } else if let Some(ref mut sources) = config.sources {
659            for source in sources {
660                if !source.name.starts_with("KV_") {
661                    source.name = format!("KV_{}", source.name);
662                }
663            }
664        } else {
665            subjects = vec![format!("$KV.{}.>", config.bucket)];
666        }
667
668        let stream = self
669            .create_stream(stream::Config {
670                name: format!("KV_{}", config.bucket),
671                description: Some(config.description),
672                subjects,
673                max_messages_per_subject: history,
674                max_bytes: config.max_bytes,
675                max_age: config.max_age,
676                max_message_size: config.max_value_size,
677                storage: config.storage,
678                republish: config.republish,
679                allow_rollup: true,
680                deny_delete: true,
681                deny_purge: false,
682                allow_direct: true,
683                sources: config.sources,
684                mirror: config.mirror,
685                num_replicas,
686                discard: stream::DiscardPolicy::New,
687                mirror_direct: config.mirror_direct,
688                #[cfg(feature = "server_2_10")]
689                compression: if config.compression {
690                    Some(stream::Compression::S2)
691                } else {
692                    None
693                },
694                placement: config.placement,
695                ..Default::default()
696            })
697            .await
698            .map_err(|err| {
699                if err.kind() == CreateStreamErrorKind::TimedOut {
700                    CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
701                } else {
702                    CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
703                }
704            })?;
705
706        let mut store = Store {
707            prefix: format!("$KV.{}.", &config.bucket),
708            name: config.bucket,
709            stream: stream.clone(),
710            stream_name: stream.info.config.name,
711            put_prefix: None,
712            use_jetstream_prefix: self.prefix != "$JS.API",
713        };
714        if let Some(ref mirror) = stream.info.config.mirror {
715            let bucket = mirror.name.trim_start_matches("KV_");
716            if let Some(ref external) = mirror.external {
717                if !external.api_prefix.is_empty() {
718                    store.use_jetstream_prefix = false;
719                    store.prefix = format!("$KV.{bucket}.");
720                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
721                } else {
722                    store.put_prefix = Some(format!("$KV.{bucket}."));
723                }
724            }
725        };
726
727        Ok(store)
728    }
729
730    /// Deletes given key-value bucket.
731    ///
732    /// # Examples
733    ///
734    /// ```no_run
735    /// # #[tokio::main]
736    /// # async fn main() -> Result<(), async_nats::Error> {
737    /// let client = async_nats::connect("demo.nats.io:4222").await?;
738    /// let jetstream = async_nats::jetstream::new(client);
739    /// let kv = jetstream
740    ///     .create_key_value(async_nats::jetstream::kv::Config {
741    ///         bucket: "kv".to_string(),
742    ///         history: 10,
743    ///         ..Default::default()
744    ///     })
745    ///     .await?;
746    /// # Ok(())
747    /// # }
748    /// ```
749    pub async fn delete_key_value<T: AsRef<str>>(
750        &self,
751        bucket: T,
752    ) -> Result<DeleteStatus, KeyValueError> {
753        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
754            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
755        }
756
757        let stream_name = format!("KV_{}", bucket.as_ref());
758        self.delete_stream(stream_name)
759            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
760            .await
761    }
762
763    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
764    //     let config = config.borrow();
765    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
766    //         return Err(Box::new(std::io::Error::new(
767    //             ErrorKind::Other,
768    //             "invalid bucket name",
769    //         )));
770    //     }
771
772    //     let stream_name = format!("KV_{}", config.bucket);
773    //     self.update_stream()
774    //         .await
775    //         .and_then(|info| Ok(()))
776    // }
777
778    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
779    ///
780    /// It has one less interaction with the server when binding to only one
781    /// [crate::jetstream::consumer::Consumer].
782    ///
783    /// # Examples:
784    ///
785    /// ```no_run
786    /// # #[tokio::main]
787    /// # async fn main() -> Result<(), async_nats::Error> {
788    /// use async_nats::jetstream::consumer::PullConsumer;
789    ///
790    /// let client = async_nats::connect("localhost:4222").await?;
791    /// let jetstream = async_nats::jetstream::new(client);
792    ///
793    /// let consumer: PullConsumer = jetstream
794    ///     .get_consumer_from_stream("consumer", "stream")
795    ///     .await?;
796    ///
797    /// # Ok(())
798    /// # }
799    /// ```
800    pub async fn get_consumer_from_stream<T, C, S>(
801        &self,
802        consumer: C,
803        stream: S,
804    ) -> Result<Consumer<T>, ConsumerError>
805    where
806        T: FromConsumer + IntoConsumerConfig,
807        S: AsRef<str>,
808        C: AsRef<str>,
809    {
810        if !is_valid_name(stream.as_ref()) {
811            return Err(ConsumerError::with_source(
812                ConsumerErrorKind::InvalidName,
813                "invalid stream",
814            ));
815        }
816
817        if !is_valid_name(consumer.as_ref()) {
818            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
819        }
820
821        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
822
823        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
824            Response::Ok(info) => info,
825            Response::Err { error } => return Err(error.into()),
826        };
827
828        Ok(Consumer::new(
829            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
830                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
831            })?,
832            info,
833            self.clone(),
834        ))
835    }
836
837    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
838    ///
839    /// It has one less interaction with the server when binding to only one
840    /// [crate::jetstream::consumer::Consumer].
841    ///
842    /// # Examples:
843    ///
844    /// ```no_run
845    /// # #[tokio::main]
846    /// # async fn main() -> Result<(), async_nats::Error> {
847    /// use async_nats::jetstream::consumer::PullConsumer;
848    ///
849    /// let client = async_nats::connect("localhost:4222").await?;
850    /// let jetstream = async_nats::jetstream::new(client);
851    ///
852    /// jetstream
853    ///     .delete_consumer_from_stream("consumer", "stream")
854    ///     .await?;
855    ///
856    /// # Ok(())
857    /// # }
858    /// ```
859    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
860        &self,
861        consumer: C,
862        stream: S,
863    ) -> Result<DeleteStatus, ConsumerError> {
864        if !is_valid_name(consumer.as_ref()) {
865            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
866        }
867
868        if !is_valid_name(stream.as_ref()) {
869            return Err(ConsumerError::with_source(
870                ConsumerErrorKind::Other,
871                "invalid stream name",
872            ));
873        }
874
875        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
876
877        match self.request(subject, &json!({})).await? {
878            Response::Ok(delete_status) => Ok(delete_status),
879            Response::Err { error } => Err(error.into()),
880        }
881    }
882
883    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
884    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
885    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
886    ///
887    /// # Examples
888    ///
889    /// ```no_run
890    /// # #[tokio::main]
891    /// # async fn main() -> Result<(), async_nats::Error> {
892    /// use async_nats::jetstream::consumer;
893    /// let client = async_nats::connect("localhost:4222").await?;
894    /// let jetstream = async_nats::jetstream::new(client);
895    ///
896    /// let consumer: consumer::PullConsumer = jetstream
897    ///     .create_consumer_on_stream(
898    ///         consumer::pull::Config {
899    ///             durable_name: Some("pull".to_string()),
900    ///             ..Default::default()
901    ///         },
902    ///         "stream",
903    ///     )
904    ///     .await?;
905    /// # Ok(())
906    /// # }
907    /// ```
908    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
909        &self,
910        config: C,
911        stream: S,
912    ) -> Result<Consumer<C>, ConsumerError> {
913        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
914            .await
915    }
916
917    /// Update an existing consumer.
918    /// This call will fail if the consumer does not exist.
919    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
920    ///
921    /// # Examples
922    ///
923    /// ```no_run
924    /// # #[tokio::main]
925    /// # async fn main() -> Result<(), async_nats::Error> {
926    /// use async_nats::jetstream::consumer;
927    /// let client = async_nats::connect("localhost:4222").await?;
928    /// let jetstream = async_nats::jetstream::new(client);
929    ///
930    /// let consumer: consumer::PullConsumer = jetstream
931    ///     .update_consumer_on_stream(
932    ///         consumer::pull::Config {
933    ///             durable_name: Some("pull".to_string()),
934    ///             description: Some("updated pull consumer".to_string()),
935    ///             ..Default::default()
936    ///         },
937    ///         "stream",
938    ///     )
939    ///     .await?;
940    /// # Ok(())
941    /// # }
942    /// ```
943    #[cfg(feature = "server_2_10")]
944    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
945        &self,
946        config: C,
947        stream: S,
948    ) -> Result<Consumer<C>, ConsumerUpdateError> {
949        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
950            .await
951            .map_err(|err| err.into())
952    }
953
954    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
955    /// the same.
956    /// This method will fail if consumer is already present with different config.
957    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
958    ///
959    /// # Examples
960    ///
961    /// ```no_run
962    /// # #[tokio::main]
963    /// # async fn main() -> Result<(), async_nats::Error> {
964    /// use async_nats::jetstream::consumer;
965    /// let client = async_nats::connect("localhost:4222").await?;
966    /// let jetstream = async_nats::jetstream::new(client);
967    ///
968    /// let consumer: consumer::PullConsumer = jetstream
969    ///     .create_consumer_strict_on_stream(
970    ///         consumer::pull::Config {
971    ///             durable_name: Some("pull".to_string()),
972    ///             ..Default::default()
973    ///         },
974    ///         "stream",
975    ///     )
976    ///     .await?;
977    /// # Ok(())
978    /// # }
979    /// ```
980    #[cfg(feature = "server_2_10")]
981    pub async fn create_consumer_strict_on_stream<
982        C: IntoConsumerConfig + FromConsumer,
983        S: AsRef<str>,
984    >(
985        &self,
986        config: C,
987        stream: S,
988    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
989        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
990            .await
991            .map_err(|err| err.into())
992    }
993
994    async fn create_consumer_on_stream_action<
995        C: IntoConsumerConfig + FromConsumer,
996        S: AsRef<str>,
997    >(
998        &self,
999        config: C,
1000        stream: S,
1001        action: ConsumerAction,
1002    ) -> Result<Consumer<C>, ConsumerError> {
1003        let config = config.into_consumer_config();
1004
1005        let subject = {
1006            if self.client.is_server_compatible(2, 9, 0) {
1007                let filter = if config.filter_subject.is_empty() {
1008                    "".to_string()
1009                } else {
1010                    format!(".{}", config.filter_subject)
1011                };
1012                config
1013                    .name
1014                    .as_ref()
1015                    .or(config.durable_name.as_ref())
1016                    .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1017                    .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1018            } else if config.name.is_some() {
1019                return Err(ConsumerError::with_source(
1020                    ConsumerErrorKind::Other,
1021                    "can't use consumer name with server < 2.9.0",
1022                ));
1023            } else if let Some(ref durable_name) = config.durable_name {
1024                format!(
1025                    "CONSUMER.DURABLE.CREATE.{}.{}",
1026                    stream.as_ref(),
1027                    durable_name
1028                )
1029            } else {
1030                format!("CONSUMER.CREATE.{}", stream.as_ref())
1031            }
1032        };
1033
1034        match self
1035            .request(
1036                subject,
1037                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1038            )
1039            .await?
1040        {
1041            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1042            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1043                FromConsumer::try_from_consumer_config(info.clone().config)
1044                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1045                info,
1046                self.clone(),
1047            )),
1048        }
1049    }
1050
1051    /// Send a request to the jetstream JSON API.
1052    ///
1053    /// This is a low level API used mostly internally, that should be used only in
1054    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1055    ///
1056    /// # Examples
1057    ///
1058    /// ```no_run
1059    /// # use async_nats::jetstream::stream::Info;
1060    /// # use async_nats::jetstream::response::Response;
1061    /// # #[tokio::main]
1062    /// # async fn main() -> Result<(), async_nats::Error> {
1063    /// let client = async_nats::connect("localhost:4222").await?;
1064    /// let jetstream = async_nats::jetstream::new(client);
1065    ///
1066    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1067    /// # Ok(())
1068    /// # }
1069    /// ```
1070    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1071    where
1072        S: ToSubject,
1073        T: ?Sized + Serialize,
1074        V: DeserializeOwned,
1075    {
1076        let subject = subject.to_subject();
1077        let request = serde_json::to_vec(&payload)
1078            .map(Bytes::from)
1079            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1080
1081        debug!("JetStream request sent: {:?}", request);
1082
1083        let message = self
1084            .client
1085            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1086            .await;
1087        let message = message?;
1088        debug!(
1089            "JetStream request response: {:?}",
1090            from_utf8(&message.payload)
1091        );
1092        let response = serde_json::from_slice(message.payload.as_ref())
1093            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1094
1095        Ok(response)
1096    }
1097
1098    /// Creates a new object store bucket.
1099    ///
1100    /// # Examples
1101    ///
1102    /// ```no_run
1103    /// # #[tokio::main]
1104    /// # async fn main() -> Result<(), async_nats::Error> {
1105    /// let client = async_nats::connect("demo.nats.io").await?;
1106    /// let jetstream = async_nats::jetstream::new(client);
1107    /// let bucket = jetstream
1108    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1109    ///         bucket: "bucket".to_string(),
1110    ///         ..Default::default()
1111    ///     })
1112    ///     .await?;
1113    /// # Ok(())
1114    /// # }
1115    /// ```
1116    pub async fn create_object_store(
1117        &self,
1118        config: super::object_store::Config,
1119    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1120        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1121            return Err(CreateObjectStoreError::new(
1122                CreateKeyValueErrorKind::InvalidStoreName,
1123            ));
1124        }
1125
1126        let bucket_name = config.bucket.clone();
1127        let stream_name = format!("OBJ_{bucket_name}");
1128        let chunk_subject = format!("$O.{bucket_name}.C.>");
1129        let meta_subject = format!("$O.{bucket_name}.M.>");
1130
1131        let stream = self
1132            .create_stream(super::stream::Config {
1133                name: stream_name,
1134                description: config.description.clone(),
1135                subjects: vec![chunk_subject, meta_subject],
1136                max_age: config.max_age,
1137                storage: config.storage,
1138                num_replicas: config.num_replicas,
1139                discard: DiscardPolicy::New,
1140                allow_rollup: true,
1141                allow_direct: true,
1142                #[cfg(feature = "server_2_10")]
1143                compression: if config.compression {
1144                    Some(Compression::S2)
1145                } else {
1146                    None
1147                },
1148                placement: config.placement,
1149                ..Default::default()
1150            })
1151            .await
1152            .map_err(|err| {
1153                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1154            })?;
1155
1156        Ok(ObjectStore {
1157            name: bucket_name,
1158            stream,
1159        })
1160    }
1161
1162    /// Get an existing object store bucket.
1163    ///
1164    /// # Examples
1165    ///
1166    /// ```no_run
1167    /// # #[tokio::main]
1168    /// # async fn main() -> Result<(), async_nats::Error> {
1169    /// let client = async_nats::connect("demo.nats.io").await?;
1170    /// let jetstream = async_nats::jetstream::new(client);
1171    /// let bucket = jetstream.get_object_store("bucket").await?;
1172    /// # Ok(())
1173    /// # }
1174    /// ```
1175    pub async fn get_object_store<T: AsRef<str>>(
1176        &self,
1177        bucket_name: T,
1178    ) -> Result<ObjectStore, ObjectStoreError> {
1179        let bucket_name = bucket_name.as_ref();
1180        if !is_valid_bucket_name(bucket_name) {
1181            return Err(ObjectStoreError::new(
1182                ObjectStoreErrorKind::InvalidBucketName,
1183            ));
1184        }
1185        let stream_name = format!("OBJ_{bucket_name}");
1186        let stream = self
1187            .get_stream(stream_name)
1188            .await
1189            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1190
1191        Ok(ObjectStore {
1192            name: bucket_name.to_string(),
1193            stream,
1194        })
1195    }
1196
1197    /// Delete a object store bucket.
1198    ///
1199    /// # Examples
1200    ///
1201    /// ```no_run
1202    /// # #[tokio::main]
1203    /// # async fn main() -> Result<(), async_nats::Error> {
1204    /// let client = async_nats::connect("demo.nats.io").await?;
1205    /// let jetstream = async_nats::jetstream::new(client);
1206    /// let bucket = jetstream.delete_object_store("bucket").await?;
1207    /// # Ok(())
1208    /// # }
1209    /// ```
1210    pub async fn delete_object_store<T: AsRef<str>>(
1211        &self,
1212        bucket_name: T,
1213    ) -> Result<(), DeleteObjectStore> {
1214        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1215        self.delete_stream(stream_name)
1216            .await
1217            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1218        Ok(())
1219    }
1220}
1221
1222#[derive(Clone, Copy, Debug, PartialEq)]
1223pub enum PublishErrorKind {
1224    StreamNotFound,
1225    WrongLastMessageId,
1226    WrongLastSequence,
1227    TimedOut,
1228    BrokenPipe,
1229    Other,
1230}
1231
1232impl Display for PublishErrorKind {
1233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1234        match self {
1235            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1236            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1237            Self::Other => write!(f, "publish failed"),
1238            Self::BrokenPipe => write!(f, "broken pipe"),
1239            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1240            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1241        }
1242    }
1243}
1244
1245pub type PublishError = Error<PublishErrorKind>;
1246
1247#[derive(Debug)]
1248pub struct PublishAckFuture {
1249    timeout: Duration,
1250    subscription: oneshot::Receiver<Message>,
1251}
1252
1253impl PublishAckFuture {
1254    async fn next_with_timeout(self) -> Result<PublishAck, PublishError> {
1255        let next = tokio::time::timeout(self.timeout, self.subscription)
1256            .await
1257            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1258        next.map_or_else(
1259            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1260            |m| {
1261                if m.status == Some(StatusCode::NO_RESPONDERS) {
1262                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1263                }
1264                let response = serde_json::from_slice(m.payload.as_ref())
1265                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1266                match response {
1267                    Response::Err { error } => match error.error_code() {
1268                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1269                            PublishErrorKind::WrongLastMessageId,
1270                            error,
1271                        )),
1272                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1273                            PublishErrorKind::WrongLastSequence,
1274                            error,
1275                        )),
1276                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1277                    },
1278                    Response::Ok(publish_ack) => Ok(publish_ack),
1279                }
1280            },
1281        )
1282    }
1283}
1284impl IntoFuture for PublishAckFuture {
1285    type Output = Result<PublishAck, PublishError>;
1286
1287    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1288
1289    fn into_future(self) -> Self::IntoFuture {
1290        Box::pin(std::future::IntoFuture::into_future(
1291            self.next_with_timeout(),
1292        ))
1293    }
1294}
1295
1296#[derive(Deserialize, Debug)]
1297struct StreamPage {
1298    total: usize,
1299    streams: Option<Vec<String>>,
1300}
1301
1302#[derive(Deserialize, Debug)]
1303struct StreamInfoPage {
1304    total: usize,
1305    streams: Option<Vec<super::stream::Info>>,
1306}
1307
1308type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1309
1310pub struct StreamNames {
1311    context: Context,
1312    offset: usize,
1313    page_request: Option<PageRequest>,
1314    streams: Vec<String>,
1315    done: bool,
1316}
1317
1318impl futures::Stream for StreamNames {
1319    type Item = Result<String, StreamsError>;
1320
1321    fn poll_next(
1322        mut self: Pin<&mut Self>,
1323        cx: &mut std::task::Context<'_>,
1324    ) -> std::task::Poll<Option<Self::Item>> {
1325        match self.page_request.as_mut() {
1326            Some(page) => match page.try_poll_unpin(cx) {
1327                std::task::Poll::Ready(page) => {
1328                    self.page_request = None;
1329                    let page = page
1330                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1331                    if let Some(streams) = page.streams {
1332                        self.offset += streams.len();
1333                        self.streams = streams;
1334                        if self.offset >= page.total {
1335                            self.done = true;
1336                        }
1337                        match self.streams.pop() {
1338                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1339                            None => Poll::Ready(None),
1340                        }
1341                    } else {
1342                        Poll::Ready(None)
1343                    }
1344                }
1345                std::task::Poll::Pending => std::task::Poll::Pending,
1346            },
1347            None => {
1348                if let Some(stream) = self.streams.pop() {
1349                    Poll::Ready(Some(Ok(stream)))
1350                } else {
1351                    if self.done {
1352                        return Poll::Ready(None);
1353                    }
1354                    let context = self.context.clone();
1355                    let offset = self.offset;
1356                    self.page_request = Some(Box::pin(async move {
1357                        match context
1358                            .request(
1359                                "STREAM.NAMES",
1360                                &json!({
1361                                    "offset": offset,
1362                                }),
1363                            )
1364                            .await?
1365                        {
1366                            Response::Err { error } => {
1367                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1368                            }
1369                            Response::Ok(page) => Ok(page),
1370                        }
1371                    }));
1372                    self.poll_next(cx)
1373                }
1374            }
1375        }
1376    }
1377}
1378
1379type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1380
1381pub type StreamsErrorKind = RequestErrorKind;
1382pub type StreamsError = RequestError;
1383
1384pub struct Streams {
1385    context: Context,
1386    offset: usize,
1387    page_request: Option<PageInfoRequest>,
1388    streams: Vec<super::stream::Info>,
1389    done: bool,
1390}
1391
1392impl futures::Stream for Streams {
1393    type Item = Result<super::stream::Info, StreamsError>;
1394
1395    fn poll_next(
1396        mut self: Pin<&mut Self>,
1397        cx: &mut std::task::Context<'_>,
1398    ) -> std::task::Poll<Option<Self::Item>> {
1399        match self.page_request.as_mut() {
1400            Some(page) => match page.try_poll_unpin(cx) {
1401                std::task::Poll::Ready(page) => {
1402                    self.page_request = None;
1403                    let page = page
1404                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1405                    if let Some(streams) = page.streams {
1406                        self.offset += streams.len();
1407                        self.streams = streams;
1408                        if self.offset >= page.total {
1409                            self.done = true;
1410                        }
1411                        match self.streams.pop() {
1412                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1413                            None => Poll::Ready(None),
1414                        }
1415                    } else {
1416                        Poll::Ready(None)
1417                    }
1418                }
1419                std::task::Poll::Pending => std::task::Poll::Pending,
1420            },
1421            None => {
1422                if let Some(stream) = self.streams.pop() {
1423                    Poll::Ready(Some(Ok(stream)))
1424                } else {
1425                    if self.done {
1426                        return Poll::Ready(None);
1427                    }
1428                    let context = self.context.clone();
1429                    let offset = self.offset;
1430                    self.page_request = Some(Box::pin(async move {
1431                        match context
1432                            .request(
1433                                "STREAM.LIST",
1434                                &json!({
1435                                    "offset": offset,
1436                                }),
1437                            )
1438                            .await?
1439                        {
1440                            Response::Err { error } => {
1441                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1442                            }
1443                            Response::Ok(page) => Ok(page),
1444                        }
1445                    }));
1446                    self.poll_next(cx)
1447                }
1448            }
1449        }
1450    }
1451}
1452/// Used for building customized `publish` message.
1453#[derive(Default, Clone, Debug)]
1454pub struct Publish {
1455    payload: Bytes,
1456    headers: Option<header::HeaderMap>,
1457}
1458impl Publish {
1459    /// Creates a new custom Publish struct to be used with.
1460    pub fn build() -> Self {
1461        Default::default()
1462    }
1463
1464    /// Sets the payload for the message.
1465    pub fn payload(mut self, payload: Bytes) -> Self {
1466        self.payload = payload;
1467        self
1468    }
1469    /// Adds headers to the message.
1470    pub fn headers(mut self, headers: HeaderMap) -> Self {
1471        self.headers = Some(headers);
1472        self
1473    }
1474    /// A shorthand to add a single header.
1475    pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1476        self.headers
1477            .get_or_insert(header::HeaderMap::new())
1478            .insert(name, value);
1479        self
1480    }
1481    /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
1482    pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1483        self.header(header::NATS_MESSAGE_ID, id.as_ref())
1484    }
1485    /// Sets expected last message ID.
1486    /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
1487    pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1488        self.header(
1489            header::NATS_EXPECTED_LAST_MESSAGE_ID,
1490            last_message_id.as_ref(),
1491        )
1492    }
1493    /// Sets the last expected stream sequence.
1494    /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
1495    pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1496        self.header(
1497            header::NATS_EXPECTED_LAST_SEQUENCE,
1498            HeaderValue::from(last_sequence),
1499        )
1500    }
1501    /// Sets the last expected stream sequence for a subject this message will be published to.
1502    /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
1503    pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1504        self.header(
1505            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1506            HeaderValue::from(subject_sequence),
1507        )
1508    }
1509    /// Sets the expected stream name.
1510    /// It sets the `Nats-Expected-Stream` header with provided value.
1511    pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1512        self.header(
1513            header::NATS_EXPECTED_STREAM,
1514            HeaderValue::from(stream.as_ref()),
1515        )
1516    }
1517}
1518
1519#[derive(Clone, Copy, Debug, PartialEq)]
1520pub enum RequestErrorKind {
1521    NoResponders,
1522    TimedOut,
1523    Other,
1524}
1525
1526impl Display for RequestErrorKind {
1527    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1528        match self {
1529            Self::TimedOut => write!(f, "timed out"),
1530            Self::Other => write!(f, "request failed"),
1531            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1532        }
1533    }
1534}
1535
1536pub type RequestError = Error<RequestErrorKind>;
1537
1538impl From<crate::RequestError> for RequestError {
1539    fn from(error: crate::RequestError) -> Self {
1540        match error.kind() {
1541            crate::RequestErrorKind::TimedOut => {
1542                RequestError::with_source(RequestErrorKind::TimedOut, error)
1543            }
1544            crate::RequestErrorKind::NoResponders => {
1545                RequestError::new(RequestErrorKind::NoResponders)
1546            }
1547            crate::RequestErrorKind::Other => {
1548                RequestError::with_source(RequestErrorKind::Other, error)
1549            }
1550        }
1551    }
1552}
1553
1554impl From<super::errors::Error> for RequestError {
1555    fn from(err: super::errors::Error) -> Self {
1556        RequestError::with_source(RequestErrorKind::Other, err)
1557    }
1558}
1559
1560#[derive(Clone, Debug, PartialEq)]
1561pub enum CreateStreamErrorKind {
1562    EmptyStreamName,
1563    InvalidStreamName,
1564    DomainAndExternalSet,
1565    JetStreamUnavailable,
1566    JetStream(super::errors::Error),
1567    TimedOut,
1568    Response,
1569    ResponseParse,
1570}
1571
1572impl Display for CreateStreamErrorKind {
1573    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1574        match self {
1575            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
1576            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
1577            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
1578            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1579            Self::TimedOut => write!(f, "jetstream request timed out"),
1580            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
1581            Self::ResponseParse => write!(f, "failed to parse server response"),
1582            Self::Response => write!(f, "response error"),
1583        }
1584    }
1585}
1586
1587pub type CreateStreamError = Error<CreateStreamErrorKind>;
1588
1589impl From<super::errors::Error> for CreateStreamError {
1590    fn from(error: super::errors::Error) -> Self {
1591        CreateStreamError::new(CreateStreamErrorKind::JetStream(error))
1592    }
1593}
1594
1595impl From<RequestError> for CreateStreamError {
1596    fn from(error: RequestError) -> Self {
1597        match error.kind() {
1598            RequestErrorKind::NoResponders => {
1599                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
1600            }
1601            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
1602            RequestErrorKind::Other => {
1603                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
1604            }
1605        }
1606    }
1607}
1608
1609#[derive(Clone, Debug, PartialEq)]
1610pub enum GetStreamErrorKind {
1611    EmptyName,
1612    Request,
1613    InvalidStreamName,
1614    JetStream(super::errors::Error),
1615}
1616
1617impl Display for GetStreamErrorKind {
1618    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1619        match self {
1620            Self::EmptyName => write!(f, "empty name cannot be empty"),
1621            Self::Request => write!(f, "request error"),
1622            Self::InvalidStreamName => write!(f, "invalid stream name"),
1623            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1624        }
1625    }
1626}
1627
1628pub type GetStreamError = Error<GetStreamErrorKind>;
1629
1630pub type UpdateStreamError = CreateStreamError;
1631pub type UpdateStreamErrorKind = CreateStreamErrorKind;
1632pub type DeleteStreamError = GetStreamError;
1633pub type DeleteStreamErrorKind = GetStreamErrorKind;
1634
1635#[derive(Clone, Copy, Debug, PartialEq)]
1636pub enum KeyValueErrorKind {
1637    InvalidStoreName,
1638    GetBucket,
1639    JetStream,
1640}
1641
1642impl Display for KeyValueErrorKind {
1643    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1644        match self {
1645            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1646            Self::GetBucket => write!(f, "failed to get the bucket"),
1647            Self::JetStream => write!(f, "JetStream error"),
1648        }
1649    }
1650}
1651
1652pub type KeyValueError = Error<KeyValueErrorKind>;
1653
1654#[derive(Clone, Copy, Debug, PartialEq)]
1655pub enum CreateKeyValueErrorKind {
1656    InvalidStoreName,
1657    TooLongHistory,
1658    JetStream,
1659    BucketCreate,
1660    TimedOut,
1661}
1662
1663impl Display for CreateKeyValueErrorKind {
1664    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1665        match self {
1666            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1667            Self::TooLongHistory => write!(f, "too long history"),
1668            Self::JetStream => write!(f, "JetStream error"),
1669            Self::BucketCreate => write!(f, "bucket creation failed"),
1670            Self::TimedOut => write!(f, "timed out"),
1671        }
1672    }
1673}
1674
1675pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
1676
1677pub type CreateObjectStoreError = CreateKeyValueError;
1678pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
1679
1680#[derive(Clone, Copy, Debug, PartialEq)]
1681pub enum ObjectStoreErrorKind {
1682    InvalidBucketName,
1683    GetStore,
1684}
1685
1686impl Display for ObjectStoreErrorKind {
1687    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1688        match self {
1689            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
1690            Self::GetStore => write!(f, "failed to get Object Store"),
1691        }
1692    }
1693}
1694
1695pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
1696
1697pub type DeleteObjectStore = ObjectStoreError;
1698pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
1699
1700#[derive(Clone, Debug, PartialEq)]
1701pub enum AccountErrorKind {
1702    TimedOut,
1703    JetStream(super::errors::Error),
1704    JetStreamUnavailable,
1705    Other,
1706}
1707
1708impl Display for AccountErrorKind {
1709    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1710        match self {
1711            Self::TimedOut => write!(f, "timed out"),
1712            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1713            Self::Other => write!(f, "error"),
1714            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
1715        }
1716    }
1717}
1718
1719pub type AccountError = Error<AccountErrorKind>;
1720
1721impl From<RequestError> for AccountError {
1722    fn from(err: RequestError) -> Self {
1723        match err.kind {
1724            RequestErrorKind::NoResponders => {
1725                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
1726            }
1727            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
1728            RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
1729        }
1730    }
1731}
1732
1733#[derive(Clone, Debug, Serialize)]
1734enum ConsumerAction {
1735    #[serde(rename = "")]
1736    CreateOrUpdate,
1737    #[serde(rename = "create")]
1738    #[cfg(feature = "server_2_10")]
1739    Create,
1740    #[serde(rename = "update")]
1741    #[cfg(feature = "server_2_10")]
1742    Update,
1743}