async_nats/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::jetstream::account::Account;
18use crate::jetstream::message::PublishMessage;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{is_valid_subject, jetstream, Client, Command, Message, StatusCode};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25use futures_util::{Future, StreamExt, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Debug;
31use std::fmt::Display;
32use std::future::IntoFuture;
33use std::pin::Pin;
34use std::str::from_utf8;
35use std::sync::Arc;
36use std::task::Poll;
37use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
38use tokio::time::Duration;
39use tokio_stream::wrappers::ReceiverStream;
40use tracing::debug;
41
42use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
43use super::errors::ErrorCode;
44use super::is_valid_name;
45#[cfg(feature = "kv")]
46use super::kv::{Store, MAX_HISTORY};
47#[cfg(feature = "object-store")]
48use super::object_store::{is_valid_bucket_name, ObjectStore};
49#[cfg(any(feature = "kv", feature = "object-store"))]
50use super::stream::DiscardPolicy;
51#[cfg(feature = "server_2_10")]
52use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
53use super::stream::{
54    Config, ConsumerError, ConsumerErrorKind, DeleteStatus, External, Info, Stream,
55};
56
57pub mod traits {
58    use std::{future::Future, time::Duration};
59
60    use bytes::Bytes;
61    use serde::{de::DeserializeOwned, Serialize};
62
63    use crate::{jetstream::message, subject::ToSubject, Request};
64
65    use super::RequestError;
66
67    pub trait Requester {
68        fn request<S, T, V>(
69            &self,
70            subject: S,
71            payload: &T,
72        ) -> impl Future<Output = Result<V, RequestError>>
73        where
74            S: ToSubject,
75            T: ?Sized + Serialize,
76            V: DeserializeOwned;
77    }
78
79    pub trait RequestSender {
80        fn send_request<S: ToSubject>(
81            &self,
82            subject: S,
83            request: Request,
84        ) -> impl Future<Output = Result<(), crate::PublishError>>;
85    }
86
87    pub trait Publisher {
88        fn publish<S: ToSubject>(
89            &self,
90            subject: S,
91            payload: Bytes,
92        ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
93
94        fn publish_message(
95            &self,
96            message: message::OutboundMessage,
97        ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
98    }
99
100    pub trait ClientProvider {
101        fn client(&self) -> crate::Client;
102    }
103
104    pub trait TimeoutProvider {
105        fn timeout(&self) -> Duration;
106    }
107}
108
109/// A context which can perform jetstream scoped requests.
110#[derive(Debug, Clone)]
111pub struct Context {
112    pub(crate) client: Client,
113    pub(crate) prefix: String,
114    pub(crate) timeout: Duration,
115    pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
116    pub(crate) ack_sender:
117        tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
118    pub(crate) backpressure_on_inflight: bool,
119    pub(crate) semaphore_capacity: usize,
120}
121
122fn spawn_acker(
123    rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
124    ack_timeout: Duration,
125    concurrency: Option<usize>,
126) -> tokio::task::JoinHandle<()> {
127    tokio::spawn(async move {
128        rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
129            tokio::time::timeout(ack_timeout, subscription).await.ok();
130            drop(permit);
131        })
132        .await;
133        debug!("Acker task exited");
134    })
135}
136
137use std::marker::PhantomData;
138
139#[derive(Debug, Default)]
140pub struct Yes;
141#[derive(Debug, Default)]
142pub struct No;
143
144pub trait ToAssign: Debug {}
145
146impl ToAssign for Yes {}
147impl ToAssign for No {}
148
149/// A builder for [Context]. Beyond what can be set by standard constructor, it allows tweaking
150/// pending publish ack backpressure settings.
151/// # Examples
152/// ```no_run
153/// # use async_nats::jetstream::context::ContextBuilder;
154/// # use async_nats::Client;
155/// # use std::time::Duration;
156/// # #[tokio::main]
157/// # async fn main() -> Result<(), async_nats::Error> {
158/// let client = async_nats::connect("demo.nats.io").await?;
159/// let context = ContextBuilder::new()
160///     .timeout(Duration::from_secs(5))
161///     .api_prefix("MY.JS.API")
162///     .max_ack_inflight(1000)
163///     .build(client);
164/// # Ok(())
165/// # }
166/// ```
167pub struct ContextBuilder<PREFIX: ToAssign> {
168    prefix: String,
169    timeout: Duration,
170    semaphore_capacity: usize,
171    ack_timeout: Duration,
172    backpressure_on_inflight: bool,
173    concurrency_limit: Option<usize>,
174    _phantom: PhantomData<PREFIX>,
175}
176
177impl Default for ContextBuilder<Yes> {
178    fn default() -> Self {
179        ContextBuilder {
180            prefix: "$JS.API".to_string(),
181            timeout: Duration::from_secs(5),
182            semaphore_capacity: 5_000,
183            ack_timeout: Duration::from_secs(30),
184            backpressure_on_inflight: true,
185            concurrency_limit: None,
186            _phantom: PhantomData {},
187        }
188    }
189}
190
191impl ContextBuilder<Yes> {
192    /// Create a new [ContextBuilder] with default settings.
193    pub fn new() -> ContextBuilder<Yes> {
194        ContextBuilder::default()
195    }
196}
197
198impl ContextBuilder<Yes> {
199    /// Set the prefix for the JetStream API.
200    pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
201        ContextBuilder {
202            prefix: prefix.into(),
203            timeout: self.timeout,
204            semaphore_capacity: self.semaphore_capacity,
205            ack_timeout: self.ack_timeout,
206            backpressure_on_inflight: self.backpressure_on_inflight,
207            concurrency_limit: self.concurrency_limit,
208            _phantom: PhantomData,
209        }
210    }
211
212    /// Set the domain for the JetStream API. Domain is the middle part of standard API prefix:
213    /// $JS.{domain}.API.
214    pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
215        ContextBuilder {
216            prefix: format!("$JS.{}.API", domain.into()),
217            timeout: self.timeout,
218            semaphore_capacity: self.semaphore_capacity,
219            ack_timeout: self.ack_timeout,
220            backpressure_on_inflight: self.backpressure_on_inflight,
221            concurrency_limit: self.concurrency_limit,
222            _phantom: PhantomData,
223        }
224    }
225}
226
227impl<PREFIX> ContextBuilder<PREFIX>
228where
229    PREFIX: ToAssign,
230{
231    /// Set the timeout for all JetStream API requests.
232    pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
233    where
234        Yes: ToAssign,
235    {
236        ContextBuilder {
237            prefix: self.prefix,
238            timeout,
239            semaphore_capacity: self.semaphore_capacity,
240            ack_timeout: self.ack_timeout,
241            backpressure_on_inflight: self.backpressure_on_inflight,
242            concurrency_limit: self.concurrency_limit,
243            _phantom: PhantomData,
244        }
245    }
246
247    /// Sets the maximum time client waits for acks from the server when default backpressure is
248    /// used.
249    pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
250    where
251        Yes: ToAssign,
252    {
253        ContextBuilder {
254            prefix: self.prefix,
255            timeout: self.timeout,
256            semaphore_capacity: self.semaphore_capacity,
257            ack_timeout,
258            backpressure_on_inflight: self.backpressure_on_inflight,
259            concurrency_limit: self.concurrency_limit,
260            _phantom: PhantomData,
261        }
262    }
263
264    /// Sets the maximum number of pending acks that can be in flight at any given time.
265    /// If limit is reached, `publish` throws an error by default, or waits if backpressure is enabled.
266    pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
267    where
268        Yes: ToAssign,
269    {
270        ContextBuilder {
271            prefix: self.prefix,
272            timeout: self.timeout,
273            semaphore_capacity: capacity,
274            ack_timeout: self.ack_timeout,
275            backpressure_on_inflight: self.backpressure_on_inflight,
276            concurrency_limit: self.concurrency_limit,
277            _phantom: PhantomData,
278        }
279    }
280
281    /// Enable or disable backpressure when max inflight acks is reached.
282    /// When enabled, publish will wait for permits to become available instead of returning an
283    /// error.
284    /// Default is false (errors on max inflight).
285    pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
286    where
287        Yes: ToAssign,
288    {
289        ContextBuilder {
290            prefix: self.prefix,
291            timeout: self.timeout,
292            semaphore_capacity: self.semaphore_capacity,
293            ack_timeout: self.ack_timeout,
294            backpressure_on_inflight: enabled,
295            concurrency_limit: self.concurrency_limit,
296            _phantom: PhantomData,
297        }
298    }
299
300    /// Sets the concurrency limit for the ack handler task. This might be useful
301    /// in scenarios where Tokio runtime is under heavy load.
302    pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
303    where
304        Yes: ToAssign,
305    {
306        ContextBuilder {
307            prefix: self.prefix,
308            timeout: self.timeout,
309            semaphore_capacity: self.semaphore_capacity,
310            ack_timeout: self.ack_timeout,
311            backpressure_on_inflight: self.backpressure_on_inflight,
312            concurrency_limit: limit,
313            _phantom: PhantomData,
314        }
315    }
316
317    /// Build the [Context] with the given settings.
318    pub fn build(self, client: Client) -> Context {
319        let (tx, rx) = tokio::sync::mpsc::channel::<(
320            oneshot::Receiver<Message>,
321            OwnedSemaphorePermit,
322        )>(self.semaphore_capacity);
323        let stream = ReceiverStream::new(rx);
324        spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
325        Context {
326            client,
327            prefix: self.prefix,
328            timeout: self.timeout,
329            max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
330            ack_sender: tx,
331            backpressure_on_inflight: self.backpressure_on_inflight,
332            semaphore_capacity: self.semaphore_capacity,
333        }
334    }
335}
336
337impl Context {
338    pub(crate) fn new(client: Client) -> Context {
339        ContextBuilder::default().build(client)
340    }
341
342    /// Sets the timeout for all JetStream API requests.
343    pub fn set_timeout(&mut self, timeout: Duration) {
344        self.timeout = timeout
345    }
346
347    /// Return a clone of the underlying NATS client.
348    pub fn client(&self) -> Client {
349        self.client.clone()
350    }
351
352    /// Waits until all pending `acks` are received from the server.
353    /// Be aware that this is probably not the way you want to await `acks`,
354    /// as it will wait for every `ack` that is pending, including those that might
355    /// be published after you call this method.
356    /// Useful in testing, or maybe batching.
357    pub async fn wait_for_acks(&self) {
358        self.max_ack_semaphore
359            .acquire_many(self.semaphore_capacity as u32)
360            .await
361            .ok();
362    }
363
364    /// Create a new [Context] with given API prefix.
365    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
366        ContextBuilder::new()
367            .api_prefix(prefix.to_string())
368            .build(client)
369    }
370
371    /// Create a new [Context] with given domain.
372    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
373        ContextBuilder::new().domain(domain.as_ref()).build(client)
374    }
375
376    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
377    /// acknowledgment from the server that the message has been successfully delivered.
378    ///
379    /// Acknowledgment future that can be polled is returned instead.
380    ///
381    /// If the stream does not exist, `no responders` error will be returned.
382    ///
383    /// # Examples
384    ///
385    /// Publish, and after each publish, await for acknowledgment.
386    ///
387    /// ```no_run
388    /// # #[tokio::main]
389    /// # async fn main() -> Result<(), async_nats::Error> {
390    /// let client = async_nats::connect("localhost:4222").await?;
391    /// let jetstream = async_nats::jetstream::new(client);
392    ///
393    /// let ack = jetstream.publish("events", "data".into()).await?;
394    /// ack.await?;
395    /// jetstream.publish("events", "data".into()).await?.await?;
396    /// # Ok(())
397    /// # }
398    /// ```
399    ///
400    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
401    /// ignored entirely.
402    ///
403    /// ```no_run
404    /// # #[tokio::main]
405    /// # async fn main() -> Result<(), async_nats::Error> {
406    /// let client = async_nats::connect("localhost:4222").await?;
407    /// let jetstream = async_nats::jetstream::new(client);
408    ///
409    /// let first_ack = jetstream.publish("events", "data".into()).await?;
410    /// let second_ack = jetstream.publish("events", "data".into()).await?;
411    /// first_ack.await?;
412    /// second_ack.await?;
413    /// # Ok(())
414    /// # }
415    /// ```
416    pub async fn publish<S: ToSubject>(
417        &self,
418        subject: S,
419        payload: Bytes,
420    ) -> Result<PublishAckFuture, PublishError> {
421        self.send_publish(subject, PublishMessage::build().payload(payload))
422            .await
423    }
424
425    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
426    /// the server that the message has been successfully delivered.
427    ///
428    /// If the stream does not exist, `no responders` error will be returned.
429    ///
430    /// # Examples
431    ///
432    /// ```no_run
433    /// # #[tokio::main]
434    /// # async fn main() -> Result<(), async_nats::Error> {
435    /// let client = async_nats::connect("localhost:4222").await?;
436    /// let jetstream = async_nats::jetstream::new(client);
437    ///
438    /// let mut headers = async_nats::HeaderMap::new();
439    /// headers.append("X-key", "Value");
440    /// let ack = jetstream
441    ///     .publish_with_headers("events", headers, "data".into())
442    ///     .await?;
443    /// # Ok(())
444    /// # }
445    /// ```
446    pub async fn publish_with_headers<S: ToSubject>(
447        &self,
448        subject: S,
449        headers: crate::header::HeaderMap,
450        payload: Bytes,
451    ) -> Result<PublishAckFuture, PublishError> {
452        self.send_publish(
453            subject,
454            PublishMessage::build().payload(payload).headers(headers),
455        )
456        .await
457    }
458
459    /// Publish a message built by [Publish] and returns an acknowledgment future.
460    ///
461    /// If the stream does not exist, `no responders` error will be returned.
462    ///
463    /// # Examples
464    ///
465    /// ```no_run
466    /// # use async_nats::jetstream::context::Publish;
467    /// # #[tokio::main]
468    /// # async fn main() -> Result<(), async_nats::Error> {
469    /// let client = async_nats::connect("localhost:4222").await?;
470    /// let jetstream = async_nats::jetstream::new(client);
471    ///
472    /// let ack = jetstream
473    ///     .send_publish(
474    ///         "events",
475    ///         Publish::build().payload("data".into()).message_id("uuid"),
476    ///     )
477    ///     .await?;
478    /// # Ok(())
479    /// # }
480    /// ```
481    pub async fn send_publish<S: ToSubject>(
482        &self,
483        subject: S,
484        publish: PublishMessage,
485    ) -> Result<PublishAckFuture, PublishError> {
486        let permit = if self.backpressure_on_inflight {
487            // When backpressure is enabled, wait for a permit to become available
488            self.max_ack_semaphore
489                .clone()
490                .acquire_owned()
491                .await
492                .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
493        } else {
494            // When backpressure is disabled, error immediately if no permits available
495            self.max_ack_semaphore
496                .clone()
497                .try_acquire_owned()
498                .map_err(|err| match err {
499                    TryAcquireError::NoPermits => {
500                        PublishError::new(PublishErrorKind::MaxAckPending)
501                    }
502                    _ => PublishError::with_source(PublishErrorKind::Other, err),
503                })?
504        };
505        let subject = subject.to_subject();
506        let (sender, receiver) = oneshot::channel();
507
508        let respond = self.client.new_inbox().into();
509
510        let send_fut = self
511            .client
512            .sender
513            .send(Command::Request {
514                subject,
515                payload: publish.payload,
516                respond,
517                headers: publish.headers,
518                sender,
519            })
520            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
521
522        tokio::time::timeout(self.timeout, send_fut)
523            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
524            .await??;
525
526        Ok(PublishAckFuture {
527            timeout: self.timeout,
528            subscription: Some(receiver),
529            permit: Some(permit),
530            tx: self.ack_sender.clone(),
531        })
532    }
533
534    /// Query the server for account information
535    pub async fn query_account(&self) -> Result<Account, AccountError> {
536        let response: Response<Account> = self.request("INFO", b"").await?;
537
538        match response {
539            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
540            Response::Ok(account) => Ok(account),
541        }
542    }
543
544    /// Create a JetStream [Stream] with given config and return a handle to it.
545    /// That handle can be used to manage and use [Consumer].
546    ///
547    /// # Examples
548    ///
549    /// ```no_run
550    /// # #[tokio::main]
551    /// # async fn main() -> Result<(), async_nats::Error> {
552    /// use async_nats::jetstream::stream::Config;
553    /// use async_nats::jetstream::stream::DiscardPolicy;
554    /// let client = async_nats::connect("localhost:4222").await?;
555    /// let jetstream = async_nats::jetstream::new(client);
556    ///
557    /// let stream = jetstream
558    ///     .create_stream(Config {
559    ///         name: "events".to_string(),
560    ///         max_messages: 100_000,
561    ///         discard: DiscardPolicy::Old,
562    ///         ..Default::default()
563    ///     })
564    ///     .await?;
565    /// # Ok(())
566    /// # }
567    /// ```
568    pub async fn create_stream<S>(
569        &self,
570        stream_config: S,
571    ) -> Result<Stream<Info>, CreateStreamError>
572    where
573        Config: From<S>,
574    {
575        let mut config: Config = stream_config.into();
576        if config.name.is_empty() {
577            return Err(CreateStreamError::new(
578                CreateStreamErrorKind::EmptyStreamName,
579            ));
580        }
581        if !is_valid_name(config.name.as_str()) {
582            return Err(CreateStreamError::new(
583                CreateStreamErrorKind::InvalidStreamName,
584            ));
585        }
586        if let Some(ref mut mirror) = config.mirror {
587            if let Some(ref mut domain) = mirror.domain {
588                if mirror.external.is_some() {
589                    return Err(CreateStreamError::new(
590                        CreateStreamErrorKind::DomainAndExternalSet,
591                    ));
592                }
593                mirror.external = Some(External {
594                    api_prefix: format!("$JS.{domain}.API"),
595                    delivery_prefix: None,
596                })
597            }
598        }
599
600        if let Some(ref mut sources) = config.sources {
601            for source in sources {
602                if let Some(ref mut domain) = source.domain {
603                    if source.external.is_some() {
604                        return Err(CreateStreamError::new(
605                            CreateStreamErrorKind::DomainAndExternalSet,
606                        ));
607                    }
608                    source.external = Some(External {
609                        api_prefix: format!("$JS.{domain}.API"),
610                        delivery_prefix: None,
611                    })
612                }
613            }
614        }
615        let subject = format!("STREAM.CREATE.{}", config.name);
616        let response: Response<Info> = self.request(subject, &config).await?;
617
618        match response {
619            Response::Err { error } => Err(error.into()),
620            Response::Ok(info) => Ok(Stream {
621                context: self.clone(),
622                info,
623                name: config.name,
624            }),
625        }
626    }
627
628    /// Checks for [Stream] existence on the server and returns handle to it.
629    /// That handle can be used to manage and use [Consumer].
630    /// This variant does not fetch [Stream] info from the server.
631    /// It means it does not check if the stream actually exists.
632    /// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
633    /// If you however run single operations on many streams, this method is more efficient.
634    ///
635    /// # Examples
636    ///
637    /// ```no_run
638    /// # #[tokio::main]
639    /// # async fn main() -> Result<(), async_nats::Error> {
640    /// let client = async_nats::connect("localhost:4222").await?;
641    /// let jetstream = async_nats::jetstream::new(client);
642    ///
643    /// let stream = jetstream.get_stream_no_info("events").await?;
644    /// # Ok(())
645    /// # }
646    /// ```
647    pub async fn get_stream_no_info<T: AsRef<str>>(
648        &self,
649        stream: T,
650    ) -> Result<Stream<()>, GetStreamError> {
651        let stream = stream.as_ref();
652        if stream.is_empty() {
653            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
654        }
655
656        if !is_valid_name(stream) {
657            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
658        }
659
660        Ok(Stream {
661            context: self.clone(),
662            info: (),
663            name: stream.to_string(),
664        })
665    }
666
667    /// Checks for [Stream] existence on the server and returns handle to it.
668    /// That handle can be used to manage and use [Consumer].
669    ///
670    /// # Examples
671    ///
672    /// ```no_run
673    /// # #[tokio::main]
674    /// # async fn main() -> Result<(), async_nats::Error> {
675    /// let client = async_nats::connect("localhost:4222").await?;
676    /// let jetstream = async_nats::jetstream::new(client);
677    ///
678    /// let stream = jetstream.get_stream("events").await?;
679    /// # Ok(())
680    /// # }
681    /// ```
682    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
683        let stream = stream.as_ref();
684        if stream.is_empty() {
685            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
686        }
687
688        if !is_valid_name(stream) {
689            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
690        }
691
692        let subject = format!("STREAM.INFO.{stream}");
693        let request: Response<Info> = self
694            .request(subject, &())
695            .await
696            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
697        match request {
698            Response::Err { error } => {
699                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
700            }
701            Response::Ok(info) => Ok(Stream {
702                context: self.clone(),
703                info,
704                name: stream.to_string(),
705            }),
706        }
707    }
708
709    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
710    ///
711    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
712    ///
713    /// # Examples
714    ///
715    /// ```no_run
716    /// # #[tokio::main]
717    /// # async fn main() -> Result<(), async_nats::Error> {
718    /// use async_nats::jetstream::stream::Config;
719    /// let client = async_nats::connect("localhost:4222").await?;
720    /// let jetstream = async_nats::jetstream::new(client);
721    ///
722    /// let stream = jetstream
723    ///     .get_or_create_stream(Config {
724    ///         name: "events".to_string(),
725    ///         max_messages: 10_000,
726    ///         ..Default::default()
727    ///     })
728    ///     .await?;
729    /// # Ok(())
730    /// # }
731    /// ```
732    pub async fn get_or_create_stream<S>(
733        &self,
734        stream_config: S,
735    ) -> Result<Stream, CreateStreamError>
736    where
737        S: Into<Config>,
738    {
739        let config: Config = stream_config.into();
740
741        if config.name.is_empty() {
742            return Err(CreateStreamError::new(
743                CreateStreamErrorKind::EmptyStreamName,
744            ));
745        }
746
747        if !is_valid_name(config.name.as_str()) {
748            return Err(CreateStreamError::new(
749                CreateStreamErrorKind::InvalidStreamName,
750            ));
751        }
752        let subject = format!("STREAM.INFO.{}", config.name);
753
754        let request: Response<Info> = self.request(subject, &()).await?;
755        match request {
756            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
757            Response::Err { error } => Err(error.into()),
758            Response::Ok(info) => Ok(Stream {
759                context: self.clone(),
760                info,
761                name: config.name,
762            }),
763        }
764    }
765
766    /// Deletes a [Stream] with a given name.
767    ///
768    /// # Examples
769    ///
770    /// ```no_run
771    /// # #[tokio::main]
772    /// # async fn main() -> Result<(), async_nats::Error> {
773    /// use async_nats::jetstream::stream::Config;
774    /// let client = async_nats::connect("localhost:4222").await?;
775    /// let jetstream = async_nats::jetstream::new(client);
776    ///
777    /// let stream = jetstream.delete_stream("events").await?;
778    /// # Ok(())
779    /// # }
780    /// ```
781    pub async fn delete_stream<T: AsRef<str>>(
782        &self,
783        stream: T,
784    ) -> Result<DeleteStatus, DeleteStreamError> {
785        let stream = stream.as_ref();
786        if stream.is_empty() {
787            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
788        }
789
790        if !is_valid_name(stream) {
791            return Err(DeleteStreamError::new(
792                DeleteStreamErrorKind::InvalidStreamName,
793            ));
794        }
795
796        let subject = format!("STREAM.DELETE.{stream}");
797        match self
798            .request(subject, &json!({}))
799            .await
800            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
801        {
802            Response::Err { error } => Err(DeleteStreamError::new(
803                DeleteStreamErrorKind::JetStream(error),
804            )),
805            Response::Ok(delete_response) => Ok(delete_response),
806        }
807    }
808
809    /// Updates a [Stream] with a given config. If specific field cannot be updated,
810    /// error is returned.
811    ///
812    /// # Examples
813    ///
814    /// ```no_run
815    /// # #[tokio::main]
816    /// # async fn main() -> Result<(), async_nats::Error> {
817    /// use async_nats::jetstream::stream::Config;
818    /// use async_nats::jetstream::stream::DiscardPolicy;
819    /// let client = async_nats::connect("localhost:4222").await?;
820    /// let jetstream = async_nats::jetstream::new(client);
821    ///
822    /// let stream = jetstream
823    ///     .update_stream(Config {
824    ///         name: "events".to_string(),
825    ///         discard: DiscardPolicy::New,
826    ///         max_messages: 50_000,
827    ///         ..Default::default()
828    ///     })
829    ///     .await?;
830    /// # Ok(())
831    /// # }
832    /// ```
833    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
834    where
835        S: Borrow<Config>,
836    {
837        let config = config.borrow();
838
839        if config.name.is_empty() {
840            return Err(CreateStreamError::new(
841                CreateStreamErrorKind::EmptyStreamName,
842            ));
843        }
844
845        if !is_valid_name(config.name.as_str()) {
846            return Err(CreateStreamError::new(
847                CreateStreamErrorKind::InvalidStreamName,
848            ));
849        }
850
851        let subject = format!("STREAM.UPDATE.{}", config.name);
852        match self.request(subject, config).await? {
853            Response::Err { error } => Err(error.into()),
854            Response::Ok(info) => Ok(info),
855        }
856    }
857
858    /// Tries to update a [Stream] with a given config, and if it does not exist, create it.
859    ///
860    /// # Examples
861    ///
862    /// ```no_run
863    /// # #[tokio::main]
864    /// # async fn main() -> Result<(), async_nats::Error> {
865    /// use async_nats::jetstream::stream::Config;
866    /// use async_nats::jetstream::stream::DiscardPolicy;
867    /// let client = async_nats::connect("localhost:4222").await?;
868    /// let jetstream = async_nats::jetstream::new(client);
869    ///
870    /// let stream = jetstream
871    ///     .create_or_update_stream(Config {
872    ///         name: "events".to_string(),
873    ///         discard: DiscardPolicy::New,
874    ///         max_messages: 50_000,
875    ///         ..Default::default()
876    ///     })
877    ///     .await?;
878    /// # Ok(())
879    /// # }
880    /// ```
881    pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
882        match self.update_stream(config.clone()).await {
883            Ok(stream) => Ok(stream),
884            Err(err) => match err.kind() {
885                CreateStreamErrorKind::NotFound => {
886                    let stream = self
887                        .create_stream(config)
888                        .await
889                        .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
890                    Ok(stream.info)
891                }
892                _ => Err(err),
893            },
894        }
895    }
896
897    /// Looks up Stream that contains provided subject.
898    ///
899    /// # Examples
900    ///
901    /// ```no_run
902    /// # #[tokio::main]
903    /// # async fn main() -> Result<(), async_nats::Error> {
904    /// use futures_util::TryStreamExt;
905    /// let client = async_nats::connect("demo.nats.io:4222").await?;
906    /// let jetstream = async_nats::jetstream::new(client);
907    /// let stream_name = jetstream.stream_by_subject("foo.>");
908    /// # Ok(())
909    /// # }
910    /// ```
911    pub async fn stream_by_subject<T: Into<String>>(
912        &self,
913        subject: T,
914    ) -> Result<String, GetStreamByNameError> {
915        let subject = subject.into();
916        if !is_valid_subject(subject.as_str()) {
917            return Err(GetStreamByNameError::new(
918                GetStreamByNameErrorKind::InvalidSubject,
919            ));
920        }
921        let mut names = StreamNames {
922            context: self.clone(),
923            offset: 0,
924            page_request: None,
925            streams: Vec::new(),
926            subject: Some(subject),
927            done: false,
928        };
929        match names.next().await {
930            Some(name) => match name {
931                Ok(name) => Ok(name),
932                Err(err) => Err(GetStreamByNameError::with_source(
933                    GetStreamByNameErrorKind::Request,
934                    err,
935                )),
936            },
937            None => Err(GetStreamByNameError::new(
938                GetStreamByNameErrorKind::NotFound,
939            )),
940        }
941    }
942
943    /// Lists names of all streams for current context.
944    ///
945    /// # Examples
946    ///
947    /// ```no_run
948    /// # #[tokio::main]
949    /// # async fn main() -> Result<(), async_nats::Error> {
950    /// use futures_util::TryStreamExt;
951    /// let client = async_nats::connect("demo.nats.io:4222").await?;
952    /// let jetstream = async_nats::jetstream::new(client);
953    /// let mut names = jetstream.stream_names();
954    /// while let Some(stream) = names.try_next().await? {
955    ///     println!("stream: {}", stream);
956    /// }
957    /// # Ok(())
958    /// # }
959    /// ```
960    pub fn stream_names(&self) -> StreamNames {
961        StreamNames {
962            context: self.clone(),
963            offset: 0,
964            page_request: None,
965            streams: Vec::new(),
966            subject: None,
967            done: false,
968        }
969    }
970
971    /// Lists all streams info for current context.
972    ///
973    /// # Examples
974    ///
975    /// ```no_run
976    /// # #[tokio::main]
977    /// # async fn main() -> Result<(), async_nats::Error> {
978    /// use futures_util::TryStreamExt;
979    /// let client = async_nats::connect("demo.nats.io:4222").await?;
980    /// let jetstream = async_nats::jetstream::new(client);
981    /// let mut streams = jetstream.streams();
982    /// while let Some(stream) = streams.try_next().await? {
983    ///     println!("stream: {:?}", stream);
984    /// }
985    /// # Ok(())
986    /// # }
987    /// ```
988    pub fn streams(&self) -> Streams {
989        Streams {
990            context: self.clone(),
991            offset: 0,
992            page_request: None,
993            streams: Vec::new(),
994            done: false,
995        }
996    }
997    /// Returns an existing key-value bucket.
998    ///
999    /// # Examples
1000    ///
1001    /// ```no_run
1002    /// # #[tokio::main]
1003    /// # async fn main() -> Result<(), async_nats::Error> {
1004    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1005    /// let jetstream = async_nats::jetstream::new(client);
1006    /// let kv = jetstream.get_key_value("bucket").await?;
1007    /// # Ok(())
1008    /// # }
1009    /// ```
1010    #[cfg(feature = "kv")]
1011    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1012    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
1013        let bucket: String = bucket.into();
1014        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
1015            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1016        }
1017
1018        let stream_name = format!("KV_{}", &bucket);
1019        let stream = self
1020            .get_stream(stream_name.clone())
1021            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
1022            .await?;
1023
1024        if stream.info.config.max_messages_per_subject < 1 {
1025            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1026        }
1027        let mut store = Store {
1028            prefix: format!("$KV.{}.", &bucket),
1029            name: bucket,
1030            stream_name,
1031            stream: stream.clone(),
1032            put_prefix: None,
1033            use_jetstream_prefix: self.prefix != "$JS.API",
1034        };
1035        if let Some(ref mirror) = stream.info.config.mirror {
1036            let bucket = mirror.name.trim_start_matches("KV_");
1037            if let Some(ref external) = mirror.external {
1038                if !external.api_prefix.is_empty() {
1039                    store.use_jetstream_prefix = false;
1040                    store.prefix = format!("$KV.{bucket}.");
1041                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
1042                } else {
1043                    store.put_prefix = Some(format!("$KV.{bucket}."));
1044                }
1045            }
1046        };
1047
1048        Ok(store)
1049    }
1050
1051    /// Creates a new key-value bucket.
1052    ///
1053    /// # Examples
1054    ///
1055    /// ```no_run
1056    /// # #[tokio::main]
1057    /// # async fn main() -> Result<(), async_nats::Error> {
1058    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1059    /// let jetstream = async_nats::jetstream::new(client);
1060    /// let kv = jetstream
1061    ///     .create_key_value(async_nats::jetstream::kv::Config {
1062    ///         bucket: "kv".to_string(),
1063    ///         history: 10,
1064    ///         ..Default::default()
1065    ///     })
1066    ///     .await?;
1067    /// # Ok(())
1068    /// # }
1069    /// ```
1070    #[cfg(feature = "kv")]
1071    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1072    pub async fn create_key_value(
1073        &self,
1074        config: crate::jetstream::kv::Config,
1075    ) -> Result<Store, CreateKeyValueError> {
1076        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1077            return Err(CreateKeyValueError::new(
1078                CreateKeyValueErrorKind::InvalidStoreName,
1079            ));
1080        }
1081        let info = self.query_account().await.map_err(|err| {
1082            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1083        })?;
1084
1085        let bucket_name = config.bucket.clone();
1086        let stream_config = kv_to_stream_config(config, info)?;
1087
1088        let stream = self.create_stream(stream_config).await.map_err(|err| {
1089            if err.kind() == CreateStreamErrorKind::TimedOut {
1090                CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1091            } else {
1092                CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1093            }
1094        })?;
1095
1096        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1097    }
1098
1099    /// Updates an existing key-value bucket.
1100    ///
1101    /// # Examples
1102    ///
1103    /// ```no_run
1104    /// # #[tokio::main]
1105    /// # async fn main() -> Result<(), async_nats::Error> {
1106    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1107    /// let jetstream = async_nats::jetstream::new(client);
1108    /// let kv = jetstream
1109    ///     .update_key_value(async_nats::jetstream::kv::Config {
1110    ///         bucket: "kv".to_string(),
1111    ///         history: 60,
1112    ///         ..Default::default()
1113    ///     })
1114    ///     .await?;
1115    /// # Ok(())
1116    /// # }
1117    /// ```
1118    #[cfg(feature = "kv")]
1119    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1120    pub async fn update_key_value(
1121        &self,
1122        config: crate::jetstream::kv::Config,
1123    ) -> Result<Store, UpdateKeyValueError> {
1124        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1125            return Err(UpdateKeyValueError::new(
1126                UpdateKeyValueErrorKind::InvalidStoreName,
1127            ));
1128        }
1129
1130        let stream_name = format!("KV_{}", config.bucket);
1131        let bucket_name = config.bucket.clone();
1132
1133        let account = self.query_account().await.map_err(|err| {
1134            UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1135        })?;
1136        let stream = self
1137            .update_stream(kv_to_stream_config(config, account)?)
1138            .await
1139            .map_err(|err| match err.kind() {
1140                UpdateStreamErrorKind::NotFound => {
1141                    UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1142                }
1143                _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1144            })?;
1145
1146        let stream = Stream {
1147            context: self.clone(),
1148            info: stream,
1149            name: stream_name,
1150        };
1151
1152        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1153    }
1154
1155    /// Tries to update an existing Key-Value bucket. If it does not exist, creates it.
1156    ///
1157    /// # Examples
1158    ///
1159    /// ```no_run
1160    /// # #[tokio::main]
1161    /// # async fn main() -> Result<(), async_nats::Error> {
1162    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1163    /// let jetstream = async_nats::jetstream::new(client);
1164    /// let kv = jetstream
1165    ///     .update_key_value(async_nats::jetstream::kv::Config {
1166    ///         bucket: "kv".to_string(),
1167    ///         history: 60,
1168    ///         ..Default::default()
1169    ///     })
1170    ///     .await?;
1171    /// # Ok(())
1172    /// # }
1173    /// ```
1174    #[cfg(feature = "kv")]
1175    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1176    pub async fn create_or_update_key_value(
1177        &self,
1178        config: crate::jetstream::kv::Config,
1179    ) -> Result<Store, CreateKeyValueError> {
1180        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1181            return Err(CreateKeyValueError::new(
1182                CreateKeyValueErrorKind::InvalidStoreName,
1183            ));
1184        }
1185
1186        let bucket_name = config.bucket.clone();
1187        let stream_name = format!("KV_{}", config.bucket);
1188
1189        let account = self.query_account().await.map_err(|err| {
1190            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1191        })?;
1192        let stream = self
1193            .create_or_update_stream(kv_to_stream_config(config, account)?)
1194            .await
1195            .map_err(|err| {
1196                CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1197            })?;
1198
1199        let stream = Stream {
1200            context: self.clone(),
1201            info: stream,
1202            name: stream_name,
1203        };
1204
1205        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1206    }
1207
1208    /// Deletes given key-value bucket.
1209    ///
1210    /// # Examples
1211    ///
1212    /// ```no_run
1213    /// # #[tokio::main]
1214    /// # async fn main() -> Result<(), async_nats::Error> {
1215    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1216    /// let jetstream = async_nats::jetstream::new(client);
1217    /// let kv = jetstream
1218    ///     .create_key_value(async_nats::jetstream::kv::Config {
1219    ///         bucket: "kv".to_string(),
1220    ///         history: 10,
1221    ///         ..Default::default()
1222    ///     })
1223    ///     .await?;
1224    /// # Ok(())
1225    /// # }
1226    /// ```
1227    #[cfg(feature = "kv")]
1228    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1229    pub async fn delete_key_value<T: AsRef<str>>(
1230        &self,
1231        bucket: T,
1232    ) -> Result<DeleteStatus, KeyValueError> {
1233        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1234            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1235        }
1236
1237        let stream_name = format!("KV_{}", bucket.as_ref());
1238        self.delete_stream(stream_name)
1239            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1240            .await
1241    }
1242
1243    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
1244    //     let config = config.borrow();
1245    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1246    //         return Err(Box::new(std::io::Error::other(
1247    //             "invalid bucket name",
1248    //         )));
1249    //     }
1250
1251    //     let stream_name = format!("KV_{}", config.bucket);
1252    //     self.update_stream()
1253    //         .await
1254    //         .and_then(|info| Ok(()))
1255    // }
1256
1257    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1258    ///
1259    /// It has one less interaction with the server when binding to only one
1260    /// [crate::jetstream::consumer::Consumer].
1261    ///
1262    /// # Examples:
1263    ///
1264    /// ```no_run
1265    /// # #[tokio::main]
1266    /// # async fn main() -> Result<(), async_nats::Error> {
1267    /// use async_nats::jetstream::consumer::PullConsumer;
1268    ///
1269    /// let client = async_nats::connect("localhost:4222").await?;
1270    /// let jetstream = async_nats::jetstream::new(client);
1271    ///
1272    /// let consumer: PullConsumer = jetstream
1273    ///     .get_consumer_from_stream("consumer", "stream")
1274    ///     .await?;
1275    ///
1276    /// # Ok(())
1277    /// # }
1278    /// ```
1279    pub async fn get_consumer_from_stream<T, C, S>(
1280        &self,
1281        consumer: C,
1282        stream: S,
1283    ) -> Result<Consumer<T>, ConsumerError>
1284    where
1285        T: FromConsumer + IntoConsumerConfig,
1286        S: AsRef<str>,
1287        C: AsRef<str>,
1288    {
1289        if !is_valid_name(stream.as_ref()) {
1290            return Err(ConsumerError::with_source(
1291                ConsumerErrorKind::InvalidName,
1292                "invalid stream",
1293            ));
1294        }
1295
1296        if !is_valid_name(consumer.as_ref()) {
1297            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1298        }
1299
1300        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1301
1302        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1303            Response::Ok(info) => info,
1304            Response::Err { error } => return Err(error.into()),
1305        };
1306
1307        Ok(Consumer::new(
1308            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1309                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1310            })?,
1311            info,
1312            self.clone(),
1313        ))
1314    }
1315
1316    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1317    ///
1318    /// It has one less interaction with the server when binding to only one
1319    /// [crate::jetstream::consumer::Consumer].
1320    ///
1321    /// # Examples:
1322    ///
1323    /// ```no_run
1324    /// # #[tokio::main]
1325    /// # async fn main() -> Result<(), async_nats::Error> {
1326    /// use async_nats::jetstream::consumer::PullConsumer;
1327    ///
1328    /// let client = async_nats::connect("localhost:4222").await?;
1329    /// let jetstream = async_nats::jetstream::new(client);
1330    ///
1331    /// jetstream
1332    ///     .delete_consumer_from_stream("consumer", "stream")
1333    ///     .await?;
1334    ///
1335    /// # Ok(())
1336    /// # }
1337    /// ```
1338    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1339        &self,
1340        consumer: C,
1341        stream: S,
1342    ) -> Result<DeleteStatus, ConsumerError> {
1343        if !is_valid_name(consumer.as_ref()) {
1344            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1345        }
1346
1347        if !is_valid_name(stream.as_ref()) {
1348            return Err(ConsumerError::with_source(
1349                ConsumerErrorKind::Other,
1350                "invalid stream name",
1351            ));
1352        }
1353
1354        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1355
1356        match self.request(subject, &json!({})).await? {
1357            Response::Ok(delete_status) => Ok(delete_status),
1358            Response::Err { error } => Err(error.into()),
1359        }
1360    }
1361
1362    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
1363    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1364    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
1365    ///
1366    /// # Examples
1367    ///
1368    /// ```no_run
1369    /// # #[tokio::main]
1370    /// # async fn main() -> Result<(), async_nats::Error> {
1371    /// use async_nats::jetstream::consumer;
1372    /// let client = async_nats::connect("localhost:4222").await?;
1373    /// let jetstream = async_nats::jetstream::new(client);
1374    ///
1375    /// let consumer: consumer::PullConsumer = jetstream
1376    ///     .create_consumer_on_stream(
1377    ///         consumer::pull::Config {
1378    ///             durable_name: Some("pull".to_string()),
1379    ///             ..Default::default()
1380    ///         },
1381    ///         "stream",
1382    ///     )
1383    ///     .await?;
1384    /// # Ok(())
1385    /// # }
1386    /// ```
1387    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1388        &self,
1389        config: C,
1390        stream: S,
1391    ) -> Result<Consumer<C>, ConsumerError> {
1392        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1393            .await
1394    }
1395
1396    /// Update an existing consumer.
1397    /// This call will fail if the consumer does not exist.
1398    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
1399    ///
1400    /// # Examples
1401    ///
1402    /// ```no_run
1403    /// # #[tokio::main]
1404    /// # async fn main() -> Result<(), async_nats::Error> {
1405    /// use async_nats::jetstream::consumer;
1406    /// let client = async_nats::connect("localhost:4222").await?;
1407    /// let jetstream = async_nats::jetstream::new(client);
1408    ///
1409    /// let consumer: consumer::PullConsumer = jetstream
1410    ///     .update_consumer_on_stream(
1411    ///         consumer::pull::Config {
1412    ///             durable_name: Some("pull".to_string()),
1413    ///             description: Some("updated pull consumer".to_string()),
1414    ///             ..Default::default()
1415    ///         },
1416    ///         "stream",
1417    ///     )
1418    ///     .await?;
1419    /// # Ok(())
1420    /// # }
1421    /// ```
1422    #[cfg(feature = "server_2_10")]
1423    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1424        &self,
1425        config: C,
1426        stream: S,
1427    ) -> Result<Consumer<C>, ConsumerUpdateError> {
1428        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1429            .await
1430            .map_err(|err| err.into())
1431    }
1432
1433    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
1434    /// the same.
1435    /// This method will fail if consumer is already present with different config.
1436    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1437    ///
1438    /// # Examples
1439    ///
1440    /// ```no_run
1441    /// # #[tokio::main]
1442    /// # async fn main() -> Result<(), async_nats::Error> {
1443    /// use async_nats::jetstream::consumer;
1444    /// let client = async_nats::connect("localhost:4222").await?;
1445    /// let jetstream = async_nats::jetstream::new(client);
1446    ///
1447    /// let consumer: consumer::PullConsumer = jetstream
1448    ///     .create_consumer_strict_on_stream(
1449    ///         consumer::pull::Config {
1450    ///             durable_name: Some("pull".to_string()),
1451    ///             ..Default::default()
1452    ///         },
1453    ///         "stream",
1454    ///     )
1455    ///     .await?;
1456    /// # Ok(())
1457    /// # }
1458    /// ```
1459    #[cfg(feature = "server_2_10")]
1460    pub async fn create_consumer_strict_on_stream<
1461        C: IntoConsumerConfig + FromConsumer,
1462        S: AsRef<str>,
1463    >(
1464        &self,
1465        config: C,
1466        stream: S,
1467    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1468        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1469            .await
1470            .map_err(|err| err.into())
1471    }
1472
1473    async fn create_consumer_on_stream_action<
1474        C: IntoConsumerConfig + FromConsumer,
1475        S: AsRef<str>,
1476    >(
1477        &self,
1478        config: C,
1479        stream: S,
1480        action: ConsumerAction,
1481    ) -> Result<Consumer<C>, ConsumerError> {
1482        let config = config.into_consumer_config();
1483
1484        let subject = {
1485            let filter = if config.filter_subject.is_empty() {
1486                "".to_string()
1487            } else {
1488                format!(".{}", config.filter_subject)
1489            };
1490            config
1491                .name
1492                .as_ref()
1493                .or(config.durable_name.as_ref())
1494                .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1495                .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1496        };
1497
1498        match self
1499            .request(
1500                subject,
1501                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1502            )
1503            .await?
1504        {
1505            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1506            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1507                FromConsumer::try_from_consumer_config(info.clone().config)
1508                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1509                info,
1510                self.clone(),
1511            )),
1512        }
1513    }
1514
1515    /// Send a request to the jetstream JSON API.
1516    ///
1517    /// This is a low level API used mostly internally, that should be used only in
1518    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1519    ///
1520    /// # Examples
1521    ///
1522    /// ```no_run
1523    /// # use async_nats::jetstream::stream::Info;
1524    /// # use async_nats::jetstream::response::Response;
1525    /// # #[tokio::main]
1526    /// # async fn main() -> Result<(), async_nats::Error> {
1527    /// let client = async_nats::connect("localhost:4222").await?;
1528    /// let jetstream = async_nats::jetstream::new(client);
1529    ///
1530    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1531    /// # Ok(())
1532    /// # }
1533    /// ```
1534    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1535    where
1536        S: ToSubject,
1537        T: ?Sized + Serialize,
1538        V: DeserializeOwned,
1539    {
1540        let subject = subject.to_subject();
1541        let request = serde_json::to_vec(&payload)
1542            .map(Bytes::from)
1543            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1544
1545        debug!("JetStream request sent: {:?}", request);
1546
1547        let message = self
1548            .client
1549            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1550            .await;
1551        let message = message?;
1552        debug!(
1553            "JetStream request response: {:?}",
1554            from_utf8(&message.payload)
1555        );
1556        let response = serde_json::from_slice(message.payload.as_ref())
1557            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1558
1559        Ok(response)
1560    }
1561
1562    /// Send a JetStream API request without waiting for a response.
1563    /// The subject will be automatically prefixed with the JetStream API prefix.
1564    ///
1565    /// This is useful for operations that need custom response handling,
1566    /// such as streaming responses (batch get) where the caller manages
1567    /// their own subscription to the reply inbox.
1568    ///
1569    /// Used mostly by extension crates.
1570    pub async fn send_request<S: ToSubject>(
1571        &self,
1572        subject: S,
1573        request: crate::client::Request,
1574    ) -> Result<(), crate::PublishError> {
1575        let prefixed_subject = format!("{}.{}", self.prefix, subject.to_subject());
1576        let inbox = request.inbox.unwrap_or_else(|| self.client.new_inbox());
1577        let payload = request.payload.unwrap_or_default();
1578
1579        match request.headers {
1580            Some(headers) => {
1581                self.client
1582                    .publish_with_reply_and_headers(prefixed_subject, inbox, headers, payload)
1583                    .await
1584            }
1585            None => {
1586                self.client
1587                    .publish_with_reply(prefixed_subject, inbox, payload)
1588                    .await
1589            }
1590        }
1591    }
1592
1593    /// Creates a new object store bucket.
1594    ///
1595    /// # Examples
1596    ///
1597    /// ```no_run
1598    /// # #[tokio::main]
1599    /// # async fn main() -> Result<(), async_nats::Error> {
1600    /// let client = async_nats::connect("demo.nats.io").await?;
1601    /// let jetstream = async_nats::jetstream::new(client);
1602    /// let bucket = jetstream
1603    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1604    ///         bucket: "bucket".to_string(),
1605    ///         ..Default::default()
1606    ///     })
1607    ///     .await?;
1608    /// # Ok(())
1609    /// # }
1610    /// ```
1611    #[cfg(feature = "object-store")]
1612    #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1613    pub async fn create_object_store(
1614        &self,
1615        config: super::object_store::Config,
1616    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1617        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1618            return Err(CreateObjectStoreError::new(
1619                CreateKeyValueErrorKind::InvalidStoreName,
1620            ));
1621        }
1622
1623        let bucket_name = config.bucket.clone();
1624        let stream_name = format!("OBJ_{bucket_name}");
1625        let chunk_subject = format!("$O.{bucket_name}.C.>");
1626        let meta_subject = format!("$O.{bucket_name}.M.>");
1627
1628        let stream = self
1629            .create_stream(super::stream::Config {
1630                name: stream_name,
1631                description: config.description.clone(),
1632                subjects: vec![chunk_subject, meta_subject],
1633                max_age: config.max_age,
1634                max_bytes: config.max_bytes,
1635                storage: config.storage,
1636                num_replicas: config.num_replicas,
1637                discard: DiscardPolicy::New,
1638                allow_rollup: true,
1639                allow_direct: true,
1640                #[cfg(feature = "server_2_10")]
1641                compression: if config.compression {
1642                    Some(Compression::S2)
1643                } else {
1644                    None
1645                },
1646                placement: config.placement,
1647                ..Default::default()
1648            })
1649            .await
1650            .map_err(|err| {
1651                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1652            })?;
1653
1654        Ok(ObjectStore {
1655            name: bucket_name,
1656            stream,
1657        })
1658    }
1659
1660    /// Get an existing object store bucket.
1661    ///
1662    /// # Examples
1663    ///
1664    /// ```no_run
1665    /// # #[tokio::main]
1666    /// # async fn main() -> Result<(), async_nats::Error> {
1667    /// let client = async_nats::connect("demo.nats.io").await?;
1668    /// let jetstream = async_nats::jetstream::new(client);
1669    /// let bucket = jetstream.get_object_store("bucket").await?;
1670    /// # Ok(())
1671    /// # }
1672    /// ```
1673    #[cfg(feature = "object-store")]
1674    #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1675    pub async fn get_object_store<T: AsRef<str>>(
1676        &self,
1677        bucket_name: T,
1678    ) -> Result<ObjectStore, ObjectStoreError> {
1679        let bucket_name = bucket_name.as_ref();
1680        if !is_valid_bucket_name(bucket_name) {
1681            return Err(ObjectStoreError::new(
1682                ObjectStoreErrorKind::InvalidBucketName,
1683            ));
1684        }
1685        let stream_name = format!("OBJ_{bucket_name}");
1686        let stream = self
1687            .get_stream(stream_name)
1688            .await
1689            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1690
1691        Ok(ObjectStore {
1692            name: bucket_name.to_string(),
1693            stream,
1694        })
1695    }
1696
1697    /// Delete a object store bucket.
1698    ///
1699    /// # Examples
1700    ///
1701    /// ```no_run
1702    /// # #[tokio::main]
1703    /// # async fn main() -> Result<(), async_nats::Error> {
1704    /// let client = async_nats::connect("demo.nats.io").await?;
1705    /// let jetstream = async_nats::jetstream::new(client);
1706    /// let bucket = jetstream.delete_object_store("bucket").await?;
1707    /// # Ok(())
1708    /// # }
1709    /// ```
1710    #[cfg(feature = "object-store")]
1711    #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1712    pub async fn delete_object_store<T: AsRef<str>>(
1713        &self,
1714        bucket_name: T,
1715    ) -> Result<(), DeleteObjectStore> {
1716        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1717        self.delete_stream(stream_name)
1718            .await
1719            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1720        Ok(())
1721    }
1722}
1723
1724impl crate::client::traits::Requester for Context {
1725    fn send_request<S: ToSubject>(
1726        &self,
1727        subject: S,
1728        request: crate::Request,
1729    ) -> impl Future<Output = Result<Message, crate::RequestError>> {
1730        self.client.send_request(subject, request)
1731    }
1732}
1733
1734impl crate::client::traits::Publisher for Context {
1735    fn publish_with_reply<S: ToSubject, R: ToSubject>(
1736        &self,
1737        subject: S,
1738        reply: R,
1739        payload: Bytes,
1740    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1741        self.client.publish_with_reply(subject, reply, payload)
1742    }
1743
1744    fn publish_message(
1745        &self,
1746        msg: crate::message::OutboundMessage,
1747    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1748        self.client.publish_message(msg)
1749    }
1750}
1751
1752impl traits::ClientProvider for Context {
1753    fn client(&self) -> crate::Client {
1754        self.client()
1755    }
1756}
1757
1758impl traits::Requester for Context {
1759    fn request<S, T, V>(
1760        &self,
1761        subject: S,
1762        payload: &T,
1763    ) -> impl Future<Output = Result<V, RequestError>>
1764    where
1765        S: ToSubject,
1766        T: ?Sized + Serialize,
1767        V: DeserializeOwned,
1768    {
1769        self.request(subject, payload)
1770    }
1771}
1772
1773impl traits::TimeoutProvider for Context {
1774    fn timeout(&self) -> Duration {
1775        self.timeout
1776    }
1777}
1778
1779impl traits::RequestSender for Context {
1780    fn send_request<S: ToSubject>(
1781        &self,
1782        subject: S,
1783        request: crate::client::Request,
1784    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1785        self.send_request(subject, request)
1786    }
1787}
1788
1789impl traits::Publisher for Context {
1790    fn publish<S: ToSubject>(
1791        &self,
1792        subject: S,
1793        payload: Bytes,
1794    ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1795        self.publish(subject, payload)
1796    }
1797
1798    fn publish_message(
1799        &self,
1800        message: jetstream::message::OutboundMessage,
1801    ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1802        self.send_publish(
1803            message.subject,
1804            PublishMessage {
1805                payload: message.payload,
1806                headers: message.headers,
1807            },
1808        )
1809    }
1810}
1811
1812#[derive(Clone, Copy, Debug, PartialEq)]
1813pub enum PublishErrorKind {
1814    StreamNotFound,
1815    WrongLastMessageId,
1816    WrongLastSequence,
1817    TimedOut,
1818    BrokenPipe,
1819    MaxAckPending,
1820    Other,
1821}
1822
1823impl Display for PublishErrorKind {
1824    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1825        match self {
1826            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1827            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1828            Self::Other => write!(f, "publish failed"),
1829            Self::BrokenPipe => write!(f, "broken pipe"),
1830            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1831            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1832            Self::MaxAckPending => write!(f, "max ack pending reached"),
1833        }
1834    }
1835}
1836
1837pub type PublishError = Error<PublishErrorKind>;
1838
1839#[derive(Debug)]
1840pub struct PublishAckFuture {
1841    timeout: Duration,
1842    subscription: Option<oneshot::Receiver<Message>>,
1843    permit: Option<OwnedSemaphorePermit>,
1844    tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1845}
1846
1847impl Drop for PublishAckFuture {
1848    fn drop(&mut self) {
1849        if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1850            if let Err(err) = self.tx.try_send((sub, permit)) {
1851                tracing::warn!("failed to pass future permit to the acker: {}", err);
1852            }
1853        }
1854    }
1855}
1856
1857impl PublishAckFuture {
1858    async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1859        let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1860            .await
1861            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1862        next.map_or_else(
1863            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1864            |m| {
1865                if m.status == Some(StatusCode::NO_RESPONDERS) {
1866                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1867                }
1868                let response = serde_json::from_slice(m.payload.as_ref())
1869                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1870                match response {
1871                    Response::Err { error } => match error.error_code() {
1872                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1873                            PublishErrorKind::WrongLastMessageId,
1874                            error,
1875                        )),
1876                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1877                            PublishErrorKind::WrongLastSequence,
1878                            error,
1879                        )),
1880                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1881                    },
1882                    Response::Ok(publish_ack) => Ok(publish_ack),
1883                }
1884            },
1885        )
1886    }
1887}
1888impl IntoFuture for PublishAckFuture {
1889    type Output = Result<PublishAck, PublishError>;
1890
1891    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1892
1893    fn into_future(self) -> Self::IntoFuture {
1894        Box::pin(std::future::IntoFuture::into_future(
1895            self.next_with_timeout(),
1896        ))
1897    }
1898}
1899
1900#[derive(Deserialize, Debug)]
1901struct StreamPage {
1902    total: usize,
1903    streams: Option<Vec<String>>,
1904}
1905
1906#[derive(Deserialize, Debug)]
1907struct StreamInfoPage {
1908    total: usize,
1909    streams: Option<Vec<super::stream::Info>>,
1910}
1911
1912type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1913
1914pub struct StreamNames {
1915    context: Context,
1916    offset: usize,
1917    page_request: Option<PageRequest>,
1918    subject: Option<String>,
1919    streams: Vec<String>,
1920    done: bool,
1921}
1922
1923impl futures_util::Stream for StreamNames {
1924    type Item = Result<String, StreamsError>;
1925
1926    fn poll_next(
1927        mut self: Pin<&mut Self>,
1928        cx: &mut std::task::Context<'_>,
1929    ) -> std::task::Poll<Option<Self::Item>> {
1930        match self.page_request.as_mut() {
1931            Some(page) => match page.try_poll_unpin(cx) {
1932                std::task::Poll::Ready(page) => {
1933                    self.page_request = None;
1934                    let page = page
1935                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1936                    if let Some(streams) = page.streams {
1937                        self.offset += streams.len();
1938                        self.streams = streams;
1939                        if self.offset >= page.total {
1940                            self.done = true;
1941                        }
1942                        match self.streams.pop() {
1943                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1944                            None => Poll::Ready(None),
1945                        }
1946                    } else {
1947                        Poll::Ready(None)
1948                    }
1949                }
1950                std::task::Poll::Pending => std::task::Poll::Pending,
1951            },
1952            None => {
1953                if let Some(stream) = self.streams.pop() {
1954                    Poll::Ready(Some(Ok(stream)))
1955                } else {
1956                    if self.done {
1957                        return Poll::Ready(None);
1958                    }
1959                    let context = self.context.clone();
1960                    let offset = self.offset;
1961                    let subject = self.subject.clone();
1962                    self.page_request = Some(Box::pin(async move {
1963                        match context
1964                            .request(
1965                                "STREAM.NAMES",
1966                                &json!({
1967                                    "offset": offset,
1968                                    "subject": subject
1969                                }),
1970                            )
1971                            .await?
1972                        {
1973                            Response::Err { error } => {
1974                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1975                            }
1976                            Response::Ok(page) => Ok(page),
1977                        }
1978                    }));
1979                    self.poll_next(cx)
1980                }
1981            }
1982        }
1983    }
1984}
1985
1986type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1987
1988pub type StreamsErrorKind = RequestErrorKind;
1989pub type StreamsError = RequestError;
1990
1991pub struct Streams {
1992    context: Context,
1993    offset: usize,
1994    page_request: Option<PageInfoRequest>,
1995    streams: Vec<super::stream::Info>,
1996    done: bool,
1997}
1998
1999impl futures_util::Stream for Streams {
2000    type Item = Result<super::stream::Info, StreamsError>;
2001
2002    fn poll_next(
2003        mut self: Pin<&mut Self>,
2004        cx: &mut std::task::Context<'_>,
2005    ) -> std::task::Poll<Option<Self::Item>> {
2006        match self.page_request.as_mut() {
2007            Some(page) => match page.try_poll_unpin(cx) {
2008                std::task::Poll::Ready(page) => {
2009                    self.page_request = None;
2010                    let page = page
2011                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
2012                    if let Some(streams) = page.streams {
2013                        self.offset += streams.len();
2014                        self.streams = streams;
2015                        if self.offset >= page.total {
2016                            self.done = true;
2017                        }
2018                        match self.streams.pop() {
2019                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2020                            None => Poll::Ready(None),
2021                        }
2022                    } else {
2023                        Poll::Ready(None)
2024                    }
2025                }
2026                std::task::Poll::Pending => std::task::Poll::Pending,
2027            },
2028            None => {
2029                if let Some(stream) = self.streams.pop() {
2030                    Poll::Ready(Some(Ok(stream)))
2031                } else {
2032                    if self.done {
2033                        return Poll::Ready(None);
2034                    }
2035                    let context = self.context.clone();
2036                    let offset = self.offset;
2037                    self.page_request = Some(Box::pin(async move {
2038                        match context
2039                            .request(
2040                                "STREAM.LIST",
2041                                &json!({
2042                                    "offset": offset,
2043                                }),
2044                            )
2045                            .await?
2046                        {
2047                            Response::Err { error } => {
2048                                Err(RequestError::with_source(RequestErrorKind::Other, error))
2049                            }
2050                            Response::Ok(page) => Ok(page),
2051                        }
2052                    }));
2053                    self.poll_next(cx)
2054                }
2055            }
2056        }
2057    }
2058}
2059
2060/// Alias to avoid breaking changes.
2061/// Please use `jetstream::message::PublishMessage` instead.
2062#[deprecated(
2063    note = "use jetstream::message::PublishMessage instead",
2064    since = "0.44.0"
2065)]
2066pub type Publish = super::message::PublishMessage;
2067
2068#[derive(Clone, Copy, Debug, PartialEq)]
2069pub enum RequestErrorKind {
2070    NoResponders,
2071    TimedOut,
2072    Other,
2073}
2074
2075impl Display for RequestErrorKind {
2076    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2077        match self {
2078            Self::TimedOut => write!(f, "timed out"),
2079            Self::Other => write!(f, "request failed"),
2080            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
2081        }
2082    }
2083}
2084
2085pub type RequestError = Error<RequestErrorKind>;
2086
2087impl From<crate::RequestError> for RequestError {
2088    fn from(error: crate::RequestError) -> Self {
2089        match error.kind() {
2090            crate::RequestErrorKind::TimedOut => {
2091                RequestError::with_source(RequestErrorKind::TimedOut, error)
2092            }
2093            crate::RequestErrorKind::NoResponders => {
2094                RequestError::new(RequestErrorKind::NoResponders)
2095            }
2096            crate::RequestErrorKind::Other => {
2097                RequestError::with_source(RequestErrorKind::Other, error)
2098            }
2099        }
2100    }
2101}
2102
2103impl From<super::errors::Error> for RequestError {
2104    fn from(err: super::errors::Error) -> Self {
2105        RequestError::with_source(RequestErrorKind::Other, err)
2106    }
2107}
2108
2109pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
2110
2111#[derive(Clone, Debug, PartialEq)]
2112pub enum ConsumerInfoErrorKind {
2113    InvalidName,
2114    Offline,
2115    NotFound,
2116    StreamNotFound,
2117    Request,
2118    JetStream(super::errors::Error),
2119    TimedOut,
2120    NoResponders,
2121}
2122
2123impl Display for ConsumerInfoErrorKind {
2124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2125        match self {
2126            Self::InvalidName => write!(f, "invalid consumer name"),
2127            Self::Offline => write!(f, "consumer is offline"),
2128            Self::NotFound => write!(f, "consumer not found"),
2129            Self::StreamNotFound => write!(f, "stream not found"),
2130            Self::Request => write!(f, "request error"),
2131            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2132            Self::TimedOut => write!(f, "timed out"),
2133            Self::NoResponders => write!(f, "no responders"),
2134        }
2135    }
2136}
2137
2138impl From<super::errors::Error> for ConsumerInfoError {
2139    fn from(error: super::errors::Error) -> Self {
2140        match error.error_code() {
2141            ErrorCode::CONSUMER_NOT_FOUND => {
2142                ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2143            }
2144            ErrorCode::STREAM_NOT_FOUND => {
2145                ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2146            }
2147            ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2148            _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2149        }
2150    }
2151}
2152
2153impl From<RequestError> for ConsumerInfoError {
2154    fn from(error: RequestError) -> Self {
2155        match error.kind() {
2156            RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2157            RequestErrorKind::Other => {
2158                ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2159            }
2160            RequestErrorKind::NoResponders => {
2161                ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2162            }
2163        }
2164    }
2165}
2166
2167#[derive(Clone, Debug, PartialEq)]
2168pub enum CreateStreamErrorKind {
2169    EmptyStreamName,
2170    InvalidStreamName,
2171    DomainAndExternalSet,
2172    JetStreamUnavailable,
2173    JetStream(super::errors::Error),
2174    TimedOut,
2175    Response,
2176    NotFound,
2177    ResponseParse,
2178}
2179
2180impl Display for CreateStreamErrorKind {
2181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2182        match self {
2183            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2184            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2185            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2186            Self::NotFound => write!(f, "stream not found"),
2187            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2188            Self::TimedOut => write!(f, "jetstream request timed out"),
2189            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2190            Self::ResponseParse => write!(f, "failed to parse server response"),
2191            Self::Response => write!(f, "response error"),
2192        }
2193    }
2194}
2195
2196pub type CreateStreamError = Error<CreateStreamErrorKind>;
2197
2198impl From<super::errors::Error> for CreateStreamError {
2199    fn from(error: super::errors::Error) -> Self {
2200        match error.kind() {
2201            super::errors::ErrorCode::STREAM_NOT_FOUND => {
2202                CreateStreamError::new(CreateStreamErrorKind::NotFound)
2203            }
2204            _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2205        }
2206    }
2207}
2208
2209impl From<RequestError> for CreateStreamError {
2210    fn from(error: RequestError) -> Self {
2211        match error.kind() {
2212            RequestErrorKind::NoResponders => {
2213                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2214            }
2215            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2216            RequestErrorKind::Other => {
2217                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2218            }
2219        }
2220    }
2221}
2222
2223#[derive(Clone, Debug, PartialEq)]
2224pub enum GetStreamErrorKind {
2225    EmptyName,
2226    Request,
2227    InvalidStreamName,
2228    JetStream(super::errors::Error),
2229}
2230
2231impl Display for GetStreamErrorKind {
2232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2233        match self {
2234            Self::EmptyName => write!(f, "empty name cannot be empty"),
2235            Self::Request => write!(f, "request error"),
2236            Self::InvalidStreamName => write!(f, "invalid stream name"),
2237            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2238        }
2239    }
2240}
2241
2242#[derive(Clone, Debug, PartialEq)]
2243pub enum GetStreamByNameErrorKind {
2244    Request,
2245    NotFound,
2246    InvalidSubject,
2247    JetStream(super::errors::Error),
2248}
2249
2250impl Display for GetStreamByNameErrorKind {
2251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2252        match self {
2253            Self::Request => write!(f, "request error"),
2254            Self::NotFound => write!(f, "stream not found"),
2255            Self::InvalidSubject => write!(f, "invalid subject"),
2256            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2257        }
2258    }
2259}
2260
2261pub type GetStreamError = Error<GetStreamErrorKind>;
2262pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2263
2264pub type UpdateStreamError = CreateStreamError;
2265pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2266pub type DeleteStreamError = GetStreamError;
2267pub type DeleteStreamErrorKind = GetStreamErrorKind;
2268
2269#[cfg(feature = "kv")]
2270#[derive(Clone, Copy, Debug, PartialEq)]
2271pub enum KeyValueErrorKind {
2272    InvalidStoreName,
2273    GetBucket,
2274    JetStream,
2275}
2276
2277#[cfg(feature = "kv")]
2278impl Display for KeyValueErrorKind {
2279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2280        match self {
2281            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2282            Self::GetBucket => write!(f, "failed to get the bucket"),
2283            Self::JetStream => write!(f, "JetStream error"),
2284        }
2285    }
2286}
2287
2288#[cfg(feature = "kv")]
2289pub type KeyValueError = Error<KeyValueErrorKind>;
2290
2291#[cfg(any(feature = "kv", feature = "object-store"))]
2292#[derive(Clone, Copy, Debug, PartialEq)]
2293pub enum CreateKeyValueErrorKind {
2294    InvalidStoreName,
2295    TooLongHistory,
2296    JetStream,
2297    BucketCreate,
2298    TimedOut,
2299    LimitMarkersNotSupported,
2300}
2301
2302#[cfg(any(feature = "kv", feature = "object-store"))]
2303impl Display for CreateKeyValueErrorKind {
2304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2305        match self {
2306            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2307            Self::TooLongHistory => write!(f, "too long history"),
2308            Self::JetStream => write!(f, "JetStream error"),
2309            Self::BucketCreate => write!(f, "bucket creation failed"),
2310            Self::TimedOut => write!(f, "timed out"),
2311            Self::LimitMarkersNotSupported => {
2312                write!(f, "limit markers not supported")
2313            }
2314        }
2315    }
2316}
2317
2318#[cfg(feature = "kv")]
2319#[derive(Clone, Copy, Debug, PartialEq)]
2320pub enum UpdateKeyValueErrorKind {
2321    InvalidStoreName,
2322    TooLongHistory,
2323    JetStream,
2324    BucketUpdate,
2325    TimedOut,
2326    LimitMarkersNotSupported,
2327    NotFound,
2328}
2329
2330#[cfg(feature = "kv")]
2331impl Display for UpdateKeyValueErrorKind {
2332    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2333        match self {
2334            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2335            Self::TooLongHistory => write!(f, "too long history"),
2336            Self::JetStream => write!(f, "JetStream error"),
2337            Self::BucketUpdate => write!(f, "bucket creation failed"),
2338            Self::TimedOut => write!(f, "timed out"),
2339            Self::LimitMarkersNotSupported => {
2340                write!(f, "limit markers not supported")
2341            }
2342            Self::NotFound => write!(f, "bucket does not exist"),
2343        }
2344    }
2345}
2346#[cfg(feature = "kv")]
2347pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2348#[cfg(feature = "kv")]
2349pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2350
2351#[cfg(feature = "object-store")]
2352pub type CreateObjectStoreError = Error<CreateKeyValueErrorKind>;
2353#[cfg(feature = "object-store")]
2354pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2355
2356#[cfg(feature = "object-store")]
2357#[derive(Clone, Copy, Debug, PartialEq)]
2358pub enum ObjectStoreErrorKind {
2359    InvalidBucketName,
2360    GetStore,
2361}
2362
2363#[cfg(feature = "object-store")]
2364impl Display for ObjectStoreErrorKind {
2365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2366        match self {
2367            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2368            Self::GetStore => write!(f, "failed to get Object Store"),
2369        }
2370    }
2371}
2372
2373#[cfg(feature = "object-store")]
2374pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2375
2376#[cfg(feature = "object-store")]
2377pub type DeleteObjectStore = ObjectStoreError;
2378#[cfg(feature = "object-store")]
2379pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2380
2381#[derive(Clone, Debug, PartialEq)]
2382pub enum AccountErrorKind {
2383    TimedOut,
2384    JetStream(super::errors::Error),
2385    JetStreamUnavailable,
2386    Other,
2387}
2388
2389impl Display for AccountErrorKind {
2390    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2391        match self {
2392            Self::TimedOut => write!(f, "timed out"),
2393            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2394            Self::Other => write!(f, "error"),
2395            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2396        }
2397    }
2398}
2399
2400pub type AccountError = Error<AccountErrorKind>;
2401
2402impl From<RequestError> for AccountError {
2403    fn from(err: RequestError) -> Self {
2404        match err.kind {
2405            RequestErrorKind::NoResponders => {
2406                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2407            }
2408            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2409            RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2410        }
2411    }
2412}
2413
2414#[derive(Clone, Debug, Serialize)]
2415enum ConsumerAction {
2416    #[serde(rename = "")]
2417    CreateOrUpdate,
2418    #[serde(rename = "create")]
2419    #[cfg(feature = "server_2_10")]
2420    Create,
2421    #[serde(rename = "update")]
2422    #[cfg(feature = "server_2_10")]
2423    Update,
2424}
2425
2426#[cfg(feature = "kv")]
2427// Maps a Stream config to KV Store.
2428fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2429    let mut store = Store {
2430        prefix: format!("$KV.{}.", bucket.as_str()),
2431        name: bucket,
2432        stream: stream.clone(),
2433        stream_name: stream.info.config.name.clone(),
2434        put_prefix: None,
2435        use_jetstream_prefix: prefix != "$JS.API",
2436    };
2437    if let Some(ref mirror) = stream.info.config.mirror {
2438        let bucket = mirror.name.trim_start_matches("KV_");
2439        if let Some(ref external) = mirror.external {
2440            if !external.api_prefix.is_empty() {
2441                store.use_jetstream_prefix = false;
2442                store.prefix = format!("$KV.{bucket}.");
2443                store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2444            } else {
2445                store.put_prefix = Some(format!("$KV.{bucket}."));
2446            }
2447        }
2448    };
2449    store
2450}
2451
2452#[cfg(feature = "kv")]
2453enum KvToStreamConfigError {
2454    TooLongHistory,
2455    #[allow(dead_code)]
2456    LimitMarkersNotSupported,
2457}
2458
2459#[cfg(feature = "kv")]
2460impl From<KvToStreamConfigError> for CreateKeyValueError {
2461    fn from(err: KvToStreamConfigError) -> Self {
2462        match err {
2463            KvToStreamConfigError::TooLongHistory => {
2464                CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2465            }
2466            KvToStreamConfigError::LimitMarkersNotSupported => {
2467                CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2468            }
2469        }
2470    }
2471}
2472
2473#[cfg(feature = "kv")]
2474impl From<KvToStreamConfigError> for UpdateKeyValueError {
2475    fn from(err: KvToStreamConfigError) -> Self {
2476        match err {
2477            KvToStreamConfigError::TooLongHistory => {
2478                UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2479            }
2480            KvToStreamConfigError::LimitMarkersNotSupported => {
2481                UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2482            }
2483        }
2484    }
2485}
2486
2487#[cfg(feature = "kv")]
2488// Maps the KV config to Stream config.
2489fn kv_to_stream_config(
2490    config: crate::jetstream::kv::Config,
2491    _account: Account,
2492) -> Result<super::stream::Config, KvToStreamConfigError> {
2493    let history = if config.history > 0 {
2494        if config.history > MAX_HISTORY {
2495            return Err(KvToStreamConfigError::TooLongHistory);
2496        }
2497        config.history
2498    } else {
2499        1
2500    };
2501
2502    let num_replicas = if config.num_replicas == 0 {
2503        1
2504    } else {
2505        config.num_replicas
2506    };
2507
2508    #[cfg(feature = "server_2_11")]
2509    let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2510
2511    #[cfg(feature = "server_2_11")]
2512    if let Some(duration) = config.limit_markers {
2513        if _account.requests.level < 1 {
2514            return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2515        }
2516        allow_message_ttl = true;
2517        subject_delete_marker_ttl = Some(duration);
2518    }
2519
2520    let mut mirror = config.mirror.clone();
2521    let mut sources = config.sources.clone();
2522    let mut mirror_direct = config.mirror_direct;
2523
2524    let mut subjects = Vec::new();
2525    if let Some(ref mut mirror) = mirror {
2526        if !mirror.name.starts_with("KV_") {
2527            mirror.name = format!("KV_{}", mirror.name);
2528        }
2529        mirror_direct = true;
2530    } else if let Some(ref mut sources) = sources {
2531        for source in sources {
2532            if !source.name.starts_with("KV_") {
2533                source.name = format!("KV_{}", source.name);
2534            }
2535        }
2536    } else {
2537        subjects = vec![format!("$KV.{}.>", config.bucket)];
2538    }
2539
2540    Ok(Config {
2541        name: format!("KV_{}", config.bucket),
2542        description: Some(config.description),
2543        subjects,
2544        max_messages_per_subject: history,
2545        max_bytes: config.max_bytes,
2546        max_age: config.max_age,
2547        max_message_size: config.max_value_size,
2548        storage: config.storage,
2549        republish: config.republish,
2550        allow_rollup: true,
2551        deny_delete: true,
2552        deny_purge: false,
2553        allow_direct: true,
2554        sources,
2555        mirror,
2556        num_replicas,
2557        discard: DiscardPolicy::New,
2558        mirror_direct,
2559        #[cfg(feature = "server_2_10")]
2560        compression: if config.compression {
2561            Some(Compression::S2)
2562        } else {
2563            None
2564        },
2565        placement: config.placement,
2566        #[cfg(feature = "server_2_11")]
2567        allow_message_ttl,
2568        #[cfg(feature = "server_2_11")]
2569        subject_delete_marker_ttl,
2570        ..Default::default()
2571    })
2572}