Skip to main content

async_nats/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::jetstream::account::Account;
18use crate::jetstream::message::PublishMessage;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{is_valid_subject, jetstream, Client, Command, Message, StatusCode};
23use bytes::Bytes;
24use futures_util::future::BoxFuture;
25use futures_util::{Future, StreamExt, TryFutureExt};
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use serde_json::{self, json};
29use std::borrow::Borrow;
30use std::fmt::Debug;
31use std::fmt::Display;
32use std::future::IntoFuture;
33use std::pin::Pin;
34use std::str::from_utf8;
35use std::sync::Arc;
36use std::task::Poll;
37use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
38use tokio::time::Duration;
39use tokio_stream::wrappers::ReceiverStream;
40use tracing::debug;
41
42use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
43use super::errors::ErrorCode;
44use super::is_valid_name;
45#[cfg(feature = "kv")]
46use super::kv::{Store, MAX_HISTORY};
47#[cfg(feature = "object-store")]
48use super::object_store::{is_valid_bucket_name, ObjectStore};
49#[cfg(any(feature = "kv", feature = "object-store"))]
50use super::stream::DiscardPolicy;
51#[cfg(feature = "server_2_10")]
52use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
53use super::stream::{
54    Config, ConsumerError, ConsumerErrorKind, DeleteStatus, External, Info, Stream,
55};
56
57pub mod traits {
58    use std::{future::Future, time::Duration};
59
60    use bytes::Bytes;
61    use serde::{de::DeserializeOwned, Serialize};
62
63    use crate::{jetstream::message, subject::ToSubject, Request};
64
65    use super::RequestError;
66
67    pub trait Requester {
68        fn request<S, T, V>(
69            &self,
70            subject: S,
71            payload: &T,
72        ) -> impl Future<Output = Result<V, RequestError>>
73        where
74            S: ToSubject,
75            T: ?Sized + Serialize,
76            V: DeserializeOwned;
77    }
78
79    pub trait RequestSender {
80        fn send_request<S: ToSubject>(
81            &self,
82            subject: S,
83            request: Request,
84        ) -> impl Future<Output = Result<(), crate::PublishError>>;
85    }
86
87    pub trait Publisher {
88        fn publish<S>(
89            &self,
90            subject: S,
91            payload: Bytes,
92        ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>
93        where
94            S: ToSubject;
95
96        fn publish_message(
97            &self,
98            message: message::OutboundMessage,
99        ) -> impl Future<Output = Result<super::PublishAckFuture, super::PublishError>>;
100    }
101
102    pub trait ClientProvider {
103        fn client(&self) -> crate::Client;
104    }
105
106    pub trait TimeoutProvider {
107        fn timeout(&self) -> Duration;
108    }
109}
110
111/// A context which can perform jetstream scoped requests.
112#[derive(Debug, Clone)]
113pub struct Context {
114    pub(crate) client: Client,
115    pub(crate) prefix: String,
116    pub(crate) timeout: Duration,
117    pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
118    pub(crate) ack_sender:
119        tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
120    pub(crate) backpressure_on_inflight: bool,
121    pub(crate) semaphore_capacity: usize,
122}
123
124fn spawn_acker(
125    rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
126    ack_timeout: Duration,
127    concurrency: Option<usize>,
128) -> tokio::task::JoinHandle<()> {
129    tokio::spawn(async move {
130        rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
131            tokio::time::timeout(ack_timeout, subscription).await.ok();
132            drop(permit);
133        })
134        .await;
135        debug!("Acker task exited");
136    })
137}
138
139use std::marker::PhantomData;
140
141#[derive(Debug, Default)]
142pub struct Yes;
143#[derive(Debug, Default)]
144pub struct No;
145
146pub trait ToAssign: Debug {}
147
148impl ToAssign for Yes {}
149impl ToAssign for No {}
150
151/// A builder for [Context]. Beyond what can be set by standard constructor, it allows tweaking
152/// pending publish ack backpressure settings.
153/// # Examples
154/// ```no_run
155/// # use async_nats::jetstream::context::ContextBuilder;
156/// # use async_nats::Client;
157/// # use std::time::Duration;
158/// # #[tokio::main]
159/// # async fn main() -> Result<(), async_nats::Error> {
160/// let client = async_nats::connect("demo.nats.io").await?;
161/// let context = ContextBuilder::new()
162///     .timeout(Duration::from_secs(5))
163///     .api_prefix("MY.JS.API")
164///     .max_ack_inflight(1000)
165///     .build(client);
166/// # Ok(())
167/// # }
168/// ```
169pub struct ContextBuilder<PREFIX: ToAssign> {
170    prefix: String,
171    timeout: Duration,
172    semaphore_capacity: usize,
173    ack_timeout: Duration,
174    backpressure_on_inflight: bool,
175    concurrency_limit: Option<usize>,
176    _phantom: PhantomData<PREFIX>,
177}
178
179impl Default for ContextBuilder<Yes> {
180    fn default() -> Self {
181        ContextBuilder {
182            prefix: "$JS.API".to_string(),
183            timeout: Duration::from_secs(5),
184            semaphore_capacity: 5_000,
185            ack_timeout: Duration::from_secs(30),
186            backpressure_on_inflight: true,
187            concurrency_limit: None,
188            _phantom: PhantomData {},
189        }
190    }
191}
192
193impl ContextBuilder<Yes> {
194    /// Create a new [ContextBuilder] with default settings.
195    pub fn new() -> ContextBuilder<Yes> {
196        ContextBuilder::default()
197    }
198}
199
200impl ContextBuilder<Yes> {
201    /// Set the prefix for the JetStream API.
202    pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
203        ContextBuilder {
204            prefix: prefix.into(),
205            timeout: self.timeout,
206            semaphore_capacity: self.semaphore_capacity,
207            ack_timeout: self.ack_timeout,
208            backpressure_on_inflight: self.backpressure_on_inflight,
209            concurrency_limit: self.concurrency_limit,
210            _phantom: PhantomData,
211        }
212    }
213
214    /// Set the domain for the JetStream API. Domain is the middle part of standard API prefix:
215    /// $JS.{domain}.API.
216    pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
217        ContextBuilder {
218            prefix: format!("$JS.{}.API", domain.into()),
219            timeout: self.timeout,
220            semaphore_capacity: self.semaphore_capacity,
221            ack_timeout: self.ack_timeout,
222            backpressure_on_inflight: self.backpressure_on_inflight,
223            concurrency_limit: self.concurrency_limit,
224            _phantom: PhantomData,
225        }
226    }
227}
228
229impl<PREFIX> ContextBuilder<PREFIX>
230where
231    PREFIX: ToAssign,
232{
233    /// Set the timeout for all JetStream API requests.
234    pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
235    where
236        Yes: ToAssign,
237    {
238        ContextBuilder {
239            prefix: self.prefix,
240            timeout,
241            semaphore_capacity: self.semaphore_capacity,
242            ack_timeout: self.ack_timeout,
243            backpressure_on_inflight: self.backpressure_on_inflight,
244            concurrency_limit: self.concurrency_limit,
245            _phantom: PhantomData,
246        }
247    }
248
249    /// Sets the maximum time client waits for acks from the server when default backpressure is
250    /// used.
251    pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
252    where
253        Yes: ToAssign,
254    {
255        ContextBuilder {
256            prefix: self.prefix,
257            timeout: self.timeout,
258            semaphore_capacity: self.semaphore_capacity,
259            ack_timeout,
260            backpressure_on_inflight: self.backpressure_on_inflight,
261            concurrency_limit: self.concurrency_limit,
262            _phantom: PhantomData,
263        }
264    }
265
266    /// Sets the maximum number of pending acks that can be in flight at any given time.
267    /// If limit is reached, `publish` throws an error by default, or waits if backpressure is enabled.
268    pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
269    where
270        Yes: ToAssign,
271    {
272        ContextBuilder {
273            prefix: self.prefix,
274            timeout: self.timeout,
275            semaphore_capacity: capacity,
276            ack_timeout: self.ack_timeout,
277            backpressure_on_inflight: self.backpressure_on_inflight,
278            concurrency_limit: self.concurrency_limit,
279            _phantom: PhantomData,
280        }
281    }
282
283    /// Enable or disable backpressure when max inflight acks is reached.
284    /// When enabled, publish will wait for permits to become available instead of returning an
285    /// error.
286    /// Default is false (errors on max inflight).
287    pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
288    where
289        Yes: ToAssign,
290    {
291        ContextBuilder {
292            prefix: self.prefix,
293            timeout: self.timeout,
294            semaphore_capacity: self.semaphore_capacity,
295            ack_timeout: self.ack_timeout,
296            backpressure_on_inflight: enabled,
297            concurrency_limit: self.concurrency_limit,
298            _phantom: PhantomData,
299        }
300    }
301
302    /// Sets the concurrency limit for the ack handler task. This might be useful
303    /// in scenarios where Tokio runtime is under heavy load.
304    pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
305    where
306        Yes: ToAssign,
307    {
308        ContextBuilder {
309            prefix: self.prefix,
310            timeout: self.timeout,
311            semaphore_capacity: self.semaphore_capacity,
312            ack_timeout: self.ack_timeout,
313            backpressure_on_inflight: self.backpressure_on_inflight,
314            concurrency_limit: limit,
315            _phantom: PhantomData,
316        }
317    }
318
319    /// Build the [Context] with the given settings.
320    pub fn build(self, client: Client) -> Context {
321        let (tx, rx) = tokio::sync::mpsc::channel::<(
322            oneshot::Receiver<Message>,
323            OwnedSemaphorePermit,
324        )>(self.semaphore_capacity);
325        let stream = ReceiverStream::new(rx);
326        spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
327        Context {
328            client,
329            prefix: self.prefix,
330            timeout: self.timeout,
331            max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
332            ack_sender: tx,
333            backpressure_on_inflight: self.backpressure_on_inflight,
334            semaphore_capacity: self.semaphore_capacity,
335        }
336    }
337}
338
339impl Context {
340    pub(crate) fn new(client: Client) -> Context {
341        ContextBuilder::default().build(client)
342    }
343
344    /// Sets the timeout for all JetStream API requests.
345    pub fn set_timeout(&mut self, timeout: Duration) {
346        self.timeout = timeout
347    }
348
349    /// Return a clone of the underlying NATS client.
350    pub fn client(&self) -> Client {
351        self.client.clone()
352    }
353
354    /// Waits until all pending `acks` are received from the server.
355    /// Be aware that this is probably not the way you want to await `acks`,
356    /// as it will wait for every `ack` that is pending, including those that might
357    /// be published after you call this method.
358    /// Useful in testing, or maybe batching.
359    pub async fn wait_for_acks(&self) {
360        self.max_ack_semaphore
361            .acquire_many(self.semaphore_capacity as u32)
362            .await
363            .ok();
364    }
365
366    /// Create a new [Context] with given API prefix.
367    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
368        ContextBuilder::new()
369            .api_prefix(prefix.to_string())
370            .build(client)
371    }
372
373    /// Create a new [Context] with given domain.
374    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
375        ContextBuilder::new().domain(domain.as_ref()).build(client)
376    }
377
378    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
379    /// acknowledgment from the server that the message has been successfully delivered.
380    ///
381    /// Acknowledgment future that can be polled is returned instead.
382    ///
383    /// If the stream does not exist, `no responders` error will be returned.
384    ///
385    /// # Examples
386    ///
387    /// Publish, and after each publish, await for acknowledgment.
388    ///
389    /// ```no_run
390    /// # #[tokio::main]
391    /// # async fn main() -> Result<(), async_nats::Error> {
392    /// let client = async_nats::connect("localhost:4222").await?;
393    /// let jetstream = async_nats::jetstream::new(client);
394    ///
395    /// let ack = jetstream.publish("events", "data".into()).await?;
396    /// ack.await?;
397    /// jetstream.publish("events", "data".into()).await?.await?;
398    /// # Ok(())
399    /// # }
400    /// ```
401    ///
402    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
403    /// ignored entirely.
404    ///
405    /// ```no_run
406    /// # #[tokio::main]
407    /// # async fn main() -> Result<(), async_nats::Error> {
408    /// let client = async_nats::connect("localhost:4222").await?;
409    /// let jetstream = async_nats::jetstream::new(client);
410    ///
411    /// let first_ack = jetstream.publish("events", "data".into()).await?;
412    /// let second_ack = jetstream.publish("events", "data".into()).await?;
413    /// first_ack.await?;
414    /// second_ack.await?;
415    /// # Ok(())
416    /// # }
417    /// ```
418    pub async fn publish<S: ToSubject>(
419        &self,
420        subject: S,
421        payload: Bytes,
422    ) -> Result<PublishAckFuture, PublishError> {
423        self.send_publish(subject, PublishMessage::build().payload(payload))
424            .await
425    }
426
427    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
428    /// the server that the message has been successfully delivered.
429    ///
430    /// If the stream does not exist, `no responders` error will be returned.
431    ///
432    /// # Examples
433    ///
434    /// ```no_run
435    /// # #[tokio::main]
436    /// # async fn main() -> Result<(), async_nats::Error> {
437    /// let client = async_nats::connect("localhost:4222").await?;
438    /// let jetstream = async_nats::jetstream::new(client);
439    ///
440    /// let mut headers = async_nats::HeaderMap::new();
441    /// headers.append("X-key", "Value");
442    /// let ack = jetstream
443    ///     .publish_with_headers("events", headers, "data".into())
444    ///     .await?;
445    /// # Ok(())
446    /// # }
447    /// ```
448    pub async fn publish_with_headers<S: ToSubject>(
449        &self,
450        subject: S,
451        headers: crate::header::HeaderMap,
452        payload: Bytes,
453    ) -> Result<PublishAckFuture, PublishError> {
454        self.send_publish(
455            subject,
456            PublishMessage::build().payload(payload).headers(headers),
457        )
458        .await
459    }
460
461    /// Publish a message built by [Publish] and returns an acknowledgment future.
462    ///
463    /// If the stream does not exist, `no responders` error will be returned.
464    ///
465    /// # Examples
466    ///
467    /// ```no_run
468    /// # use async_nats::jetstream::context::Publish;
469    /// # #[tokio::main]
470    /// # async fn main() -> Result<(), async_nats::Error> {
471    /// let client = async_nats::connect("localhost:4222").await?;
472    /// let jetstream = async_nats::jetstream::new(client);
473    ///
474    /// let ack = jetstream
475    ///     .send_publish(
476    ///         "events",
477    ///         Publish::build().payload("data".into()).message_id("uuid"),
478    ///     )
479    ///     .await?;
480    /// # Ok(())
481    /// # }
482    /// ```
483    pub async fn send_publish<S: ToSubject>(
484        &self,
485        subject: S,
486        publish: PublishMessage,
487    ) -> Result<PublishAckFuture, PublishError> {
488        let subject = self
489            .client
490            .maybe_validate_publish_subject(subject)
491            .map_err(|e| PublishError::with_source(PublishErrorKind::Other, e))?;
492
493        // Reject oversized messages before taking an ack permit, else the
494        // publish is sent and we wait for an ack that never arrives.
495        self.client
496            .check_payload_size(publish.headers.as_ref(), publish.payload.len())
497            .map_err(|sizes| {
498                PublishError::with_source(
499                    PublishErrorKind::MaxPayloadExceeded,
500                    crate::client::max_payload_message(sizes),
501                )
502            })?;
503
504        let permit = if self.backpressure_on_inflight {
505            // When backpressure is enabled, wait for a permit to become available
506            self.max_ack_semaphore
507                .clone()
508                .acquire_owned()
509                .await
510                .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
511        } else {
512            // When backpressure is disabled, error immediately if no permits available
513            self.max_ack_semaphore
514                .clone()
515                .try_acquire_owned()
516                .map_err(|err| match err {
517                    TryAcquireError::NoPermits => {
518                        PublishError::new(PublishErrorKind::MaxAckPending)
519                    }
520                    _ => PublishError::with_source(PublishErrorKind::Other, err),
521                })?
522        };
523
524        let (sender, receiver) = oneshot::channel();
525
526        let respond = self.client.new_inbox().into();
527
528        let send_fut = self
529            .client
530            .sender
531            .send(Command::Request {
532                subject,
533                payload: publish.payload,
534                respond,
535                headers: publish.headers,
536                sender,
537            })
538            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
539
540        tokio::time::timeout(self.timeout, send_fut)
541            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
542            .await??;
543
544        Ok(PublishAckFuture {
545            timeout: self.timeout,
546            subscription: Some(receiver),
547            permit: Some(permit),
548            tx: self.ack_sender.clone(),
549        })
550    }
551
552    /// Query the server for account information
553    pub async fn query_account(&self) -> Result<Account, AccountError> {
554        let response: Response<Account> = self.request("INFO", b"").await?;
555
556        match response {
557            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
558            Response::Ok(account) => Ok(account),
559        }
560    }
561
562    /// Create a JetStream [Stream] with given config and return a handle to it.
563    /// That handle can be used to manage and use [Consumer].
564    ///
565    /// # Examples
566    ///
567    /// ```no_run
568    /// # #[tokio::main]
569    /// # async fn main() -> Result<(), async_nats::Error> {
570    /// use async_nats::jetstream::stream::Config;
571    /// use async_nats::jetstream::stream::DiscardPolicy;
572    /// let client = async_nats::connect("localhost:4222").await?;
573    /// let jetstream = async_nats::jetstream::new(client);
574    ///
575    /// let stream = jetstream
576    ///     .create_stream(Config {
577    ///         name: "events".to_string(),
578    ///         max_messages: 100_000,
579    ///         discard: DiscardPolicy::Old,
580    ///         ..Default::default()
581    ///     })
582    ///     .await?;
583    /// # Ok(())
584    /// # }
585    /// ```
586    pub async fn create_stream<S>(
587        &self,
588        stream_config: S,
589    ) -> Result<Stream<Info>, CreateStreamError>
590    where
591        Config: From<S>,
592    {
593        let mut config: Config = stream_config.into();
594        if config.name.is_empty() {
595            return Err(CreateStreamError::new(
596                CreateStreamErrorKind::EmptyStreamName,
597            ));
598        }
599        if !is_valid_name(config.name.as_str()) {
600            return Err(CreateStreamError::new(
601                CreateStreamErrorKind::InvalidStreamName,
602            ));
603        }
604        if let Some(ref mut mirror) = config.mirror {
605            if let Some(ref mut domain) = mirror.domain {
606                if mirror.external.is_some() {
607                    return Err(CreateStreamError::new(
608                        CreateStreamErrorKind::DomainAndExternalSet,
609                    ));
610                }
611                mirror.external = Some(External {
612                    api_prefix: format!("$JS.{domain}.API"),
613                    delivery_prefix: None,
614                })
615            }
616        }
617
618        if let Some(ref mut sources) = config.sources {
619            for source in sources {
620                if let Some(ref mut domain) = source.domain {
621                    if source.external.is_some() {
622                        return Err(CreateStreamError::new(
623                            CreateStreamErrorKind::DomainAndExternalSet,
624                        ));
625                    }
626                    source.external = Some(External {
627                        api_prefix: format!("$JS.{domain}.API"),
628                        delivery_prefix: None,
629                    })
630                }
631            }
632        }
633        let subject = format!("STREAM.CREATE.{}", config.name);
634        let response: Response<Info> = self.request(subject, &config).await?;
635
636        match response {
637            Response::Err { error } => Err(error.into()),
638            Response::Ok(info) => Ok(Stream {
639                context: self.clone(),
640                info,
641                name: config.name,
642            }),
643        }
644    }
645
646    /// Checks for [Stream] existence on the server and returns handle to it.
647    /// That handle can be used to manage and use [Consumer].
648    /// This variant does not fetch [Stream] info from the server.
649    /// It means it does not check if the stream actually exists.
650    /// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
651    /// If you however run single operations on many streams, this method is more efficient.
652    ///
653    /// # Examples
654    ///
655    /// ```no_run
656    /// # #[tokio::main]
657    /// # async fn main() -> Result<(), async_nats::Error> {
658    /// let client = async_nats::connect("localhost:4222").await?;
659    /// let jetstream = async_nats::jetstream::new(client);
660    ///
661    /// let stream = jetstream.get_stream_no_info("events").await?;
662    /// # Ok(())
663    /// # }
664    /// ```
665    pub async fn get_stream_no_info<T: AsRef<str>>(
666        &self,
667        stream: T,
668    ) -> Result<Stream<()>, GetStreamError> {
669        let stream = stream.as_ref();
670        if stream.is_empty() {
671            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
672        }
673
674        if !is_valid_name(stream) {
675            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
676        }
677
678        Ok(Stream {
679            context: self.clone(),
680            info: (),
681            name: stream.to_string(),
682        })
683    }
684
685    /// Checks for [Stream] existence on the server and returns handle to it.
686    /// That handle can be used to manage and use [Consumer].
687    ///
688    /// # Examples
689    ///
690    /// ```no_run
691    /// # #[tokio::main]
692    /// # async fn main() -> Result<(), async_nats::Error> {
693    /// let client = async_nats::connect("localhost:4222").await?;
694    /// let jetstream = async_nats::jetstream::new(client);
695    ///
696    /// let stream = jetstream.get_stream("events").await?;
697    /// # Ok(())
698    /// # }
699    /// ```
700    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
701        let stream = stream.as_ref();
702        if stream.is_empty() {
703            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
704        }
705
706        if !is_valid_name(stream) {
707            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
708        }
709
710        let subject = format!("STREAM.INFO.{stream}");
711        let request: Response<Info> = self
712            .request(subject, &())
713            .await
714            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
715        match request {
716            Response::Err { error } => {
717                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
718            }
719            Response::Ok(info) => Ok(Stream {
720                context: self.clone(),
721                info,
722                name: stream.to_string(),
723            }),
724        }
725    }
726
727    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
728    ///
729    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
730    ///
731    /// # Examples
732    ///
733    /// ```no_run
734    /// # #[tokio::main]
735    /// # async fn main() -> Result<(), async_nats::Error> {
736    /// use async_nats::jetstream::stream::Config;
737    /// let client = async_nats::connect("localhost:4222").await?;
738    /// let jetstream = async_nats::jetstream::new(client);
739    ///
740    /// let stream = jetstream
741    ///     .get_or_create_stream(Config {
742    ///         name: "events".to_string(),
743    ///         max_messages: 10_000,
744    ///         ..Default::default()
745    ///     })
746    ///     .await?;
747    /// # Ok(())
748    /// # }
749    /// ```
750    pub async fn get_or_create_stream<S>(
751        &self,
752        stream_config: S,
753    ) -> Result<Stream, CreateStreamError>
754    where
755        S: Into<Config>,
756    {
757        let config: Config = stream_config.into();
758
759        if config.name.is_empty() {
760            return Err(CreateStreamError::new(
761                CreateStreamErrorKind::EmptyStreamName,
762            ));
763        }
764
765        if !is_valid_name(config.name.as_str()) {
766            return Err(CreateStreamError::new(
767                CreateStreamErrorKind::InvalidStreamName,
768            ));
769        }
770        let subject = format!("STREAM.INFO.{}", config.name);
771
772        let request: Response<Info> = self.request(subject, &()).await?;
773        match request {
774            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
775            Response::Err { error } => Err(error.into()),
776            Response::Ok(info) => Ok(Stream {
777                context: self.clone(),
778                info,
779                name: config.name,
780            }),
781        }
782    }
783
784    /// Deletes a [Stream] with a given name.
785    ///
786    /// # Examples
787    ///
788    /// ```no_run
789    /// # #[tokio::main]
790    /// # async fn main() -> Result<(), async_nats::Error> {
791    /// use async_nats::jetstream::stream::Config;
792    /// let client = async_nats::connect("localhost:4222").await?;
793    /// let jetstream = async_nats::jetstream::new(client);
794    ///
795    /// let stream = jetstream.delete_stream("events").await?;
796    /// # Ok(())
797    /// # }
798    /// ```
799    pub async fn delete_stream<T: AsRef<str>>(
800        &self,
801        stream: T,
802    ) -> Result<DeleteStatus, DeleteStreamError> {
803        let stream = stream.as_ref();
804        if stream.is_empty() {
805            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
806        }
807
808        if !is_valid_name(stream) {
809            return Err(DeleteStreamError::new(
810                DeleteStreamErrorKind::InvalidStreamName,
811            ));
812        }
813
814        let subject = format!("STREAM.DELETE.{stream}");
815        match self
816            .request(subject, &json!({}))
817            .await
818            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
819        {
820            Response::Err { error } => Err(DeleteStreamError::new(
821                DeleteStreamErrorKind::JetStream(error),
822            )),
823            Response::Ok(delete_response) => Ok(delete_response),
824        }
825    }
826
827    /// Updates a [Stream] with a given config. If specific field cannot be updated,
828    /// error is returned.
829    ///
830    /// # Examples
831    ///
832    /// ```no_run
833    /// # #[tokio::main]
834    /// # async fn main() -> Result<(), async_nats::Error> {
835    /// use async_nats::jetstream::stream::Config;
836    /// use async_nats::jetstream::stream::DiscardPolicy;
837    /// let client = async_nats::connect("localhost:4222").await?;
838    /// let jetstream = async_nats::jetstream::new(client);
839    ///
840    /// let stream = jetstream
841    ///     .update_stream(Config {
842    ///         name: "events".to_string(),
843    ///         discard: DiscardPolicy::New,
844    ///         max_messages: 50_000,
845    ///         ..Default::default()
846    ///     })
847    ///     .await?;
848    /// # Ok(())
849    /// # }
850    /// ```
851    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
852    where
853        S: Borrow<Config>,
854    {
855        let config = config.borrow();
856
857        if config.name.is_empty() {
858            return Err(CreateStreamError::new(
859                CreateStreamErrorKind::EmptyStreamName,
860            ));
861        }
862
863        if !is_valid_name(config.name.as_str()) {
864            return Err(CreateStreamError::new(
865                CreateStreamErrorKind::InvalidStreamName,
866            ));
867        }
868
869        let subject = format!("STREAM.UPDATE.{}", config.name);
870        match self.request(subject, config).await? {
871            Response::Err { error } => Err(error.into()),
872            Response::Ok(info) => Ok(info),
873        }
874    }
875
876    /// Tries to update a [Stream] with a given config, and if it does not exist, create it.
877    ///
878    /// # Examples
879    ///
880    /// ```no_run
881    /// # #[tokio::main]
882    /// # async fn main() -> Result<(), async_nats::Error> {
883    /// use async_nats::jetstream::stream::Config;
884    /// use async_nats::jetstream::stream::DiscardPolicy;
885    /// let client = async_nats::connect("localhost:4222").await?;
886    /// let jetstream = async_nats::jetstream::new(client);
887    ///
888    /// let stream = jetstream
889    ///     .create_or_update_stream(Config {
890    ///         name: "events".to_string(),
891    ///         discard: DiscardPolicy::New,
892    ///         max_messages: 50_000,
893    ///         ..Default::default()
894    ///     })
895    ///     .await?;
896    /// # Ok(())
897    /// # }
898    /// ```
899    pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
900        match self.update_stream(config.clone()).await {
901            Ok(stream) => Ok(stream),
902            Err(err) => match err.kind() {
903                CreateStreamErrorKind::NotFound => {
904                    let stream = self
905                        .create_stream(config)
906                        .await
907                        .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
908                    Ok(stream.info)
909                }
910                _ => Err(err),
911            },
912        }
913    }
914
915    /// Looks up Stream that contains provided subject.
916    ///
917    /// # Examples
918    ///
919    /// ```no_run
920    /// # #[tokio::main]
921    /// # async fn main() -> Result<(), async_nats::Error> {
922    /// use futures_util::TryStreamExt;
923    /// let client = async_nats::connect("demo.nats.io:4222").await?;
924    /// let jetstream = async_nats::jetstream::new(client);
925    /// let stream_name = jetstream.stream_by_subject("foo.>");
926    /// # Ok(())
927    /// # }
928    /// ```
929    pub async fn stream_by_subject<T: Into<String>>(
930        &self,
931        subject: T,
932    ) -> Result<String, GetStreamByNameError> {
933        let subject = subject.into();
934        if !is_valid_subject(subject.as_str()) {
935            return Err(GetStreamByNameError::new(
936                GetStreamByNameErrorKind::InvalidSubject,
937            ));
938        }
939        let mut names = StreamNames {
940            context: self.clone(),
941            offset: 0,
942            page_request: None,
943            streams: Vec::new(),
944            subject: Some(subject),
945            done: false,
946        };
947        match names.next().await {
948            Some(name) => match name {
949                Ok(name) => Ok(name),
950                Err(err) => Err(GetStreamByNameError::with_source(
951                    GetStreamByNameErrorKind::Request,
952                    err,
953                )),
954            },
955            None => Err(GetStreamByNameError::new(
956                GetStreamByNameErrorKind::NotFound,
957            )),
958        }
959    }
960
961    /// Lists names of all streams for current context.
962    ///
963    /// # Examples
964    ///
965    /// ```no_run
966    /// # #[tokio::main]
967    /// # async fn main() -> Result<(), async_nats::Error> {
968    /// use futures_util::TryStreamExt;
969    /// let client = async_nats::connect("demo.nats.io:4222").await?;
970    /// let jetstream = async_nats::jetstream::new(client);
971    /// let mut names = jetstream.stream_names();
972    /// while let Some(stream) = names.try_next().await? {
973    ///     println!("stream: {}", stream);
974    /// }
975    /// # Ok(())
976    /// # }
977    /// ```
978    pub fn stream_names(&self) -> StreamNames {
979        StreamNames {
980            context: self.clone(),
981            offset: 0,
982            page_request: None,
983            streams: Vec::new(),
984            subject: None,
985            done: false,
986        }
987    }
988
989    /// Lists all streams info for current context.
990    ///
991    /// # Examples
992    ///
993    /// ```no_run
994    /// # #[tokio::main]
995    /// # async fn main() -> Result<(), async_nats::Error> {
996    /// use futures_util::TryStreamExt;
997    /// let client = async_nats::connect("demo.nats.io:4222").await?;
998    /// let jetstream = async_nats::jetstream::new(client);
999    /// let mut streams = jetstream.streams();
1000    /// while let Some(stream) = streams.try_next().await? {
1001    ///     println!("stream: {:?}", stream);
1002    /// }
1003    /// # Ok(())
1004    /// # }
1005    /// ```
1006    pub fn streams(&self) -> Streams {
1007        Streams {
1008            context: self.clone(),
1009            offset: 0,
1010            page_request: None,
1011            streams: Vec::new(),
1012            done: false,
1013        }
1014    }
1015    /// Returns an existing key-value bucket.
1016    ///
1017    /// # Examples
1018    ///
1019    /// ```no_run
1020    /// # #[tokio::main]
1021    /// # async fn main() -> Result<(), async_nats::Error> {
1022    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1023    /// let jetstream = async_nats::jetstream::new(client);
1024    /// let kv = jetstream.get_key_value("bucket").await?;
1025    /// # Ok(())
1026    /// # }
1027    /// ```
1028    #[cfg(feature = "kv")]
1029    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1030    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
1031        let bucket: String = bucket.into();
1032        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
1033            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1034        }
1035
1036        let stream_name = format!("KV_{}", &bucket);
1037        let stream = self
1038            .get_stream(stream_name.clone())
1039            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
1040            .await?;
1041
1042        if stream.info.config.max_messages_per_subject < 1 {
1043            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1044        }
1045        let mut store = Store {
1046            prefix: format!("$KV.{}.", &bucket),
1047            name: bucket,
1048            stream_name,
1049            stream: stream.clone(),
1050            put_prefix: None,
1051            use_jetstream_prefix: self.prefix != "$JS.API",
1052        };
1053        if let Some(ref mirror) = stream.info.config.mirror {
1054            let bucket = mirror.name.trim_start_matches("KV_");
1055            if let Some(ref external) = mirror.external {
1056                if !external.api_prefix.is_empty() {
1057                    store.use_jetstream_prefix = false;
1058                    store.prefix = format!("$KV.{bucket}.");
1059                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
1060                } else {
1061                    store.put_prefix = Some(format!("$KV.{bucket}."));
1062                }
1063            }
1064        };
1065
1066        Ok(store)
1067    }
1068
1069    /// Creates a new key-value bucket.
1070    ///
1071    /// # Examples
1072    ///
1073    /// ```no_run
1074    /// # #[tokio::main]
1075    /// # async fn main() -> Result<(), async_nats::Error> {
1076    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1077    /// let jetstream = async_nats::jetstream::new(client);
1078    /// let kv = jetstream
1079    ///     .create_key_value(async_nats::jetstream::kv::Config {
1080    ///         bucket: "kv".to_string(),
1081    ///         history: 10,
1082    ///         ..Default::default()
1083    ///     })
1084    ///     .await?;
1085    /// # Ok(())
1086    /// # }
1087    /// ```
1088    #[cfg(feature = "kv")]
1089    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1090    pub async fn create_key_value(
1091        &self,
1092        config: crate::jetstream::kv::Config,
1093    ) -> Result<Store, CreateKeyValueError> {
1094        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1095            return Err(CreateKeyValueError::new(
1096                CreateKeyValueErrorKind::InvalidStoreName,
1097            ));
1098        }
1099        let info = self.query_account().await.map_err(|err| {
1100            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1101        })?;
1102
1103        let bucket_name = config.bucket.clone();
1104        let stream_config = kv_to_stream_config(config, info)?;
1105
1106        let stream = self.create_stream(stream_config).await.map_err(|err| {
1107            if err.kind() == CreateStreamErrorKind::TimedOut {
1108                CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1109            } else {
1110                CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1111            }
1112        })?;
1113
1114        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1115    }
1116
1117    /// Updates an existing key-value bucket.
1118    ///
1119    /// # Examples
1120    ///
1121    /// ```no_run
1122    /// # #[tokio::main]
1123    /// # async fn main() -> Result<(), async_nats::Error> {
1124    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1125    /// let jetstream = async_nats::jetstream::new(client);
1126    /// let kv = jetstream
1127    ///     .update_key_value(async_nats::jetstream::kv::Config {
1128    ///         bucket: "kv".to_string(),
1129    ///         history: 60,
1130    ///         ..Default::default()
1131    ///     })
1132    ///     .await?;
1133    /// # Ok(())
1134    /// # }
1135    /// ```
1136    #[cfg(feature = "kv")]
1137    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1138    pub async fn update_key_value(
1139        &self,
1140        config: crate::jetstream::kv::Config,
1141    ) -> Result<Store, UpdateKeyValueError> {
1142        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1143            return Err(UpdateKeyValueError::new(
1144                UpdateKeyValueErrorKind::InvalidStoreName,
1145            ));
1146        }
1147
1148        let stream_name = format!("KV_{}", config.bucket);
1149        let bucket_name = config.bucket.clone();
1150
1151        let account = self.query_account().await.map_err(|err| {
1152            UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1153        })?;
1154        let stream = self
1155            .update_stream(kv_to_stream_config(config, account)?)
1156            .await
1157            .map_err(|err| match err.kind() {
1158                UpdateStreamErrorKind::NotFound => {
1159                    UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1160                }
1161                _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1162            })?;
1163
1164        let stream = Stream {
1165            context: self.clone(),
1166            info: stream,
1167            name: stream_name,
1168        };
1169
1170        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1171    }
1172
1173    /// Tries to update an existing Key-Value bucket. If it does not exist, creates it.
1174    ///
1175    /// # Examples
1176    ///
1177    /// ```no_run
1178    /// # #[tokio::main]
1179    /// # async fn main() -> Result<(), async_nats::Error> {
1180    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1181    /// let jetstream = async_nats::jetstream::new(client);
1182    /// let kv = jetstream
1183    ///     .create_or_update_key_value(async_nats::jetstream::kv::Config {
1184    ///         bucket: "kv".to_string(),
1185    ///         history: 60,
1186    ///         ..Default::default()
1187    ///     })
1188    ///     .await?;
1189    /// # Ok(())
1190    /// # }
1191    /// ```
1192    #[cfg(feature = "kv")]
1193    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1194    pub async fn create_or_update_key_value(
1195        &self,
1196        config: crate::jetstream::kv::Config,
1197    ) -> Result<Store, CreateKeyValueError> {
1198        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1199            return Err(CreateKeyValueError::new(
1200                CreateKeyValueErrorKind::InvalidStoreName,
1201            ));
1202        }
1203
1204        let bucket_name = config.bucket.clone();
1205        let stream_name = format!("KV_{}", config.bucket);
1206
1207        let account = self.query_account().await.map_err(|err| {
1208            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1209        })?;
1210        let stream = self
1211            .create_or_update_stream(kv_to_stream_config(config, account)?)
1212            .await
1213            .map_err(|err| {
1214                CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1215            })?;
1216
1217        let stream = Stream {
1218            context: self.clone(),
1219            info: stream,
1220            name: stream_name,
1221        };
1222
1223        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1224    }
1225
1226    /// Deletes given key-value bucket.
1227    ///
1228    /// # Examples
1229    ///
1230    /// ```no_run
1231    /// # #[tokio::main]
1232    /// # async fn main() -> Result<(), async_nats::Error> {
1233    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1234    /// let jetstream = async_nats::jetstream::new(client);
1235    /// let kv = jetstream
1236    ///     .create_key_value(async_nats::jetstream::kv::Config {
1237    ///         bucket: "kv".to_string(),
1238    ///         history: 10,
1239    ///         ..Default::default()
1240    ///     })
1241    ///     .await?;
1242    /// # Ok(())
1243    /// # }
1244    /// ```
1245    #[cfg(feature = "kv")]
1246    #[cfg_attr(docsrs, doc(cfg(feature = "kv")))]
1247    pub async fn delete_key_value<T: AsRef<str>>(
1248        &self,
1249        bucket: T,
1250    ) -> Result<DeleteStatus, KeyValueError> {
1251        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1252            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1253        }
1254
1255        let stream_name = format!("KV_{}", bucket.as_ref());
1256        self.delete_stream(stream_name)
1257            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1258            .await
1259    }
1260
1261    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
1262    //     let config = config.borrow();
1263    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1264    //         return Err(Box::new(std::io::Error::other(
1265    //             "invalid bucket name",
1266    //         )));
1267    //     }
1268
1269    //     let stream_name = format!("KV_{}", config.bucket);
1270    //     self.update_stream()
1271    //         .await
1272    //         .and_then(|info| Ok(()))
1273    // }
1274
1275    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1276    ///
1277    /// It has one less interaction with the server when binding to only one
1278    /// [crate::jetstream::consumer::Consumer].
1279    ///
1280    /// # Examples:
1281    ///
1282    /// ```no_run
1283    /// # #[tokio::main]
1284    /// # async fn main() -> Result<(), async_nats::Error> {
1285    /// use async_nats::jetstream::consumer::PullConsumer;
1286    ///
1287    /// let client = async_nats::connect("localhost:4222").await?;
1288    /// let jetstream = async_nats::jetstream::new(client);
1289    ///
1290    /// let consumer: PullConsumer = jetstream
1291    ///     .get_consumer_from_stream("consumer", "stream")
1292    ///     .await?;
1293    ///
1294    /// # Ok(())
1295    /// # }
1296    /// ```
1297    pub async fn get_consumer_from_stream<T, C, S>(
1298        &self,
1299        consumer: C,
1300        stream: S,
1301    ) -> Result<Consumer<T>, ConsumerError>
1302    where
1303        T: FromConsumer + IntoConsumerConfig,
1304        S: AsRef<str>,
1305        C: AsRef<str>,
1306    {
1307        if !is_valid_name(stream.as_ref()) {
1308            return Err(ConsumerError::with_source(
1309                ConsumerErrorKind::InvalidName,
1310                "invalid stream",
1311            ));
1312        }
1313
1314        if !is_valid_name(consumer.as_ref()) {
1315            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1316        }
1317
1318        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1319
1320        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1321            Response::Ok(info) => info,
1322            Response::Err { error } => return Err(error.into()),
1323        };
1324
1325        Ok(Consumer::new(
1326            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1327                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1328            })?,
1329            info,
1330            self.clone(),
1331        ))
1332    }
1333
1334    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1335    ///
1336    /// It has one less interaction with the server when binding to only one
1337    /// [crate::jetstream::consumer::Consumer].
1338    ///
1339    /// # Examples:
1340    ///
1341    /// ```no_run
1342    /// # #[tokio::main]
1343    /// # async fn main() -> Result<(), async_nats::Error> {
1344    /// use async_nats::jetstream::consumer::PullConsumer;
1345    ///
1346    /// let client = async_nats::connect("localhost:4222").await?;
1347    /// let jetstream = async_nats::jetstream::new(client);
1348    ///
1349    /// jetstream
1350    ///     .delete_consumer_from_stream("consumer", "stream")
1351    ///     .await?;
1352    ///
1353    /// # Ok(())
1354    /// # }
1355    /// ```
1356    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1357        &self,
1358        consumer: C,
1359        stream: S,
1360    ) -> Result<DeleteStatus, ConsumerError> {
1361        if !is_valid_name(consumer.as_ref()) {
1362            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1363        }
1364
1365        if !is_valid_name(stream.as_ref()) {
1366            return Err(ConsumerError::with_source(
1367                ConsumerErrorKind::Other,
1368                "invalid stream name",
1369            ));
1370        }
1371
1372        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1373
1374        match self.request(subject, &json!({})).await? {
1375            Response::Ok(delete_status) => Ok(delete_status),
1376            Response::Err { error } => Err(error.into()),
1377        }
1378    }
1379
1380    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
1381    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1382    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
1383    ///
1384    /// # Examples
1385    ///
1386    /// ```no_run
1387    /// # #[tokio::main]
1388    /// # async fn main() -> Result<(), async_nats::Error> {
1389    /// use async_nats::jetstream::consumer;
1390    /// let client = async_nats::connect("localhost:4222").await?;
1391    /// let jetstream = async_nats::jetstream::new(client);
1392    ///
1393    /// let consumer: consumer::PullConsumer = jetstream
1394    ///     .create_consumer_on_stream(
1395    ///         consumer::pull::Config {
1396    ///             durable_name: Some("pull".to_string()),
1397    ///             ..Default::default()
1398    ///         },
1399    ///         "stream",
1400    ///     )
1401    ///     .await?;
1402    /// # Ok(())
1403    /// # }
1404    /// ```
1405    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1406        &self,
1407        config: C,
1408        stream: S,
1409    ) -> Result<Consumer<C>, ConsumerError> {
1410        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1411            .await
1412    }
1413
1414    /// Update an existing consumer.
1415    /// This call will fail if the consumer does not exist.
1416    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
1417    ///
1418    /// # Examples
1419    ///
1420    /// ```no_run
1421    /// # #[tokio::main]
1422    /// # async fn main() -> Result<(), async_nats::Error> {
1423    /// use async_nats::jetstream::consumer;
1424    /// let client = async_nats::connect("localhost:4222").await?;
1425    /// let jetstream = async_nats::jetstream::new(client);
1426    ///
1427    /// let consumer: consumer::PullConsumer = jetstream
1428    ///     .update_consumer_on_stream(
1429    ///         consumer::pull::Config {
1430    ///             durable_name: Some("pull".to_string()),
1431    ///             description: Some("updated pull consumer".to_string()),
1432    ///             ..Default::default()
1433    ///         },
1434    ///         "stream",
1435    ///     )
1436    ///     .await?;
1437    /// # Ok(())
1438    /// # }
1439    /// ```
1440    #[cfg(feature = "server_2_10")]
1441    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1442        &self,
1443        config: C,
1444        stream: S,
1445    ) -> Result<Consumer<C>, ConsumerUpdateError> {
1446        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1447            .await
1448            .map_err(|err| err.into())
1449    }
1450
1451    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
1452    /// the same.
1453    /// This method will fail if consumer is already present with different config.
1454    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1455    ///
1456    /// # Examples
1457    ///
1458    /// ```no_run
1459    /// # #[tokio::main]
1460    /// # async fn main() -> Result<(), async_nats::Error> {
1461    /// use async_nats::jetstream::consumer;
1462    /// let client = async_nats::connect("localhost:4222").await?;
1463    /// let jetstream = async_nats::jetstream::new(client);
1464    ///
1465    /// let consumer: consumer::PullConsumer = jetstream
1466    ///     .create_consumer_strict_on_stream(
1467    ///         consumer::pull::Config {
1468    ///             durable_name: Some("pull".to_string()),
1469    ///             ..Default::default()
1470    ///         },
1471    ///         "stream",
1472    ///     )
1473    ///     .await?;
1474    /// # Ok(())
1475    /// # }
1476    /// ```
1477    #[cfg(feature = "server_2_10")]
1478    pub async fn create_consumer_strict_on_stream<
1479        C: IntoConsumerConfig + FromConsumer,
1480        S: AsRef<str>,
1481    >(
1482        &self,
1483        config: C,
1484        stream: S,
1485    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1486        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1487            .await
1488            .map_err(|err| err.into())
1489    }
1490
1491    async fn create_consumer_on_stream_action<
1492        C: IntoConsumerConfig + FromConsumer,
1493        S: AsRef<str>,
1494    >(
1495        &self,
1496        config: C,
1497        stream: S,
1498        action: ConsumerAction,
1499    ) -> Result<Consumer<C>, ConsumerError> {
1500        let config = config.into_consumer_config();
1501
1502        let subject = {
1503            let filter = if config.filter_subject.is_empty() {
1504                "".to_string()
1505            } else {
1506                format!(".{}", config.filter_subject)
1507            };
1508            config
1509                .name
1510                .as_ref()
1511                .or(config.durable_name.as_ref())
1512                .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1513                .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1514        };
1515
1516        match self
1517            .request(
1518                subject,
1519                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1520            )
1521            .await?
1522        {
1523            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1524            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1525                FromConsumer::try_from_consumer_config(info.clone().config)
1526                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1527                info,
1528                self.clone(),
1529            )),
1530        }
1531    }
1532
1533    /// Send a request to the jetstream JSON API.
1534    ///
1535    /// This is a low level API used mostly internally, that should be used only in
1536    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1537    ///
1538    /// # Examples
1539    ///
1540    /// ```no_run
1541    /// # use async_nats::jetstream::stream::Info;
1542    /// # use async_nats::jetstream::response::Response;
1543    /// # #[tokio::main]
1544    /// # async fn main() -> Result<(), async_nats::Error> {
1545    /// let client = async_nats::connect("localhost:4222").await?;
1546    /// let jetstream = async_nats::jetstream::new(client);
1547    ///
1548    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1549    /// # Ok(())
1550    /// # }
1551    /// ```
1552    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1553    where
1554        S: ToSubject,
1555        T: ?Sized + Serialize,
1556        V: DeserializeOwned,
1557    {
1558        let subject = subject.to_subject();
1559        let request = serde_json::to_vec(&payload)
1560            .map(Bytes::from)
1561            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1562
1563        debug!("JetStream request sent: {:?}", request);
1564
1565        let message = self
1566            .client
1567            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1568            .await;
1569        let message = message?;
1570        debug!(
1571            "JetStream request response: {:?}",
1572            from_utf8(&message.payload)
1573        );
1574        let response = serde_json::from_slice(message.payload.as_ref())
1575            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1576
1577        Ok(response)
1578    }
1579
1580    /// Send a JetStream API request without waiting for a response.
1581    /// The subject will be automatically prefixed with the JetStream API prefix.
1582    ///
1583    /// This is useful for operations that need custom response handling,
1584    /// such as streaming responses (batch get) where the caller manages
1585    /// their own subscription to the reply inbox.
1586    ///
1587    /// Used mostly by extension crates.
1588    pub async fn send_request<S: ToSubject>(
1589        &self,
1590        subject: S,
1591        request: crate::client::Request,
1592    ) -> Result<(), crate::PublishError> {
1593        let prefixed_subject = format!("{}.{}", self.prefix, subject.to_subject());
1594        let inbox = request.inbox.unwrap_or_else(|| self.client.new_inbox());
1595        let payload = request.payload.unwrap_or_default();
1596
1597        match request.headers {
1598            Some(headers) => {
1599                self.client
1600                    .publish_with_reply_and_headers(prefixed_subject, inbox, headers, payload)
1601                    .await
1602            }
1603            None => {
1604                self.client
1605                    .publish_with_reply(prefixed_subject, inbox, payload)
1606                    .await
1607            }
1608        }
1609    }
1610
1611    /// Creates a new object store bucket.
1612    ///
1613    /// # Examples
1614    ///
1615    /// ```no_run
1616    /// # #[tokio::main]
1617    /// # async fn main() -> Result<(), async_nats::Error> {
1618    /// let client = async_nats::connect("demo.nats.io").await?;
1619    /// let jetstream = async_nats::jetstream::new(client);
1620    /// let bucket = jetstream
1621    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1622    ///         bucket: "bucket".to_string(),
1623    ///         ..Default::default()
1624    ///     })
1625    ///     .await?;
1626    /// # Ok(())
1627    /// # }
1628    /// ```
1629    #[cfg(feature = "object-store")]
1630    #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1631    pub async fn create_object_store(
1632        &self,
1633        config: super::object_store::Config,
1634    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1635        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1636            return Err(CreateObjectStoreError::new(
1637                CreateKeyValueErrorKind::InvalidStoreName,
1638            ));
1639        }
1640
1641        let bucket_name = config.bucket.clone();
1642        let stream_name = format!("OBJ_{bucket_name}");
1643        let chunk_subject = format!("$O.{bucket_name}.C.>");
1644        let meta_subject = format!("$O.{bucket_name}.M.>");
1645
1646        let stream = self
1647            .create_stream(super::stream::Config {
1648                name: stream_name,
1649                description: config.description.clone(),
1650                subjects: vec![chunk_subject, meta_subject],
1651                max_age: config.max_age,
1652                max_bytes: config.max_bytes,
1653                storage: config.storage,
1654                num_replicas: config.num_replicas,
1655                discard: DiscardPolicy::New,
1656                allow_rollup: true,
1657                allow_direct: true,
1658                #[cfg(feature = "server_2_10")]
1659                compression: if config.compression {
1660                    Some(Compression::S2)
1661                } else {
1662                    None
1663                },
1664                placement: config.placement,
1665                ..Default::default()
1666            })
1667            .await
1668            .map_err(|err| {
1669                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1670            })?;
1671
1672        Ok(ObjectStore {
1673            name: bucket_name,
1674            stream,
1675        })
1676    }
1677
1678    /// Get an existing object store bucket.
1679    ///
1680    /// # Examples
1681    ///
1682    /// ```no_run
1683    /// # #[tokio::main]
1684    /// # async fn main() -> Result<(), async_nats::Error> {
1685    /// let client = async_nats::connect("demo.nats.io").await?;
1686    /// let jetstream = async_nats::jetstream::new(client);
1687    /// let bucket = jetstream.get_object_store("bucket").await?;
1688    /// # Ok(())
1689    /// # }
1690    /// ```
1691    #[cfg(feature = "object-store")]
1692    #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1693    pub async fn get_object_store<T: AsRef<str>>(
1694        &self,
1695        bucket_name: T,
1696    ) -> Result<ObjectStore, ObjectStoreError> {
1697        let bucket_name = bucket_name.as_ref();
1698        if !is_valid_bucket_name(bucket_name) {
1699            return Err(ObjectStoreError::new(
1700                ObjectStoreErrorKind::InvalidBucketName,
1701            ));
1702        }
1703        let stream_name = format!("OBJ_{bucket_name}");
1704        let stream = self
1705            .get_stream(stream_name)
1706            .await
1707            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1708
1709        Ok(ObjectStore {
1710            name: bucket_name.to_string(),
1711            stream,
1712        })
1713    }
1714
1715    /// Delete a object store bucket.
1716    ///
1717    /// # Examples
1718    ///
1719    /// ```no_run
1720    /// # #[tokio::main]
1721    /// # async fn main() -> Result<(), async_nats::Error> {
1722    /// let client = async_nats::connect("demo.nats.io").await?;
1723    /// let jetstream = async_nats::jetstream::new(client);
1724    /// let bucket = jetstream.delete_object_store("bucket").await?;
1725    /// # Ok(())
1726    /// # }
1727    /// ```
1728    #[cfg(feature = "object-store")]
1729    #[cfg_attr(docsrs, doc(cfg(feature = "object-store")))]
1730    pub async fn delete_object_store<T: AsRef<str>>(
1731        &self,
1732        bucket_name: T,
1733    ) -> Result<(), DeleteObjectStore> {
1734        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1735        self.delete_stream(stream_name)
1736            .await
1737            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1738        Ok(())
1739    }
1740}
1741
1742impl crate::client::traits::Requester for Context {
1743    fn send_request<S: ToSubject>(
1744        &self,
1745        subject: S,
1746        request: crate::Request,
1747    ) -> impl Future<Output = Result<Message, crate::RequestError>> {
1748        self.client.send_request(subject, request)
1749    }
1750}
1751
1752impl crate::client::traits::Publisher for Context {
1753    fn publish_with_reply<S, R>(
1754        &self,
1755        subject: S,
1756        reply: R,
1757        payload: Bytes,
1758    ) -> impl Future<Output = Result<(), crate::PublishError>>
1759    where
1760        S: ToSubject,
1761        R: ToSubject,
1762    {
1763        self.client.publish_with_reply(subject, reply, payload)
1764    }
1765
1766    fn publish_message(
1767        &self,
1768        msg: crate::message::OutboundMessage,
1769    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1770        self.client.publish_message(msg)
1771    }
1772}
1773
1774impl traits::ClientProvider for Context {
1775    fn client(&self) -> crate::Client {
1776        self.client()
1777    }
1778}
1779
1780impl traits::Requester for Context {
1781    fn request<S, T, V>(
1782        &self,
1783        subject: S,
1784        payload: &T,
1785    ) -> impl Future<Output = Result<V, RequestError>>
1786    where
1787        S: ToSubject,
1788        T: ?Sized + Serialize,
1789        V: DeserializeOwned,
1790    {
1791        self.request(subject, payload)
1792    }
1793}
1794
1795impl traits::TimeoutProvider for Context {
1796    fn timeout(&self) -> Duration {
1797        self.timeout
1798    }
1799}
1800
1801impl traits::RequestSender for Context {
1802    fn send_request<S: ToSubject>(
1803        &self,
1804        subject: S,
1805        request: crate::client::Request,
1806    ) -> impl Future<Output = Result<(), crate::PublishError>> {
1807        self.send_request(subject, request)
1808    }
1809}
1810
1811impl traits::Publisher for Context {
1812    fn publish<S: ToSubject>(
1813        &self,
1814        subject: S,
1815        payload: Bytes,
1816    ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1817        self.publish(subject, payload)
1818    }
1819
1820    fn publish_message(
1821        &self,
1822        message: jetstream::message::OutboundMessage,
1823    ) -> impl Future<Output = Result<PublishAckFuture, PublishError>> {
1824        self.send_publish(
1825            message.subject,
1826            PublishMessage {
1827                payload: message.payload,
1828                headers: message.headers,
1829            },
1830        )
1831    }
1832}
1833
1834#[derive(Clone, Copy, Debug, PartialEq)]
1835pub enum PublishErrorKind {
1836    StreamNotFound,
1837    WrongLastMessageId,
1838    WrongLastSequence,
1839    TimedOut,
1840    BrokenPipe,
1841    MaxAckPending,
1842    /// The message (payload plus headers) exceeds the server's `max_payload`.
1843    MaxPayloadExceeded,
1844    Other,
1845}
1846
1847impl Display for PublishErrorKind {
1848    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1849        match self {
1850            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1851            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1852            Self::Other => write!(f, "publish failed"),
1853            Self::BrokenPipe => write!(f, "broken pipe"),
1854            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1855            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1856            Self::MaxAckPending => write!(f, "max ack pending reached"),
1857            Self::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
1858        }
1859    }
1860}
1861
1862pub type PublishError = Error<PublishErrorKind>;
1863
1864#[derive(Debug)]
1865pub struct PublishAckFuture {
1866    timeout: Duration,
1867    subscription: Option<oneshot::Receiver<Message>>,
1868    permit: Option<OwnedSemaphorePermit>,
1869    tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1870}
1871
1872impl Drop for PublishAckFuture {
1873    fn drop(&mut self) {
1874        if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1875            if let Err(err) = self.tx.try_send((sub, permit)) {
1876                tracing::warn!("failed to pass future permit to the acker: {}", err);
1877            }
1878        }
1879    }
1880}
1881
1882impl PublishAckFuture {
1883    async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1884        let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1885            .await
1886            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1887        next.map_or_else(
1888            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1889            |m| {
1890                if m.status == Some(StatusCode::NO_RESPONDERS) {
1891                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1892                }
1893                let response = serde_json::from_slice(m.payload.as_ref())
1894                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1895                match response {
1896                    Response::Err { error } => match error.error_code() {
1897                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1898                            PublishErrorKind::WrongLastMessageId,
1899                            error,
1900                        )),
1901                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1902                            PublishErrorKind::WrongLastSequence,
1903                            error,
1904                        )),
1905                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1906                    },
1907                    Response::Ok(publish_ack) => Ok(publish_ack),
1908                }
1909            },
1910        )
1911    }
1912}
1913impl IntoFuture for PublishAckFuture {
1914    type Output = Result<PublishAck, PublishError>;
1915
1916    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1917
1918    fn into_future(self) -> Self::IntoFuture {
1919        Box::pin(std::future::IntoFuture::into_future(
1920            self.next_with_timeout(),
1921        ))
1922    }
1923}
1924
1925#[derive(Deserialize, Debug)]
1926struct StreamPage {
1927    total: usize,
1928    streams: Option<Vec<String>>,
1929}
1930
1931#[derive(Deserialize, Debug)]
1932struct StreamInfoPage {
1933    total: usize,
1934    streams: Option<Vec<super::stream::Info>>,
1935}
1936
1937type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1938
1939pub struct StreamNames {
1940    context: Context,
1941    offset: usize,
1942    page_request: Option<PageRequest>,
1943    subject: Option<String>,
1944    streams: Vec<String>,
1945    done: bool,
1946}
1947
1948impl futures_util::Stream for StreamNames {
1949    type Item = Result<String, StreamsError>;
1950
1951    fn poll_next(
1952        mut self: Pin<&mut Self>,
1953        cx: &mut std::task::Context<'_>,
1954    ) -> std::task::Poll<Option<Self::Item>> {
1955        match self.page_request.as_mut() {
1956            Some(page) => match page.try_poll_unpin(cx) {
1957                std::task::Poll::Ready(page) => {
1958                    self.page_request = None;
1959                    let page = page
1960                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1961                    if let Some(streams) = page.streams {
1962                        self.offset += streams.len();
1963                        self.streams = streams;
1964                        if self.offset >= page.total {
1965                            self.done = true;
1966                        }
1967                        match self.streams.pop() {
1968                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1969                            None => Poll::Ready(None),
1970                        }
1971                    } else {
1972                        Poll::Ready(None)
1973                    }
1974                }
1975                std::task::Poll::Pending => std::task::Poll::Pending,
1976            },
1977            None => {
1978                if let Some(stream) = self.streams.pop() {
1979                    Poll::Ready(Some(Ok(stream)))
1980                } else {
1981                    if self.done {
1982                        return Poll::Ready(None);
1983                    }
1984                    let context = self.context.clone();
1985                    let offset = self.offset;
1986                    let subject = self.subject.clone();
1987                    self.page_request = Some(Box::pin(async move {
1988                        match context
1989                            .request(
1990                                "STREAM.NAMES",
1991                                &json!({
1992                                    "offset": offset,
1993                                    "subject": subject
1994                                }),
1995                            )
1996                            .await?
1997                        {
1998                            Response::Err { error } => {
1999                                Err(RequestError::with_source(RequestErrorKind::Other, error))
2000                            }
2001                            Response::Ok(page) => Ok(page),
2002                        }
2003                    }));
2004                    self.poll_next(cx)
2005                }
2006            }
2007        }
2008    }
2009}
2010
2011type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
2012
2013pub type StreamsErrorKind = RequestErrorKind;
2014pub type StreamsError = RequestError;
2015
2016pub struct Streams {
2017    context: Context,
2018    offset: usize,
2019    page_request: Option<PageInfoRequest>,
2020    streams: Vec<super::stream::Info>,
2021    done: bool,
2022}
2023
2024impl futures_util::Stream for Streams {
2025    type Item = Result<super::stream::Info, StreamsError>;
2026
2027    fn poll_next(
2028        mut self: Pin<&mut Self>,
2029        cx: &mut std::task::Context<'_>,
2030    ) -> std::task::Poll<Option<Self::Item>> {
2031        match self.page_request.as_mut() {
2032            Some(page) => match page.try_poll_unpin(cx) {
2033                std::task::Poll::Ready(page) => {
2034                    self.page_request = None;
2035                    let page = page
2036                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
2037                    if let Some(streams) = page.streams {
2038                        self.offset += streams.len();
2039                        self.streams = streams;
2040                        if self.offset >= page.total {
2041                            self.done = true;
2042                        }
2043                        match self.streams.pop() {
2044                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2045                            None => Poll::Ready(None),
2046                        }
2047                    } else {
2048                        Poll::Ready(None)
2049                    }
2050                }
2051                std::task::Poll::Pending => std::task::Poll::Pending,
2052            },
2053            None => {
2054                if let Some(stream) = self.streams.pop() {
2055                    Poll::Ready(Some(Ok(stream)))
2056                } else {
2057                    if self.done {
2058                        return Poll::Ready(None);
2059                    }
2060                    let context = self.context.clone();
2061                    let offset = self.offset;
2062                    self.page_request = Some(Box::pin(async move {
2063                        match context
2064                            .request(
2065                                "STREAM.LIST",
2066                                &json!({
2067                                    "offset": offset,
2068                                }),
2069                            )
2070                            .await?
2071                        {
2072                            Response::Err { error } => {
2073                                Err(RequestError::with_source(RequestErrorKind::Other, error))
2074                            }
2075                            Response::Ok(page) => Ok(page),
2076                        }
2077                    }));
2078                    self.poll_next(cx)
2079                }
2080            }
2081        }
2082    }
2083}
2084
2085/// Alias to avoid breaking changes.
2086/// Please use `jetstream::message::PublishMessage` instead.
2087#[deprecated(
2088    note = "use jetstream::message::PublishMessage instead",
2089    since = "0.44.0"
2090)]
2091pub type Publish = super::message::PublishMessage;
2092
2093#[derive(Clone, Copy, Debug, PartialEq)]
2094pub enum RequestErrorKind {
2095    NoResponders,
2096    TimedOut,
2097    InvalidSubject,
2098    Other,
2099}
2100
2101impl Display for RequestErrorKind {
2102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2103        match self {
2104            Self::TimedOut => write!(f, "timed out"),
2105            Self::Other => write!(f, "request failed"),
2106            Self::InvalidSubject => write!(f, "invalid subject"),
2107            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
2108        }
2109    }
2110}
2111
2112pub type RequestError = Error<RequestErrorKind>;
2113
2114impl From<crate::RequestError> for RequestError {
2115    fn from(error: crate::RequestError) -> Self {
2116        match error.kind() {
2117            crate::RequestErrorKind::TimedOut => {
2118                RequestError::with_source(RequestErrorKind::TimedOut, error)
2119            }
2120            crate::RequestErrorKind::NoResponders => {
2121                RequestError::new(RequestErrorKind::NoResponders)
2122            }
2123            crate::RequestErrorKind::InvalidSubject => {
2124                RequestError::with_source(RequestErrorKind::InvalidSubject, error)
2125            }
2126            crate::RequestErrorKind::MaxPayloadExceeded | crate::RequestErrorKind::Other => {
2127                RequestError::with_source(RequestErrorKind::Other, error)
2128            }
2129        }
2130    }
2131}
2132
2133impl From<super::errors::Error> for RequestError {
2134    fn from(err: super::errors::Error) -> Self {
2135        RequestError::with_source(RequestErrorKind::Other, err)
2136    }
2137}
2138
2139pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
2140
2141#[derive(Clone, Debug, PartialEq)]
2142pub enum ConsumerInfoErrorKind {
2143    InvalidName,
2144    Offline,
2145    NotFound,
2146    StreamNotFound,
2147    Request,
2148    JetStream(super::errors::Error),
2149    TimedOut,
2150    NoResponders,
2151}
2152
2153impl Display for ConsumerInfoErrorKind {
2154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2155        match self {
2156            Self::InvalidName => write!(f, "invalid consumer name"),
2157            Self::Offline => write!(f, "consumer is offline"),
2158            Self::NotFound => write!(f, "consumer not found"),
2159            Self::StreamNotFound => write!(f, "stream not found"),
2160            Self::Request => write!(f, "request error"),
2161            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2162            Self::TimedOut => write!(f, "timed out"),
2163            Self::NoResponders => write!(f, "no responders"),
2164        }
2165    }
2166}
2167
2168impl From<super::errors::Error> for ConsumerInfoError {
2169    fn from(error: super::errors::Error) -> Self {
2170        match error.error_code() {
2171            ErrorCode::CONSUMER_NOT_FOUND => {
2172                ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2173            }
2174            ErrorCode::STREAM_NOT_FOUND => {
2175                ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2176            }
2177            ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2178            _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2179        }
2180    }
2181}
2182
2183impl From<RequestError> for ConsumerInfoError {
2184    fn from(error: RequestError) -> Self {
2185        match error.kind() {
2186            RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2187            RequestErrorKind::InvalidSubject => {
2188                ConsumerInfoError::with_source(ConsumerInfoErrorKind::InvalidName, error)
2189            }
2190            RequestErrorKind::Other => {
2191                ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2192            }
2193            RequestErrorKind::NoResponders => {
2194                ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2195            }
2196        }
2197    }
2198}
2199
2200#[derive(Clone, Debug, PartialEq)]
2201pub enum CreateStreamErrorKind {
2202    EmptyStreamName,
2203    InvalidStreamName,
2204    DomainAndExternalSet,
2205    JetStreamUnavailable,
2206    JetStream(super::errors::Error),
2207    TimedOut,
2208    Response,
2209    NotFound,
2210    ResponseParse,
2211}
2212
2213impl Display for CreateStreamErrorKind {
2214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2215        match self {
2216            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2217            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2218            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2219            Self::NotFound => write!(f, "stream not found"),
2220            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2221            Self::TimedOut => write!(f, "jetstream request timed out"),
2222            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2223            Self::ResponseParse => write!(f, "failed to parse server response"),
2224            Self::Response => write!(f, "response error"),
2225        }
2226    }
2227}
2228
2229pub type CreateStreamError = Error<CreateStreamErrorKind>;
2230
2231impl From<super::errors::Error> for CreateStreamError {
2232    fn from(error: super::errors::Error) -> Self {
2233        match error.kind() {
2234            super::errors::ErrorCode::STREAM_NOT_FOUND => {
2235                CreateStreamError::new(CreateStreamErrorKind::NotFound)
2236            }
2237            _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2238        }
2239    }
2240}
2241
2242impl From<RequestError> for CreateStreamError {
2243    fn from(error: RequestError) -> Self {
2244        match error.kind() {
2245            RequestErrorKind::NoResponders => {
2246                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2247            }
2248            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2249            RequestErrorKind::InvalidSubject => {
2250                CreateStreamError::with_source(CreateStreamErrorKind::InvalidStreamName, error)
2251            }
2252            RequestErrorKind::Other => {
2253                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2254            }
2255        }
2256    }
2257}
2258
2259#[derive(Clone, Debug, PartialEq)]
2260pub enum GetStreamErrorKind {
2261    EmptyName,
2262    Request,
2263    InvalidStreamName,
2264    JetStream(super::errors::Error),
2265}
2266
2267impl Display for GetStreamErrorKind {
2268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2269        match self {
2270            Self::EmptyName => write!(f, "empty name cannot be empty"),
2271            Self::Request => write!(f, "request error"),
2272            Self::InvalidStreamName => write!(f, "invalid stream name"),
2273            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2274        }
2275    }
2276}
2277
2278#[derive(Clone, Debug, PartialEq)]
2279pub enum GetStreamByNameErrorKind {
2280    Request,
2281    NotFound,
2282    InvalidSubject,
2283    JetStream(super::errors::Error),
2284}
2285
2286impl Display for GetStreamByNameErrorKind {
2287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2288        match self {
2289            Self::Request => write!(f, "request error"),
2290            Self::NotFound => write!(f, "stream not found"),
2291            Self::InvalidSubject => write!(f, "invalid subject"),
2292            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2293        }
2294    }
2295}
2296
2297pub type GetStreamError = Error<GetStreamErrorKind>;
2298pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2299
2300pub type UpdateStreamError = CreateStreamError;
2301pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2302pub type DeleteStreamError = GetStreamError;
2303pub type DeleteStreamErrorKind = GetStreamErrorKind;
2304
2305#[cfg(feature = "kv")]
2306#[derive(Clone, Copy, Debug, PartialEq)]
2307pub enum KeyValueErrorKind {
2308    InvalidStoreName,
2309    GetBucket,
2310    JetStream,
2311}
2312
2313#[cfg(feature = "kv")]
2314impl Display for KeyValueErrorKind {
2315    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2316        match self {
2317            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2318            Self::GetBucket => write!(f, "failed to get the bucket"),
2319            Self::JetStream => write!(f, "JetStream error"),
2320        }
2321    }
2322}
2323
2324#[cfg(feature = "kv")]
2325pub type KeyValueError = Error<KeyValueErrorKind>;
2326
2327#[cfg(any(feature = "kv", feature = "object-store"))]
2328#[derive(Clone, Copy, Debug, PartialEq)]
2329pub enum CreateKeyValueErrorKind {
2330    InvalidStoreName,
2331    TooLongHistory,
2332    JetStream,
2333    BucketCreate,
2334    TimedOut,
2335    LimitMarkersNotSupported,
2336}
2337
2338#[cfg(any(feature = "kv", feature = "object-store"))]
2339impl Display for CreateKeyValueErrorKind {
2340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2341        match self {
2342            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2343            Self::TooLongHistory => write!(f, "too long history"),
2344            Self::JetStream => write!(f, "JetStream error"),
2345            Self::BucketCreate => write!(f, "bucket creation failed"),
2346            Self::TimedOut => write!(f, "timed out"),
2347            Self::LimitMarkersNotSupported => {
2348                write!(f, "limit markers not supported")
2349            }
2350        }
2351    }
2352}
2353
2354#[cfg(feature = "kv")]
2355#[derive(Clone, Copy, Debug, PartialEq)]
2356pub enum UpdateKeyValueErrorKind {
2357    InvalidStoreName,
2358    TooLongHistory,
2359    JetStream,
2360    BucketUpdate,
2361    TimedOut,
2362    LimitMarkersNotSupported,
2363    NotFound,
2364}
2365
2366#[cfg(feature = "kv")]
2367impl Display for UpdateKeyValueErrorKind {
2368    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2369        match self {
2370            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2371            Self::TooLongHistory => write!(f, "too long history"),
2372            Self::JetStream => write!(f, "JetStream error"),
2373            Self::BucketUpdate => write!(f, "bucket creation failed"),
2374            Self::TimedOut => write!(f, "timed out"),
2375            Self::LimitMarkersNotSupported => {
2376                write!(f, "limit markers not supported")
2377            }
2378            Self::NotFound => write!(f, "bucket does not exist"),
2379        }
2380    }
2381}
2382#[cfg(feature = "kv")]
2383pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2384#[cfg(feature = "kv")]
2385pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2386
2387#[cfg(feature = "object-store")]
2388pub type CreateObjectStoreError = Error<CreateKeyValueErrorKind>;
2389#[cfg(feature = "object-store")]
2390pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2391
2392#[cfg(feature = "object-store")]
2393#[derive(Clone, Copy, Debug, PartialEq)]
2394pub enum ObjectStoreErrorKind {
2395    InvalidBucketName,
2396    GetStore,
2397}
2398
2399#[cfg(feature = "object-store")]
2400impl Display for ObjectStoreErrorKind {
2401    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2402        match self {
2403            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2404            Self::GetStore => write!(f, "failed to get Object Store"),
2405        }
2406    }
2407}
2408
2409#[cfg(feature = "object-store")]
2410pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2411
2412#[cfg(feature = "object-store")]
2413pub type DeleteObjectStore = ObjectStoreError;
2414#[cfg(feature = "object-store")]
2415pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2416
2417#[derive(Clone, Debug, PartialEq)]
2418pub enum AccountErrorKind {
2419    TimedOut,
2420    JetStream(super::errors::Error),
2421    JetStreamUnavailable,
2422    Other,
2423}
2424
2425impl Display for AccountErrorKind {
2426    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2427        match self {
2428            Self::TimedOut => write!(f, "timed out"),
2429            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2430            Self::Other => write!(f, "error"),
2431            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2432        }
2433    }
2434}
2435
2436pub type AccountError = Error<AccountErrorKind>;
2437
2438impl From<RequestError> for AccountError {
2439    fn from(err: RequestError) -> Self {
2440        match err.kind {
2441            RequestErrorKind::NoResponders => {
2442                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2443            }
2444            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2445            RequestErrorKind::Other | RequestErrorKind::InvalidSubject => {
2446                AccountError::with_source(AccountErrorKind::Other, err)
2447            }
2448        }
2449    }
2450}
2451
2452#[derive(Clone, Debug, Serialize)]
2453enum ConsumerAction {
2454    #[serde(rename = "")]
2455    CreateOrUpdate,
2456    #[serde(rename = "create")]
2457    #[cfg(feature = "server_2_10")]
2458    Create,
2459    #[serde(rename = "update")]
2460    #[cfg(feature = "server_2_10")]
2461    Update,
2462}
2463
2464#[cfg(feature = "kv")]
2465// Maps a Stream config to KV Store.
2466fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2467    let mut store = Store {
2468        prefix: format!("$KV.{}.", bucket.as_str()),
2469        name: bucket,
2470        stream: stream.clone(),
2471        stream_name: stream.info.config.name.clone(),
2472        put_prefix: None,
2473        use_jetstream_prefix: prefix != "$JS.API",
2474    };
2475    if let Some(ref mirror) = stream.info.config.mirror {
2476        let bucket = mirror.name.trim_start_matches("KV_");
2477        if let Some(ref external) = mirror.external {
2478            if !external.api_prefix.is_empty() {
2479                store.use_jetstream_prefix = false;
2480                store.prefix = format!("$KV.{bucket}.");
2481                store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2482            } else {
2483                store.put_prefix = Some(format!("$KV.{bucket}."));
2484            }
2485        }
2486    };
2487    store
2488}
2489
2490#[cfg(feature = "kv")]
2491enum KvToStreamConfigError {
2492    TooLongHistory,
2493    #[allow(dead_code)]
2494    LimitMarkersNotSupported,
2495}
2496
2497#[cfg(feature = "kv")]
2498impl From<KvToStreamConfigError> for CreateKeyValueError {
2499    fn from(err: KvToStreamConfigError) -> Self {
2500        match err {
2501            KvToStreamConfigError::TooLongHistory => {
2502                CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2503            }
2504            KvToStreamConfigError::LimitMarkersNotSupported => {
2505                CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2506            }
2507        }
2508    }
2509}
2510
2511#[cfg(feature = "kv")]
2512impl From<KvToStreamConfigError> for UpdateKeyValueError {
2513    fn from(err: KvToStreamConfigError) -> Self {
2514        match err {
2515            KvToStreamConfigError::TooLongHistory => {
2516                UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2517            }
2518            KvToStreamConfigError::LimitMarkersNotSupported => {
2519                UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2520            }
2521        }
2522    }
2523}
2524
2525#[cfg(feature = "kv")]
2526// Maps the KV config to Stream config.
2527fn kv_to_stream_config(
2528    config: crate::jetstream::kv::Config,
2529    _account: Account,
2530) -> Result<super::stream::Config, KvToStreamConfigError> {
2531    let history = if config.history > 0 {
2532        if config.history > MAX_HISTORY {
2533            return Err(KvToStreamConfigError::TooLongHistory);
2534        }
2535        config.history
2536    } else {
2537        1
2538    };
2539
2540    let num_replicas = if config.num_replicas == 0 {
2541        1
2542    } else {
2543        config.num_replicas
2544    };
2545
2546    #[cfg(feature = "server_2_11")]
2547    let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2548
2549    #[cfg(feature = "server_2_11")]
2550    if let Some(duration) = config.limit_markers {
2551        if _account.requests.level < 1 {
2552            return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2553        }
2554        allow_message_ttl = true;
2555        subject_delete_marker_ttl = Some(duration);
2556    }
2557
2558    let mut mirror = config.mirror.clone();
2559    let mut sources = config.sources.clone();
2560    let mut mirror_direct = config.mirror_direct;
2561
2562    let mut subjects = Vec::new();
2563    if let Some(ref mut mirror) = mirror {
2564        if !mirror.name.starts_with("KV_") {
2565            mirror.name = format!("KV_{}", mirror.name);
2566        }
2567        mirror_direct = true;
2568    } else if let Some(ref mut sources) = sources {
2569        for source in sources {
2570            if !source.name.starts_with("KV_") {
2571                source.name = format!("KV_{}", source.name);
2572            }
2573        }
2574    } else {
2575        subjects = vec![format!("$KV.{}.>", config.bucket)];
2576    }
2577
2578    Ok(Config {
2579        name: format!("KV_{}", config.bucket),
2580        description: Some(config.description),
2581        subjects,
2582        max_messages_per_subject: history,
2583        max_bytes: config.max_bytes,
2584        max_age: config.max_age,
2585        max_message_size: config.max_value_size,
2586        storage: config.storage,
2587        republish: config.republish,
2588        allow_rollup: true,
2589        deny_delete: true,
2590        deny_purge: false,
2591        allow_direct: true,
2592        sources,
2593        mirror,
2594        num_replicas,
2595        discard: DiscardPolicy::New,
2596        mirror_direct,
2597        #[cfg(feature = "server_2_10")]
2598        compression: if config.compression {
2599            Some(Compression::S2)
2600        } else {
2601            None
2602        },
2603        placement: config.placement,
2604        #[cfg(feature = "server_2_11")]
2605        allow_message_ttl,
2606        #[cfg(feature = "server_2_11")]
2607        subject_delete_marker_ttl,
2608        ..Default::default()
2609    })
2610}