Skip to main content

async_nats/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::jetstream::account::Account;
18use crate::jetstream::message::PublishMessage;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{is_valid_subject, jetstream, Client, Command, Message, StatusCode};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25use futures_util::{Future, StreamExt, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Debug;
31use std::fmt::Display;
32use std::future::IntoFuture;
33use std::pin::Pin;
34use std::str::from_utf8;
35use std::sync::Arc;
36use std::task::Poll;
37use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
38use tokio::time::Duration;
39use tokio_stream::wrappers::ReceiverStream;
40use tracing::debug;
41
42use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
43use super::errors::ErrorCode;
44use super::is_valid_name;
45#[cfg(feature = "kv")]
46use super::kv::{Store, MAX_HISTORY};
47#[cfg(feature = "object-store")]
48use super::object_store::{is_valid_bucket_name, ObjectStore};
49#[cfg(any(feature = "kv", feature = "object-store"))]
50use super::stream::DiscardPolicy;
51#[cfg(feature = "server_2_10")]
52use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
53use super::stream::{
54    Config, ConsumerError, ConsumerErrorKind, DeleteStatus, External, Info, Stream,
55};
56
57pub mod traits {
58    use std::{future::Future, time::Duration};
59
60    use bytes::Bytes;
61    use serde::{de::DeserializeOwned, Serialize};
62
63    use crate::{jetstream::message, subject::ToSubject, Request};
64
65    use super::RequestError;
66
67    pub trait Requester {
68        fn request<S, T, V>(
69            &self,
70            subject: S,
71            payload: &T,
72        ) -> impl Future<Output = Result<V, RequestError>>
73        where
74            S: ToSubject,
75            T: ?Sized + Serialize,
76            V: DeserializeOwned;
77    }
78
79    pub trait RequestSender {
80        fn send_request<S: ToSubject>(
81            &self,
82            subject: S,
83            request: Request,
84        ) -> impl Future<Output = Result<(), crate::PublishError>>;
85    }
86
87    pub trait Publisher {
88        fn publish<S>(
89            &self,
90            subject: S,
91            payload: Bytes,
92        ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>
93        where
94            S: ToSubject;
95
96        fn publish_message(
97            &self,
98            message: message::OutboundMessage,
99        ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
100    }
101
102    pub trait ClientProvider {
103        fn client(&self) -> crate::Client;
104    }
105
106    pub trait TimeoutProvider {
107        fn timeout(&self) -> Duration;
108    }
109}
110
111/// A context which can perform jetstream scoped requests.
112#[derive(Debug, Clone)]
113pub struct Context {
114    pub(crate) client: Client,
115    pub(crate) prefix: String,
116    pub(crate) timeout: Duration,
117    pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
118    pub(crate) ack_sender:
119        tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
120    pub(crate) backpressure_on_inflight: bool,
121    pub(crate) semaphore_capacity: usize,
122}
123
124fn spawn_acker(
125    rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
126    ack_timeout: Duration,
127    concurrency: Option<usize>,
128) -> tokio::task::JoinHandle<()> {
129    tokio::spawn(async move {
130        rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
131            tokio::time::timeout(ack_timeout, subscription).await.ok();
132            drop(permit);
133        })
134        .await;
135        debug!("Acker task exited");
136    })
137}
138
139use std::marker::PhantomData;
140
141#[derive(Debug, Default)]
142pub struct Yes;
143#[derive(Debug, Default)]
144pub struct No;
145
146pub trait ToAssign: Debug {}
147
148impl ToAssign for Yes {}
149impl ToAssign for No {}
150
151/// A builder for [Context]. Beyond what can be set by standard constructor, it allows tweaking
152/// pending publish ack backpressure settings.
153/// # Examples
154/// ```no_run
155/// # use async_nats::jetstream::context::ContextBuilder;
156/// # use async_nats::Client;
157/// # use std::time::Duration;
158/// # #[tokio::main]
159/// # async fn main() -> Result<(), async_nats::Error> {
160/// let client = async_nats::connect("demo.nats.io").await?;
161/// let context = ContextBuilder::new()
162///     .timeout(Duration::from_secs(5))
163///     .api_prefix("MY.JS.API")
164///     .max_ack_inflight(1000)
165///     .build(client);
166/// # Ok(())
167/// # }
168/// ```
169pub struct ContextBuilder<PREFIX: ToAssign> {
170    prefix: String,
171    timeout: Duration,
172    semaphore_capacity: usize,
173    ack_timeout: Duration,
174    backpressure_on_inflight: bool,
175    concurrency_limit: Option<usize>,
176    _phantom: PhantomData<PREFIX>,
177}
178
179impl Default for ContextBuilder<Yes> {
180    fn default() -> Self {
181        ContextBuilder {
182            prefix: "$JS.API".to_string(),
183            timeout: Duration::from_secs(5),
184            semaphore_capacity: 5_000,
185            ack_timeout: Duration::from_secs(30),
186            backpressure_on_inflight: true,
187            concurrency_limit: None,
188            _phantom: PhantomData {},
189        }
190    }
191}
192
193impl ContextBuilder<Yes> {
194    /// Create a new [ContextBuilder] with default settings.
195    pub fn new() -> ContextBuilder<Yes> {
196        ContextBuilder::default()
197    }
198}
199
200impl ContextBuilder<Yes> {
201    /// Set the prefix for the JetStream API.
202    pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
203        ContextBuilder {
204            prefix: prefix.into(),
205            timeout: self.timeout,
206            semaphore_capacity: self.semaphore_capacity,
207            ack_timeout: self.ack_timeout,
208            backpressure_on_inflight: self.backpressure_on_inflight,
209            concurrency_limit: self.concurrency_limit,
210            _phantom: PhantomData,
211        }
212    }
213
214    /// Set the domain for the JetStream API. Domain is the middle part of standard API prefix:
215    /// $JS.{domain}.API.
216    pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
217        ContextBuilder {
218            prefix: format!("$JS.{}.API", domain.into()),
219            timeout: self.timeout,
220            semaphore_capacity: self.semaphore_capacity,
221            ack_timeout: self.ack_timeout,
222            backpressure_on_inflight: self.backpressure_on_inflight,
223            concurrency_limit: self.concurrency_limit,
224            _phantom: PhantomData,
225        }
226    }
227}
228
229impl<PREFIX> ContextBuilder<PREFIX>
230where
231    PREFIX: ToAssign,
232{
233    /// Set the timeout for all JetStream API requests.
234    pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
235    where
236        Yes: ToAssign,
237    {
238        ContextBuilder {
239            prefix: self.prefix,
240            timeout,
241            semaphore_capacity: self.semaphore_capacity,
242            ack_timeout: self.ack_timeout,
243            backpressure_on_inflight: self.backpressure_on_inflight,
244            concurrency_limit: self.concurrency_limit,
245            _phantom: PhantomData,
246        }
247    }
248
249    /// Sets the maximum time client waits for acks from the server when default backpressure is
250    /// used.
251    pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
252    where
253        Yes: ToAssign,
254    {
255        ContextBuilder {
256            prefix: self.prefix,
257            timeout: self.timeout,
258            semaphore_capacity: self.semaphore_capacity,
259            ack_timeout,
260            backpressure_on_inflight: self.backpressure_on_inflight,
261            concurrency_limit: self.concurrency_limit,
262            _phantom: PhantomData,
263        }
264    }
265
266    /// Sets the maximum number of pending acks that can be in flight at any given time.
267    /// If limit is reached, `publish` throws an error by default, or waits if backpressure is enabled.
268    pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
269    where
270        Yes: ToAssign,
271    {
272        ContextBuilder {
273            prefix: self.prefix,
274            timeout: self.timeout,
275            semaphore_capacity: capacity,
276            ack_timeout: self.ack_timeout,
277            backpressure_on_inflight: self.backpressure_on_inflight,
278            concurrency_limit: self.concurrency_limit,
279            _phantom: PhantomData,
280        }
281    }
282
283    /// Enable or disable backpressure when max inflight acks is reached.
284    /// When enabled, publish will wait for permits to become available instead of returning an
285    /// error.
286    /// Default is false (errors on max inflight).
287    pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
288    where
289        Yes: ToAssign,
290    {
291        ContextBuilder {
292            prefix: self.prefix,
293            timeout: self.timeout,
294            semaphore_capacity: self.semaphore_capacity,
295            ack_timeout: self.ack_timeout,
296            backpressure_on_inflight: enabled,
297            concurrency_limit: self.concurrency_limit,
298            _phantom: PhantomData,
299        }
300    }
301
302    /// Sets the concurrency limit for the ack handler task. This might be useful
303    /// in scenarios where Tokio runtime is under heavy load.
304    pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
305    where
306        Yes: ToAssign,
307    {
308        ContextBuilder {
309            prefix: self.prefix,
310            timeout: self.timeout,
311            semaphore_capacity: self.semaphore_capacity,
312            ack_timeout: self.ack_timeout,
313            backpressure_on_inflight: self.backpressure_on_inflight,
314            concurrency_limit: limit,
315            _phantom: PhantomData,
316        }
317    }
318
319    /// Build the [Context] with the given settings.
320    pub fn build(self, client: Client) -> Context {
321        let (tx, rx) = tokio::sync::mpsc::channel::<(
322            oneshot::Receiver<Message>,
323            OwnedSemaphorePermit,
324        )>(self.semaphore_capacity);
325        let stream = ReceiverStream::new(rx);
326        spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
327        Context {
328            client,
329            prefix: self.prefix,
330            timeout: self.timeout,
331            max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
332            ack_sender: tx,
333            backpressure_on_inflight: self.backpressure_on_inflight,
334            semaphore_capacity: self.semaphore_capacity,
335        }
336    }
337}
338
339impl Context {
340    pub(crate) fn new(client: Client) -> Context {
341        ContextBuilder::default().build(client)
342    }
343
344    /// Sets the timeout for all JetStream API requests.
345    pub fn set_timeout(&mut self, timeout: Duration) {
346        self.timeout = timeout
347    }
348
349    /// Return a clone of the underlying NATS client.
350    pub fn client(&self) -> Client {
351        self.client.clone()
352    }
353
354    /// Waits until all pending `acks` are received from the server.
355    /// Be aware that this is probably not the way you want to await `acks`,
356    /// as it will wait for every `ack` that is pending, including those that might
357    /// be published after you call this method.
358    /// Useful in testing, or maybe batching.
359    pub async fn wait_for_acks(&self) {
360        self.max_ack_semaphore
361            .acquire_many(self.semaphore_capacity as u32)
362            .await
363            .ok();
364    }
365
366    /// Create a new [Context] with given API prefix.
367    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
368        ContextBuilder::new()
369            .api_prefix(prefix.to_string())
370            .build(client)
371    }
372
373    /// Create a new [Context] with given domain.
374    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
375        ContextBuilder::new().domain(domain.as_ref()).build(client)
376    }
377
378    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
379    /// acknowledgment from the server that the message has been successfully delivered.
380    ///
381    /// Acknowledgment future that can be polled is returned instead.
382    ///
383    /// If the stream does not exist, `no responders` error will be returned.
384    ///
385    /// # Examples
386    ///
387    /// Publish, and after each publish, await for acknowledgment.
388    ///
389    /// ```no_run
390    /// # #[tokio::main]
391    /// # async fn main() -> Result<(), async_nats::Error> {
392    /// let client = async_nats::connect("localhost:4222").await?;
393    /// let jetstream = async_nats::jetstream::new(client);
394    ///
395    /// let ack = jetstream.publish("events", "data".into()).await?;
396    /// ack.await?;
397    /// jetstream.publish("events", "data".into()).await?.await?;
398    /// # Ok(())
399    /// # }
400    /// ```
401    ///
402    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
403    /// ignored entirely.
404    ///
405    /// ```no_run
406    /// # #[tokio::main]
407    /// # async fn main() -> Result<(), async_nats::Error> {
408    /// let client = async_nats::connect("localhost:4222").await?;
409    /// let jetstream = async_nats::jetstream::new(client);
410    ///
411    /// let first_ack = jetstream.publish("events", "data".into()).await?;
412    /// let second_ack = jetstream.publish("events", "data".into()).await?;
413    /// first_ack.await?;
414    /// second_ack.await?;
415    /// # Ok(())
416    /// # }
417    /// ```
418    pub async fn publish<S: ToSubject>(
419        &self,
420        subject: S,
421        payload: Bytes,
422    ) -> Result<PublishAckFuture, PublishError> {
423        self.send_publish(subject, PublishMessage::build().payload(payload))
424            .await
425    }
426
427    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
428    /// the server that the message has been successfully delivered.
429    ///
430    /// If the stream does not exist, `no responders` error will be returned.
431    ///
432    /// # Examples
433    ///
434    /// ```no_run
435    /// # #[tokio::main]
436    /// # async fn main() -> Result<(), async_nats::Error> {
437    /// let client = async_nats::connect("localhost:4222").await?;
438    /// let jetstream = async_nats::jetstream::new(client);
439    ///
440    /// let mut headers = async_nats::HeaderMap::new();
441    /// headers.append("X-key", "Value");
442    /// let ack = jetstream
443    ///     .publish_with_headers("events", headers, "data".into())
444    ///     .await?;
445    /// # Ok(())
446    /// # }
447    /// ```
448    pub async fn publish_with_headers<S: ToSubject>(
449        &self,
450        subject: S,
451        headers: crate::header::HeaderMap,
452        payload: Bytes,
453    ) -> Result<PublishAckFuture, PublishError> {
454        self.send_publish(
455            subject,
456            PublishMessage::build().payload(payload).headers(headers),
457        )
458        .await
459    }
460
461    /// Publish a message built by [Publish] and returns an acknowledgment future.
462    ///
463    /// If the stream does not exist, `no responders` error will be returned.
464    ///
465    /// # Examples
466    ///
467    /// ```no_run
468    /// # use async_nats::jetstream::context::Publish;
469    /// # #[tokio::main]
470    /// # async fn main() -> Result<(), async_nats::Error> {
471    /// let client = async_nats::connect("localhost:4222").await?;
472    /// let jetstream = async_nats::jetstream::new(client);
473    ///
474    /// let ack = jetstream
475    ///     .send_publish(
476    ///         "events",
477    ///         Publish::build().payload("data".into()).message_id("uuid"),
478    ///     )
479    ///     .await?;
480    /// # Ok(())
481    /// # }
482    /// ```
483    pub async fn send_publish<S: ToSubject>(
484        &self,
485        subject: S,
486        publish: PublishMessage,
487    ) -> Result<PublishAckFuture, PublishError> {
488        let permit = if self.backpressure_on_inflight {
489            // When backpressure is enabled, wait for a permit to become available
490            self.max_ack_semaphore
491                .clone()
492                .acquire_owned()
493                .await
494                .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
495        } else {
496            // When backpressure is disabled, error immediately if no permits available
497            self.max_ack_semaphore
498                .clone()
499                .try_acquire_owned()
500                .map_err(|err| match err {
501                    TryAcquireError::NoPermits => {
502                        PublishError::new(PublishErrorKind::MaxAckPending)
503                    }
504                    _ => PublishError::with_source(PublishErrorKind::Other, err),
505                })?
506        };
507        let subject = self
508            .client
509            .maybe_validate_publish_subject(subject)
510            .map_err(|e| PublishError::with_source(PublishErrorKind::Other, e))?;
511        let (sender, receiver) = oneshot::channel();
512
513        let respond = self.client.new_inbox().into();
514
515        let send_fut = self
516            .client
517            .sender
518            .send(Command::Request {
519                subject,
520                payload: publish.payload,
521                respond,
522                headers: publish.headers,
523                sender,
524            })
525            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
526
527        tokio::time::timeout(self.timeout, send_fut)
528            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
529            .await??;
530
531        Ok(PublishAckFuture {
532            timeout: self.timeout,
533            subscription: Some(receiver),
534            permit: Some(permit),
535            tx: self.ack_sender.clone(),
536        })
537    }
538
539    /// Query the server for account information
540    pub async fn query_account(&self) -> Result<Account, AccountError> {
541        let response: Response<Account> = self.request("INFO", b"").await?;
542
543        match response {
544            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
545            Response::Ok(account) => Ok(account),
546        }
547    }
548
549    /// Create a JetStream [Stream] with given config and return a handle to it.
550    /// That handle can be used to manage and use [Consumer].
551    ///
552    /// # Examples
553    ///
554    /// ```no_run
555    /// # #[tokio::main]
556    /// # async fn main() -> Result<(), async_nats::Error> {
557    /// use async_nats::jetstream::stream::Config;
558    /// use async_nats::jetstream::stream::DiscardPolicy;
559    /// let client = async_nats::connect("localhost:4222").await?;
560    /// let jetstream = async_nats::jetstream::new(client);
561    ///
562    /// let stream = jetstream
563    ///     .create_stream(Config {
564    ///         name: "events".to_string(),
565    ///         max_messages: 100_000,
566    ///         discard: DiscardPolicy::Old,
567    ///         ..Default::default()
568    ///     })
569    ///     .await?;
570    /// # Ok(())
571    /// # }
572    /// ```
573    pub async fn create_stream<S>(
574        &self,
575        stream_config: S,
576    ) -> Result<Stream<Info>, CreateStreamError>
577    where
578        Config: From<S>,
579    {
580        let mut config: Config = stream_config.into();
581        if config.name.is_empty() {
582            return Err(CreateStreamError::new(
583                CreateStreamErrorKind::EmptyStreamName,
584            ));
585        }
586        if !is_valid_name(config.name.as_str()) {
587            return Err(CreateStreamError::new(
588                CreateStreamErrorKind::InvalidStreamName,
589            ));
590        }
591        if let Some(ref mut mirror) = config.mirror {
592            if let Some(ref mut domain) = mirror.domain {
593                if mirror.external.is_some() {
594                    return Err(CreateStreamError::new(
595                        CreateStreamErrorKind::DomainAndExternalSet,
596                    ));
597                }
598                mirror.external = Some(External {
599                    api_prefix: format!("$JS.{domain}.API"),
600                    delivery_prefix: None,
601                })
602            }
603        }
604
605        if let Some(ref mut sources) = config.sources {
606            for source in sources {
607                if let Some(ref mut domain) = source.domain {
608                    if source.external.is_some() {
609                        return Err(CreateStreamError::new(
610                            CreateStreamErrorKind::DomainAndExternalSet,
611                        ));
612                    }
613                    source.external = Some(External {
614                        api_prefix: format!("$JS.{domain}.API"),
615                        delivery_prefix: None,
616                    })
617                }
618            }
619        }
620        let subject = format!("STREAM.CREATE.{}", config.name);
621        let response: Response<Info> = self.request(subject, &config).await?;
622
623        match response {
624            Response::Err { error } => Err(error.into()),
625            Response::Ok(info) => Ok(Stream {
626                context: self.clone(),
627                info,
628                name: config.name,
629            }),
630        }
631    }
632
633    /// Checks for [Stream] existence on the server and returns handle to it.
634    /// That handle can be used to manage and use [Consumer].
635    /// This variant does not fetch [Stream] info from the server.
636    /// It means it does not check if the stream actually exists.
637    /// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
638    /// If you however run single operations on many streams, this method is more efficient.
639    ///
640    /// # Examples
641    ///
642    /// ```no_run
643    /// # #[tokio::main]
644    /// # async fn main() -> Result<(), async_nats::Error> {
645    /// let client = async_nats::connect("localhost:4222").await?;
646    /// let jetstream = async_nats::jetstream::new(client);
647    ///
648    /// let stream = jetstream.get_stream_no_info("events").await?;
649    /// # Ok(())
650    /// # }
651    /// ```
652    pub async fn get_stream_no_info<T: AsRef<str>>(
653        &self,
654        stream: T,
655    ) -> Result<Stream<()>, GetStreamError> {
656        let stream = stream.as_ref();
657        if stream.is_empty() {
658            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
659        }
660
661        if !is_valid_name(stream) {
662            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
663        }
664
665        Ok(Stream {
666            context: self.clone(),
667            info: (),
668            name: stream.to_string(),
669        })
670    }
671
672    /// Checks for [Stream] existence on the server and returns handle to it.
673    /// That handle can be used to manage and use [Consumer].
674    ///
675    /// # Examples
676    ///
677    /// ```no_run
678    /// # #[tokio::main]
679    /// # async fn main() -> Result<(), async_nats::Error> {
680    /// let client = async_nats::connect("localhost:4222").await?;
681    /// let jetstream = async_nats::jetstream::new(client);
682    ///
683    /// let stream = jetstream.get_stream("events").await?;
684    /// # Ok(())
685    /// # }
686    /// ```
687    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
688        let stream = stream.as_ref();
689        if stream.is_empty() {
690            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
691        }
692
693        if !is_valid_name(stream) {
694            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
695        }
696
697        let subject = format!("STREAM.INFO.{stream}");
698        let request: Response<Info> = self
699            .request(subject, &())
700            .await
701            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
702        match request {
703            Response::Err { error } => {
704                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
705            }
706            Response::Ok(info) => Ok(Stream {
707                context: self.clone(),
708                info,
709                name: stream.to_string(),
710            }),
711        }
712    }
713
714    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
715    ///
716    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
717    ///
718    /// # Examples
719    ///
720    /// ```no_run
721    /// # #[tokio::main]
722    /// # async fn main() -> Result<(), async_nats::Error> {
723    /// use async_nats::jetstream::stream::Config;
724    /// let client = async_nats::connect("localhost:4222").await?;
725    /// let jetstream = async_nats::jetstream::new(client);
726    ///
727    /// let stream = jetstream
728    ///     .get_or_create_stream(Config {
729    ///         name: "events".to_string(),
730    ///         max_messages: 10_000,
731    ///         ..Default::default()
732    ///     })
733    ///     .await?;
734    /// # Ok(())
735    /// # }
736    /// ```
737    pub async fn get_or_create_stream<S>(
738        &self,
739        stream_config: S,
740    ) -> Result<Stream, CreateStreamError>
741    where
742        S: Into<Config>,
743    {
744        let config: Config = stream_config.into();
745
746        if config.name.is_empty() {
747            return Err(CreateStreamError::new(
748                CreateStreamErrorKind::EmptyStreamName,
749            ));
750        }
751
752        if !is_valid_name(config.name.as_str()) {
753            return Err(CreateStreamError::new(
754                CreateStreamErrorKind::InvalidStreamName,
755            ));
756        }
757        let subject = format!("STREAM.INFO.{}", config.name);
758
759        let request: Response<Info> = self.request(subject, &()).await?;
760        match request {
761            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
762            Response::Err { error } => Err(error.into()),
763            Response::Ok(info) => Ok(Stream {
764                context: self.clone(),
765                info,
766                name: config.name,
767            }),
768        }
769    }
770
771    /// Deletes a [Stream] with a given name.
772    ///
773    /// # Examples
774    ///
775    /// ```no_run
776    /// # #[tokio::main]
777    /// # async fn main() -> Result<(), async_nats::Error> {
778    /// use async_nats::jetstream::stream::Config;
779    /// let client = async_nats::connect("localhost:4222").await?;
780    /// let jetstream = async_nats::jetstream::new(client);
781    ///
782    /// let stream = jetstream.delete_stream("events").await?;
783    /// # Ok(())
784    /// # }
785    /// ```
786    pub async fn delete_stream<T: AsRef<str>>(
787        &self,
788        stream: T,
789    ) -> Result<DeleteStatus, DeleteStreamError> {
790        let stream = stream.as_ref();
791        if stream.is_empty() {
792            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
793        }
794
795        if !is_valid_name(stream) {
796            return Err(DeleteStreamError::new(
797                DeleteStreamErrorKind::InvalidStreamName,
798            ));
799        }
800
801        let subject = format!("STREAM.DELETE.{stream}");
802        match self
803            .request(subject, &json!({}))
804            .await
805            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
806        {
807            Response::Err { error } => Err(DeleteStreamError::new(
808                DeleteStreamErrorKind::JetStream(error),
809            )),
810            Response::Ok(delete_response) => Ok(delete_response),
811        }
812    }
813
814    /// Updates a [Stream] with a given config. If specific field cannot be updated,
815    /// error is returned.
816    ///
817    /// # Examples
818    ///
819    /// ```no_run
820    /// # #[tokio::main]
821    /// # async fn main() -> Result<(), async_nats::Error> {
822    /// use async_nats::jetstream::stream::Config;
823    /// use async_nats::jetstream::stream::DiscardPolicy;
824    /// let client = async_nats::connect("localhost:4222").await?;
825    /// let jetstream = async_nats::jetstream::new(client);
826    ///
827    /// let stream = jetstream
828    ///     .update_stream(Config {
829    ///         name: "events".to_string(),
830    ///         discard: DiscardPolicy::New,
831    ///         max_messages: 50_000,
832    ///         ..Default::default()
833    ///     })
834    ///     .await?;
835    /// # Ok(())
836    /// # }
837    /// ```
838    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
839    where
840        S: Borrow<Config>,
841    {
842        let config = config.borrow();
843
844        if config.name.is_empty() {
845            return Err(CreateStreamError::new(
846                CreateStreamErrorKind::EmptyStreamName,
847            ));
848        }
849
850        if !is_valid_name(config.name.as_str()) {
851            return Err(CreateStreamError::new(
852                CreateStreamErrorKind::InvalidStreamName,
853            ));
854        }
855
856        let subject = format!("STREAM.UPDATE.{}", config.name);
857        match self.request(subject, config).await? {
858            Response::Err { error } => Err(error.into()),
859            Response::Ok(info) => Ok(info),
860        }
861    }
862
863    /// Tries to update a [Stream] with a given config, and if it does not exist, create it.
864    ///
865    /// # Examples
866    ///
867    /// ```no_run
868    /// # #[tokio::main]
869    /// # async fn main() -> Result<(), async_nats::Error> {
870    /// use async_nats::jetstream::stream::Config;
871    /// use async_nats::jetstream::stream::DiscardPolicy;
872    /// let client = async_nats::connect("localhost:4222").await?;
873    /// let jetstream = async_nats::jetstream::new(client);
874    ///
875    /// let stream = jetstream
876    ///     .create_or_update_stream(Config {
877    ///         name: "events".to_string(),
878    ///         discard: DiscardPolicy::New,
879    ///         max_messages: 50_000,
880    ///         ..Default::default()
881    ///     })
882    ///     .await?;
883    /// # Ok(())
884    /// # }
885    /// ```
886    pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
887        match self.update_stream(config.clone()).await {
888            Ok(stream) => Ok(stream),
889            Err(err) => match err.kind() {
890                CreateStreamErrorKind::NotFound => {
891                    let stream = self
892                        .create_stream(config)
893                        .await
894                        .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
895                    Ok(stream.info)
896                }
897                _ => Err(err),
898            },
899        }
900    }
901
902    /// Looks up Stream that contains provided subject.
903    ///
904    /// # Examples
905    ///
906    /// ```no_run
907    /// # #[tokio::main]
908    /// # async fn main() -> Result<(), async_nats::Error> {
909    /// use futures_util::TryStreamExt;
910    /// let client = async_nats::connect("demo.nats.io:4222").await?;
911    /// let jetstream = async_nats::jetstream::new(client);
912    /// let stream_name = jetstream.stream_by_subject("foo.>");
913    /// # Ok(())
914    /// # }
915    /// ```
916    pub async fn stream_by_subject<T: Into<String>>(
917        &self,
918        subject: T,
919    ) -> Result<String, GetStreamByNameError> {
920        let subject = subject.into();
921        if !is_valid_subject(subject.as_str()) {
922            return Err(GetStreamByNameError::new(
923                GetStreamByNameErrorKind::InvalidSubject,
924            ));
925        }
926        let mut names = StreamNames {
927            context: self.clone(),
928            offset: 0,
929            page_request: None,
930            streams: Vec::new(),
931            subject: Some(subject),
932            done: false,
933        };
934        match names.next().await {
935            Some(name) => match name {
936                Ok(name) => Ok(name),
937                Err(err) => Err(GetStreamByNameError::with_source(
938                    GetStreamByNameErrorKind::Request,
939                    err,
940                )),
941            },
942            None => Err(GetStreamByNameError::new(
943                GetStreamByNameErrorKind::NotFound,
944            )),
945        }
946    }
947
948    /// Lists names of all streams for current context.
949    ///
950    /// # Examples
951    ///
952    /// ```no_run
953    /// # #[tokio::main]
954    /// # async fn main() -> Result<(), async_nats::Error> {
955    /// use futures_util::TryStreamExt;
956    /// let client = async_nats::connect("demo.nats.io:4222").await?;
957    /// let jetstream = async_nats::jetstream::new(client);
958    /// let mut names = jetstream.stream_names();
959    /// while let Some(stream) = names.try_next().await? {
960    ///     println!("stream: {}", stream);
961    /// }
962    /// # Ok(())
963    /// # }
964    /// ```
965    pub fn stream_names(&self) -> StreamNames {
966        StreamNames {
967            context: self.clone(),
968            offset: 0,
969            page_request: None,
970            streams: Vec::new(),
971            subject: None,
972            done: false,
973        }
974    }
975
976    /// Lists all streams info for current context.
977    ///
978    /// # Examples
979    ///
980    /// ```no_run
981    /// # #[tokio::main]
982    /// # async fn main() -> Result<(), async_nats::Error> {
983    /// use futures_util::TryStreamExt;
984    /// let client = async_nats::connect("demo.nats.io:4222").await?;
985    /// let jetstream = async_nats::jetstream::new(client);
986    /// let mut streams = jetstream.streams();
987    /// while let Some(stream) = streams.try_next().await? {
988    ///     println!("stream: {:?}", stream);
989    /// }
990    /// # Ok(())
991    /// # }
992    /// ```
993    pub fn streams(&self) -> Streams {
994        Streams {
995            context: self.clone(),
996            offset: 0,
997            page_request: None,
998            streams: Vec::new(),
999            done: false,
1000        }
1001    }
1002    /// Returns an existing key-value bucket.
1003    ///
1004    /// # Examples
1005    ///
1006    /// ```no_run
1007    /// # #[tokio::main]
1008    /// # async fn main() -> Result<(), async_nats::Error> {
1009    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1010    /// let jetstream = async_nats::jetstream::new(client);
1011    /// let kv = jetstream.get_key_value("bucket").await?;
1012    /// # Ok(())
1013    /// # }
1014    /// ```
1015    #[cfg(feature = "kv")]
1016    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1017    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
1018        let bucket: String = bucket.into();
1019        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
1020            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1021        }
1022
1023        let stream_name = format!("KV_{}", &bucket);
1024        let stream = self
1025            .get_stream(stream_name.clone())
1026            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
1027            .await?;
1028
1029        if stream.info.config.max_messages_per_subject < 1 {
1030            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1031        }
1032        let mut store = Store {
1033            prefix: format!("$KV.{}.", &bucket),
1034            name: bucket,
1035            stream_name,
1036            stream: stream.clone(),
1037            put_prefix: None,
1038            use_jetstream_prefix: self.prefix != "$JS.API",
1039        };
1040        if let Some(ref mirror) = stream.info.config.mirror {
1041            let bucket = mirror.name.trim_start_matches("KV_");
1042            if let Some(ref external) = mirror.external {
1043                if !external.api_prefix.is_empty() {
1044                    store.use_jetstream_prefix = false;
1045                    store.prefix = format!("$KV.{bucket}.");
1046                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
1047                } else {
1048                    store.put_prefix = Some(format!("$KV.{bucket}."));
1049                }
1050            }
1051        };
1052
1053        Ok(store)
1054    }
1055
1056    /// Creates a new key-value bucket.
1057    ///
1058    /// # Examples
1059    ///
1060    /// ```no_run
1061    /// # #[tokio::main]
1062    /// # async fn main() -> Result<(), async_nats::Error> {
1063    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1064    /// let jetstream = async_nats::jetstream::new(client);
1065    /// let kv = jetstream
1066    ///     .create_key_value(async_nats::jetstream::kv::Config {
1067    ///         bucket: "kv".to_string(),
1068    ///         history: 10,
1069    ///         ..Default::default()
1070    ///     })
1071    ///     .await?;
1072    /// # Ok(())
1073    /// # }
1074    /// ```
1075    #[cfg(feature = "kv")]
1076    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1077    pub async fn create_key_value(
1078        &self,
1079        config: crate::jetstream::kv::Config,
1080    ) -> Result<Store, CreateKeyValueError> {
1081        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1082            return Err(CreateKeyValueError::new(
1083                CreateKeyValueErrorKind::InvalidStoreName,
1084            ));
1085        }
1086        let info = self.query_account().await.map_err(|err| {
1087            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1088        })?;
1089
1090        let bucket_name = config.bucket.clone();
1091        let stream_config = kv_to_stream_config(config, info)?;
1092
1093        let stream = self.create_stream(stream_config).await.map_err(|err| {
1094            if err.kind() == CreateStreamErrorKind::TimedOut {
1095                CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1096            } else {
1097                CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1098            }
1099        })?;
1100
1101        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1102    }
1103
1104    /// Updates an existing key-value bucket.
1105    ///
1106    /// # Examples
1107    ///
1108    /// ```no_run
1109    /// # #[tokio::main]
1110    /// # async fn main() -> Result<(), async_nats::Error> {
1111    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1112    /// let jetstream = async_nats::jetstream::new(client);
1113    /// let kv = jetstream
1114    ///     .update_key_value(async_nats::jetstream::kv::Config {
1115    ///         bucket: "kv".to_string(),
1116    ///         history: 60,
1117    ///         ..Default::default()
1118    ///     })
1119    ///     .await?;
1120    /// # Ok(())
1121    /// # }
1122    /// ```
1123    #[cfg(feature = "kv")]
1124    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1125    pub async fn update_key_value(
1126        &self,
1127        config: crate::jetstream::kv::Config,
1128    ) -> Result<Store, UpdateKeyValueError> {
1129        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1130            return Err(UpdateKeyValueError::new(
1131                UpdateKeyValueErrorKind::InvalidStoreName,
1132            ));
1133        }
1134
1135        let stream_name = format!("KV_{}", config.bucket);
1136        let bucket_name = config.bucket.clone();
1137
1138        let account = self.query_account().await.map_err(|err| {
1139            UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1140        })?;
1141        let stream = self
1142            .update_stream(kv_to_stream_config(config, account)?)
1143            .await
1144            .map_err(|err| match err.kind() {
1145                UpdateStreamErrorKind::NotFound => {
1146                    UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1147                }
1148                _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1149            })?;
1150
1151        let stream = Stream {
1152            context: self.clone(),
1153            info: stream,
1154            name: stream_name,
1155        };
1156
1157        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1158    }
1159
1160    /// Tries to update an existing Key-Value bucket. If it does not exist, creates it.
1161    ///
1162    /// # Examples
1163    ///
1164    /// ```no_run
1165    /// # #[tokio::main]
1166    /// # async fn main() -> Result<(), async_nats::Error> {
1167    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1168    /// let jetstream = async_nats::jetstream::new(client);
1169    /// let kv = jetstream
1170    ///     .update_key_value(async_nats::jetstream::kv::Config {
1171    ///         bucket: "kv".to_string(),
1172    ///         history: 60,
1173    ///         ..Default::default()
1174    ///     })
1175    ///     .await?;
1176    /// # Ok(())
1177    /// # }
1178    /// ```
1179    #[cfg(feature = "kv")]
1180    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1181    pub async fn create_or_update_key_value(
1182        &self,
1183        config: crate::jetstream::kv::Config,
1184    ) -> Result<Store, CreateKeyValueError> {
1185        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1186            return Err(CreateKeyValueError::new(
1187                CreateKeyValueErrorKind::InvalidStoreName,
1188            ));
1189        }
1190
1191        let bucket_name = config.bucket.clone();
1192        let stream_name = format!("KV_{}", config.bucket);
1193
1194        let account = self.query_account().await.map_err(|err| {
1195            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1196        })?;
1197        let stream = self
1198            .create_or_update_stream(kv_to_stream_config(config, account)?)
1199            .await
1200            .map_err(|err| {
1201                CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1202            })?;
1203
1204        let stream = Stream {
1205            context: self.clone(),
1206            info: stream,
1207            name: stream_name,
1208        };
1209
1210        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1211    }
1212
1213    /// Deletes given key-value bucket.
1214    ///
1215    /// # Examples
1216    ///
1217    /// ```no_run
1218    /// # #[tokio::main]
1219    /// # async fn main() -> Result<(), async_nats::Error> {
1220    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1221    /// let jetstream = async_nats::jetstream::new(client);
1222    /// let kv = jetstream
1223    ///     .create_key_value(async_nats::jetstream::kv::Config {
1224    ///         bucket: "kv".to_string(),
1225    ///         history: 10,
1226    ///         ..Default::default()
1227    ///     })
1228    ///     .await?;
1229    /// # Ok(())
1230    /// # }
1231    /// ```
1232    #[cfg(feature = "kv")]
1233    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1234    pub async fn delete_key_value<T: AsRef<str>>(
1235        &self,
1236        bucket: T,
1237    ) -> Result<DeleteStatus, KeyValueError> {
1238        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1239            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1240        }
1241
1242        let stream_name = format!("KV_{}", bucket.as_ref());
1243        self.delete_stream(stream_name)
1244            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1245            .await
1246    }
1247
1248    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
1249    //     let config = config.borrow();
1250    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1251    //         return Err(Box::new(std::io::Error::other(
1252    //             "invalid bucket name",
1253    //         )));
1254    //     }
1255
1256    //     let stream_name = format!("KV_{}", config.bucket);
1257    //     self.update_stream()
1258    //         .await
1259    //         .and_then(|info| Ok(()))
1260    // }
1261
1262    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1263    ///
1264    /// It has one less interaction with the server when binding to only one
1265    /// [crate::jetstream::consumer::Consumer].
1266    ///
1267    /// # Examples:
1268    ///
1269    /// ```no_run
1270    /// # #[tokio::main]
1271    /// # async fn main() -> Result<(), async_nats::Error> {
1272    /// use async_nats::jetstream::consumer::PullConsumer;
1273    ///
1274    /// let client = async_nats::connect("localhost:4222").await?;
1275    /// let jetstream = async_nats::jetstream::new(client);
1276    ///
1277    /// let consumer: PullConsumer = jetstream
1278    ///     .get_consumer_from_stream("consumer", "stream")
1279    ///     .await?;
1280    ///
1281    /// # Ok(())
1282    /// # }
1283    /// ```
1284    pub async fn get_consumer_from_stream<T, C, S>(
1285        &self,
1286        consumer: C,
1287        stream: S,
1288    ) -> Result<Consumer<T>, ConsumerError>
1289    where
1290        T: FromConsumer + IntoConsumerConfig,
1291        S: AsRef<str>,
1292        C: AsRef<str>,
1293    {
1294        if !is_valid_name(stream.as_ref()) {
1295            return Err(ConsumerError::with_source(
1296                ConsumerErrorKind::InvalidName,
1297                "invalid stream",
1298            ));
1299        }
1300
1301        if !is_valid_name(consumer.as_ref()) {
1302            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1303        }
1304
1305        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1306
1307        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1308            Response::Ok(info) => info,
1309            Response::Err { error } => return Err(error.into()),
1310        };
1311
1312        Ok(Consumer::new(
1313            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1314                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1315            })?,
1316            info,
1317            self.clone(),
1318        ))
1319    }
1320
1321    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1322    ///
1323    /// It has one less interaction with the server when binding to only one
1324    /// [crate::jetstream::consumer::Consumer].
1325    ///
1326    /// # Examples:
1327    ///
1328    /// ```no_run
1329    /// # #[tokio::main]
1330    /// # async fn main() -> Result<(), async_nats::Error> {
1331    /// use async_nats::jetstream::consumer::PullConsumer;
1332    ///
1333    /// let client = async_nats::connect("localhost:4222").await?;
1334    /// let jetstream = async_nats::jetstream::new(client);
1335    ///
1336    /// jetstream
1337    ///     .delete_consumer_from_stream("consumer", "stream")
1338    ///     .await?;
1339    ///
1340    /// # Ok(())
1341    /// # }
1342    /// ```
1343    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1344        &self,
1345        consumer: C,
1346        stream: S,
1347    ) -> Result<DeleteStatus, ConsumerError> {
1348        if !is_valid_name(consumer.as_ref()) {
1349            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1350        }
1351
1352        if !is_valid_name(stream.as_ref()) {
1353            return Err(ConsumerError::with_source(
1354                ConsumerErrorKind::Other,
1355                "invalid stream name",
1356            ));
1357        }
1358
1359        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1360
1361        match self.request(subject, &json!({})).await? {
1362            Response::Ok(delete_status) => Ok(delete_status),
1363            Response::Err { error } => Err(error.into()),
1364        }
1365    }
1366
1367    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
1368    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1369    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
1370    ///
1371    /// # Examples
1372    ///
1373    /// ```no_run
1374    /// # #[tokio::main]
1375    /// # async fn main() -> Result<(), async_nats::Error> {
1376    /// use async_nats::jetstream::consumer;
1377    /// let client = async_nats::connect("localhost:4222").await?;
1378    /// let jetstream = async_nats::jetstream::new(client);
1379    ///
1380    /// let consumer: consumer::PullConsumer = jetstream
1381    ///     .create_consumer_on_stream(
1382    ///         consumer::pull::Config {
1383    ///             durable_name: Some("pull".to_string()),
1384    ///             ..Default::default()
1385    ///         },
1386    ///         "stream",
1387    ///     )
1388    ///     .await?;
1389    /// # Ok(())
1390    /// # }
1391    /// ```
1392    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1393        &self,
1394        config: C,
1395        stream: S,
1396    ) -> Result<Consumer<C>, ConsumerError> {
1397        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1398            .await
1399    }
1400
1401    /// Update an existing consumer.
1402    /// This call will fail if the consumer does not exist.
1403    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
1404    ///
1405    /// # Examples
1406    ///
1407    /// ```no_run
1408    /// # #[tokio::main]
1409    /// # async fn main() -> Result<(), async_nats::Error> {
1410    /// use async_nats::jetstream::consumer;
1411    /// let client = async_nats::connect("localhost:4222").await?;
1412    /// let jetstream = async_nats::jetstream::new(client);
1413    ///
1414    /// let consumer: consumer::PullConsumer = jetstream
1415    ///     .update_consumer_on_stream(
1416    ///         consumer::pull::Config {
1417    ///             durable_name: Some("pull".to_string()),
1418    ///             description: Some("updated pull consumer".to_string()),
1419    ///             ..Default::default()
1420    ///         },
1421    ///         "stream",
1422    ///     )
1423    ///     .await?;
1424    /// # Ok(())
1425    /// # }
1426    /// ```
1427    #[cfg(feature = "server_2_10")]
1428    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1429        &self,
1430        config: C,
1431        stream: S,
1432    ) -> Result<Consumer<C>, ConsumerUpdateError> {
1433        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1434            .await
1435            .map_err(|err| err.into())
1436    }
1437
1438    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
1439    /// the same.
1440    /// This method will fail if consumer is already present with different config.
1441    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1442    ///
1443    /// # Examples
1444    ///
1445    /// ```no_run
1446    /// # #[tokio::main]
1447    /// # async fn main() -> Result<(), async_nats::Error> {
1448    /// use async_nats::jetstream::consumer;
1449    /// let client = async_nats::connect("localhost:4222").await?;
1450    /// let jetstream = async_nats::jetstream::new(client);
1451    ///
1452    /// let consumer: consumer::PullConsumer = jetstream
1453    ///     .create_consumer_strict_on_stream(
1454    ///         consumer::pull::Config {
1455    ///             durable_name: Some("pull".to_string()),
1456    ///             ..Default::default()
1457    ///         },
1458    ///         "stream",
1459    ///     )
1460    ///     .await?;
1461    /// # Ok(())
1462    /// # }
1463    /// ```
1464    #[cfg(feature = "server_2_10")]
1465    pub async fn create_consumer_strict_on_stream<
1466        C: IntoConsumerConfig + FromConsumer,
1467        S: AsRef<str>,
1468    >(
1469        &self,
1470        config: C,
1471        stream: S,
1472    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1473        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1474            .await
1475            .map_err(|err| err.into())
1476    }
1477
1478    async fn create_consumer_on_stream_action<
1479        C: IntoConsumerConfig + FromConsumer,
1480        S: AsRef<str>,
1481    >(
1482        &self,
1483        config: C,
1484        stream: S,
1485        action: ConsumerAction,
1486    ) -> Result<Consumer<C>, ConsumerError> {
1487        let config = config.into_consumer_config();
1488
1489        let subject = {
1490            let filter = if config.filter_subject.is_empty() {
1491                "".to_string()
1492            } else {
1493                format!(".{}", config.filter_subject)
1494            };
1495            config
1496                .name
1497                .as_ref()
1498                .or(config.durable_name.as_ref())
1499                .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1500                .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1501        };
1502
1503        match self
1504            .request(
1505                subject,
1506                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1507            )
1508            .await?
1509        {
1510            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1511            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1512                FromConsumer::try_from_consumer_config(info.clone().config)
1513                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1514                info,
1515                self.clone(),
1516            )),
1517        }
1518    }
1519
1520    /// Send a request to the jetstream JSON API.
1521    ///
1522    /// This is a low level API used mostly internally, that should be used only in
1523    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1524    ///
1525    /// # Examples
1526    ///
1527    /// ```no_run
1528    /// # use async_nats::jetstream::stream::Info;
1529    /// # use async_nats::jetstream::response::Response;
1530    /// # #[tokio::main]
1531    /// # async fn main() -> Result<(), async_nats::Error> {
1532    /// let client = async_nats::connect("localhost:4222").await?;
1533    /// let jetstream = async_nats::jetstream::new(client);
1534    ///
1535    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1536    /// # Ok(())
1537    /// # }
1538    /// ```
1539    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1540    where
1541        S: ToSubject,
1542        T: ?Sized + Serialize,
1543        V: DeserializeOwned,
1544    {
1545        let subject = subject.to_subject();
1546        let request = serde_json::to_vec(&payload)
1547            .map(Bytes::from)
1548            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1549
1550        debug!("JetStream request sent: {:?}", request);
1551
1552        let message = self
1553            .client
1554            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1555            .await;
1556        let message = message?;
1557        debug!(
1558            "JetStream request response: {:?}",
1559            from_utf8(&message.payload)
1560        );
1561        let response = serde_json::from_slice(message.payload.as_ref())
1562            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1563
1564        Ok(response)
1565    }
1566
1567    /// Send a JetStream API request without waiting for a response.
1568    /// The subject will be automatically prefixed with the JetStream API prefix.
1569    ///
1570    /// This is useful for operations that need custom response handling,
1571    /// such as streaming responses (batch get) where the caller manages
1572    /// their own subscription to the reply inbox.
1573    ///
1574    /// Used mostly by extension crates.
1575    pub async fn send_request<S: ToSubject>(
1576        &self,
1577        subject: S,
1578        request: crate::client::Request,
1579    ) -> Result<(), crate::PublishError> {
1580        let prefixed_subject = format!("{}.{}", self.prefix, subject.to_subject());
1581        let inbox = request.inbox.unwrap_or_else(|| self.client.new_inbox());
1582        let payload = request.payload.unwrap_or_default();
1583
1584        match request.headers {
1585            Some(headers) => {
1586                self.client
1587                    .publish_with_reply_and_headers(prefixed_subject, inbox, headers, payload)
1588                    .await
1589            }
1590            None => {
1591                self.client
1592                    .publish_with_reply(prefixed_subject, inbox, payload)
1593                    .await
1594            }
1595        }
1596    }
1597
1598    /// Creates a new object store bucket.
1599    ///
1600    /// # Examples
1601    ///
1602    /// ```no_run
1603    /// # #[tokio::main]
1604    /// # async fn main() -> Result<(), async_nats::Error> {
1605    /// let client = async_nats::connect("demo.nats.io").await?;
1606    /// let jetstream = async_nats::jetstream::new(client);
1607    /// let bucket = jetstream
1608    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1609    ///         bucket: "bucket".to_string(),
1610    ///         ..Default::default()
1611    ///     })
1612    ///     .await?;
1613    /// # Ok(())
1614    /// # }
1615    /// ```
1616    #[cfg(feature = "object-store")]
1617    #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1618    pub async fn create_object_store(
1619        &self,
1620        config: super::object_store::Config,
1621    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1622        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1623            return Err(CreateObjectStoreError::new(
1624                CreateKeyValueErrorKind::InvalidStoreName,
1625            ));
1626        }
1627
1628        let bucket_name = config.bucket.clone();
1629        let stream_name = format!("OBJ_{bucket_name}");
1630        let chunk_subject = format!("$O.{bucket_name}.C.>");
1631        let meta_subject = format!("$O.{bucket_name}.M.>");
1632
1633        let stream = self
1634            .create_stream(super::stream::Config {
1635                name: stream_name,
1636                description: config.description.clone(),
1637                subjects: vec![chunk_subject, meta_subject],
1638                max_age: config.max_age,
1639                max_bytes: config.max_bytes,
1640                storage: config.storage,
1641                num_replicas: config.num_replicas,
1642                discard: DiscardPolicy::New,
1643                allow_rollup: true,
1644                allow_direct: true,
1645                #[cfg(feature = "server_2_10")]
1646                compression: if config.compression {
1647                    Some(Compression::S2)
1648                } else {
1649                    None
1650                },
1651                placement: config.placement,
1652                ..Default::default()
1653            })
1654            .await
1655            .map_err(|err| {
1656                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1657            })?;
1658
1659        Ok(ObjectStore {
1660            name: bucket_name,
1661            stream,
1662        })
1663    }
1664
1665    /// Get an existing object store bucket.
1666    ///
1667    /// # Examples
1668    ///
1669    /// ```no_run
1670    /// # #[tokio::main]
1671    /// # async fn main() -> Result<(), async_nats::Error> {
1672    /// let client = async_nats::connect("demo.nats.io").await?;
1673    /// let jetstream = async_nats::jetstream::new(client);
1674    /// let bucket = jetstream.get_object_store("bucket").await?;
1675    /// # Ok(())
1676    /// # }
1677    /// ```
1678    #[cfg(feature = "object-store")]
1679    #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1680    pub async fn get_object_store<T: AsRef<str>>(
1681        &self,
1682        bucket_name: T,
1683    ) -> Result<ObjectStore, ObjectStoreError> {
1684        let bucket_name = bucket_name.as_ref();
1685        if !is_valid_bucket_name(bucket_name) {
1686            return Err(ObjectStoreError::new(
1687                ObjectStoreErrorKind::InvalidBucketName,
1688            ));
1689        }
1690        let stream_name = format!("OBJ_{bucket_name}");
1691        let stream = self
1692            .get_stream(stream_name)
1693            .await
1694            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1695
1696        Ok(ObjectStore {
1697            name: bucket_name.to_string(),
1698            stream,
1699        })
1700    }
1701
1702    /// Delete a object store bucket.
1703    ///
1704    /// # Examples
1705    ///
1706    /// ```no_run
1707    /// # #[tokio::main]
1708    /// # async fn main() -> Result<(), async_nats::Error> {
1709    /// let client = async_nats::connect("demo.nats.io").await?;
1710    /// let jetstream = async_nats::jetstream::new(client);
1711    /// let bucket = jetstream.delete_object_store("bucket").await?;
1712    /// # Ok(())
1713    /// # }
1714    /// ```
1715    #[cfg(feature = "object-store")]
1716    #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1717    pub async fn delete_object_store<T: AsRef<str>>(
1718        &self,
1719        bucket_name: T,
1720    ) -> Result<(), DeleteObjectStore> {
1721        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1722        self.delete_stream(stream_name)
1723            .await
1724            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1725        Ok(())
1726    }
1727}
1728
1729impl crate::client::traits::Requester for Context {
1730    fn send_request<S: ToSubject>(
1731        &self,
1732        subject: S,
1733        request: crate::Request,
1734    ) -> impl Future<Output = Result<Message, crate::RequestError>> {
1735        self.client.send_request(subject, request)
1736    }
1737}
1738
1739impl crate::client::traits::Publisher for Context {
1740    fn publish_with_reply<S, R>(
1741        &self,
1742        subject: S,
1743        reply: R,
1744        payload: Bytes,
1745    ) -> impl Future<Output = Result<(), crate::PublishError>>
1746    where
1747        S: ToSubject,
1748        R: ToSubject,
1749    {
1750        self.client.publish_with_reply(subject, reply, payload)
1751    }
1752
1753    fn publish_message(
1754        &self,
1755        msg: crate::message::OutboundMessage,
1756    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1757        self.client.publish_message(msg)
1758    }
1759}
1760
1761impl traits::ClientProvider for Context {
1762    fn client(&self) -> crate::Client {
1763        self.client()
1764    }
1765}
1766
1767impl traits::Requester for Context {
1768    fn request<S, T, V>(
1769        &self,
1770        subject: S,
1771        payload: &T,
1772    ) -> impl Future<Output = Result<V, RequestError>>
1773    where
1774        S: ToSubject,
1775        T: ?Sized + Serialize,
1776        V: DeserializeOwned,
1777    {
1778        self.request(subject, payload)
1779    }
1780}
1781
1782impl traits::TimeoutProvider for Context {
1783    fn timeout(&self) -> Duration {
1784        self.timeout
1785    }
1786}
1787
1788impl traits::RequestSender for Context {
1789    fn send_request<S: ToSubject>(
1790        &self,
1791        subject: S,
1792        request: crate::client::Request,
1793    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1794        self.send_request(subject, request)
1795    }
1796}
1797
1798impl traits::Publisher for Context {
1799    fn publish<S: ToSubject>(
1800        &self,
1801        subject: S,
1802        payload: Bytes,
1803    ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1804        self.publish(subject, payload)
1805    }
1806
1807    fn publish_message(
1808        &self,
1809        message: jetstream::message::OutboundMessage,
1810    ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1811        self.send_publish(
1812            message.subject,
1813            PublishMessage {
1814                payload: message.payload,
1815                headers: message.headers,
1816            },
1817        )
1818    }
1819}
1820
1821#[derive(Clone, Copy, Debug, PartialEq)]
1822pub enum PublishErrorKind {
1823    StreamNotFound,
1824    WrongLastMessageId,
1825    WrongLastSequence,
1826    TimedOut,
1827    BrokenPipe,
1828    MaxAckPending,
1829    Other,
1830}
1831
1832impl Display for PublishErrorKind {
1833    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1834        match self {
1835            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1836            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1837            Self::Other => write!(f, "publish failed"),
1838            Self::BrokenPipe => write!(f, "broken pipe"),
1839            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1840            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1841            Self::MaxAckPending => write!(f, "max ack pending reached"),
1842        }
1843    }
1844}
1845
1846pub type PublishError = Error<PublishErrorKind>;
1847
1848#[derive(Debug)]
1849pub struct PublishAckFuture {
1850    timeout: Duration,
1851    subscription: Option<oneshot::Receiver<Message>>,
1852    permit: Option<OwnedSemaphorePermit>,
1853    tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1854}
1855
1856impl Drop for PublishAckFuture {
1857    fn drop(&mut self) {
1858        if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1859            if let Err(err) = self.tx.try_send((sub, permit)) {
1860                tracing::warn!("failed to pass future permit to the acker: {}", err);
1861            }
1862        }
1863    }
1864}
1865
1866impl PublishAckFuture {
1867    async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1868        let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1869            .await
1870            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1871        next.map_or_else(
1872            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1873            |m| {
1874                if m.status == Some(StatusCode::NO_RESPONDERS) {
1875                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1876                }
1877                let response = serde_json::from_slice(m.payload.as_ref())
1878                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1879                match response {
1880                    Response::Err { error } => match error.error_code() {
1881                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1882                            PublishErrorKind::WrongLastMessageId,
1883                            error,
1884                        )),
1885                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1886                            PublishErrorKind::WrongLastSequence,
1887                            error,
1888                        )),
1889                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1890                    },
1891                    Response::Ok(publish_ack) => Ok(publish_ack),
1892                }
1893            },
1894        )
1895    }
1896}
1897impl IntoFuture for PublishAckFuture {
1898    type Output = Result<PublishAck, PublishError>;
1899
1900    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1901
1902    fn into_future(self) -> Self::IntoFuture {
1903        Box::pin(std::future::IntoFuture::into_future(
1904            self.next_with_timeout(),
1905        ))
1906    }
1907}
1908
1909#[derive(Deserialize, Debug)]
1910struct StreamPage {
1911    total: usize,
1912    streams: Option<Vec<String>>,
1913}
1914
1915#[derive(Deserialize, Debug)]
1916struct StreamInfoPage {
1917    total: usize,
1918    streams: Option<Vec<super::stream::Info>>,
1919}
1920
1921type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1922
1923pub struct StreamNames {
1924    context: Context,
1925    offset: usize,
1926    page_request: Option<PageRequest>,
1927    subject: Option<String>,
1928    streams: Vec<String>,
1929    done: bool,
1930}
1931
1932impl futures_util::Stream for StreamNames {
1933    type Item = Result<String, StreamsError>;
1934
1935    fn poll_next(
1936        mut self: Pin<&mut Self>,
1937        cx: &mut std::task::Context<'_>,
1938    ) -> std::task::Poll<Option<Self::Item>> {
1939        match self.page_request.as_mut() {
1940            Some(page) => match page.try_poll_unpin(cx) {
1941                std::task::Poll::Ready(page) => {
1942                    self.page_request = None;
1943                    let page = page
1944                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1945                    if let Some(streams) = page.streams {
1946                        self.offset += streams.len();
1947                        self.streams = streams;
1948                        if self.offset >= page.total {
1949                            self.done = true;
1950                        }
1951                        match self.streams.pop() {
1952                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1953                            None => Poll::Ready(None),
1954                        }
1955                    } else {
1956                        Poll::Ready(None)
1957                    }
1958                }
1959                std::task::Poll::Pending => std::task::Poll::Pending,
1960            },
1961            None => {
1962                if let Some(stream) = self.streams.pop() {
1963                    Poll::Ready(Some(Ok(stream)))
1964                } else {
1965                    if self.done {
1966                        return Poll::Ready(None);
1967                    }
1968                    let context = self.context.clone();
1969                    let offset = self.offset;
1970                    let subject = self.subject.clone();
1971                    self.page_request = Some(Box::pin(async move {
1972                        match context
1973                            .request(
1974                                "STREAM.NAMES",
1975                                &json!({
1976                                    "offset": offset,
1977                                    "subject": subject
1978                                }),
1979                            )
1980                            .await?
1981                        {
1982                            Response::Err { error } => {
1983                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1984                            }
1985                            Response::Ok(page) => Ok(page),
1986                        }
1987                    }));
1988                    self.poll_next(cx)
1989                }
1990            }
1991        }
1992    }
1993}
1994
1995type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1996
1997pub type StreamsErrorKind = RequestErrorKind;
1998pub type StreamsError = RequestError;
1999
2000pub struct Streams {
2001    context: Context,
2002    offset: usize,
2003    page_request: Option<PageInfoRequest>,
2004    streams: Vec<super::stream::Info>,
2005    done: bool,
2006}
2007
2008impl futures_util::Stream for Streams {
2009    type Item = Result<super::stream::Info, StreamsError>;
2010
2011    fn poll_next(
2012        mut self: Pin<&mut Self>,
2013        cx: &mut std::task::Context<'_>,
2014    ) -> std::task::Poll<Option<Self::Item>> {
2015        match self.page_request.as_mut() {
2016            Some(page) => match page.try_poll_unpin(cx) {
2017                std::task::Poll::Ready(page) => {
2018                    self.page_request = None;
2019                    let page = page
2020                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
2021                    if let Some(streams) = page.streams {
2022                        self.offset += streams.len();
2023                        self.streams = streams;
2024                        if self.offset >= page.total {
2025                            self.done = true;
2026                        }
2027                        match self.streams.pop() {
2028                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2029                            None => Poll::Ready(None),
2030                        }
2031                    } else {
2032                        Poll::Ready(None)
2033                    }
2034                }
2035                std::task::Poll::Pending => std::task::Poll::Pending,
2036            },
2037            None => {
2038                if let Some(stream) = self.streams.pop() {
2039                    Poll::Ready(Some(Ok(stream)))
2040                } else {
2041                    if self.done {
2042                        return Poll::Ready(None);
2043                    }
2044                    let context = self.context.clone();
2045                    let offset = self.offset;
2046                    self.page_request = Some(Box::pin(async move {
2047                        match context
2048                            .request(
2049                                "STREAM.LIST",
2050                                &json!({
2051                                    "offset": offset,
2052                                }),
2053                            )
2054                            .await?
2055                        {
2056                            Response::Err { error } => {
2057                                Err(RequestError::with_source(RequestErrorKind::Other, error))
2058                            }
2059                            Response::Ok(page) => Ok(page),
2060                        }
2061                    }));
2062                    self.poll_next(cx)
2063                }
2064            }
2065        }
2066    }
2067}
2068
2069/// Alias to avoid breaking changes.
2070/// Please use `jetstream::message::PublishMessage` instead.
2071#[deprecated(
2072    note = "use jetstream::message::PublishMessage instead",
2073    since = "0.44.0"
2074)]
2075pub type Publish = super::message::PublishMessage;
2076
2077#[derive(Clone, Copy, Debug, PartialEq)]
2078pub enum RequestErrorKind {
2079    NoResponders,
2080    TimedOut,
2081    InvalidSubject,
2082    Other,
2083}
2084
2085impl Display for RequestErrorKind {
2086    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2087        match self {
2088            Self::TimedOut => write!(f, "timed out"),
2089            Self::Other => write!(f, "request failed"),
2090            Self::InvalidSubject => write!(f, "invalid subject"),
2091            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
2092        }
2093    }
2094}
2095
2096pub type RequestError = Error<RequestErrorKind>;
2097
2098impl From<crate::RequestError> for RequestError {
2099    fn from(error: crate::RequestError) -> Self {
2100        match error.kind() {
2101            crate::RequestErrorKind::TimedOut => {
2102                RequestError::with_source(RequestErrorKind::TimedOut, error)
2103            }
2104            crate::RequestErrorKind::NoResponders => {
2105                RequestError::new(RequestErrorKind::NoResponders)
2106            }
2107            crate::RequestErrorKind::InvalidSubject => {
2108                RequestError::with_source(RequestErrorKind::InvalidSubject, error)
2109            }
2110            crate::RequestErrorKind::Other => {
2111                RequestError::with_source(RequestErrorKind::Other, error)
2112            }
2113        }
2114    }
2115}
2116
2117impl From<super::errors::Error> for RequestError {
2118    fn from(err: super::errors::Error) -> Self {
2119        RequestError::with_source(RequestErrorKind::Other, err)
2120    }
2121}
2122
2123pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
2124
2125#[derive(Clone, Debug, PartialEq)]
2126pub enum ConsumerInfoErrorKind {
2127    InvalidName,
2128    Offline,
2129    NotFound,
2130    StreamNotFound,
2131    Request,
2132    JetStream(super::errors::Error),
2133    TimedOut,
2134    NoResponders,
2135}
2136
2137impl Display for ConsumerInfoErrorKind {
2138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2139        match self {
2140            Self::InvalidName => write!(f, "invalid consumer name"),
2141            Self::Offline => write!(f, "consumer is offline"),
2142            Self::NotFound => write!(f, "consumer not found"),
2143            Self::StreamNotFound => write!(f, "stream not found"),
2144            Self::Request => write!(f, "request error"),
2145            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2146            Self::TimedOut => write!(f, "timed out"),
2147            Self::NoResponders => write!(f, "no responders"),
2148        }
2149    }
2150}
2151
2152impl From<super::errors::Error> for ConsumerInfoError {
2153    fn from(error: super::errors::Error) -> Self {
2154        match error.error_code() {
2155            ErrorCode::CONSUMER_NOT_FOUND => {
2156                ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2157            }
2158            ErrorCode::STREAM_NOT_FOUND => {
2159                ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2160            }
2161            ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2162            _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2163        }
2164    }
2165}
2166
2167impl From<RequestError> for ConsumerInfoError {
2168    fn from(error: RequestError) -> Self {
2169        match error.kind() {
2170            RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2171            RequestErrorKind::InvalidSubject => {
2172                ConsumerInfoError::with_source(ConsumerInfoErrorKind::InvalidName, error)
2173            }
2174            RequestErrorKind::Other => {
2175                ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2176            }
2177            RequestErrorKind::NoResponders => {
2178                ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2179            }
2180        }
2181    }
2182}
2183
2184#[derive(Clone, Debug, PartialEq)]
2185pub enum CreateStreamErrorKind {
2186    EmptyStreamName,
2187    InvalidStreamName,
2188    DomainAndExternalSet,
2189    JetStreamUnavailable,
2190    JetStream(super::errors::Error),
2191    TimedOut,
2192    Response,
2193    NotFound,
2194    ResponseParse,
2195}
2196
2197impl Display for CreateStreamErrorKind {
2198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2199        match self {
2200            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2201            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2202            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2203            Self::NotFound => write!(f, "stream not found"),
2204            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2205            Self::TimedOut => write!(f, "jetstream request timed out"),
2206            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2207            Self::ResponseParse => write!(f, "failed to parse server response"),
2208            Self::Response => write!(f, "response error"),
2209        }
2210    }
2211}
2212
2213pub type CreateStreamError = Error<CreateStreamErrorKind>;
2214
2215impl From<super::errors::Error> for CreateStreamError {
2216    fn from(error: super::errors::Error) -> Self {
2217        match error.kind() {
2218            super::errors::ErrorCode::STREAM_NOT_FOUND => {
2219                CreateStreamError::new(CreateStreamErrorKind::NotFound)
2220            }
2221            _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2222        }
2223    }
2224}
2225
2226impl From<RequestError> for CreateStreamError {
2227    fn from(error: RequestError) -> Self {
2228        match error.kind() {
2229            RequestErrorKind::NoResponders => {
2230                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2231            }
2232            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2233            RequestErrorKind::InvalidSubject => {
2234                CreateStreamError::with_source(CreateStreamErrorKind::InvalidStreamName, error)
2235            }
2236            RequestErrorKind::Other => {
2237                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2238            }
2239        }
2240    }
2241}
2242
2243#[derive(Clone, Debug, PartialEq)]
2244pub enum GetStreamErrorKind {
2245    EmptyName,
2246    Request,
2247    InvalidStreamName,
2248    JetStream(super::errors::Error),
2249}
2250
2251impl Display for GetStreamErrorKind {
2252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2253        match self {
2254            Self::EmptyName => write!(f, "empty name cannot be empty"),
2255            Self::Request => write!(f, "request error"),
2256            Self::InvalidStreamName => write!(f, "invalid stream name"),
2257            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2258        }
2259    }
2260}
2261
2262#[derive(Clone, Debug, PartialEq)]
2263pub enum GetStreamByNameErrorKind {
2264    Request,
2265    NotFound,
2266    InvalidSubject,
2267    JetStream(super::errors::Error),
2268}
2269
2270impl Display for GetStreamByNameErrorKind {
2271    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2272        match self {
2273            Self::Request => write!(f, "request error"),
2274            Self::NotFound => write!(f, "stream not found"),
2275            Self::InvalidSubject => write!(f, "invalid subject"),
2276            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2277        }
2278    }
2279}
2280
2281pub type GetStreamError = Error<GetStreamErrorKind>;
2282pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2283
2284pub type UpdateStreamError = CreateStreamError;
2285pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2286pub type DeleteStreamError = GetStreamError;
2287pub type DeleteStreamErrorKind = GetStreamErrorKind;
2288
2289#[cfg(feature = "kv")]
2290#[derive(Clone, Copy, Debug, PartialEq)]
2291pub enum KeyValueErrorKind {
2292    InvalidStoreName,
2293    GetBucket,
2294    JetStream,
2295}
2296
2297#[cfg(feature = "kv")]
2298impl Display for KeyValueErrorKind {
2299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2300        match self {
2301            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2302            Self::GetBucket => write!(f, "failed to get the bucket"),
2303            Self::JetStream => write!(f, "JetStream error"),
2304        }
2305    }
2306}
2307
2308#[cfg(feature = "kv")]
2309pub type KeyValueError = Error<KeyValueErrorKind>;
2310
2311#[cfg(any(feature = "kv", feature = "object-store"))]
2312#[derive(Clone, Copy, Debug, PartialEq)]
2313pub enum CreateKeyValueErrorKind {
2314    InvalidStoreName,
2315    TooLongHistory,
2316    JetStream,
2317    BucketCreate,
2318    TimedOut,
2319    LimitMarkersNotSupported,
2320}
2321
2322#[cfg(any(feature = "kv", feature = "object-store"))]
2323impl Display for CreateKeyValueErrorKind {
2324    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2325        match self {
2326            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2327            Self::TooLongHistory => write!(f, "too long history"),
2328            Self::JetStream => write!(f, "JetStream error"),
2329            Self::BucketCreate => write!(f, "bucket creation failed"),
2330            Self::TimedOut => write!(f, "timed out"),
2331            Self::LimitMarkersNotSupported => {
2332                write!(f, "limit markers not supported")
2333            }
2334        }
2335    }
2336}
2337
2338#[cfg(feature = "kv")]
2339#[derive(Clone, Copy, Debug, PartialEq)]
2340pub enum UpdateKeyValueErrorKind {
2341    InvalidStoreName,
2342    TooLongHistory,
2343    JetStream,
2344    BucketUpdate,
2345    TimedOut,
2346    LimitMarkersNotSupported,
2347    NotFound,
2348}
2349
2350#[cfg(feature = "kv")]
2351impl Display for UpdateKeyValueErrorKind {
2352    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2353        match self {
2354            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2355            Self::TooLongHistory => write!(f, "too long history"),
2356            Self::JetStream => write!(f, "JetStream error"),
2357            Self::BucketUpdate => write!(f, "bucket creation failed"),
2358            Self::TimedOut => write!(f, "timed out"),
2359            Self::LimitMarkersNotSupported => {
2360                write!(f, "limit markers not supported")
2361            }
2362            Self::NotFound => write!(f, "bucket does not exist"),
2363        }
2364    }
2365}
2366#[cfg(feature = "kv")]
2367pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2368#[cfg(feature = "kv")]
2369pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2370
2371#[cfg(feature = "object-store")]
2372pub type CreateObjectStoreError = Error<CreateKeyValueErrorKind>;
2373#[cfg(feature = "object-store")]
2374pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2375
2376#[cfg(feature = "object-store")]
2377#[derive(Clone, Copy, Debug, PartialEq)]
2378pub enum ObjectStoreErrorKind {
2379    InvalidBucketName,
2380    GetStore,
2381}
2382
2383#[cfg(feature = "object-store")]
2384impl Display for ObjectStoreErrorKind {
2385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2386        match self {
2387            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2388            Self::GetStore => write!(f, "failed to get Object Store"),
2389        }
2390    }
2391}
2392
2393#[cfg(feature = "object-store")]
2394pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2395
2396#[cfg(feature = "object-store")]
2397pub type DeleteObjectStore = ObjectStoreError;
2398#[cfg(feature = "object-store")]
2399pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2400
2401#[derive(Clone, Debug, PartialEq)]
2402pub enum AccountErrorKind {
2403    TimedOut,
2404    JetStream(super::errors::Error),
2405    JetStreamUnavailable,
2406    Other,
2407}
2408
2409impl Display for AccountErrorKind {
2410    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2411        match self {
2412            Self::TimedOut => write!(f, "timed out"),
2413            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2414            Self::Other => write!(f, "error"),
2415            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2416        }
2417    }
2418}
2419
2420pub type AccountError = Error<AccountErrorKind>;
2421
2422impl From<RequestError> for AccountError {
2423    fn from(err: RequestError) -> Self {
2424        match err.kind {
2425            RequestErrorKind::NoResponders => {
2426                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2427            }
2428            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2429            RequestErrorKind::Other | RequestErrorKind::InvalidSubject => {
2430                AccountError::with_source(AccountErrorKind::Other, err)
2431            }
2432        }
2433    }
2434}
2435
2436#[derive(Clone, Debug, Serialize)]
2437enum ConsumerAction {
2438    #[serde(rename = "")]
2439    CreateOrUpdate,
2440    #[serde(rename = "create")]
2441    #[cfg(feature = "server_2_10")]
2442    Create,
2443    #[serde(rename = "update")]
2444    #[cfg(feature = "server_2_10")]
2445    Update,
2446}
2447
2448#[cfg(feature = "kv")]
2449// Maps a Stream config to KV Store.
2450fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2451    let mut store = Store {
2452        prefix: format!("$KV.{}.", bucket.as_str()),
2453        name: bucket,
2454        stream: stream.clone(),
2455        stream_name: stream.info.config.name.clone(),
2456        put_prefix: None,
2457        use_jetstream_prefix: prefix != "$JS.API",
2458    };
2459    if let Some(ref mirror) = stream.info.config.mirror {
2460        let bucket = mirror.name.trim_start_matches("KV_");
2461        if let Some(ref external) = mirror.external {
2462            if !external.api_prefix.is_empty() {
2463                store.use_jetstream_prefix = false;
2464                store.prefix = format!("$KV.{bucket}.");
2465                store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2466            } else {
2467                store.put_prefix = Some(format!("$KV.{bucket}."));
2468            }
2469        }
2470    };
2471    store
2472}
2473
2474#[cfg(feature = "kv")]
2475enum KvToStreamConfigError {
2476    TooLongHistory,
2477    #[allow(dead_code)]
2478    LimitMarkersNotSupported,
2479}
2480
2481#[cfg(feature = "kv")]
2482impl From<KvToStreamConfigError> for CreateKeyValueError {
2483    fn from(err: KvToStreamConfigError) -> Self {
2484        match err {
2485            KvToStreamConfigError::TooLongHistory => {
2486                CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2487            }
2488            KvToStreamConfigError::LimitMarkersNotSupported => {
2489                CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2490            }
2491        }
2492    }
2493}
2494
2495#[cfg(feature = "kv")]
2496impl From<KvToStreamConfigError> for UpdateKeyValueError {
2497    fn from(err: KvToStreamConfigError) -> Self {
2498        match err {
2499            KvToStreamConfigError::TooLongHistory => {
2500                UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2501            }
2502            KvToStreamConfigError::LimitMarkersNotSupported => {
2503                UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2504            }
2505        }
2506    }
2507}
2508
2509#[cfg(feature = "kv")]
2510// Maps the KV config to Stream config.
2511fn kv_to_stream_config(
2512    config: crate::jetstream::kv::Config,
2513    _account: Account,
2514) -> Result<super::stream::Config, KvToStreamConfigError> {
2515    let history = if config.history > 0 {
2516        if config.history > MAX_HISTORY {
2517            return Err(KvToStreamConfigError::TooLongHistory);
2518        }
2519        config.history
2520    } else {
2521        1
2522    };
2523
2524    let num_replicas = if config.num_replicas == 0 {
2525        1
2526    } else {
2527        config.num_replicas
2528    };
2529
2530    #[cfg(feature = "server_2_11")]
2531    let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2532
2533    #[cfg(feature = "server_2_11")]
2534    if let Some(duration) = config.limit_markers {
2535        if _account.requests.level < 1 {
2536            return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2537        }
2538        allow_message_ttl = true;
2539        subject_delete_marker_ttl = Some(duration);
2540    }
2541
2542    let mut mirror = config.mirror.clone();
2543    let mut sources = config.sources.clone();
2544    let mut mirror_direct = config.mirror_direct;
2545
2546    let mut subjects = Vec::new();
2547    if let Some(ref mut mirror) = mirror {
2548        if !mirror.name.starts_with("KV_") {
2549            mirror.name = format!("KV_{}", mirror.name);
2550        }
2551        mirror_direct = true;
2552    } else if let Some(ref mut sources) = sources {
2553        for source in sources {
2554            if !source.name.starts_with("KV_") {
2555                source.name = format!("KV_{}", source.name);
2556            }
2557        }
2558    } else {
2559        subjects = vec![format!("$KV.{}.>", config.bucket)];
2560    }
2561
2562    Ok(Config {
2563        name: format!("KV_{}", config.bucket),
2564        description: Some(config.description),
2565        subjects,
2566        max_messages_per_subject: history,
2567        max_bytes: config.max_bytes,
2568        max_age: config.max_age,
2569        max_message_size: config.max_value_size,
2570        storage: config.storage,
2571        republish: config.republish,
2572        allow_rollup: true,
2573        deny_delete: true,
2574        deny_purge: false,
2575        allow_direct: true,
2576        sources,
2577        mirror,
2578        num_replicas,
2579        discard: DiscardPolicy::New,
2580        mirror_direct,
2581        #[cfg(feature = "server_2_10")]
2582        compression: if config.compression {
2583            Some(Compression::S2)
2584        } else {
2585            None
2586        },
2587        placement: config.placement,
2588        #[cfg(feature = "server_2_11")]
2589        allow_message_ttl,
2590        #[cfg(feature = "server_2_11")]
2591        subject_delete_marker_ttl,
2592        ..Default::default()
2593    })
2594}