async_nats/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23    header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures_util::future::BoxFuture;
27use futures_util::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Debug;
33use std::fmt::Display;
34use std::future::IntoFuture;
35use std::pin::Pin;
36use std::str::from_utf8;
37use std::sync::Arc;
38use std::task::Poll;
39use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
40use tokio::time::Duration;
41use tokio_stream::wrappers::ReceiverStream;
42use tracing::debug;
43
44use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
45use super::errors::ErrorCode;
46use super::kv::{Store, MAX_HISTORY};
47use super::object_store::{is_valid_bucket_name, ObjectStore};
48use super::stream::{
49    self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
50    Stream,
51};
52#[cfg(feature = "server_2_10")]
53use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
54use super::{is_valid_name, kv};
55
56/// A context which can perform jetstream scoped requests.
57#[derive(Debug, Clone)]
58pub struct Context {
59    pub(crate) client: Client,
60    pub(crate) prefix: String,
61    pub(crate) timeout: Duration,
62    pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
63    pub(crate) acker_task: Arc<tokio::task::JoinHandle<()>>,
64    pub(crate) ack_sender:
65        tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
66    pub(crate) backpressure_on_inflight: bool,
67    pub(crate) semaphore_capacity: usize,
68}
69
70fn spawn_acker(
71    rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
72    ack_timeout: Duration,
73    concurrency: Option<usize>,
74) -> tokio::task::JoinHandle<()> {
75    tokio::spawn(async move {
76        rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
77            tokio::time::timeout(ack_timeout, subscription).await.ok();
78            drop(permit);
79        })
80        .await;
81    })
82}
83
84impl Drop for Context {
85    fn drop(&mut self) {
86        self.acker_task.abort();
87    }
88}
89
90use std::marker::PhantomData;
91
92#[derive(Debug, Default)]
93pub struct Yes;
94#[derive(Debug, Default)]
95pub struct No;
96
97pub trait ToAssign: Debug {}
98
99impl ToAssign for Yes {}
100impl ToAssign for No {}
101
102/// A builder for [Context]. Beyond what can be set by standard constructor, it allows tweaking
103/// pending publish ack backpressure settings.
104/// # Examples
105/// ```no_run
106/// # use async_nats::jetstream::context::ContextBuilder;
107/// # use async_nats::Client;
108/// # use std::time::Duration;
109/// # #[tokio::main]
110/// # async fn main() -> Result<(), async_nats::Error> {
111/// let client = async_nats::connect("demo.nats.io").await?;
112/// let context = ContextBuilder::new()
113///     .timeout(Duration::from_secs(5))
114///     .api_prefix("MY.JS.API")
115///     .max_ack_inflight(1000)
116///     .build(client);
117/// # Ok(())
118/// # }
119/// ```
120pub struct ContextBuilder<PREFIX: ToAssign> {
121    prefix: String,
122    timeout: Duration,
123    semaphore_capacity: usize,
124    ack_timeout: Duration,
125    backpressure_on_inflight: bool,
126    concurrency_limit: Option<usize>,
127    _phantom: PhantomData<PREFIX>,
128}
129
130impl Default for ContextBuilder<Yes> {
131    fn default() -> Self {
132        ContextBuilder {
133            prefix: "$JS.API".to_string(),
134            timeout: Duration::from_secs(5),
135            semaphore_capacity: 5_000,
136            ack_timeout: Duration::from_secs(30),
137            backpressure_on_inflight: true,
138            concurrency_limit: None,
139            _phantom: PhantomData {},
140        }
141    }
142}
143
144impl ContextBuilder<Yes> {
145    /// Create a new [ContextBuilder] with default settings.
146    pub fn new() -> ContextBuilder<Yes> {
147        ContextBuilder::default()
148    }
149}
150
151impl ContextBuilder<Yes> {
152    /// Set the prefix for the JetStream API.
153    pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
154        ContextBuilder {
155            prefix: prefix.into(),
156            timeout: self.timeout,
157            semaphore_capacity: self.semaphore_capacity,
158            ack_timeout: self.ack_timeout,
159            backpressure_on_inflight: self.backpressure_on_inflight,
160            concurrency_limit: self.concurrency_limit,
161            _phantom: PhantomData,
162        }
163    }
164
165    /// Set the domain for the JetStream API. Domain is the middle part of standard API prefix:
166    /// $JS.{domain}.API.
167    pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
168        ContextBuilder {
169            prefix: format!("$JS.{}.API", domain.into()),
170            timeout: self.timeout,
171            semaphore_capacity: self.semaphore_capacity,
172            ack_timeout: self.ack_timeout,
173            backpressure_on_inflight: self.backpressure_on_inflight,
174            concurrency_limit: self.concurrency_limit,
175            _phantom: PhantomData,
176        }
177    }
178}
179
180impl<PREFIX> ContextBuilder<PREFIX>
181where
182    PREFIX: ToAssign,
183{
184    /// Set the timeout for all JetStream API requests.
185    pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
186    where
187        Yes: ToAssign,
188    {
189        ContextBuilder {
190            prefix: self.prefix,
191            timeout,
192            semaphore_capacity: self.semaphore_capacity,
193            ack_timeout: self.ack_timeout,
194            backpressure_on_inflight: self.backpressure_on_inflight,
195            concurrency_limit: self.concurrency_limit,
196            _phantom: PhantomData,
197        }
198    }
199
200    /// Sets the maximum time client waits for acks from the server when default backpressure is
201    /// used.
202    pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
203    where
204        Yes: ToAssign,
205    {
206        ContextBuilder {
207            prefix: self.prefix,
208            timeout: self.timeout,
209            semaphore_capacity: self.semaphore_capacity,
210            ack_timeout,
211            backpressure_on_inflight: self.backpressure_on_inflight,
212            concurrency_limit: self.concurrency_limit,
213            _phantom: PhantomData,
214        }
215    }
216
217    /// Sets the maximum number of pending acks that can be in flight at any given time.
218    /// If limit is reached, `publish` throws an error by default, or waits if backpressure is enabled.
219    pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
220    where
221        Yes: ToAssign,
222    {
223        ContextBuilder {
224            prefix: self.prefix,
225            timeout: self.timeout,
226            semaphore_capacity: capacity,
227            ack_timeout: self.ack_timeout,
228            backpressure_on_inflight: self.backpressure_on_inflight,
229            concurrency_limit: self.concurrency_limit,
230            _phantom: PhantomData,
231        }
232    }
233
234    /// Enable or disable backpressure when max inflight acks is reached.
235    /// When enabled, publish will wait for permits to become available instead of returning an
236    /// error.
237    /// Default is false (errors on max inflight).
238    pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
239    where
240        Yes: ToAssign,
241    {
242        ContextBuilder {
243            prefix: self.prefix,
244            timeout: self.timeout,
245            semaphore_capacity: self.semaphore_capacity,
246            ack_timeout: self.ack_timeout,
247            backpressure_on_inflight: enabled,
248            concurrency_limit: self.concurrency_limit,
249            _phantom: PhantomData,
250        }
251    }
252
253    /// Sets the concurrency limit for the ack handler task. This might be useful
254    /// in scenarios where Tokio runtime is under heavy load.
255    pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
256    where
257        Yes: ToAssign,
258    {
259        ContextBuilder {
260            prefix: self.prefix,
261            timeout: self.timeout,
262            semaphore_capacity: self.semaphore_capacity,
263            ack_timeout: self.ack_timeout,
264            backpressure_on_inflight: self.backpressure_on_inflight,
265            concurrency_limit: limit,
266            _phantom: PhantomData,
267        }
268    }
269
270    /// Build the [Context] with the given settings.
271    pub fn build(self, client: Client) -> Context {
272        let acker_channel_capacity = self.semaphore_capacity;
273        let (tx, rx) = tokio::sync::mpsc::channel::<(
274            oneshot::Receiver<Message>,
275            OwnedSemaphorePermit,
276        )>(acker_channel_capacity);
277        let stream = ReceiverStream::new(rx);
278        let acker_task = Arc::new(spawn_acker(
279            stream,
280            self.ack_timeout,
281            self.concurrency_limit,
282        ));
283        Context {
284            client,
285            prefix: self.prefix,
286            timeout: self.timeout,
287            max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
288            acker_task,
289            ack_sender: tx,
290            backpressure_on_inflight: self.backpressure_on_inflight,
291            semaphore_capacity: self.semaphore_capacity,
292        }
293    }
294}
295
296impl Context {
297    pub(crate) fn new(client: Client) -> Context {
298        ContextBuilder::default().build(client)
299    }
300
301    /// Sets the timeout for all JetStream API requests.
302    pub fn set_timeout(&mut self, timeout: Duration) {
303        self.timeout = timeout
304    }
305
306    /// Waits until all pending `acks` are received from the server.
307    /// Be aware that this is probably not the way you want to await `acks`,
308    /// as it will wait for every `ack` that is pending, including those that might
309    /// be published after you call this method.
310    /// Useful in testing, or maybe batching.
311    pub async fn wait_for_acks(&self) {
312        self.max_ack_semaphore
313            .acquire_many(self.semaphore_capacity as u32)
314            .await
315            .ok();
316    }
317
318    /// Create a new [Context] with given API prefix.
319    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
320        ContextBuilder::new()
321            .api_prefix(prefix.to_string())
322            .build(client)
323    }
324
325    /// Create a new [Context] with given domain.
326    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
327        ContextBuilder::new().domain(domain.as_ref()).build(client)
328    }
329
330    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
331    /// acknowledgment from the server that the message has been successfully delivered.
332    ///
333    /// Acknowledgment future that can be polled is returned instead.
334    ///
335    /// If the stream does not exist, `no responders` error will be returned.
336    ///
337    /// # Examples
338    ///
339    /// Publish, and after each publish, await for acknowledgment.
340    ///
341    /// ```no_run
342    /// # #[tokio::main]
343    /// # async fn main() -> Result<(), async_nats::Error> {
344    /// let client = async_nats::connect("localhost:4222").await?;
345    /// let jetstream = async_nats::jetstream::new(client);
346    ///
347    /// let ack = jetstream.publish("events", "data".into()).await?;
348    /// ack.await?;
349    /// jetstream.publish("events", "data".into()).await?.await?;
350    /// # Ok(())
351    /// # }
352    /// ```
353    ///
354    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
355    /// ignored entirely.
356    ///
357    /// ```no_run
358    /// # #[tokio::main]
359    /// # async fn main() -> Result<(), async_nats::Error> {
360    /// let client = async_nats::connect("localhost:4222").await?;
361    /// let jetstream = async_nats::jetstream::new(client);
362    ///
363    /// let first_ack = jetstream.publish("events", "data".into()).await?;
364    /// let second_ack = jetstream.publish("events", "data".into()).await?;
365    /// first_ack.await?;
366    /// second_ack.await?;
367    /// # Ok(())
368    /// # }
369    /// ```
370    pub async fn publish<S: ToSubject>(
371        &self,
372        subject: S,
373        payload: Bytes,
374    ) -> Result<PublishAckFuture, PublishError> {
375        self.send_publish(subject, Publish::build().payload(payload))
376            .await
377    }
378
379    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
380    /// the server that the message has been successfully delivered.
381    ///
382    /// If the stream does not exist, `no responders` error will be returned.
383    ///
384    /// # Examples
385    ///
386    /// ```no_run
387    /// # #[tokio::main]
388    /// # async fn main() -> Result<(), async_nats::Error> {
389    /// let client = async_nats::connect("localhost:4222").await?;
390    /// let jetstream = async_nats::jetstream::new(client);
391    ///
392    /// let mut headers = async_nats::HeaderMap::new();
393    /// headers.append("X-key", "Value");
394    /// let ack = jetstream
395    ///     .publish_with_headers("events", headers, "data".into())
396    ///     .await?;
397    /// # Ok(())
398    /// # }
399    /// ```
400    pub async fn publish_with_headers<S: ToSubject>(
401        &self,
402        subject: S,
403        headers: crate::header::HeaderMap,
404        payload: Bytes,
405    ) -> Result<PublishAckFuture, PublishError> {
406        self.send_publish(subject, Publish::build().payload(payload).headers(headers))
407            .await
408    }
409
410    /// Publish a message built by [Publish] and returns an acknowledgment future.
411    ///
412    /// If the stream does not exist, `no responders` error will be returned.
413    ///
414    /// # Examples
415    ///
416    /// ```no_run
417    /// # use async_nats::jetstream::context::Publish;
418    /// # #[tokio::main]
419    /// # async fn main() -> Result<(), async_nats::Error> {
420    /// let client = async_nats::connect("localhost:4222").await?;
421    /// let jetstream = async_nats::jetstream::new(client);
422    ///
423    /// let ack = jetstream
424    ///     .send_publish(
425    ///         "events",
426    ///         Publish::build().payload("data".into()).message_id("uuid"),
427    ///     )
428    ///     .await?;
429    /// # Ok(())
430    /// # }
431    /// ```
432    pub async fn send_publish<S: ToSubject>(
433        &self,
434        subject: S,
435        publish: Publish,
436    ) -> Result<PublishAckFuture, PublishError> {
437        let permit = if self.backpressure_on_inflight {
438            // When backpressure is enabled, wait for a permit to become available
439            self.max_ack_semaphore
440                .clone()
441                .acquire_owned()
442                .await
443                .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
444        } else {
445            // When backpressure is disabled, error immediately if no permits available
446            self.max_ack_semaphore
447                .clone()
448                .try_acquire_owned()
449                .map_err(|err| match err {
450                    TryAcquireError::NoPermits => {
451                        PublishError::new(PublishErrorKind::MaxAckPending)
452                    }
453                    _ => PublishError::with_source(PublishErrorKind::Other, err),
454                })?
455        };
456        let subject = subject.to_subject();
457        let (sender, receiver) = oneshot::channel();
458
459        let respond = self.client.new_inbox().into();
460
461        let send_fut = self
462            .client
463            .sender
464            .send(Command::Request {
465                subject,
466                payload: publish.payload,
467                respond,
468                headers: publish.headers,
469                sender,
470            })
471            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
472
473        tokio::time::timeout(self.timeout, send_fut)
474            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
475            .await??;
476
477        Ok(PublishAckFuture {
478            timeout: self.timeout,
479            subscription: Some(receiver),
480            permit: Some(permit),
481            tx: self.ack_sender.clone(),
482        })
483    }
484
485    /// Query the server for account information
486    pub async fn query_account(&self) -> Result<Account, AccountError> {
487        let response: Response<Account> = self.request("INFO", b"").await?;
488
489        match response {
490            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
491            Response::Ok(account) => Ok(account),
492        }
493    }
494
495    /// Create a JetStream [Stream] with given config and return a handle to it.
496    /// That handle can be used to manage and use [Consumer].
497    ///
498    /// # Examples
499    ///
500    /// ```no_run
501    /// # #[tokio::main]
502    /// # async fn main() -> Result<(), async_nats::Error> {
503    /// use async_nats::jetstream::stream::Config;
504    /// use async_nats::jetstream::stream::DiscardPolicy;
505    /// let client = async_nats::connect("localhost:4222").await?;
506    /// let jetstream = async_nats::jetstream::new(client);
507    ///
508    /// let stream = jetstream
509    ///     .create_stream(Config {
510    ///         name: "events".to_string(),
511    ///         max_messages: 100_000,
512    ///         discard: DiscardPolicy::Old,
513    ///         ..Default::default()
514    ///     })
515    ///     .await?;
516    /// # Ok(())
517    /// # }
518    /// ```
519    pub async fn create_stream<S>(
520        &self,
521        stream_config: S,
522    ) -> Result<Stream<Info>, CreateStreamError>
523    where
524        Config: From<S>,
525    {
526        let mut config: Config = stream_config.into();
527        if config.name.is_empty() {
528            return Err(CreateStreamError::new(
529                CreateStreamErrorKind::EmptyStreamName,
530            ));
531        }
532        if !is_valid_name(config.name.as_str()) {
533            return Err(CreateStreamError::new(
534                CreateStreamErrorKind::InvalidStreamName,
535            ));
536        }
537        if let Some(ref mut mirror) = config.mirror {
538            if let Some(ref mut domain) = mirror.domain {
539                if mirror.external.is_some() {
540                    return Err(CreateStreamError::new(
541                        CreateStreamErrorKind::DomainAndExternalSet,
542                    ));
543                }
544                mirror.external = Some(External {
545                    api_prefix: format!("$JS.{domain}.API"),
546                    delivery_prefix: None,
547                })
548            }
549        }
550
551        if let Some(ref mut sources) = config.sources {
552            for source in sources {
553                if let Some(ref mut domain) = source.domain {
554                    if source.external.is_some() {
555                        return Err(CreateStreamError::new(
556                            CreateStreamErrorKind::DomainAndExternalSet,
557                        ));
558                    }
559                    source.external = Some(External {
560                        api_prefix: format!("$JS.{domain}.API"),
561                        delivery_prefix: None,
562                    })
563                }
564            }
565        }
566        let subject = format!("STREAM.CREATE.{}", config.name);
567        let response: Response<Info> = self.request(subject, &config).await?;
568
569        match response {
570            Response::Err { error } => Err(error.into()),
571            Response::Ok(info) => Ok(Stream {
572                context: self.clone(),
573                info,
574                name: config.name,
575            }),
576        }
577    }
578
579    /// Checks for [Stream] existence on the server and returns handle to it.
580    /// That handle can be used to manage and use [Consumer].
581    /// This variant does not fetch [Stream] info from the server.
582    /// It means it does not check if the stream actually exists.
583    /// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
584    /// If you however run single operations on many streams, this method is more efficient.
585    ///
586    /// # Examples
587    ///
588    /// ```no_run
589    /// # #[tokio::main]
590    /// # async fn main() -> Result<(), async_nats::Error> {
591    /// let client = async_nats::connect("localhost:4222").await?;
592    /// let jetstream = async_nats::jetstream::new(client);
593    ///
594    /// let stream = jetstream.get_stream_no_info("events").await?;
595    /// # Ok(())
596    /// # }
597    /// ```
598    pub async fn get_stream_no_info<T: AsRef<str>>(
599        &self,
600        stream: T,
601    ) -> Result<Stream<()>, GetStreamError> {
602        let stream = stream.as_ref();
603        if stream.is_empty() {
604            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
605        }
606
607        if !is_valid_name(stream) {
608            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
609        }
610
611        Ok(Stream {
612            context: self.clone(),
613            info: (),
614            name: stream.to_string(),
615        })
616    }
617
618    /// Checks for [Stream] existence on the server and returns handle to it.
619    /// That handle can be used to manage and use [Consumer].
620    ///
621    /// # Examples
622    ///
623    /// ```no_run
624    /// # #[tokio::main]
625    /// # async fn main() -> Result<(), async_nats::Error> {
626    /// let client = async_nats::connect("localhost:4222").await?;
627    /// let jetstream = async_nats::jetstream::new(client);
628    ///
629    /// let stream = jetstream.get_stream("events").await?;
630    /// # Ok(())
631    /// # }
632    /// ```
633    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
634        let stream = stream.as_ref();
635        if stream.is_empty() {
636            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
637        }
638
639        if !is_valid_name(stream) {
640            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
641        }
642
643        let subject = format!("STREAM.INFO.{stream}");
644        let request: Response<Info> = self
645            .request(subject, &())
646            .await
647            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
648        match request {
649            Response::Err { error } => {
650                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
651            }
652            Response::Ok(info) => Ok(Stream {
653                context: self.clone(),
654                info,
655                name: stream.to_string(),
656            }),
657        }
658    }
659
660    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
661    ///
662    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
663    ///
664    /// # Examples
665    ///
666    /// ```no_run
667    /// # #[tokio::main]
668    /// # async fn main() -> Result<(), async_nats::Error> {
669    /// use async_nats::jetstream::stream::Config;
670    /// let client = async_nats::connect("localhost:4222").await?;
671    /// let jetstream = async_nats::jetstream::new(client);
672    ///
673    /// let stream = jetstream
674    ///     .get_or_create_stream(Config {
675    ///         name: "events".to_string(),
676    ///         max_messages: 10_000,
677    ///         ..Default::default()
678    ///     })
679    ///     .await?;
680    /// # Ok(())
681    /// # }
682    /// ```
683    pub async fn get_or_create_stream<S>(
684        &self,
685        stream_config: S,
686    ) -> Result<Stream, CreateStreamError>
687    where
688        S: Into<Config>,
689    {
690        let config: Config = stream_config.into();
691
692        if config.name.is_empty() {
693            return Err(CreateStreamError::new(
694                CreateStreamErrorKind::EmptyStreamName,
695            ));
696        }
697
698        if !is_valid_name(config.name.as_str()) {
699            return Err(CreateStreamError::new(
700                CreateStreamErrorKind::InvalidStreamName,
701            ));
702        }
703        let subject = format!("STREAM.INFO.{}", config.name);
704
705        let request: Response<Info> = self.request(subject, &()).await?;
706        match request {
707            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
708            Response::Err { error } => Err(error.into()),
709            Response::Ok(info) => Ok(Stream {
710                context: self.clone(),
711                info,
712                name: config.name,
713            }),
714        }
715    }
716
717    /// Deletes a [Stream] with a given name.
718    ///
719    /// # Examples
720    ///
721    /// ```no_run
722    /// # #[tokio::main]
723    /// # async fn main() -> Result<(), async_nats::Error> {
724    /// use async_nats::jetstream::stream::Config;
725    /// let client = async_nats::connect("localhost:4222").await?;
726    /// let jetstream = async_nats::jetstream::new(client);
727    ///
728    /// let stream = jetstream.delete_stream("events").await?;
729    /// # Ok(())
730    /// # }
731    /// ```
732    pub async fn delete_stream<T: AsRef<str>>(
733        &self,
734        stream: T,
735    ) -> Result<DeleteStatus, DeleteStreamError> {
736        let stream = stream.as_ref();
737        if stream.is_empty() {
738            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
739        }
740
741        if !is_valid_name(stream) {
742            return Err(DeleteStreamError::new(
743                DeleteStreamErrorKind::InvalidStreamName,
744            ));
745        }
746
747        let subject = format!("STREAM.DELETE.{stream}");
748        match self
749            .request(subject, &json!({}))
750            .await
751            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
752        {
753            Response::Err { error } => Err(DeleteStreamError::new(
754                DeleteStreamErrorKind::JetStream(error),
755            )),
756            Response::Ok(delete_response) => Ok(delete_response),
757        }
758    }
759
760    /// Updates a [Stream] with a given config. If specific field cannot be updated,
761    /// error is returned.
762    ///
763    /// # Examples
764    ///
765    /// ```no_run
766    /// # #[tokio::main]
767    /// # async fn main() -> Result<(), async_nats::Error> {
768    /// use async_nats::jetstream::stream::Config;
769    /// use async_nats::jetstream::stream::DiscardPolicy;
770    /// let client = async_nats::connect("localhost:4222").await?;
771    /// let jetstream = async_nats::jetstream::new(client);
772    ///
773    /// let stream = jetstream
774    ///     .update_stream(Config {
775    ///         name: "events".to_string(),
776    ///         discard: DiscardPolicy::New,
777    ///         max_messages: 50_000,
778    ///         ..Default::default()
779    ///     })
780    ///     .await?;
781    /// # Ok(())
782    /// # }
783    /// ```
784    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
785    where
786        S: Borrow<Config>,
787    {
788        let config = config.borrow();
789
790        if config.name.is_empty() {
791            return Err(CreateStreamError::new(
792                CreateStreamErrorKind::EmptyStreamName,
793            ));
794        }
795
796        if !is_valid_name(config.name.as_str()) {
797            return Err(CreateStreamError::new(
798                CreateStreamErrorKind::InvalidStreamName,
799            ));
800        }
801
802        let subject = format!("STREAM.UPDATE.{}", config.name);
803        match self.request(subject, config).await? {
804            Response::Err { error } => Err(error.into()),
805            Response::Ok(info) => Ok(info),
806        }
807    }
808
809    /// Tries to update a [Stream] with a given config, and if it does not exist, create it.
810    ///
811    /// # Examples
812    ///
813    /// ```no_run
814    /// # #[tokio::main]
815    /// # async fn main() -> Result<(), async_nats::Error> {
816    /// use async_nats::jetstream::stream::Config;
817    /// use async_nats::jetstream::stream::DiscardPolicy;
818    /// let client = async_nats::connect("localhost:4222").await?;
819    /// let jetstream = async_nats::jetstream::new(client);
820    ///
821    /// let stream = jetstream
822    ///     .create_or_update_stream(Config {
823    ///         name: "events".to_string(),
824    ///         discard: DiscardPolicy::New,
825    ///         max_messages: 50_000,
826    ///         ..Default::default()
827    ///     })
828    ///     .await?;
829    /// # Ok(())
830    /// # }
831    /// ```
832    pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
833        match self.update_stream(config.clone()).await {
834            Ok(stream) => Ok(stream),
835            Err(err) => match err.kind() {
836                CreateStreamErrorKind::NotFound => {
837                    let stream = self
838                        .create_stream(config)
839                        .await
840                        .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
841                    Ok(stream.info)
842                }
843                _ => Err(err),
844            },
845        }
846    }
847
848    /// Looks up Stream that contains provided subject.
849    ///
850    /// # Examples
851    ///
852    /// ```no_run
853    /// # #[tokio::main]
854    /// # async fn main() -> Result<(), async_nats::Error> {
855    /// use futures_util::TryStreamExt;
856    /// let client = async_nats::connect("demo.nats.io:4222").await?;
857    /// let jetstream = async_nats::jetstream::new(client);
858    /// let stream_name = jetstream.stream_by_subject("foo.>");
859    /// # Ok(())
860    /// # }
861    /// ```
862    pub async fn stream_by_subject<T: Into<String>>(
863        &self,
864        subject: T,
865    ) -> Result<String, GetStreamByNameError> {
866        let subject = subject.into();
867        if !is_valid_subject(subject.as_str()) {
868            return Err(GetStreamByNameError::new(
869                GetStreamByNameErrorKind::InvalidSubject,
870            ));
871        }
872        let mut names = StreamNames {
873            context: self.clone(),
874            offset: 0,
875            page_request: None,
876            streams: Vec::new(),
877            subject: Some(subject),
878            done: false,
879        };
880        match names.next().await {
881            Some(name) => match name {
882                Ok(name) => Ok(name),
883                Err(err) => Err(GetStreamByNameError::with_source(
884                    GetStreamByNameErrorKind::Request,
885                    err,
886                )),
887            },
888            None => Err(GetStreamByNameError::new(
889                GetStreamByNameErrorKind::NotFound,
890            )),
891        }
892    }
893
894    /// Lists names of all streams for current context.
895    ///
896    /// # Examples
897    ///
898    /// ```no_run
899    /// # #[tokio::main]
900    /// # async fn main() -> Result<(), async_nats::Error> {
901    /// use futures_util::TryStreamExt;
902    /// let client = async_nats::connect("demo.nats.io:4222").await?;
903    /// let jetstream = async_nats::jetstream::new(client);
904    /// let mut names = jetstream.stream_names();
905    /// while let Some(stream) = names.try_next().await? {
906    ///     println!("stream: {}", stream);
907    /// }
908    /// # Ok(())
909    /// # }
910    /// ```
911    pub fn stream_names(&self) -> StreamNames {
912        StreamNames {
913            context: self.clone(),
914            offset: 0,
915            page_request: None,
916            streams: Vec::new(),
917            subject: None,
918            done: false,
919        }
920    }
921
922    /// Lists all streams info for current context.
923    ///
924    /// # Examples
925    ///
926    /// ```no_run
927    /// # #[tokio::main]
928    /// # async fn main() -> Result<(), async_nats::Error> {
929    /// use futures_util::TryStreamExt;
930    /// let client = async_nats::connect("demo.nats.io:4222").await?;
931    /// let jetstream = async_nats::jetstream::new(client);
932    /// let mut streams = jetstream.streams();
933    /// while let Some(stream) = streams.try_next().await? {
934    ///     println!("stream: {:?}", stream);
935    /// }
936    /// # Ok(())
937    /// # }
938    /// ```
939    pub fn streams(&self) -> Streams {
940        Streams {
941            context: self.clone(),
942            offset: 0,
943            page_request: None,
944            streams: Vec::new(),
945            done: false,
946        }
947    }
948    /// Returns an existing key-value bucket.
949    ///
950    /// # Examples
951    ///
952    /// ```no_run
953    /// # #[tokio::main]
954    /// # async fn main() -> Result<(), async_nats::Error> {
955    /// let client = async_nats::connect("demo.nats.io:4222").await?;
956    /// let jetstream = async_nats::jetstream::new(client);
957    /// let kv = jetstream.get_key_value("bucket").await?;
958    /// # Ok(())
959    /// # }
960    /// ```
961    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
962        let bucket: String = bucket.into();
963        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
964            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
965        }
966
967        let stream_name = format!("KV_{}", &bucket);
968        let stream = self
969            .get_stream(stream_name.clone())
970            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
971            .await?;
972
973        if stream.info.config.max_messages_per_subject < 1 {
974            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
975        }
976        let mut store = Store {
977            prefix: format!("$KV.{}.", &bucket),
978            name: bucket,
979            stream_name,
980            stream: stream.clone(),
981            put_prefix: None,
982            use_jetstream_prefix: self.prefix != "$JS.API",
983        };
984        if let Some(ref mirror) = stream.info.config.mirror {
985            let bucket = mirror.name.trim_start_matches("KV_");
986            if let Some(ref external) = mirror.external {
987                if !external.api_prefix.is_empty() {
988                    store.use_jetstream_prefix = false;
989                    store.prefix = format!("$KV.{bucket}.");
990                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
991                } else {
992                    store.put_prefix = Some(format!("$KV.{bucket}."));
993                }
994            }
995        };
996
997        Ok(store)
998    }
999
1000    /// Creates a new key-value bucket.
1001    ///
1002    /// # Examples
1003    ///
1004    /// ```no_run
1005    /// # #[tokio::main]
1006    /// # async fn main() -> Result<(), async_nats::Error> {
1007    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1008    /// let jetstream = async_nats::jetstream::new(client);
1009    /// let kv = jetstream
1010    ///     .create_key_value(async_nats::jetstream::kv::Config {
1011    ///         bucket: "kv".to_string(),
1012    ///         history: 10,
1013    ///         ..Default::default()
1014    ///     })
1015    ///     .await?;
1016    /// # Ok(())
1017    /// # }
1018    /// ```
1019    pub async fn create_key_value(
1020        &self,
1021        config: crate::jetstream::kv::Config,
1022    ) -> Result<Store, CreateKeyValueError> {
1023        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1024            return Err(CreateKeyValueError::new(
1025                CreateKeyValueErrorKind::InvalidStoreName,
1026            ));
1027        }
1028        let info = self.query_account().await.map_err(|err| {
1029            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1030        })?;
1031
1032        let bucket_name = config.bucket.clone();
1033        let stream_config = kv_to_stream_config(config, info)?;
1034
1035        let stream = self.create_stream(stream_config).await.map_err(|err| {
1036            if err.kind() == CreateStreamErrorKind::TimedOut {
1037                CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1038            } else {
1039                CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1040            }
1041        })?;
1042
1043        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1044    }
1045
1046    /// Updates an existing key-value bucket.
1047    ///
1048    /// # Examples
1049    ///
1050    /// ```no_run
1051    /// # #[tokio::main]
1052    /// # async fn main() -> Result<(), async_nats::Error> {
1053    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1054    /// let jetstream = async_nats::jetstream::new(client);
1055    /// let kv = jetstream
1056    ///     .update_key_value(async_nats::jetstream::kv::Config {
1057    ///         bucket: "kv".to_string(),
1058    ///         history: 60,
1059    ///         ..Default::default()
1060    ///     })
1061    ///     .await?;
1062    /// # Ok(())
1063    /// # }
1064    /// ```
1065    pub async fn update_key_value(
1066        &self,
1067        config: crate::jetstream::kv::Config,
1068    ) -> Result<Store, UpdateKeyValueError> {
1069        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1070            return Err(UpdateKeyValueError::new(
1071                UpdateKeyValueErrorKind::InvalidStoreName,
1072            ));
1073        }
1074
1075        let stream_name = format!("KV_{}", config.bucket);
1076        let bucket_name = config.bucket.clone();
1077
1078        let account = self.query_account().await.map_err(|err| {
1079            UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1080        })?;
1081        let stream = self
1082            .update_stream(kv_to_stream_config(config, account)?)
1083            .await
1084            .map_err(|err| match err.kind() {
1085                UpdateStreamErrorKind::NotFound => {
1086                    UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1087                }
1088                _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1089            })?;
1090
1091        let stream = Stream {
1092            context: self.clone(),
1093            info: stream,
1094            name: stream_name,
1095        };
1096
1097        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1098    }
1099
1100    /// Tries to update an existing Key-Value bucket. If it does not exist, creates it.
1101    ///
1102    /// # Examples
1103    ///
1104    /// ```no_run
1105    /// # #[tokio::main]
1106    /// # async fn main() -> Result<(), async_nats::Error> {
1107    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1108    /// let jetstream = async_nats::jetstream::new(client);
1109    /// let kv = jetstream
1110    ///     .update_key_value(async_nats::jetstream::kv::Config {
1111    ///         bucket: "kv".to_string(),
1112    ///         history: 60,
1113    ///         ..Default::default()
1114    ///     })
1115    ///     .await?;
1116    /// # Ok(())
1117    /// # }
1118    /// ```
1119    pub async fn create_or_update_key_value(
1120        &self,
1121        config: crate::jetstream::kv::Config,
1122    ) -> Result<Store, CreateKeyValueError> {
1123        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1124            return Err(CreateKeyValueError::new(
1125                CreateKeyValueErrorKind::InvalidStoreName,
1126            ));
1127        }
1128
1129        let bucket_name = config.bucket.clone();
1130        let stream_name = format!("KV_{}", config.bucket);
1131
1132        let account = self.query_account().await.map_err(|err| {
1133            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1134        })?;
1135        let stream = self
1136            .create_or_update_stream(kv_to_stream_config(config, account)?)
1137            .await
1138            .map_err(|err| {
1139                CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1140            })?;
1141
1142        let stream = Stream {
1143            context: self.clone(),
1144            info: stream,
1145            name: stream_name,
1146        };
1147
1148        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1149    }
1150
1151    /// Deletes given key-value bucket.
1152    ///
1153    /// # Examples
1154    ///
1155    /// ```no_run
1156    /// # #[tokio::main]
1157    /// # async fn main() -> Result<(), async_nats::Error> {
1158    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1159    /// let jetstream = async_nats::jetstream::new(client);
1160    /// let kv = jetstream
1161    ///     .create_key_value(async_nats::jetstream::kv::Config {
1162    ///         bucket: "kv".to_string(),
1163    ///         history: 10,
1164    ///         ..Default::default()
1165    ///     })
1166    ///     .await?;
1167    /// # Ok(())
1168    /// # }
1169    /// ```
1170    pub async fn delete_key_value<T: AsRef<str>>(
1171        &self,
1172        bucket: T,
1173    ) -> Result<DeleteStatus, KeyValueError> {
1174        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1175            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1176        }
1177
1178        let stream_name = format!("KV_{}", bucket.as_ref());
1179        self.delete_stream(stream_name)
1180            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1181            .await
1182    }
1183
1184    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
1185    //     let config = config.borrow();
1186    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1187    //         return Err(Box::new(std::io::Error::other(
1188    //             "invalid bucket name",
1189    //         )));
1190    //     }
1191
1192    //     let stream_name = format!("KV_{}", config.bucket);
1193    //     self.update_stream()
1194    //         .await
1195    //         .and_then(|info| Ok(()))
1196    // }
1197
1198    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1199    ///
1200    /// It has one less interaction with the server when binding to only one
1201    /// [crate::jetstream::consumer::Consumer].
1202    ///
1203    /// # Examples:
1204    ///
1205    /// ```no_run
1206    /// # #[tokio::main]
1207    /// # async fn main() -> Result<(), async_nats::Error> {
1208    /// use async_nats::jetstream::consumer::PullConsumer;
1209    ///
1210    /// let client = async_nats::connect("localhost:4222").await?;
1211    /// let jetstream = async_nats::jetstream::new(client);
1212    ///
1213    /// let consumer: PullConsumer = jetstream
1214    ///     .get_consumer_from_stream("consumer", "stream")
1215    ///     .await?;
1216    ///
1217    /// # Ok(())
1218    /// # }
1219    /// ```
1220    pub async fn get_consumer_from_stream<T, C, S>(
1221        &self,
1222        consumer: C,
1223        stream: S,
1224    ) -> Result<Consumer<T>, ConsumerError>
1225    where
1226        T: FromConsumer + IntoConsumerConfig,
1227        S: AsRef<str>,
1228        C: AsRef<str>,
1229    {
1230        if !is_valid_name(stream.as_ref()) {
1231            return Err(ConsumerError::with_source(
1232                ConsumerErrorKind::InvalidName,
1233                "invalid stream",
1234            ));
1235        }
1236
1237        if !is_valid_name(consumer.as_ref()) {
1238            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1239        }
1240
1241        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1242
1243        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1244            Response::Ok(info) => info,
1245            Response::Err { error } => return Err(error.into()),
1246        };
1247
1248        Ok(Consumer::new(
1249            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1250                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1251            })?,
1252            info,
1253            self.clone(),
1254        ))
1255    }
1256
1257    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1258    ///
1259    /// It has one less interaction with the server when binding to only one
1260    /// [crate::jetstream::consumer::Consumer].
1261    ///
1262    /// # Examples:
1263    ///
1264    /// ```no_run
1265    /// # #[tokio::main]
1266    /// # async fn main() -> Result<(), async_nats::Error> {
1267    /// use async_nats::jetstream::consumer::PullConsumer;
1268    ///
1269    /// let client = async_nats::connect("localhost:4222").await?;
1270    /// let jetstream = async_nats::jetstream::new(client);
1271    ///
1272    /// jetstream
1273    ///     .delete_consumer_from_stream("consumer", "stream")
1274    ///     .await?;
1275    ///
1276    /// # Ok(())
1277    /// # }
1278    /// ```
1279    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1280        &self,
1281        consumer: C,
1282        stream: S,
1283    ) -> Result<DeleteStatus, ConsumerError> {
1284        if !is_valid_name(consumer.as_ref()) {
1285            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1286        }
1287
1288        if !is_valid_name(stream.as_ref()) {
1289            return Err(ConsumerError::with_source(
1290                ConsumerErrorKind::Other,
1291                "invalid stream name",
1292            ));
1293        }
1294
1295        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1296
1297        match self.request(subject, &json!({})).await? {
1298            Response::Ok(delete_status) => Ok(delete_status),
1299            Response::Err { error } => Err(error.into()),
1300        }
1301    }
1302
1303    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
1304    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1305    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
1306    ///
1307    /// # Examples
1308    ///
1309    /// ```no_run
1310    /// # #[tokio::main]
1311    /// # async fn main() -> Result<(), async_nats::Error> {
1312    /// use async_nats::jetstream::consumer;
1313    /// let client = async_nats::connect("localhost:4222").await?;
1314    /// let jetstream = async_nats::jetstream::new(client);
1315    ///
1316    /// let consumer: consumer::PullConsumer = jetstream
1317    ///     .create_consumer_on_stream(
1318    ///         consumer::pull::Config {
1319    ///             durable_name: Some("pull".to_string()),
1320    ///             ..Default::default()
1321    ///         },
1322    ///         "stream",
1323    ///     )
1324    ///     .await?;
1325    /// # Ok(())
1326    /// # }
1327    /// ```
1328    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1329        &self,
1330        config: C,
1331        stream: S,
1332    ) -> Result<Consumer<C>, ConsumerError> {
1333        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1334            .await
1335    }
1336
1337    /// Update an existing consumer.
1338    /// This call will fail if the consumer does not exist.
1339    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
1340    ///
1341    /// # Examples
1342    ///
1343    /// ```no_run
1344    /// # #[tokio::main]
1345    /// # async fn main() -> Result<(), async_nats::Error> {
1346    /// use async_nats::jetstream::consumer;
1347    /// let client = async_nats::connect("localhost:4222").await?;
1348    /// let jetstream = async_nats::jetstream::new(client);
1349    ///
1350    /// let consumer: consumer::PullConsumer = jetstream
1351    ///     .update_consumer_on_stream(
1352    ///         consumer::pull::Config {
1353    ///             durable_name: Some("pull".to_string()),
1354    ///             description: Some("updated pull consumer".to_string()),
1355    ///             ..Default::default()
1356    ///         },
1357    ///         "stream",
1358    ///     )
1359    ///     .await?;
1360    /// # Ok(())
1361    /// # }
1362    /// ```
1363    #[cfg(feature = "server_2_10")]
1364    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1365        &self,
1366        config: C,
1367        stream: S,
1368    ) -> Result<Consumer<C>, ConsumerUpdateError> {
1369        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1370            .await
1371            .map_err(|err| err.into())
1372    }
1373
1374    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
1375    /// the same.
1376    /// This method will fail if consumer is already present with different config.
1377    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1378    ///
1379    /// # Examples
1380    ///
1381    /// ```no_run
1382    /// # #[tokio::main]
1383    /// # async fn main() -> Result<(), async_nats::Error> {
1384    /// use async_nats::jetstream::consumer;
1385    /// let client = async_nats::connect("localhost:4222").await?;
1386    /// let jetstream = async_nats::jetstream::new(client);
1387    ///
1388    /// let consumer: consumer::PullConsumer = jetstream
1389    ///     .create_consumer_strict_on_stream(
1390    ///         consumer::pull::Config {
1391    ///             durable_name: Some("pull".to_string()),
1392    ///             ..Default::default()
1393    ///         },
1394    ///         "stream",
1395    ///     )
1396    ///     .await?;
1397    /// # Ok(())
1398    /// # }
1399    /// ```
1400    #[cfg(feature = "server_2_10")]
1401    pub async fn create_consumer_strict_on_stream<
1402        C: IntoConsumerConfig + FromConsumer,
1403        S: AsRef<str>,
1404    >(
1405        &self,
1406        config: C,
1407        stream: S,
1408    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1409        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1410            .await
1411            .map_err(|err| err.into())
1412    }
1413
1414    async fn create_consumer_on_stream_action<
1415        C: IntoConsumerConfig + FromConsumer,
1416        S: AsRef<str>,
1417    >(
1418        &self,
1419        config: C,
1420        stream: S,
1421        action: ConsumerAction,
1422    ) -> Result<Consumer<C>, ConsumerError> {
1423        let config = config.into_consumer_config();
1424
1425        let subject = {
1426            let filter = if config.filter_subject.is_empty() {
1427                "".to_string()
1428            } else {
1429                format!(".{}", config.filter_subject)
1430            };
1431            config
1432                .name
1433                .as_ref()
1434                .or(config.durable_name.as_ref())
1435                .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1436                .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1437        };
1438
1439        match self
1440            .request(
1441                subject,
1442                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1443            )
1444            .await?
1445        {
1446            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1447            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1448                FromConsumer::try_from_consumer_config(info.clone().config)
1449                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1450                info,
1451                self.clone(),
1452            )),
1453        }
1454    }
1455
1456    /// Send a request to the jetstream JSON API.
1457    ///
1458    /// This is a low level API used mostly internally, that should be used only in
1459    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1460    ///
1461    /// # Examples
1462    ///
1463    /// ```no_run
1464    /// # use async_nats::jetstream::stream::Info;
1465    /// # use async_nats::jetstream::response::Response;
1466    /// # #[tokio::main]
1467    /// # async fn main() -> Result<(), async_nats::Error> {
1468    /// let client = async_nats::connect("localhost:4222").await?;
1469    /// let jetstream = async_nats::jetstream::new(client);
1470    ///
1471    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1472    /// # Ok(())
1473    /// # }
1474    /// ```
1475    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1476    where
1477        S: ToSubject,
1478        T: ?Sized + Serialize,
1479        V: DeserializeOwned,
1480    {
1481        let subject = subject.to_subject();
1482        let request = serde_json::to_vec(&payload)
1483            .map(Bytes::from)
1484            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1485
1486        debug!("JetStream request sent: {:?}", request);
1487
1488        let message = self
1489            .client
1490            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1491            .await;
1492        let message = message?;
1493        debug!(
1494            "JetStream request response: {:?}",
1495            from_utf8(&message.payload)
1496        );
1497        let response = serde_json::from_slice(message.payload.as_ref())
1498            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1499
1500        Ok(response)
1501    }
1502
1503    /// Creates a new object store bucket.
1504    ///
1505    /// # Examples
1506    ///
1507    /// ```no_run
1508    /// # #[tokio::main]
1509    /// # async fn main() -> Result<(), async_nats::Error> {
1510    /// let client = async_nats::connect("demo.nats.io").await?;
1511    /// let jetstream = async_nats::jetstream::new(client);
1512    /// let bucket = jetstream
1513    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1514    ///         bucket: "bucket".to_string(),
1515    ///         ..Default::default()
1516    ///     })
1517    ///     .await?;
1518    /// # Ok(())
1519    /// # }
1520    /// ```
1521    pub async fn create_object_store(
1522        &self,
1523        config: super::object_store::Config,
1524    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1525        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1526            return Err(CreateObjectStoreError::new(
1527                CreateKeyValueErrorKind::InvalidStoreName,
1528            ));
1529        }
1530
1531        let bucket_name = config.bucket.clone();
1532        let stream_name = format!("OBJ_{bucket_name}");
1533        let chunk_subject = format!("$O.{bucket_name}.C.>");
1534        let meta_subject = format!("$O.{bucket_name}.M.>");
1535
1536        let stream = self
1537            .create_stream(super::stream::Config {
1538                name: stream_name,
1539                description: config.description.clone(),
1540                subjects: vec![chunk_subject, meta_subject],
1541                max_age: config.max_age,
1542                max_bytes: config.max_bytes,
1543                storage: config.storage,
1544                num_replicas: config.num_replicas,
1545                discard: DiscardPolicy::New,
1546                allow_rollup: true,
1547                allow_direct: true,
1548                #[cfg(feature = "server_2_10")]
1549                compression: if config.compression {
1550                    Some(Compression::S2)
1551                } else {
1552                    None
1553                },
1554                placement: config.placement,
1555                ..Default::default()
1556            })
1557            .await
1558            .map_err(|err| {
1559                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1560            })?;
1561
1562        Ok(ObjectStore {
1563            name: bucket_name,
1564            stream,
1565        })
1566    }
1567
1568    /// Get an existing object store bucket.
1569    ///
1570    /// # Examples
1571    ///
1572    /// ```no_run
1573    /// # #[tokio::main]
1574    /// # async fn main() -> Result<(), async_nats::Error> {
1575    /// let client = async_nats::connect("demo.nats.io").await?;
1576    /// let jetstream = async_nats::jetstream::new(client);
1577    /// let bucket = jetstream.get_object_store("bucket").await?;
1578    /// # Ok(())
1579    /// # }
1580    /// ```
1581    pub async fn get_object_store<T: AsRef<str>>(
1582        &self,
1583        bucket_name: T,
1584    ) -> Result<ObjectStore, ObjectStoreError> {
1585        let bucket_name = bucket_name.as_ref();
1586        if !is_valid_bucket_name(bucket_name) {
1587            return Err(ObjectStoreError::new(
1588                ObjectStoreErrorKind::InvalidBucketName,
1589            ));
1590        }
1591        let stream_name = format!("OBJ_{bucket_name}");
1592        let stream = self
1593            .get_stream(stream_name)
1594            .await
1595            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1596
1597        Ok(ObjectStore {
1598            name: bucket_name.to_string(),
1599            stream,
1600        })
1601    }
1602
1603    /// Delete a object store bucket.
1604    ///
1605    /// # Examples
1606    ///
1607    /// ```no_run
1608    /// # #[tokio::main]
1609    /// # async fn main() -> Result<(), async_nats::Error> {
1610    /// let client = async_nats::connect("demo.nats.io").await?;
1611    /// let jetstream = async_nats::jetstream::new(client);
1612    /// let bucket = jetstream.delete_object_store("bucket").await?;
1613    /// # Ok(())
1614    /// # }
1615    /// ```
1616    pub async fn delete_object_store<T: AsRef<str>>(
1617        &self,
1618        bucket_name: T,
1619    ) -> Result<(), DeleteObjectStore> {
1620        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1621        self.delete_stream(stream_name)
1622            .await
1623            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1624        Ok(())
1625    }
1626}
1627
1628#[derive(Clone, Copy, Debug, PartialEq)]
1629pub enum PublishErrorKind {
1630    StreamNotFound,
1631    WrongLastMessageId,
1632    WrongLastSequence,
1633    TimedOut,
1634    BrokenPipe,
1635    MaxAckPending,
1636    Other,
1637}
1638
1639impl Display for PublishErrorKind {
1640    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1641        match self {
1642            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1643            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1644            Self::Other => write!(f, "publish failed"),
1645            Self::BrokenPipe => write!(f, "broken pipe"),
1646            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1647            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1648            Self::MaxAckPending => write!(f, "max ack pending reached"),
1649        }
1650    }
1651}
1652
1653pub type PublishError = Error<PublishErrorKind>;
1654
1655#[derive(Debug)]
1656pub struct PublishAckFuture {
1657    timeout: Duration,
1658    subscription: Option<oneshot::Receiver<Message>>,
1659    permit: Option<OwnedSemaphorePermit>,
1660    tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1661}
1662
1663impl Drop for PublishAckFuture {
1664    fn drop(&mut self) {
1665        if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1666            if let Err(err) = self.tx.try_send((sub, permit)) {
1667                tracing::warn!("failed to pass future permit to the acker: {}", err);
1668            }
1669        }
1670    }
1671}
1672
1673impl PublishAckFuture {
1674    async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1675        let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1676            .await
1677            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1678        next.map_or_else(
1679            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1680            |m| {
1681                if m.status == Some(StatusCode::NO_RESPONDERS) {
1682                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1683                }
1684                let response = serde_json::from_slice(m.payload.as_ref())
1685                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1686                match response {
1687                    Response::Err { error } => match error.error_code() {
1688                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1689                            PublishErrorKind::WrongLastMessageId,
1690                            error,
1691                        )),
1692                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1693                            PublishErrorKind::WrongLastSequence,
1694                            error,
1695                        )),
1696                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1697                    },
1698                    Response::Ok(publish_ack) => Ok(publish_ack),
1699                }
1700            },
1701        )
1702    }
1703}
1704impl IntoFuture for PublishAckFuture {
1705    type Output = Result<PublishAck, PublishError>;
1706
1707    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1708
1709    fn into_future(self) -> Self::IntoFuture {
1710        Box::pin(std::future::IntoFuture::into_future(
1711            self.next_with_timeout(),
1712        ))
1713    }
1714}
1715
1716#[derive(Deserialize, Debug)]
1717struct StreamPage {
1718    total: usize,
1719    streams: Option<Vec<String>>,
1720}
1721
1722#[derive(Deserialize, Debug)]
1723struct StreamInfoPage {
1724    total: usize,
1725    streams: Option<Vec<super::stream::Info>>,
1726}
1727
1728type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1729
1730pub struct StreamNames {
1731    context: Context,
1732    offset: usize,
1733    page_request: Option<PageRequest>,
1734    subject: Option<String>,
1735    streams: Vec<String>,
1736    done: bool,
1737}
1738
1739impl futures_util::Stream for StreamNames {
1740    type Item = Result<String, StreamsError>;
1741
1742    fn poll_next(
1743        mut self: Pin<&mut Self>,
1744        cx: &mut std::task::Context<'_>,
1745    ) -> std::task::Poll<Option<Self::Item>> {
1746        match self.page_request.as_mut() {
1747            Some(page) => match page.try_poll_unpin(cx) {
1748                std::task::Poll::Ready(page) => {
1749                    self.page_request = None;
1750                    let page = page
1751                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1752                    if let Some(streams) = page.streams {
1753                        self.offset += streams.len();
1754                        self.streams = streams;
1755                        if self.offset >= page.total {
1756                            self.done = true;
1757                        }
1758                        match self.streams.pop() {
1759                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1760                            None => Poll::Ready(None),
1761                        }
1762                    } else {
1763                        Poll::Ready(None)
1764                    }
1765                }
1766                std::task::Poll::Pending => std::task::Poll::Pending,
1767            },
1768            None => {
1769                if let Some(stream) = self.streams.pop() {
1770                    Poll::Ready(Some(Ok(stream)))
1771                } else {
1772                    if self.done {
1773                        return Poll::Ready(None);
1774                    }
1775                    let context = self.context.clone();
1776                    let offset = self.offset;
1777                    let subject = self.subject.clone();
1778                    self.page_request = Some(Box::pin(async move {
1779                        match context
1780                            .request(
1781                                "STREAM.NAMES",
1782                                &json!({
1783                                    "offset": offset,
1784                                    "subject": subject
1785                                }),
1786                            )
1787                            .await?
1788                        {
1789                            Response::Err { error } => {
1790                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1791                            }
1792                            Response::Ok(page) => Ok(page),
1793                        }
1794                    }));
1795                    self.poll_next(cx)
1796                }
1797            }
1798        }
1799    }
1800}
1801
1802type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1803
1804pub type StreamsErrorKind = RequestErrorKind;
1805pub type StreamsError = RequestError;
1806
1807pub struct Streams {
1808    context: Context,
1809    offset: usize,
1810    page_request: Option<PageInfoRequest>,
1811    streams: Vec<super::stream::Info>,
1812    done: bool,
1813}
1814
1815impl futures_util::Stream for Streams {
1816    type Item = Result<super::stream::Info, StreamsError>;
1817
1818    fn poll_next(
1819        mut self: Pin<&mut Self>,
1820        cx: &mut std::task::Context<'_>,
1821    ) -> std::task::Poll<Option<Self::Item>> {
1822        match self.page_request.as_mut() {
1823            Some(page) => match page.try_poll_unpin(cx) {
1824                std::task::Poll::Ready(page) => {
1825                    self.page_request = None;
1826                    let page = page
1827                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1828                    if let Some(streams) = page.streams {
1829                        self.offset += streams.len();
1830                        self.streams = streams;
1831                        if self.offset >= page.total {
1832                            self.done = true;
1833                        }
1834                        match self.streams.pop() {
1835                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1836                            None => Poll::Ready(None),
1837                        }
1838                    } else {
1839                        Poll::Ready(None)
1840                    }
1841                }
1842                std::task::Poll::Pending => std::task::Poll::Pending,
1843            },
1844            None => {
1845                if let Some(stream) = self.streams.pop() {
1846                    Poll::Ready(Some(Ok(stream)))
1847                } else {
1848                    if self.done {
1849                        return Poll::Ready(None);
1850                    }
1851                    let context = self.context.clone();
1852                    let offset = self.offset;
1853                    self.page_request = Some(Box::pin(async move {
1854                        match context
1855                            .request(
1856                                "STREAM.LIST",
1857                                &json!({
1858                                    "offset": offset,
1859                                }),
1860                            )
1861                            .await?
1862                        {
1863                            Response::Err { error } => {
1864                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1865                            }
1866                            Response::Ok(page) => Ok(page),
1867                        }
1868                    }));
1869                    self.poll_next(cx)
1870                }
1871            }
1872        }
1873    }
1874}
1875/// Used for building customized `publish` message.
1876#[derive(Default, Clone, Debug)]
1877pub struct Publish {
1878    payload: Bytes,
1879    headers: Option<header::HeaderMap>,
1880}
1881impl Publish {
1882    /// Creates a new custom Publish struct to be used with.
1883    pub fn build() -> Self {
1884        Default::default()
1885    }
1886
1887    /// Sets the payload for the message.
1888    pub fn payload(mut self, payload: Bytes) -> Self {
1889        self.payload = payload;
1890        self
1891    }
1892    /// Adds headers to the message.
1893    pub fn headers(mut self, headers: HeaderMap) -> Self {
1894        self.headers = Some(headers);
1895        self
1896    }
1897    /// A shorthand to add a single header.
1898    pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1899        self.headers
1900            .get_or_insert(header::HeaderMap::new())
1901            .insert(name, value);
1902        self
1903    }
1904    /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
1905    pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1906        self.header(header::NATS_MESSAGE_ID, id.as_ref())
1907    }
1908    /// Sets expected last message ID.
1909    /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
1910    pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1911        self.header(
1912            header::NATS_EXPECTED_LAST_MESSAGE_ID,
1913            last_message_id.as_ref(),
1914        )
1915    }
1916    /// Sets the last expected stream sequence.
1917    /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
1918    pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1919        self.header(
1920            header::NATS_EXPECTED_LAST_SEQUENCE,
1921            HeaderValue::from(last_sequence),
1922        )
1923    }
1924    /// Sets the last expected stream sequence for a subject this message will be published to.
1925    /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
1926    pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1927        self.header(
1928            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1929            HeaderValue::from(subject_sequence),
1930        )
1931    }
1932    /// Sets the expected stream name.
1933    /// It sets the `Nats-Expected-Stream` header with provided value.
1934    pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1935        self.header(
1936            header::NATS_EXPECTED_STREAM,
1937            HeaderValue::from(stream.as_ref()),
1938        )
1939    }
1940
1941    #[cfg(feature = "server_2_11")]
1942    /// Sets TTL for a single message.
1943    /// It sets the `Nats-TTL` header with provided value.
1944    pub fn ttl(self, ttl: Duration) -> Self {
1945        self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
1946    }
1947}
1948
1949#[derive(Clone, Copy, Debug, PartialEq)]
1950pub enum RequestErrorKind {
1951    NoResponders,
1952    TimedOut,
1953    Other,
1954}
1955
1956impl Display for RequestErrorKind {
1957    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1958        match self {
1959            Self::TimedOut => write!(f, "timed out"),
1960            Self::Other => write!(f, "request failed"),
1961            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1962        }
1963    }
1964}
1965
1966pub type RequestError = Error<RequestErrorKind>;
1967
1968impl From<crate::RequestError> for RequestError {
1969    fn from(error: crate::RequestError) -> Self {
1970        match error.kind() {
1971            crate::RequestErrorKind::TimedOut => {
1972                RequestError::with_source(RequestErrorKind::TimedOut, error)
1973            }
1974            crate::RequestErrorKind::NoResponders => {
1975                RequestError::new(RequestErrorKind::NoResponders)
1976            }
1977            crate::RequestErrorKind::Other => {
1978                RequestError::with_source(RequestErrorKind::Other, error)
1979            }
1980        }
1981    }
1982}
1983
1984impl From<super::errors::Error> for RequestError {
1985    fn from(err: super::errors::Error) -> Self {
1986        RequestError::with_source(RequestErrorKind::Other, err)
1987    }
1988}
1989
1990pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
1991
1992#[derive(Clone, Debug, PartialEq)]
1993pub enum ConsumerInfoErrorKind {
1994    InvalidName,
1995    Offline,
1996    NotFound,
1997    StreamNotFound,
1998    Request,
1999    JetStream(super::errors::Error),
2000    TimedOut,
2001    NoResponders,
2002}
2003
2004impl Display for ConsumerInfoErrorKind {
2005    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2006        match self {
2007            Self::InvalidName => write!(f, "invalid consumer name"),
2008            Self::Offline => write!(f, "consumer is offline"),
2009            Self::NotFound => write!(f, "consumer not found"),
2010            Self::StreamNotFound => write!(f, "stream not found"),
2011            Self::Request => write!(f, "request error"),
2012            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2013            Self::TimedOut => write!(f, "timed out"),
2014            Self::NoResponders => write!(f, "no responders"),
2015        }
2016    }
2017}
2018
2019impl From<super::errors::Error> for ConsumerInfoError {
2020    fn from(error: super::errors::Error) -> Self {
2021        match error.error_code() {
2022            ErrorCode::CONSUMER_NOT_FOUND => {
2023                ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2024            }
2025            ErrorCode::STREAM_NOT_FOUND => {
2026                ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2027            }
2028            ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2029            _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2030        }
2031    }
2032}
2033
2034impl From<RequestError> for ConsumerInfoError {
2035    fn from(error: RequestError) -> Self {
2036        match error.kind() {
2037            RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2038            RequestErrorKind::Other => {
2039                ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2040            }
2041            RequestErrorKind::NoResponders => {
2042                ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2043            }
2044        }
2045    }
2046}
2047
2048#[derive(Clone, Debug, PartialEq)]
2049pub enum CreateStreamErrorKind {
2050    EmptyStreamName,
2051    InvalidStreamName,
2052    DomainAndExternalSet,
2053    JetStreamUnavailable,
2054    JetStream(super::errors::Error),
2055    TimedOut,
2056    Response,
2057    NotFound,
2058    ResponseParse,
2059}
2060
2061impl Display for CreateStreamErrorKind {
2062    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2063        match self {
2064            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2065            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2066            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2067            Self::NotFound => write!(f, "stream not found"),
2068            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2069            Self::TimedOut => write!(f, "jetstream request timed out"),
2070            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2071            Self::ResponseParse => write!(f, "failed to parse server response"),
2072            Self::Response => write!(f, "response error"),
2073        }
2074    }
2075}
2076
2077pub type CreateStreamError = Error<CreateStreamErrorKind>;
2078
2079impl From<super::errors::Error> for CreateStreamError {
2080    fn from(error: super::errors::Error) -> Self {
2081        match error.kind() {
2082            super::errors::ErrorCode::STREAM_NOT_FOUND => {
2083                CreateStreamError::new(CreateStreamErrorKind::NotFound)
2084            }
2085            _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2086        }
2087    }
2088}
2089
2090impl From<RequestError> for CreateStreamError {
2091    fn from(error: RequestError) -> Self {
2092        match error.kind() {
2093            RequestErrorKind::NoResponders => {
2094                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2095            }
2096            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2097            RequestErrorKind::Other => {
2098                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2099            }
2100        }
2101    }
2102}
2103
2104#[derive(Clone, Debug, PartialEq)]
2105pub enum GetStreamErrorKind {
2106    EmptyName,
2107    Request,
2108    InvalidStreamName,
2109    JetStream(super::errors::Error),
2110}
2111
2112impl Display for GetStreamErrorKind {
2113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2114        match self {
2115            Self::EmptyName => write!(f, "empty name cannot be empty"),
2116            Self::Request => write!(f, "request error"),
2117            Self::InvalidStreamName => write!(f, "invalid stream name"),
2118            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2119        }
2120    }
2121}
2122
2123#[derive(Clone, Debug, PartialEq)]
2124pub enum GetStreamByNameErrorKind {
2125    Request,
2126    NotFound,
2127    InvalidSubject,
2128    JetStream(super::errors::Error),
2129}
2130
2131impl Display for GetStreamByNameErrorKind {
2132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2133        match self {
2134            Self::Request => write!(f, "request error"),
2135            Self::NotFound => write!(f, "stream not found"),
2136            Self::InvalidSubject => write!(f, "invalid subject"),
2137            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2138        }
2139    }
2140}
2141
2142pub type GetStreamError = Error<GetStreamErrorKind>;
2143pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2144
2145pub type UpdateStreamError = CreateStreamError;
2146pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2147pub type DeleteStreamError = GetStreamError;
2148pub type DeleteStreamErrorKind = GetStreamErrorKind;
2149
2150#[derive(Clone, Copy, Debug, PartialEq)]
2151pub enum KeyValueErrorKind {
2152    InvalidStoreName,
2153    GetBucket,
2154    JetStream,
2155}
2156
2157impl Display for KeyValueErrorKind {
2158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2159        match self {
2160            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2161            Self::GetBucket => write!(f, "failed to get the bucket"),
2162            Self::JetStream => write!(f, "JetStream error"),
2163        }
2164    }
2165}
2166
2167pub type KeyValueError = Error<KeyValueErrorKind>;
2168
2169#[derive(Clone, Copy, Debug, PartialEq)]
2170pub enum CreateKeyValueErrorKind {
2171    InvalidStoreName,
2172    TooLongHistory,
2173    JetStream,
2174    BucketCreate,
2175    TimedOut,
2176    LimitMarkersNotSupported,
2177}
2178
2179impl Display for CreateKeyValueErrorKind {
2180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2181        match self {
2182            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2183            Self::TooLongHistory => write!(f, "too long history"),
2184            Self::JetStream => write!(f, "JetStream error"),
2185            Self::BucketCreate => write!(f, "bucket creation failed"),
2186            Self::TimedOut => write!(f, "timed out"),
2187            Self::LimitMarkersNotSupported => {
2188                write!(f, "limit markers not supported")
2189            }
2190        }
2191    }
2192}
2193
2194#[derive(Clone, Copy, Debug, PartialEq)]
2195pub enum UpdateKeyValueErrorKind {
2196    InvalidStoreName,
2197    TooLongHistory,
2198    JetStream,
2199    BucketUpdate,
2200    TimedOut,
2201    LimitMarkersNotSupported,
2202    NotFound,
2203}
2204
2205impl Display for UpdateKeyValueErrorKind {
2206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2207        match self {
2208            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2209            Self::TooLongHistory => write!(f, "too long history"),
2210            Self::JetStream => write!(f, "JetStream error"),
2211            Self::BucketUpdate => write!(f, "bucket creation failed"),
2212            Self::TimedOut => write!(f, "timed out"),
2213            Self::LimitMarkersNotSupported => {
2214                write!(f, "limit markers not supported")
2215            }
2216            Self::NotFound => write!(f, "bucket does not exist"),
2217        }
2218    }
2219}
2220pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2221pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2222
2223pub type CreateObjectStoreError = CreateKeyValueError;
2224pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2225
2226#[derive(Clone, Copy, Debug, PartialEq)]
2227pub enum ObjectStoreErrorKind {
2228    InvalidBucketName,
2229    GetStore,
2230}
2231
2232impl Display for ObjectStoreErrorKind {
2233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2234        match self {
2235            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2236            Self::GetStore => write!(f, "failed to get Object Store"),
2237        }
2238    }
2239}
2240
2241pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2242
2243pub type DeleteObjectStore = ObjectStoreError;
2244pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2245
2246#[derive(Clone, Debug, PartialEq)]
2247pub enum AccountErrorKind {
2248    TimedOut,
2249    JetStream(super::errors::Error),
2250    JetStreamUnavailable,
2251    Other,
2252}
2253
2254impl Display for AccountErrorKind {
2255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2256        match self {
2257            Self::TimedOut => write!(f, "timed out"),
2258            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2259            Self::Other => write!(f, "error"),
2260            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2261        }
2262    }
2263}
2264
2265pub type AccountError = Error<AccountErrorKind>;
2266
2267impl From<RequestError> for AccountError {
2268    fn from(err: RequestError) -> Self {
2269        match err.kind {
2270            RequestErrorKind::NoResponders => {
2271                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2272            }
2273            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2274            RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2275        }
2276    }
2277}
2278
2279#[derive(Clone, Debug, Serialize)]
2280enum ConsumerAction {
2281    #[serde(rename = "")]
2282    CreateOrUpdate,
2283    #[serde(rename = "create")]
2284    #[cfg(feature = "server_2_10")]
2285    Create,
2286    #[serde(rename = "update")]
2287    #[cfg(feature = "server_2_10")]
2288    Update,
2289}
2290
2291// Maps a Stream config to KV Store.
2292fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2293    let mut store = Store {
2294        prefix: format!("$KV.{}.", bucket.as_str()),
2295        name: bucket,
2296        stream: stream.clone(),
2297        stream_name: stream.info.config.name.clone(),
2298        put_prefix: None,
2299        use_jetstream_prefix: prefix != "$JS.API",
2300    };
2301    if let Some(ref mirror) = stream.info.config.mirror {
2302        let bucket = mirror.name.trim_start_matches("KV_");
2303        if let Some(ref external) = mirror.external {
2304            if !external.api_prefix.is_empty() {
2305                store.use_jetstream_prefix = false;
2306                store.prefix = format!("$KV.{bucket}.");
2307                store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2308            } else {
2309                store.put_prefix = Some(format!("$KV.{bucket}."));
2310            }
2311        }
2312    };
2313    store
2314}
2315
2316enum KvToStreamConfigError {
2317    TooLongHistory,
2318    #[allow(dead_code)]
2319    LimitMarkersNotSupported,
2320}
2321
2322impl From<KvToStreamConfigError> for CreateKeyValueError {
2323    fn from(err: KvToStreamConfigError) -> Self {
2324        match err {
2325            KvToStreamConfigError::TooLongHistory => {
2326                CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2327            }
2328            KvToStreamConfigError::LimitMarkersNotSupported => {
2329                CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2330            }
2331        }
2332    }
2333}
2334
2335impl From<KvToStreamConfigError> for UpdateKeyValueError {
2336    fn from(err: KvToStreamConfigError) -> Self {
2337        match err {
2338            KvToStreamConfigError::TooLongHistory => {
2339                UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2340            }
2341            KvToStreamConfigError::LimitMarkersNotSupported => {
2342                UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2343            }
2344        }
2345    }
2346}
2347
2348// Maps the KV config to Stream config.
2349fn kv_to_stream_config(
2350    config: kv::Config,
2351    _account: Account,
2352) -> Result<super::stream::Config, KvToStreamConfigError> {
2353    let history = if config.history > 0 {
2354        if config.history > MAX_HISTORY {
2355            return Err(KvToStreamConfigError::TooLongHistory);
2356        }
2357        config.history
2358    } else {
2359        1
2360    };
2361
2362    let num_replicas = if config.num_replicas == 0 {
2363        1
2364    } else {
2365        config.num_replicas
2366    };
2367
2368    #[cfg(feature = "server_2_11")]
2369    let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2370
2371    #[cfg(feature = "server_2_11")]
2372    if let Some(duration) = config.limit_markers {
2373        if _account.requests.level < 1 {
2374            return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2375        }
2376        allow_message_ttl = true;
2377        subject_delete_marker_ttl = Some(duration);
2378    }
2379
2380    let mut mirror = config.mirror.clone();
2381    let mut sources = config.sources.clone();
2382    let mut mirror_direct = config.mirror_direct;
2383
2384    let mut subjects = Vec::new();
2385    if let Some(ref mut mirror) = mirror {
2386        if !mirror.name.starts_with("KV_") {
2387            mirror.name = format!("KV_{}", mirror.name);
2388        }
2389        mirror_direct = true;
2390    } else if let Some(ref mut sources) = sources {
2391        for source in sources {
2392            if !source.name.starts_with("KV_") {
2393                source.name = format!("KV_{}", source.name);
2394            }
2395        }
2396    } else {
2397        subjects = vec![format!("$KV.{}.>", config.bucket)];
2398    }
2399
2400    Ok(stream::Config {
2401        name: format!("KV_{}", config.bucket),
2402        description: Some(config.description),
2403        subjects,
2404        max_messages_per_subject: history,
2405        max_bytes: config.max_bytes,
2406        max_age: config.max_age,
2407        max_message_size: config.max_value_size,
2408        storage: config.storage,
2409        republish: config.republish,
2410        allow_rollup: true,
2411        deny_delete: true,
2412        deny_purge: false,
2413        allow_direct: true,
2414        sources,
2415        mirror,
2416        num_replicas,
2417        discard: stream::DiscardPolicy::New,
2418        mirror_direct,
2419        #[cfg(feature = "server_2_10")]
2420        compression: if config.compression {
2421            Some(stream::Compression::S2)
2422        } else {
2423            None
2424        },
2425        placement: config.placement,
2426        #[cfg(feature = "server_2_11")]
2427        allow_message_ttl,
2428        #[cfg(feature = "server_2_11")]
2429        subject_delete_marker_ttl,
2430        ..Default::default()
2431    })
2432}