async_nats/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::jetstream::account::Account;
18use crate::jetstream::message::PublishMessage;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{is_valid_subject, jetstream, Client, Command, Message, StatusCode};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25use futures_util::{Future, StreamExt, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Debug;
31use std::fmt::Display;
32use std::future::IntoFuture;
33use std::pin::Pin;
34use std::str::from_utf8;
35use std::sync::Arc;
36use std::task::Poll;
37use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
38use tokio::time::Duration;
39use tokio_stream::wrappers::ReceiverStream;
40use tracing::debug;
41
42use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
43use super::errors::ErrorCode;
44use super::kv::{Store, MAX_HISTORY};
45use super::object_store::{is_valid_bucket_name, ObjectStore};
46use super::stream::{
47    self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
48    Stream,
49};
50#[cfg(feature = "server_2_10")]
51use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
52use super::{is_valid_name, kv};
53
54pub mod traits {
55    use std::{future::Future, time::Duration};
56
57    use bytes::Bytes;
58    use serde::{de::DeserializeOwned, Serialize};
59
60    use crate::{jetstream::message, subject::ToSubject, Request};
61
62    use super::RequestError;
63
64    pub trait Requester {
65        fn request<S, T, V>(
66            &self,
67            subject: S,
68            payload: &T,
69        ) -> impl Future<Output = Result<V, RequestError>>
70        where
71            S: ToSubject,
72            T: ?Sized + Serialize,
73            V: DeserializeOwned;
74    }
75
76    pub trait RequestSender {
77        fn send_request<S: ToSubject>(
78            &self,
79            subject: S,
80            request: Request,
81        ) -> impl Future<Output = Result<(), crate::PublishError>>;
82    }
83
84    pub trait Publisher {
85        fn publish<S: ToSubject>(
86            &self,
87            subject: S,
88            payload: Bytes,
89        ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
90
91        fn publish_message(
92            &self,
93            message: message::OutboundMessage,
94        ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
95    }
96
97    pub trait ClientProvider {
98        fn client(&self) -> crate::Client;
99    }
100
101    pub trait TimeoutProvider {
102        fn timeout(&self) -> Duration;
103    }
104}
105
106/// A context which can perform jetstream scoped requests.
107#[derive(Debug, Clone)]
108pub struct Context {
109    pub(crate) client: Client,
110    pub(crate) prefix: String,
111    pub(crate) timeout: Duration,
112    pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
113    pub(crate) ack_sender:
114        tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
115    pub(crate) backpressure_on_inflight: bool,
116    pub(crate) semaphore_capacity: usize,
117}
118
119fn spawn_acker(
120    rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
121    ack_timeout: Duration,
122    concurrency: Option<usize>,
123) -> tokio::task::JoinHandle<()> {
124    tokio::spawn(async move {
125        rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
126            tokio::time::timeout(ack_timeout, subscription).await.ok();
127            drop(permit);
128        })
129        .await;
130        debug!("Acker task exited");
131    })
132}
133
134use std::marker::PhantomData;
135
136#[derive(Debug, Default)]
137pub struct Yes;
138#[derive(Debug, Default)]
139pub struct No;
140
141pub trait ToAssign: Debug {}
142
143impl ToAssign for Yes {}
144impl ToAssign for No {}
145
146/// A builder for [Context]. Beyond what can be set by standard constructor, it allows tweaking
147/// pending publish ack backpressure settings.
148/// # Examples
149/// ```no_run
150/// # use async_nats::jetstream::context::ContextBuilder;
151/// # use async_nats::Client;
152/// # use std::time::Duration;
153/// # #[tokio::main]
154/// # async fn main() -> Result<(), async_nats::Error> {
155/// let client = async_nats::connect("demo.nats.io").await?;
156/// let context = ContextBuilder::new()
157///     .timeout(Duration::from_secs(5))
158///     .api_prefix("MY.JS.API")
159///     .max_ack_inflight(1000)
160///     .build(client);
161/// # Ok(())
162/// # }
163/// ```
164pub struct ContextBuilder<PREFIX: ToAssign> {
165    prefix: String,
166    timeout: Duration,
167    semaphore_capacity: usize,
168    ack_timeout: Duration,
169    backpressure_on_inflight: bool,
170    concurrency_limit: Option<usize>,
171    _phantom: PhantomData<PREFIX>,
172}
173
174impl Default for ContextBuilder<Yes> {
175    fn default() -> Self {
176        ContextBuilder {
177            prefix: "$JS.API".to_string(),
178            timeout: Duration::from_secs(5),
179            semaphore_capacity: 5_000,
180            ack_timeout: Duration::from_secs(30),
181            backpressure_on_inflight: true,
182            concurrency_limit: None,
183            _phantom: PhantomData {},
184        }
185    }
186}
187
188impl ContextBuilder<Yes> {
189    /// Create a new [ContextBuilder] with default settings.
190    pub fn new() -> ContextBuilder<Yes> {
191        ContextBuilder::default()
192    }
193}
194
195impl ContextBuilder<Yes> {
196    /// Set the prefix for the JetStream API.
197    pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
198        ContextBuilder {
199            prefix: prefix.into(),
200            timeout: self.timeout,
201            semaphore_capacity: self.semaphore_capacity,
202            ack_timeout: self.ack_timeout,
203            backpressure_on_inflight: self.backpressure_on_inflight,
204            concurrency_limit: self.concurrency_limit,
205            _phantom: PhantomData,
206        }
207    }
208
209    /// Set the domain for the JetStream API. Domain is the middle part of standard API prefix:
210    /// $JS.{domain}.API.
211    pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
212        ContextBuilder {
213            prefix: format!("$JS.{}.API", domain.into()),
214            timeout: self.timeout,
215            semaphore_capacity: self.semaphore_capacity,
216            ack_timeout: self.ack_timeout,
217            backpressure_on_inflight: self.backpressure_on_inflight,
218            concurrency_limit: self.concurrency_limit,
219            _phantom: PhantomData,
220        }
221    }
222}
223
224impl<PREFIX> ContextBuilder<PREFIX>
225where
226    PREFIX: ToAssign,
227{
228    /// Set the timeout for all JetStream API requests.
229    pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
230    where
231        Yes: ToAssign,
232    {
233        ContextBuilder {
234            prefix: self.prefix,
235            timeout,
236            semaphore_capacity: self.semaphore_capacity,
237            ack_timeout: self.ack_timeout,
238            backpressure_on_inflight: self.backpressure_on_inflight,
239            concurrency_limit: self.concurrency_limit,
240            _phantom: PhantomData,
241        }
242    }
243
244    /// Sets the maximum time client waits for acks from the server when default backpressure is
245    /// used.
246    pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
247    where
248        Yes: ToAssign,
249    {
250        ContextBuilder {
251            prefix: self.prefix,
252            timeout: self.timeout,
253            semaphore_capacity: self.semaphore_capacity,
254            ack_timeout,
255            backpressure_on_inflight: self.backpressure_on_inflight,
256            concurrency_limit: self.concurrency_limit,
257            _phantom: PhantomData,
258        }
259    }
260
261    /// Sets the maximum number of pending acks that can be in flight at any given time.
262    /// If limit is reached, `publish` throws an error by default, or waits if backpressure is enabled.
263    pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
264    where
265        Yes: ToAssign,
266    {
267        ContextBuilder {
268            prefix: self.prefix,
269            timeout: self.timeout,
270            semaphore_capacity: capacity,
271            ack_timeout: self.ack_timeout,
272            backpressure_on_inflight: self.backpressure_on_inflight,
273            concurrency_limit: self.concurrency_limit,
274            _phantom: PhantomData,
275        }
276    }
277
278    /// Enable or disable backpressure when max inflight acks is reached.
279    /// When enabled, publish will wait for permits to become available instead of returning an
280    /// error.
281    /// Default is false (errors on max inflight).
282    pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
283    where
284        Yes: ToAssign,
285    {
286        ContextBuilder {
287            prefix: self.prefix,
288            timeout: self.timeout,
289            semaphore_capacity: self.semaphore_capacity,
290            ack_timeout: self.ack_timeout,
291            backpressure_on_inflight: enabled,
292            concurrency_limit: self.concurrency_limit,
293            _phantom: PhantomData,
294        }
295    }
296
297    /// Sets the concurrency limit for the ack handler task. This might be useful
298    /// in scenarios where Tokio runtime is under heavy load.
299    pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
300    where
301        Yes: ToAssign,
302    {
303        ContextBuilder {
304            prefix: self.prefix,
305            timeout: self.timeout,
306            semaphore_capacity: self.semaphore_capacity,
307            ack_timeout: self.ack_timeout,
308            backpressure_on_inflight: self.backpressure_on_inflight,
309            concurrency_limit: limit,
310            _phantom: PhantomData,
311        }
312    }
313
314    /// Build the [Context] with the given settings.
315    pub fn build(self, client: Client) -> Context {
316        let (tx, rx) = tokio::sync::mpsc::channel::<(
317            oneshot::Receiver<Message>,
318            OwnedSemaphorePermit,
319        )>(self.semaphore_capacity);
320        let stream = ReceiverStream::new(rx);
321        spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
322        Context {
323            client,
324            prefix: self.prefix,
325            timeout: self.timeout,
326            max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
327            ack_sender: tx,
328            backpressure_on_inflight: self.backpressure_on_inflight,
329            semaphore_capacity: self.semaphore_capacity,
330        }
331    }
332}
333
334impl Context {
335    pub(crate) fn new(client: Client) -> Context {
336        ContextBuilder::default().build(client)
337    }
338
339    /// Sets the timeout for all JetStream API requests.
340    pub fn set_timeout(&mut self, timeout: Duration) {
341        self.timeout = timeout
342    }
343
344    /// Return a clone of the underlying NATS client.
345    pub fn client(&self) -> Client {
346        self.client.clone()
347    }
348
349    /// Waits until all pending `acks` are received from the server.
350    /// Be aware that this is probably not the way you want to await `acks`,
351    /// as it will wait for every `ack` that is pending, including those that might
352    /// be published after you call this method.
353    /// Useful in testing, or maybe batching.
354    pub async fn wait_for_acks(&self) {
355        self.max_ack_semaphore
356            .acquire_many(self.semaphore_capacity as u32)
357            .await
358            .ok();
359    }
360
361    /// Create a new [Context] with given API prefix.
362    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
363        ContextBuilder::new()
364            .api_prefix(prefix.to_string())
365            .build(client)
366    }
367
368    /// Create a new [Context] with given domain.
369    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
370        ContextBuilder::new().domain(domain.as_ref()).build(client)
371    }
372
373    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
374    /// acknowledgment from the server that the message has been successfully delivered.
375    ///
376    /// Acknowledgment future that can be polled is returned instead.
377    ///
378    /// If the stream does not exist, `no responders` error will be returned.
379    ///
380    /// # Examples
381    ///
382    /// Publish, and after each publish, await for acknowledgment.
383    ///
384    /// ```no_run
385    /// # #[tokio::main]
386    /// # async fn main() -> Result<(), async_nats::Error> {
387    /// let client = async_nats::connect("localhost:4222").await?;
388    /// let jetstream = async_nats::jetstream::new(client);
389    ///
390    /// let ack = jetstream.publish("events", "data".into()).await?;
391    /// ack.await?;
392    /// jetstream.publish("events", "data".into()).await?.await?;
393    /// # Ok(())
394    /// # }
395    /// ```
396    ///
397    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
398    /// ignored entirely.
399    ///
400    /// ```no_run
401    /// # #[tokio::main]
402    /// # async fn main() -> Result<(), async_nats::Error> {
403    /// let client = async_nats::connect("localhost:4222").await?;
404    /// let jetstream = async_nats::jetstream::new(client);
405    ///
406    /// let first_ack = jetstream.publish("events", "data".into()).await?;
407    /// let second_ack = jetstream.publish("events", "data".into()).await?;
408    /// first_ack.await?;
409    /// second_ack.await?;
410    /// # Ok(())
411    /// # }
412    /// ```
413    pub async fn publish<S: ToSubject>(
414        &self,
415        subject: S,
416        payload: Bytes,
417    ) -> Result<PublishAckFuture, PublishError> {
418        self.send_publish(subject, PublishMessage::build().payload(payload))
419            .await
420    }
421
422    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
423    /// the server that the message has been successfully delivered.
424    ///
425    /// If the stream does not exist, `no responders` error will be returned.
426    ///
427    /// # Examples
428    ///
429    /// ```no_run
430    /// # #[tokio::main]
431    /// # async fn main() -> Result<(), async_nats::Error> {
432    /// let client = async_nats::connect("localhost:4222").await?;
433    /// let jetstream = async_nats::jetstream::new(client);
434    ///
435    /// let mut headers = async_nats::HeaderMap::new();
436    /// headers.append("X-key", "Value");
437    /// let ack = jetstream
438    ///     .publish_with_headers("events", headers, "data".into())
439    ///     .await?;
440    /// # Ok(())
441    /// # }
442    /// ```
443    pub async fn publish_with_headers<S: ToSubject>(
444        &self,
445        subject: S,
446        headers: crate::header::HeaderMap,
447        payload: Bytes,
448    ) -> Result<PublishAckFuture, PublishError> {
449        self.send_publish(
450            subject,
451            PublishMessage::build().payload(payload).headers(headers),
452        )
453        .await
454    }
455
456    /// Publish a message built by [Publish] and returns an acknowledgment future.
457    ///
458    /// If the stream does not exist, `no responders` error will be returned.
459    ///
460    /// # Examples
461    ///
462    /// ```no_run
463    /// # use async_nats::jetstream::context::Publish;
464    /// # #[tokio::main]
465    /// # async fn main() -> Result<(), async_nats::Error> {
466    /// let client = async_nats::connect("localhost:4222").await?;
467    /// let jetstream = async_nats::jetstream::new(client);
468    ///
469    /// let ack = jetstream
470    ///     .send_publish(
471    ///         "events",
472    ///         Publish::build().payload("data".into()).message_id("uuid"),
473    ///     )
474    ///     .await?;
475    /// # Ok(())
476    /// # }
477    /// ```
478    pub async fn send_publish<S: ToSubject>(
479        &self,
480        subject: S,
481        publish: PublishMessage,
482    ) -> Result<PublishAckFuture, PublishError> {
483        let permit = if self.backpressure_on_inflight {
484            // When backpressure is enabled, wait for a permit to become available
485            self.max_ack_semaphore
486                .clone()
487                .acquire_owned()
488                .await
489                .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
490        } else {
491            // When backpressure is disabled, error immediately if no permits available
492            self.max_ack_semaphore
493                .clone()
494                .try_acquire_owned()
495                .map_err(|err| match err {
496                    TryAcquireError::NoPermits => {
497                        PublishError::new(PublishErrorKind::MaxAckPending)
498                    }
499                    _ => PublishError::with_source(PublishErrorKind::Other, err),
500                })?
501        };
502        let subject = subject.to_subject();
503        let (sender, receiver) = oneshot::channel();
504
505        let respond = self.client.new_inbox().into();
506
507        let send_fut = self
508            .client
509            .sender
510            .send(Command::Request {
511                subject,
512                payload: publish.payload,
513                respond,
514                headers: publish.headers,
515                sender,
516            })
517            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
518
519        tokio::time::timeout(self.timeout, send_fut)
520            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
521            .await??;
522
523        Ok(PublishAckFuture {
524            timeout: self.timeout,
525            subscription: Some(receiver),
526            permit: Some(permit),
527            tx: self.ack_sender.clone(),
528        })
529    }
530
531    /// Query the server for account information
532    pub async fn query_account(&self) -> Result<Account, AccountError> {
533        let response: Response<Account> = self.request("INFO", b"").await?;
534
535        match response {
536            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
537            Response::Ok(account) => Ok(account),
538        }
539    }
540
541    /// Create a JetStream [Stream] with given config and return a handle to it.
542    /// That handle can be used to manage and use [Consumer].
543    ///
544    /// # Examples
545    ///
546    /// ```no_run
547    /// # #[tokio::main]
548    /// # async fn main() -> Result<(), async_nats::Error> {
549    /// use async_nats::jetstream::stream::Config;
550    /// use async_nats::jetstream::stream::DiscardPolicy;
551    /// let client = async_nats::connect("localhost:4222").await?;
552    /// let jetstream = async_nats::jetstream::new(client);
553    ///
554    /// let stream = jetstream
555    ///     .create_stream(Config {
556    ///         name: "events".to_string(),
557    ///         max_messages: 100_000,
558    ///         discard: DiscardPolicy::Old,
559    ///         ..Default::default()
560    ///     })
561    ///     .await?;
562    /// # Ok(())
563    /// # }
564    /// ```
565    pub async fn create_stream<S>(
566        &self,
567        stream_config: S,
568    ) -> Result<Stream<Info>, CreateStreamError>
569    where
570        Config: From<S>,
571    {
572        let mut config: Config = stream_config.into();
573        if config.name.is_empty() {
574            return Err(CreateStreamError::new(
575                CreateStreamErrorKind::EmptyStreamName,
576            ));
577        }
578        if !is_valid_name(config.name.as_str()) {
579            return Err(CreateStreamError::new(
580                CreateStreamErrorKind::InvalidStreamName,
581            ));
582        }
583        if let Some(ref mut mirror) = config.mirror {
584            if let Some(ref mut domain) = mirror.domain {
585                if mirror.external.is_some() {
586                    return Err(CreateStreamError::new(
587                        CreateStreamErrorKind::DomainAndExternalSet,
588                    ));
589                }
590                mirror.external = Some(External {
591                    api_prefix: format!("$JS.{domain}.API"),
592                    delivery_prefix: None,
593                })
594            }
595        }
596
597        if let Some(ref mut sources) = config.sources {
598            for source in sources {
599                if let Some(ref mut domain) = source.domain {
600                    if source.external.is_some() {
601                        return Err(CreateStreamError::new(
602                            CreateStreamErrorKind::DomainAndExternalSet,
603                        ));
604                    }
605                    source.external = Some(External {
606                        api_prefix: format!("$JS.{domain}.API"),
607                        delivery_prefix: None,
608                    })
609                }
610            }
611        }
612        let subject = format!("STREAM.CREATE.{}", config.name);
613        let response: Response<Info> = self.request(subject, &config).await?;
614
615        match response {
616            Response::Err { error } => Err(error.into()),
617            Response::Ok(info) => Ok(Stream {
618                context: self.clone(),
619                info,
620                name: config.name,
621            }),
622        }
623    }
624
625    /// Checks for [Stream] existence on the server and returns handle to it.
626    /// That handle can be used to manage and use [Consumer].
627    /// This variant does not fetch [Stream] info from the server.
628    /// It means it does not check if the stream actually exists.
629    /// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
630    /// If you however run single operations on many streams, this method is more efficient.
631    ///
632    /// # Examples
633    ///
634    /// ```no_run
635    /// # #[tokio::main]
636    /// # async fn main() -> Result<(), async_nats::Error> {
637    /// let client = async_nats::connect("localhost:4222").await?;
638    /// let jetstream = async_nats::jetstream::new(client);
639    ///
640    /// let stream = jetstream.get_stream_no_info("events").await?;
641    /// # Ok(())
642    /// # }
643    /// ```
644    pub async fn get_stream_no_info<T: AsRef<str>>(
645        &self,
646        stream: T,
647    ) -> Result<Stream<()>, GetStreamError> {
648        let stream = stream.as_ref();
649        if stream.is_empty() {
650            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
651        }
652
653        if !is_valid_name(stream) {
654            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
655        }
656
657        Ok(Stream {
658            context: self.clone(),
659            info: (),
660            name: stream.to_string(),
661        })
662    }
663
664    /// Checks for [Stream] existence on the server and returns handle to it.
665    /// That handle can be used to manage and use [Consumer].
666    ///
667    /// # Examples
668    ///
669    /// ```no_run
670    /// # #[tokio::main]
671    /// # async fn main() -> Result<(), async_nats::Error> {
672    /// let client = async_nats::connect("localhost:4222").await?;
673    /// let jetstream = async_nats::jetstream::new(client);
674    ///
675    /// let stream = jetstream.get_stream("events").await?;
676    /// # Ok(())
677    /// # }
678    /// ```
679    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
680        let stream = stream.as_ref();
681        if stream.is_empty() {
682            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
683        }
684
685        if !is_valid_name(stream) {
686            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
687        }
688
689        let subject = format!("STREAM.INFO.{stream}");
690        let request: Response<Info> = self
691            .request(subject, &())
692            .await
693            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
694        match request {
695            Response::Err { error } => {
696                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
697            }
698            Response::Ok(info) => Ok(Stream {
699                context: self.clone(),
700                info,
701                name: stream.to_string(),
702            }),
703        }
704    }
705
706    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
707    ///
708    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
709    ///
710    /// # Examples
711    ///
712    /// ```no_run
713    /// # #[tokio::main]
714    /// # async fn main() -> Result<(), async_nats::Error> {
715    /// use async_nats::jetstream::stream::Config;
716    /// let client = async_nats::connect("localhost:4222").await?;
717    /// let jetstream = async_nats::jetstream::new(client);
718    ///
719    /// let stream = jetstream
720    ///     .get_or_create_stream(Config {
721    ///         name: "events".to_string(),
722    ///         max_messages: 10_000,
723    ///         ..Default::default()
724    ///     })
725    ///     .await?;
726    /// # Ok(())
727    /// # }
728    /// ```
729    pub async fn get_or_create_stream<S>(
730        &self,
731        stream_config: S,
732    ) -> Result<Stream, CreateStreamError>
733    where
734        S: Into<Config>,
735    {
736        let config: Config = stream_config.into();
737
738        if config.name.is_empty() {
739            return Err(CreateStreamError::new(
740                CreateStreamErrorKind::EmptyStreamName,
741            ));
742        }
743
744        if !is_valid_name(config.name.as_str()) {
745            return Err(CreateStreamError::new(
746                CreateStreamErrorKind::InvalidStreamName,
747            ));
748        }
749        let subject = format!("STREAM.INFO.{}", config.name);
750
751        let request: Response<Info> = self.request(subject, &()).await?;
752        match request {
753            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
754            Response::Err { error } => Err(error.into()),
755            Response::Ok(info) => Ok(Stream {
756                context: self.clone(),
757                info,
758                name: config.name,
759            }),
760        }
761    }
762
763    /// Deletes a [Stream] with a given name.
764    ///
765    /// # Examples
766    ///
767    /// ```no_run
768    /// # #[tokio::main]
769    /// # async fn main() -> Result<(), async_nats::Error> {
770    /// use async_nats::jetstream::stream::Config;
771    /// let client = async_nats::connect("localhost:4222").await?;
772    /// let jetstream = async_nats::jetstream::new(client);
773    ///
774    /// let stream = jetstream.delete_stream("events").await?;
775    /// # Ok(())
776    /// # }
777    /// ```
778    pub async fn delete_stream<T: AsRef<str>>(
779        &self,
780        stream: T,
781    ) -> Result<DeleteStatus, DeleteStreamError> {
782        let stream = stream.as_ref();
783        if stream.is_empty() {
784            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
785        }
786
787        if !is_valid_name(stream) {
788            return Err(DeleteStreamError::new(
789                DeleteStreamErrorKind::InvalidStreamName,
790            ));
791        }
792
793        let subject = format!("STREAM.DELETE.{stream}");
794        match self
795            .request(subject, &json!({}))
796            .await
797            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
798        {
799            Response::Err { error } => Err(DeleteStreamError::new(
800                DeleteStreamErrorKind::JetStream(error),
801            )),
802            Response::Ok(delete_response) => Ok(delete_response),
803        }
804    }
805
806    /// Updates a [Stream] with a given config. If specific field cannot be updated,
807    /// error is returned.
808    ///
809    /// # Examples
810    ///
811    /// ```no_run
812    /// # #[tokio::main]
813    /// # async fn main() -> Result<(), async_nats::Error> {
814    /// use async_nats::jetstream::stream::Config;
815    /// use async_nats::jetstream::stream::DiscardPolicy;
816    /// let client = async_nats::connect("localhost:4222").await?;
817    /// let jetstream = async_nats::jetstream::new(client);
818    ///
819    /// let stream = jetstream
820    ///     .update_stream(Config {
821    ///         name: "events".to_string(),
822    ///         discard: DiscardPolicy::New,
823    ///         max_messages: 50_000,
824    ///         ..Default::default()
825    ///     })
826    ///     .await?;
827    /// # Ok(())
828    /// # }
829    /// ```
830    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
831    where
832        S: Borrow<Config>,
833    {
834        let config = config.borrow();
835
836        if config.name.is_empty() {
837            return Err(CreateStreamError::new(
838                CreateStreamErrorKind::EmptyStreamName,
839            ));
840        }
841
842        if !is_valid_name(config.name.as_str()) {
843            return Err(CreateStreamError::new(
844                CreateStreamErrorKind::InvalidStreamName,
845            ));
846        }
847
848        let subject = format!("STREAM.UPDATE.{}", config.name);
849        match self.request(subject, config).await? {
850            Response::Err { error } => Err(error.into()),
851            Response::Ok(info) => Ok(info),
852        }
853    }
854
855    /// Tries to update a [Stream] with a given config, and if it does not exist, create it.
856    ///
857    /// # Examples
858    ///
859    /// ```no_run
860    /// # #[tokio::main]
861    /// # async fn main() -> Result<(), async_nats::Error> {
862    /// use async_nats::jetstream::stream::Config;
863    /// use async_nats::jetstream::stream::DiscardPolicy;
864    /// let client = async_nats::connect("localhost:4222").await?;
865    /// let jetstream = async_nats::jetstream::new(client);
866    ///
867    /// let stream = jetstream
868    ///     .create_or_update_stream(Config {
869    ///         name: "events".to_string(),
870    ///         discard: DiscardPolicy::New,
871    ///         max_messages: 50_000,
872    ///         ..Default::default()
873    ///     })
874    ///     .await?;
875    /// # Ok(())
876    /// # }
877    /// ```
878    pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
879        match self.update_stream(config.clone()).await {
880            Ok(stream) => Ok(stream),
881            Err(err) => match err.kind() {
882                CreateStreamErrorKind::NotFound => {
883                    let stream = self
884                        .create_stream(config)
885                        .await
886                        .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
887                    Ok(stream.info)
888                }
889                _ => Err(err),
890            },
891        }
892    }
893
894    /// Looks up Stream that contains provided subject.
895    ///
896    /// # Examples
897    ///
898    /// ```no_run
899    /// # #[tokio::main]
900    /// # async fn main() -> Result<(), async_nats::Error> {
901    /// use futures_util::TryStreamExt;
902    /// let client = async_nats::connect("demo.nats.io:4222").await?;
903    /// let jetstream = async_nats::jetstream::new(client);
904    /// let stream_name = jetstream.stream_by_subject("foo.>");
905    /// # Ok(())
906    /// # }
907    /// ```
908    pub async fn stream_by_subject<T: Into<String>>(
909        &self,
910        subject: T,
911    ) -> Result<String, GetStreamByNameError> {
912        let subject = subject.into();
913        if !is_valid_subject(subject.as_str()) {
914            return Err(GetStreamByNameError::new(
915                GetStreamByNameErrorKind::InvalidSubject,
916            ));
917        }
918        let mut names = StreamNames {
919            context: self.clone(),
920            offset: 0,
921            page_request: None,
922            streams: Vec::new(),
923            subject: Some(subject),
924            done: false,
925        };
926        match names.next().await {
927            Some(name) => match name {
928                Ok(name) => Ok(name),
929                Err(err) => Err(GetStreamByNameError::with_source(
930                    GetStreamByNameErrorKind::Request,
931                    err,
932                )),
933            },
934            None => Err(GetStreamByNameError::new(
935                GetStreamByNameErrorKind::NotFound,
936            )),
937        }
938    }
939
940    /// Lists names of all streams for current context.
941    ///
942    /// # Examples
943    ///
944    /// ```no_run
945    /// # #[tokio::main]
946    /// # async fn main() -> Result<(), async_nats::Error> {
947    /// use futures_util::TryStreamExt;
948    /// let client = async_nats::connect("demo.nats.io:4222").await?;
949    /// let jetstream = async_nats::jetstream::new(client);
950    /// let mut names = jetstream.stream_names();
951    /// while let Some(stream) = names.try_next().await? {
952    ///     println!("stream: {}", stream);
953    /// }
954    /// # Ok(())
955    /// # }
956    /// ```
957    pub fn stream_names(&self) -> StreamNames {
958        StreamNames {
959            context: self.clone(),
960            offset: 0,
961            page_request: None,
962            streams: Vec::new(),
963            subject: None,
964            done: false,
965        }
966    }
967
968    /// Lists all streams info for current context.
969    ///
970    /// # Examples
971    ///
972    /// ```no_run
973    /// # #[tokio::main]
974    /// # async fn main() -> Result<(), async_nats::Error> {
975    /// use futures_util::TryStreamExt;
976    /// let client = async_nats::connect("demo.nats.io:4222").await?;
977    /// let jetstream = async_nats::jetstream::new(client);
978    /// let mut streams = jetstream.streams();
979    /// while let Some(stream) = streams.try_next().await? {
980    ///     println!("stream: {:?}", stream);
981    /// }
982    /// # Ok(())
983    /// # }
984    /// ```
985    pub fn streams(&self) -> Streams {
986        Streams {
987            context: self.clone(),
988            offset: 0,
989            page_request: None,
990            streams: Vec::new(),
991            done: false,
992        }
993    }
994    /// Returns an existing key-value bucket.
995    ///
996    /// # Examples
997    ///
998    /// ```no_run
999    /// # #[tokio::main]
1000    /// # async fn main() -> Result<(), async_nats::Error> {
1001    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1002    /// let jetstream = async_nats::jetstream::new(client);
1003    /// let kv = jetstream.get_key_value("bucket").await?;
1004    /// # Ok(())
1005    /// # }
1006    /// ```
1007    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
1008        let bucket: String = bucket.into();
1009        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
1010            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1011        }
1012
1013        let stream_name = format!("KV_{}", &bucket);
1014        let stream = self
1015            .get_stream(stream_name.clone())
1016            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
1017            .await?;
1018
1019        if stream.info.config.max_messages_per_subject < 1 {
1020            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1021        }
1022        let mut store = Store {
1023            prefix: format!("$KV.{}.", &bucket),
1024            name: bucket,
1025            stream_name,
1026            stream: stream.clone(),
1027            put_prefix: None,
1028            use_jetstream_prefix: self.prefix != "$JS.API",
1029        };
1030        if let Some(ref mirror) = stream.info.config.mirror {
1031            let bucket = mirror.name.trim_start_matches("KV_");
1032            if let Some(ref external) = mirror.external {
1033                if !external.api_prefix.is_empty() {
1034                    store.use_jetstream_prefix = false;
1035                    store.prefix = format!("$KV.{bucket}.");
1036                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
1037                } else {
1038                    store.put_prefix = Some(format!("$KV.{bucket}."));
1039                }
1040            }
1041        };
1042
1043        Ok(store)
1044    }
1045
1046    /// Creates a new key-value bucket.
1047    ///
1048    /// # Examples
1049    ///
1050    /// ```no_run
1051    /// # #[tokio::main]
1052    /// # async fn main() -> Result<(), async_nats::Error> {
1053    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1054    /// let jetstream = async_nats::jetstream::new(client);
1055    /// let kv = jetstream
1056    ///     .create_key_value(async_nats::jetstream::kv::Config {
1057    ///         bucket: "kv".to_string(),
1058    ///         history: 10,
1059    ///         ..Default::default()
1060    ///     })
1061    ///     .await?;
1062    /// # Ok(())
1063    /// # }
1064    /// ```
1065    pub async fn create_key_value(
1066        &self,
1067        config: crate::jetstream::kv::Config,
1068    ) -> Result<Store, CreateKeyValueError> {
1069        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1070            return Err(CreateKeyValueError::new(
1071                CreateKeyValueErrorKind::InvalidStoreName,
1072            ));
1073        }
1074        let info = self.query_account().await.map_err(|err| {
1075            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1076        })?;
1077
1078        let bucket_name = config.bucket.clone();
1079        let stream_config = kv_to_stream_config(config, info)?;
1080
1081        let stream = self.create_stream(stream_config).await.map_err(|err| {
1082            if err.kind() == CreateStreamErrorKind::TimedOut {
1083                CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1084            } else {
1085                CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1086            }
1087        })?;
1088
1089        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1090    }
1091
1092    /// Updates an existing key-value bucket.
1093    ///
1094    /// # Examples
1095    ///
1096    /// ```no_run
1097    /// # #[tokio::main]
1098    /// # async fn main() -> Result<(), async_nats::Error> {
1099    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1100    /// let jetstream = async_nats::jetstream::new(client);
1101    /// let kv = jetstream
1102    ///     .update_key_value(async_nats::jetstream::kv::Config {
1103    ///         bucket: "kv".to_string(),
1104    ///         history: 60,
1105    ///         ..Default::default()
1106    ///     })
1107    ///     .await?;
1108    /// # Ok(())
1109    /// # }
1110    /// ```
1111    pub async fn update_key_value(
1112        &self,
1113        config: crate::jetstream::kv::Config,
1114    ) -> Result<Store, UpdateKeyValueError> {
1115        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1116            return Err(UpdateKeyValueError::new(
1117                UpdateKeyValueErrorKind::InvalidStoreName,
1118            ));
1119        }
1120
1121        let stream_name = format!("KV_{}", config.bucket);
1122        let bucket_name = config.bucket.clone();
1123
1124        let account = self.query_account().await.map_err(|err| {
1125            UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1126        })?;
1127        let stream = self
1128            .update_stream(kv_to_stream_config(config, account)?)
1129            .await
1130            .map_err(|err| match err.kind() {
1131                UpdateStreamErrorKind::NotFound => {
1132                    UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1133                }
1134                _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1135            })?;
1136
1137        let stream = Stream {
1138            context: self.clone(),
1139            info: stream,
1140            name: stream_name,
1141        };
1142
1143        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1144    }
1145
1146    /// Tries to update an existing Key-Value bucket. If it does not exist, creates it.
1147    ///
1148    /// # Examples
1149    ///
1150    /// ```no_run
1151    /// # #[tokio::main]
1152    /// # async fn main() -> Result<(), async_nats::Error> {
1153    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1154    /// let jetstream = async_nats::jetstream::new(client);
1155    /// let kv = jetstream
1156    ///     .update_key_value(async_nats::jetstream::kv::Config {
1157    ///         bucket: "kv".to_string(),
1158    ///         history: 60,
1159    ///         ..Default::default()
1160    ///     })
1161    ///     .await?;
1162    /// # Ok(())
1163    /// # }
1164    /// ```
1165    pub async fn create_or_update_key_value(
1166        &self,
1167        config: crate::jetstream::kv::Config,
1168    ) -> Result<Store, CreateKeyValueError> {
1169        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1170            return Err(CreateKeyValueError::new(
1171                CreateKeyValueErrorKind::InvalidStoreName,
1172            ));
1173        }
1174
1175        let bucket_name = config.bucket.clone();
1176        let stream_name = format!("KV_{}", config.bucket);
1177
1178        let account = self.query_account().await.map_err(|err| {
1179            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1180        })?;
1181        let stream = self
1182            .create_or_update_stream(kv_to_stream_config(config, account)?)
1183            .await
1184            .map_err(|err| {
1185                CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1186            })?;
1187
1188        let stream = Stream {
1189            context: self.clone(),
1190            info: stream,
1191            name: stream_name,
1192        };
1193
1194        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1195    }
1196
1197    /// Deletes given key-value bucket.
1198    ///
1199    /// # Examples
1200    ///
1201    /// ```no_run
1202    /// # #[tokio::main]
1203    /// # async fn main() -> Result<(), async_nats::Error> {
1204    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1205    /// let jetstream = async_nats::jetstream::new(client);
1206    /// let kv = jetstream
1207    ///     .create_key_value(async_nats::jetstream::kv::Config {
1208    ///         bucket: "kv".to_string(),
1209    ///         history: 10,
1210    ///         ..Default::default()
1211    ///     })
1212    ///     .await?;
1213    /// # Ok(())
1214    /// # }
1215    /// ```
1216    pub async fn delete_key_value<T: AsRef<str>>(
1217        &self,
1218        bucket: T,
1219    ) -> Result<DeleteStatus, KeyValueError> {
1220        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1221            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1222        }
1223
1224        let stream_name = format!("KV_{}", bucket.as_ref());
1225        self.delete_stream(stream_name)
1226            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1227            .await
1228    }
1229
1230    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
1231    //     let config = config.borrow();
1232    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1233    //         return Err(Box::new(std::io::Error::other(
1234    //             "invalid bucket name",
1235    //         )));
1236    //     }
1237
1238    //     let stream_name = format!("KV_{}", config.bucket);
1239    //     self.update_stream()
1240    //         .await
1241    //         .and_then(|info| Ok(()))
1242    // }
1243
1244    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1245    ///
1246    /// It has one less interaction with the server when binding to only one
1247    /// [crate::jetstream::consumer::Consumer].
1248    ///
1249    /// # Examples:
1250    ///
1251    /// ```no_run
1252    /// # #[tokio::main]
1253    /// # async fn main() -> Result<(), async_nats::Error> {
1254    /// use async_nats::jetstream::consumer::PullConsumer;
1255    ///
1256    /// let client = async_nats::connect("localhost:4222").await?;
1257    /// let jetstream = async_nats::jetstream::new(client);
1258    ///
1259    /// let consumer: PullConsumer = jetstream
1260    ///     .get_consumer_from_stream("consumer", "stream")
1261    ///     .await?;
1262    ///
1263    /// # Ok(())
1264    /// # }
1265    /// ```
1266    pub async fn get_consumer_from_stream<T, C, S>(
1267        &self,
1268        consumer: C,
1269        stream: S,
1270    ) -> Result<Consumer<T>, ConsumerError>
1271    where
1272        T: FromConsumer + IntoConsumerConfig,
1273        S: AsRef<str>,
1274        C: AsRef<str>,
1275    {
1276        if !is_valid_name(stream.as_ref()) {
1277            return Err(ConsumerError::with_source(
1278                ConsumerErrorKind::InvalidName,
1279                "invalid stream",
1280            ));
1281        }
1282
1283        if !is_valid_name(consumer.as_ref()) {
1284            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1285        }
1286
1287        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1288
1289        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1290            Response::Ok(info) => info,
1291            Response::Err { error } => return Err(error.into()),
1292        };
1293
1294        Ok(Consumer::new(
1295            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1296                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1297            })?,
1298            info,
1299            self.clone(),
1300        ))
1301    }
1302
1303    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1304    ///
1305    /// It has one less interaction with the server when binding to only one
1306    /// [crate::jetstream::consumer::Consumer].
1307    ///
1308    /// # Examples:
1309    ///
1310    /// ```no_run
1311    /// # #[tokio::main]
1312    /// # async fn main() -> Result<(), async_nats::Error> {
1313    /// use async_nats::jetstream::consumer::PullConsumer;
1314    ///
1315    /// let client = async_nats::connect("localhost:4222").await?;
1316    /// let jetstream = async_nats::jetstream::new(client);
1317    ///
1318    /// jetstream
1319    ///     .delete_consumer_from_stream("consumer", "stream")
1320    ///     .await?;
1321    ///
1322    /// # Ok(())
1323    /// # }
1324    /// ```
1325    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1326        &self,
1327        consumer: C,
1328        stream: S,
1329    ) -> Result<DeleteStatus, ConsumerError> {
1330        if !is_valid_name(consumer.as_ref()) {
1331            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1332        }
1333
1334        if !is_valid_name(stream.as_ref()) {
1335            return Err(ConsumerError::with_source(
1336                ConsumerErrorKind::Other,
1337                "invalid stream name",
1338            ));
1339        }
1340
1341        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1342
1343        match self.request(subject, &json!({})).await? {
1344            Response::Ok(delete_status) => Ok(delete_status),
1345            Response::Err { error } => Err(error.into()),
1346        }
1347    }
1348
1349    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
1350    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1351    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
1352    ///
1353    /// # Examples
1354    ///
1355    /// ```no_run
1356    /// # #[tokio::main]
1357    /// # async fn main() -> Result<(), async_nats::Error> {
1358    /// use async_nats::jetstream::consumer;
1359    /// let client = async_nats::connect("localhost:4222").await?;
1360    /// let jetstream = async_nats::jetstream::new(client);
1361    ///
1362    /// let consumer: consumer::PullConsumer = jetstream
1363    ///     .create_consumer_on_stream(
1364    ///         consumer::pull::Config {
1365    ///             durable_name: Some("pull".to_string()),
1366    ///             ..Default::default()
1367    ///         },
1368    ///         "stream",
1369    ///     )
1370    ///     .await?;
1371    /// # Ok(())
1372    /// # }
1373    /// ```
1374    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1375        &self,
1376        config: C,
1377        stream: S,
1378    ) -> Result<Consumer<C>, ConsumerError> {
1379        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1380            .await
1381    }
1382
1383    /// Update an existing consumer.
1384    /// This call will fail if the consumer does not exist.
1385    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
1386    ///
1387    /// # Examples
1388    ///
1389    /// ```no_run
1390    /// # #[tokio::main]
1391    /// # async fn main() -> Result<(), async_nats::Error> {
1392    /// use async_nats::jetstream::consumer;
1393    /// let client = async_nats::connect("localhost:4222").await?;
1394    /// let jetstream = async_nats::jetstream::new(client);
1395    ///
1396    /// let consumer: consumer::PullConsumer = jetstream
1397    ///     .update_consumer_on_stream(
1398    ///         consumer::pull::Config {
1399    ///             durable_name: Some("pull".to_string()),
1400    ///             description: Some("updated pull consumer".to_string()),
1401    ///             ..Default::default()
1402    ///         },
1403    ///         "stream",
1404    ///     )
1405    ///     .await?;
1406    /// # Ok(())
1407    /// # }
1408    /// ```
1409    #[cfg(feature = "server_2_10")]
1410    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1411        &self,
1412        config: C,
1413        stream: S,
1414    ) -> Result<Consumer<C>, ConsumerUpdateError> {
1415        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1416            .await
1417            .map_err(|err| err.into())
1418    }
1419
1420    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
1421    /// the same.
1422    /// This method will fail if consumer is already present with different config.
1423    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1424    ///
1425    /// # Examples
1426    ///
1427    /// ```no_run
1428    /// # #[tokio::main]
1429    /// # async fn main() -> Result<(), async_nats::Error> {
1430    /// use async_nats::jetstream::consumer;
1431    /// let client = async_nats::connect("localhost:4222").await?;
1432    /// let jetstream = async_nats::jetstream::new(client);
1433    ///
1434    /// let consumer: consumer::PullConsumer = jetstream
1435    ///     .create_consumer_strict_on_stream(
1436    ///         consumer::pull::Config {
1437    ///             durable_name: Some("pull".to_string()),
1438    ///             ..Default::default()
1439    ///         },
1440    ///         "stream",
1441    ///     )
1442    ///     .await?;
1443    /// # Ok(())
1444    /// # }
1445    /// ```
1446    #[cfg(feature = "server_2_10")]
1447    pub async fn create_consumer_strict_on_stream<
1448        C: IntoConsumerConfig + FromConsumer,
1449        S: AsRef<str>,
1450    >(
1451        &self,
1452        config: C,
1453        stream: S,
1454    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1455        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1456            .await
1457            .map_err(|err| err.into())
1458    }
1459
1460    async fn create_consumer_on_stream_action<
1461        C: IntoConsumerConfig + FromConsumer,
1462        S: AsRef<str>,
1463    >(
1464        &self,
1465        config: C,
1466        stream: S,
1467        action: ConsumerAction,
1468    ) -> Result<Consumer<C>, ConsumerError> {
1469        let config = config.into_consumer_config();
1470
1471        let subject = {
1472            let filter = if config.filter_subject.is_empty() {
1473                "".to_string()
1474            } else {
1475                format!(".{}", config.filter_subject)
1476            };
1477            config
1478                .name
1479                .as_ref()
1480                .or(config.durable_name.as_ref())
1481                .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1482                .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1483        };
1484
1485        match self
1486            .request(
1487                subject,
1488                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1489            )
1490            .await?
1491        {
1492            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1493            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1494                FromConsumer::try_from_consumer_config(info.clone().config)
1495                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1496                info,
1497                self.clone(),
1498            )),
1499        }
1500    }
1501
1502    /// Send a request to the jetstream JSON API.
1503    ///
1504    /// This is a low level API used mostly internally, that should be used only in
1505    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1506    ///
1507    /// # Examples
1508    ///
1509    /// ```no_run
1510    /// # use async_nats::jetstream::stream::Info;
1511    /// # use async_nats::jetstream::response::Response;
1512    /// # #[tokio::main]
1513    /// # async fn main() -> Result<(), async_nats::Error> {
1514    /// let client = async_nats::connect("localhost:4222").await?;
1515    /// let jetstream = async_nats::jetstream::new(client);
1516    ///
1517    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1518    /// # Ok(())
1519    /// # }
1520    /// ```
1521    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1522    where
1523        S: ToSubject,
1524        T: ?Sized + Serialize,
1525        V: DeserializeOwned,
1526    {
1527        let subject = subject.to_subject();
1528        let request = serde_json::to_vec(&payload)
1529            .map(Bytes::from)
1530            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1531
1532        debug!("JetStream request sent: {:?}", request);
1533
1534        let message = self
1535            .client
1536            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1537            .await;
1538        let message = message?;
1539        debug!(
1540            "JetStream request response: {:?}",
1541            from_utf8(&message.payload)
1542        );
1543        let response = serde_json::from_slice(message.payload.as_ref())
1544            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1545
1546        Ok(response)
1547    }
1548
1549    /// Send a JetStream API request without waiting for a response.
1550    /// The subject will be automatically prefixed with the JetStream API prefix.
1551    ///
1552    /// This is useful for operations that need custom response handling,
1553    /// such as streaming responses (batch get) where the caller manages
1554    /// their own subscription to the reply inbox.
1555    ///
1556    /// Used mostly by extension crates.
1557    pub async fn send_request<S: ToSubject>(
1558        &self,
1559        subject: S,
1560        request: crate::client::Request,
1561    ) -> Result<(), crate::PublishError> {
1562        let prefixed_subject = format!("{}.{}", self.prefix, subject.to_subject());
1563        let inbox = request.inbox.unwrap_or_else(|| self.client.new_inbox());
1564        let payload = request.payload.unwrap_or_default();
1565
1566        match request.headers {
1567            Some(headers) => {
1568                self.client
1569                    .publish_with_reply_and_headers(prefixed_subject, inbox, headers, payload)
1570                    .await
1571            }
1572            None => {
1573                self.client
1574                    .publish_with_reply(prefixed_subject, inbox, payload)
1575                    .await
1576            }
1577        }
1578    }
1579
1580    /// Creates a new object store bucket.
1581    ///
1582    /// # Examples
1583    ///
1584    /// ```no_run
1585    /// # #[tokio::main]
1586    /// # async fn main() -> Result<(), async_nats::Error> {
1587    /// let client = async_nats::connect("demo.nats.io").await?;
1588    /// let jetstream = async_nats::jetstream::new(client);
1589    /// let bucket = jetstream
1590    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1591    ///         bucket: "bucket".to_string(),
1592    ///         ..Default::default()
1593    ///     })
1594    ///     .await?;
1595    /// # Ok(())
1596    /// # }
1597    /// ```
1598    pub async fn create_object_store(
1599        &self,
1600        config: super::object_store::Config,
1601    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1602        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1603            return Err(CreateObjectStoreError::new(
1604                CreateKeyValueErrorKind::InvalidStoreName,
1605            ));
1606        }
1607
1608        let bucket_name = config.bucket.clone();
1609        let stream_name = format!("OBJ_{bucket_name}");
1610        let chunk_subject = format!("$O.{bucket_name}.C.>");
1611        let meta_subject = format!("$O.{bucket_name}.M.>");
1612
1613        let stream = self
1614            .create_stream(super::stream::Config {
1615                name: stream_name,
1616                description: config.description.clone(),
1617                subjects: vec![chunk_subject, meta_subject],
1618                max_age: config.max_age,
1619                max_bytes: config.max_bytes,
1620                storage: config.storage,
1621                num_replicas: config.num_replicas,
1622                discard: DiscardPolicy::New,
1623                allow_rollup: true,
1624                allow_direct: true,
1625                #[cfg(feature = "server_2_10")]
1626                compression: if config.compression {
1627                    Some(Compression::S2)
1628                } else {
1629                    None
1630                },
1631                placement: config.placement,
1632                ..Default::default()
1633            })
1634            .await
1635            .map_err(|err| {
1636                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1637            })?;
1638
1639        Ok(ObjectStore {
1640            name: bucket_name,
1641            stream,
1642        })
1643    }
1644
1645    /// Get an existing object store bucket.
1646    ///
1647    /// # Examples
1648    ///
1649    /// ```no_run
1650    /// # #[tokio::main]
1651    /// # async fn main() -> Result<(), async_nats::Error> {
1652    /// let client = async_nats::connect("demo.nats.io").await?;
1653    /// let jetstream = async_nats::jetstream::new(client);
1654    /// let bucket = jetstream.get_object_store("bucket").await?;
1655    /// # Ok(())
1656    /// # }
1657    /// ```
1658    pub async fn get_object_store<T: AsRef<str>>(
1659        &self,
1660        bucket_name: T,
1661    ) -> Result<ObjectStore, ObjectStoreError> {
1662        let bucket_name = bucket_name.as_ref();
1663        if !is_valid_bucket_name(bucket_name) {
1664            return Err(ObjectStoreError::new(
1665                ObjectStoreErrorKind::InvalidBucketName,
1666            ));
1667        }
1668        let stream_name = format!("OBJ_{bucket_name}");
1669        let stream = self
1670            .get_stream(stream_name)
1671            .await
1672            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1673
1674        Ok(ObjectStore {
1675            name: bucket_name.to_string(),
1676            stream,
1677        })
1678    }
1679
1680    /// Delete a object store bucket.
1681    ///
1682    /// # Examples
1683    ///
1684    /// ```no_run
1685    /// # #[tokio::main]
1686    /// # async fn main() -> Result<(), async_nats::Error> {
1687    /// let client = async_nats::connect("demo.nats.io").await?;
1688    /// let jetstream = async_nats::jetstream::new(client);
1689    /// let bucket = jetstream.delete_object_store("bucket").await?;
1690    /// # Ok(())
1691    /// # }
1692    /// ```
1693    pub async fn delete_object_store<T: AsRef<str>>(
1694        &self,
1695        bucket_name: T,
1696    ) -> Result<(), DeleteObjectStore> {
1697        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1698        self.delete_stream(stream_name)
1699            .await
1700            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1701        Ok(())
1702    }
1703}
1704
1705impl crate::client::traits::Requester for Context {
1706    fn send_request<S: ToSubject>(
1707        &self,
1708        subject: S,
1709        request: crate::Request,
1710    ) -> impl Future<Output = Result<Message, crate::RequestError>> {
1711        self.client.send_request(subject, request)
1712    }
1713}
1714
1715impl crate::client::traits::Publisher for Context {
1716    fn publish_with_reply<S: ToSubject, R: ToSubject>(
1717        &self,
1718        subject: S,
1719        reply: R,
1720        payload: Bytes,
1721    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1722        self.client.publish_with_reply(subject, reply, payload)
1723    }
1724
1725    fn publish_message(
1726        &self,
1727        msg: crate::message::OutboundMessage,
1728    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1729        self.client.publish_message(msg)
1730    }
1731}
1732
1733impl traits::ClientProvider for Context {
1734    fn client(&self) -> crate::Client {
1735        self.client()
1736    }
1737}
1738
1739impl traits::Requester for Context {
1740    fn request<S, T, V>(
1741        &self,
1742        subject: S,
1743        payload: &T,
1744    ) -> impl Future<Output = Result<V, RequestError>>
1745    where
1746        S: ToSubject,
1747        T: ?Sized + Serialize,
1748        V: DeserializeOwned,
1749    {
1750        self.request(subject, payload)
1751    }
1752}
1753
1754impl traits::TimeoutProvider for Context {
1755    fn timeout(&self) -> Duration {
1756        self.timeout
1757    }
1758}
1759
1760impl traits::RequestSender for Context {
1761    fn send_request<S: ToSubject>(
1762        &self,
1763        subject: S,
1764        request: crate::client::Request,
1765    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1766        self.send_request(subject, request)
1767    }
1768}
1769
1770impl traits::Publisher for Context {
1771    fn publish<S: ToSubject>(
1772        &self,
1773        subject: S,
1774        payload: Bytes,
1775    ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1776        self.publish(subject, payload)
1777    }
1778
1779    fn publish_message(
1780        &self,
1781        message: jetstream::message::OutboundMessage,
1782    ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1783        self.send_publish(
1784            message.subject,
1785            PublishMessage {
1786                payload: message.payload,
1787                headers: message.headers,
1788            },
1789        )
1790    }
1791}
1792
1793#[derive(Clone, Copy, Debug, PartialEq)]
1794pub enum PublishErrorKind {
1795    StreamNotFound,
1796    WrongLastMessageId,
1797    WrongLastSequence,
1798    TimedOut,
1799    BrokenPipe,
1800    MaxAckPending,
1801    Other,
1802}
1803
1804impl Display for PublishErrorKind {
1805    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1806        match self {
1807            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1808            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1809            Self::Other => write!(f, "publish failed"),
1810            Self::BrokenPipe => write!(f, "broken pipe"),
1811            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1812            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1813            Self::MaxAckPending => write!(f, "max ack pending reached"),
1814        }
1815    }
1816}
1817
1818pub type PublishError = Error<PublishErrorKind>;
1819
1820#[derive(Debug)]
1821pub struct PublishAckFuture {
1822    timeout: Duration,
1823    subscription: Option<oneshot::Receiver<Message>>,
1824    permit: Option<OwnedSemaphorePermit>,
1825    tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1826}
1827
1828impl Drop for PublishAckFuture {
1829    fn drop(&mut self) {
1830        if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1831            if let Err(err) = self.tx.try_send((sub, permit)) {
1832                tracing::warn!("failed to pass future permit to the acker: {}", err);
1833            }
1834        }
1835    }
1836}
1837
1838impl PublishAckFuture {
1839    async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1840        let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1841            .await
1842            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1843        next.map_or_else(
1844            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1845            |m| {
1846                if m.status == Some(StatusCode::NO_RESPONDERS) {
1847                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1848                }
1849                let response = serde_json::from_slice(m.payload.as_ref())
1850                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1851                match response {
1852                    Response::Err { error } => match error.error_code() {
1853                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1854                            PublishErrorKind::WrongLastMessageId,
1855                            error,
1856                        )),
1857                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1858                            PublishErrorKind::WrongLastSequence,
1859                            error,
1860                        )),
1861                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1862                    },
1863                    Response::Ok(publish_ack) => Ok(publish_ack),
1864                }
1865            },
1866        )
1867    }
1868}
1869impl IntoFuture for PublishAckFuture {
1870    type Output = Result<PublishAck, PublishError>;
1871
1872    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1873
1874    fn into_future(self) -> Self::IntoFuture {
1875        Box::pin(std::future::IntoFuture::into_future(
1876            self.next_with_timeout(),
1877        ))
1878    }
1879}
1880
1881#[derive(Deserialize, Debug)]
1882struct StreamPage {
1883    total: usize,
1884    streams: Option<Vec<String>>,
1885}
1886
1887#[derive(Deserialize, Debug)]
1888struct StreamInfoPage {
1889    total: usize,
1890    streams: Option<Vec<super::stream::Info>>,
1891}
1892
1893type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1894
1895pub struct StreamNames {
1896    context: Context,
1897    offset: usize,
1898    page_request: Option<PageRequest>,
1899    subject: Option<String>,
1900    streams: Vec<String>,
1901    done: bool,
1902}
1903
1904impl futures_util::Stream for StreamNames {
1905    type Item = Result<String, StreamsError>;
1906
1907    fn poll_next(
1908        mut self: Pin<&mut Self>,
1909        cx: &mut std::task::Context<'_>,
1910    ) -> std::task::Poll<Option<Self::Item>> {
1911        match self.page_request.as_mut() {
1912            Some(page) => match page.try_poll_unpin(cx) {
1913                std::task::Poll::Ready(page) => {
1914                    self.page_request = None;
1915                    let page = page
1916                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1917                    if let Some(streams) = page.streams {
1918                        self.offset += streams.len();
1919                        self.streams = streams;
1920                        if self.offset >= page.total {
1921                            self.done = true;
1922                        }
1923                        match self.streams.pop() {
1924                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1925                            None => Poll::Ready(None),
1926                        }
1927                    } else {
1928                        Poll::Ready(None)
1929                    }
1930                }
1931                std::task::Poll::Pending => std::task::Poll::Pending,
1932            },
1933            None => {
1934                if let Some(stream) = self.streams.pop() {
1935                    Poll::Ready(Some(Ok(stream)))
1936                } else {
1937                    if self.done {
1938                        return Poll::Ready(None);
1939                    }
1940                    let context = self.context.clone();
1941                    let offset = self.offset;
1942                    let subject = self.subject.clone();
1943                    self.page_request = Some(Box::pin(async move {
1944                        match context
1945                            .request(
1946                                "STREAM.NAMES",
1947                                &json!({
1948                                    "offset": offset,
1949                                    "subject": subject
1950                                }),
1951                            )
1952                            .await?
1953                        {
1954                            Response::Err { error } => {
1955                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1956                            }
1957                            Response::Ok(page) => Ok(page),
1958                        }
1959                    }));
1960                    self.poll_next(cx)
1961                }
1962            }
1963        }
1964    }
1965}
1966
1967type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1968
1969pub type StreamsErrorKind = RequestErrorKind;
1970pub type StreamsError = RequestError;
1971
1972pub struct Streams {
1973    context: Context,
1974    offset: usize,
1975    page_request: Option<PageInfoRequest>,
1976    streams: Vec<super::stream::Info>,
1977    done: bool,
1978}
1979
1980impl futures_util::Stream for Streams {
1981    type Item = Result<super::stream::Info, StreamsError>;
1982
1983    fn poll_next(
1984        mut self: Pin<&mut Self>,
1985        cx: &mut std::task::Context<'_>,
1986    ) -> std::task::Poll<Option<Self::Item>> {
1987        match self.page_request.as_mut() {
1988            Some(page) => match page.try_poll_unpin(cx) {
1989                std::task::Poll::Ready(page) => {
1990                    self.page_request = None;
1991                    let page = page
1992                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1993                    if let Some(streams) = page.streams {
1994                        self.offset += streams.len();
1995                        self.streams = streams;
1996                        if self.offset >= page.total {
1997                            self.done = true;
1998                        }
1999                        match self.streams.pop() {
2000                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2001                            None => Poll::Ready(None),
2002                        }
2003                    } else {
2004                        Poll::Ready(None)
2005                    }
2006                }
2007                std::task::Poll::Pending => std::task::Poll::Pending,
2008            },
2009            None => {
2010                if let Some(stream) = self.streams.pop() {
2011                    Poll::Ready(Some(Ok(stream)))
2012                } else {
2013                    if self.done {
2014                        return Poll::Ready(None);
2015                    }
2016                    let context = self.context.clone();
2017                    let offset = self.offset;
2018                    self.page_request = Some(Box::pin(async move {
2019                        match context
2020                            .request(
2021                                "STREAM.LIST",
2022                                &json!({
2023                                    "offset": offset,
2024                                }),
2025                            )
2026                            .await?
2027                        {
2028                            Response::Err { error } => {
2029                                Err(RequestError::with_source(RequestErrorKind::Other, error))
2030                            }
2031                            Response::Ok(page) => Ok(page),
2032                        }
2033                    }));
2034                    self.poll_next(cx)
2035                }
2036            }
2037        }
2038    }
2039}
2040
2041/// Alias to avoid breaking changes.
2042/// Please use `jetstream::message::PublishMessage` instead.
2043#[deprecated(
2044    note = "use jetstream::message::PublishMessage instead",
2045    since = "0.44.0"
2046)]
2047pub type Publish = super::message::PublishMessage;
2048
2049#[derive(Clone, Copy, Debug, PartialEq)]
2050pub enum RequestErrorKind {
2051    NoResponders,
2052    TimedOut,
2053    Other,
2054}
2055
2056impl Display for RequestErrorKind {
2057    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2058        match self {
2059            Self::TimedOut => write!(f, "timed out"),
2060            Self::Other => write!(f, "request failed"),
2061            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
2062        }
2063    }
2064}
2065
2066pub type RequestError = Error<RequestErrorKind>;
2067
2068impl From<crate::RequestError> for RequestError {
2069    fn from(error: crate::RequestError) -> Self {
2070        match error.kind() {
2071            crate::RequestErrorKind::TimedOut => {
2072                RequestError::with_source(RequestErrorKind::TimedOut, error)
2073            }
2074            crate::RequestErrorKind::NoResponders => {
2075                RequestError::new(RequestErrorKind::NoResponders)
2076            }
2077            crate::RequestErrorKind::Other => {
2078                RequestError::with_source(RequestErrorKind::Other, error)
2079            }
2080        }
2081    }
2082}
2083
2084impl From<super::errors::Error> for RequestError {
2085    fn from(err: super::errors::Error) -> Self {
2086        RequestError::with_source(RequestErrorKind::Other, err)
2087    }
2088}
2089
2090pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
2091
2092#[derive(Clone, Debug, PartialEq)]
2093pub enum ConsumerInfoErrorKind {
2094    InvalidName,
2095    Offline,
2096    NotFound,
2097    StreamNotFound,
2098    Request,
2099    JetStream(super::errors::Error),
2100    TimedOut,
2101    NoResponders,
2102}
2103
2104impl Display for ConsumerInfoErrorKind {
2105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2106        match self {
2107            Self::InvalidName => write!(f, "invalid consumer name"),
2108            Self::Offline => write!(f, "consumer is offline"),
2109            Self::NotFound => write!(f, "consumer not found"),
2110            Self::StreamNotFound => write!(f, "stream not found"),
2111            Self::Request => write!(f, "request error"),
2112            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2113            Self::TimedOut => write!(f, "timed out"),
2114            Self::NoResponders => write!(f, "no responders"),
2115        }
2116    }
2117}
2118
2119impl From<super::errors::Error> for ConsumerInfoError {
2120    fn from(error: super::errors::Error) -> Self {
2121        match error.error_code() {
2122            ErrorCode::CONSUMER_NOT_FOUND => {
2123                ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2124            }
2125            ErrorCode::STREAM_NOT_FOUND => {
2126                ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2127            }
2128            ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2129            _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2130        }
2131    }
2132}
2133
2134impl From<RequestError> for ConsumerInfoError {
2135    fn from(error: RequestError) -> Self {
2136        match error.kind() {
2137            RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2138            RequestErrorKind::Other => {
2139                ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2140            }
2141            RequestErrorKind::NoResponders => {
2142                ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2143            }
2144        }
2145    }
2146}
2147
2148#[derive(Clone, Debug, PartialEq)]
2149pub enum CreateStreamErrorKind {
2150    EmptyStreamName,
2151    InvalidStreamName,
2152    DomainAndExternalSet,
2153    JetStreamUnavailable,
2154    JetStream(super::errors::Error),
2155    TimedOut,
2156    Response,
2157    NotFound,
2158    ResponseParse,
2159}
2160
2161impl Display for CreateStreamErrorKind {
2162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2163        match self {
2164            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2165            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2166            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2167            Self::NotFound => write!(f, "stream not found"),
2168            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2169            Self::TimedOut => write!(f, "jetstream request timed out"),
2170            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2171            Self::ResponseParse => write!(f, "failed to parse server response"),
2172            Self::Response => write!(f, "response error"),
2173        }
2174    }
2175}
2176
2177pub type CreateStreamError = Error<CreateStreamErrorKind>;
2178
2179impl From<super::errors::Error> for CreateStreamError {
2180    fn from(error: super::errors::Error) -> Self {
2181        match error.kind() {
2182            super::errors::ErrorCode::STREAM_NOT_FOUND => {
2183                CreateStreamError::new(CreateStreamErrorKind::NotFound)
2184            }
2185            _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2186        }
2187    }
2188}
2189
2190impl From<RequestError> for CreateStreamError {
2191    fn from(error: RequestError) -> Self {
2192        match error.kind() {
2193            RequestErrorKind::NoResponders => {
2194                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2195            }
2196            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2197            RequestErrorKind::Other => {
2198                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2199            }
2200        }
2201    }
2202}
2203
2204#[derive(Clone, Debug, PartialEq)]
2205pub enum GetStreamErrorKind {
2206    EmptyName,
2207    Request,
2208    InvalidStreamName,
2209    JetStream(super::errors::Error),
2210}
2211
2212impl Display for GetStreamErrorKind {
2213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2214        match self {
2215            Self::EmptyName => write!(f, "empty name cannot be empty"),
2216            Self::Request => write!(f, "request error"),
2217            Self::InvalidStreamName => write!(f, "invalid stream name"),
2218            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2219        }
2220    }
2221}
2222
2223#[derive(Clone, Debug, PartialEq)]
2224pub enum GetStreamByNameErrorKind {
2225    Request,
2226    NotFound,
2227    InvalidSubject,
2228    JetStream(super::errors::Error),
2229}
2230
2231impl Display for GetStreamByNameErrorKind {
2232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2233        match self {
2234            Self::Request => write!(f, "request error"),
2235            Self::NotFound => write!(f, "stream not found"),
2236            Self::InvalidSubject => write!(f, "invalid subject"),
2237            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2238        }
2239    }
2240}
2241
2242pub type GetStreamError = Error<GetStreamErrorKind>;
2243pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2244
2245pub type UpdateStreamError = CreateStreamError;
2246pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2247pub type DeleteStreamError = GetStreamError;
2248pub type DeleteStreamErrorKind = GetStreamErrorKind;
2249
2250#[derive(Clone, Copy, Debug, PartialEq)]
2251pub enum KeyValueErrorKind {
2252    InvalidStoreName,
2253    GetBucket,
2254    JetStream,
2255}
2256
2257impl Display for KeyValueErrorKind {
2258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2259        match self {
2260            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2261            Self::GetBucket => write!(f, "failed to get the bucket"),
2262            Self::JetStream => write!(f, "JetStream error"),
2263        }
2264    }
2265}
2266
2267pub type KeyValueError = Error<KeyValueErrorKind>;
2268
2269#[derive(Clone, Copy, Debug, PartialEq)]
2270pub enum CreateKeyValueErrorKind {
2271    InvalidStoreName,
2272    TooLongHistory,
2273    JetStream,
2274    BucketCreate,
2275    TimedOut,
2276    LimitMarkersNotSupported,
2277}
2278
2279impl Display for CreateKeyValueErrorKind {
2280    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2281        match self {
2282            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2283            Self::TooLongHistory => write!(f, "too long history"),
2284            Self::JetStream => write!(f, "JetStream error"),
2285            Self::BucketCreate => write!(f, "bucket creation failed"),
2286            Self::TimedOut => write!(f, "timed out"),
2287            Self::LimitMarkersNotSupported => {
2288                write!(f, "limit markers not supported")
2289            }
2290        }
2291    }
2292}
2293
2294#[derive(Clone, Copy, Debug, PartialEq)]
2295pub enum UpdateKeyValueErrorKind {
2296    InvalidStoreName,
2297    TooLongHistory,
2298    JetStream,
2299    BucketUpdate,
2300    TimedOut,
2301    LimitMarkersNotSupported,
2302    NotFound,
2303}
2304
2305impl Display for UpdateKeyValueErrorKind {
2306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2307        match self {
2308            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2309            Self::TooLongHistory => write!(f, "too long history"),
2310            Self::JetStream => write!(f, "JetStream error"),
2311            Self::BucketUpdate => write!(f, "bucket creation failed"),
2312            Self::TimedOut => write!(f, "timed out"),
2313            Self::LimitMarkersNotSupported => {
2314                write!(f, "limit markers not supported")
2315            }
2316            Self::NotFound => write!(f, "bucket does not exist"),
2317        }
2318    }
2319}
2320pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2321pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2322
2323pub type CreateObjectStoreError = CreateKeyValueError;
2324pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2325
2326#[derive(Clone, Copy, Debug, PartialEq)]
2327pub enum ObjectStoreErrorKind {
2328    InvalidBucketName,
2329    GetStore,
2330}
2331
2332impl Display for ObjectStoreErrorKind {
2333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2334        match self {
2335            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2336            Self::GetStore => write!(f, "failed to get Object Store"),
2337        }
2338    }
2339}
2340
2341pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2342
2343pub type DeleteObjectStore = ObjectStoreError;
2344pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2345
2346#[derive(Clone, Debug, PartialEq)]
2347pub enum AccountErrorKind {
2348    TimedOut,
2349    JetStream(super::errors::Error),
2350    JetStreamUnavailable,
2351    Other,
2352}
2353
2354impl Display for AccountErrorKind {
2355    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2356        match self {
2357            Self::TimedOut => write!(f, "timed out"),
2358            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2359            Self::Other => write!(f, "error"),
2360            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2361        }
2362    }
2363}
2364
2365pub type AccountError = Error<AccountErrorKind>;
2366
2367impl From<RequestError> for AccountError {
2368    fn from(err: RequestError) -> Self {
2369        match err.kind {
2370            RequestErrorKind::NoResponders => {
2371                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2372            }
2373            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2374            RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2375        }
2376    }
2377}
2378
2379#[derive(Clone, Debug, Serialize)]
2380enum ConsumerAction {
2381    #[serde(rename = "")]
2382    CreateOrUpdate,
2383    #[serde(rename = "create")]
2384    #[cfg(feature = "server_2_10")]
2385    Create,
2386    #[serde(rename = "update")]
2387    #[cfg(feature = "server_2_10")]
2388    Update,
2389}
2390
2391// Maps a Stream config to KV Store.
2392fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2393    let mut store = Store {
2394        prefix: format!("$KV.{}.", bucket.as_str()),
2395        name: bucket,
2396        stream: stream.clone(),
2397        stream_name: stream.info.config.name.clone(),
2398        put_prefix: None,
2399        use_jetstream_prefix: prefix != "$JS.API",
2400    };
2401    if let Some(ref mirror) = stream.info.config.mirror {
2402        let bucket = mirror.name.trim_start_matches("KV_");
2403        if let Some(ref external) = mirror.external {
2404            if !external.api_prefix.is_empty() {
2405                store.use_jetstream_prefix = false;
2406                store.prefix = format!("$KV.{bucket}.");
2407                store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2408            } else {
2409                store.put_prefix = Some(format!("$KV.{bucket}."));
2410            }
2411        }
2412    };
2413    store
2414}
2415
2416enum KvToStreamConfigError {
2417    TooLongHistory,
2418    #[allow(dead_code)]
2419    LimitMarkersNotSupported,
2420}
2421
2422impl From<KvToStreamConfigError> for CreateKeyValueError {
2423    fn from(err: KvToStreamConfigError) -> Self {
2424        match err {
2425            KvToStreamConfigError::TooLongHistory => {
2426                CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2427            }
2428            KvToStreamConfigError::LimitMarkersNotSupported => {
2429                CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2430            }
2431        }
2432    }
2433}
2434
2435impl From<KvToStreamConfigError> for UpdateKeyValueError {
2436    fn from(err: KvToStreamConfigError) -> Self {
2437        match err {
2438            KvToStreamConfigError::TooLongHistory => {
2439                UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2440            }
2441            KvToStreamConfigError::LimitMarkersNotSupported => {
2442                UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2443            }
2444        }
2445    }
2446}
2447
2448// Maps the KV config to Stream config.
2449fn kv_to_stream_config(
2450    config: kv::Config,
2451    _account: Account,
2452) -> Result<super::stream::Config, KvToStreamConfigError> {
2453    let history = if config.history > 0 {
2454        if config.history > MAX_HISTORY {
2455            return Err(KvToStreamConfigError::TooLongHistory);
2456        }
2457        config.history
2458    } else {
2459        1
2460    };
2461
2462    let num_replicas = if config.num_replicas == 0 {
2463        1
2464    } else {
2465        config.num_replicas
2466    };
2467
2468    #[cfg(feature = "server_2_11")]
2469    let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2470
2471    #[cfg(feature = "server_2_11")]
2472    if let Some(duration) = config.limit_markers {
2473        if _account.requests.level < 1 {
2474            return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2475        }
2476        allow_message_ttl = true;
2477        subject_delete_marker_ttl = Some(duration);
2478    }
2479
2480    let mut mirror = config.mirror.clone();
2481    let mut sources = config.sources.clone();
2482    let mut mirror_direct = config.mirror_direct;
2483
2484    let mut subjects = Vec::new();
2485    if let Some(ref mut mirror) = mirror {
2486        if !mirror.name.starts_with("KV_") {
2487            mirror.name = format!("KV_{}", mirror.name);
2488        }
2489        mirror_direct = true;
2490    } else if let Some(ref mut sources) = sources {
2491        for source in sources {
2492            if !source.name.starts_with("KV_") {
2493                source.name = format!("KV_{}", source.name);
2494            }
2495        }
2496    } else {
2497        subjects = vec![format!("$KV.{}.>", config.bucket)];
2498    }
2499
2500    Ok(stream::Config {
2501        name: format!("KV_{}", config.bucket),
2502        description: Some(config.description),
2503        subjects,
2504        max_messages_per_subject: history,
2505        max_bytes: config.max_bytes,
2506        max_age: config.max_age,
2507        max_message_size: config.max_value_size,
2508        storage: config.storage,
2509        republish: config.republish,
2510        allow_rollup: true,
2511        deny_delete: true,
2512        deny_purge: false,
2513        allow_direct: true,
2514        sources,
2515        mirror,
2516        num_replicas,
2517        discard: stream::DiscardPolicy::New,
2518        mirror_direct,
2519        #[cfg(feature = "server_2_10")]
2520        compression: if config.compression {
2521            Some(stream::Compression::S2)
2522        } else {
2523            None
2524        },
2525        placement: config.placement,
2526        #[cfg(feature = "server_2_11")]
2527        allow_message_ttl,
2528        #[cfg(feature = "server_2_11")]
2529        subject_delete_marker_ttl,
2530        ..Default::default()
2531    })
2532}