async_nats/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23    header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures_util::future::BoxFuture;
27use futures_util::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Debug;
33use std::fmt::Display;
34use std::future::IntoFuture;
35use std::pin::Pin;
36use std::str::from_utf8;
37use std::sync::Arc;
38use std::task::Poll;
39use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, TryAcquireError};
40use tokio::time::Duration;
41use tokio_stream::wrappers::ReceiverStream;
42use tracing::debug;
43
44use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
45use super::errors::ErrorCode;
46use super::kv::{Store, MAX_HISTORY};
47use super::object_store::{is_valid_bucket_name, ObjectStore};
48use super::stream::{
49    self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
50    Stream,
51};
52#[cfg(feature = "server_2_10")]
53use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
54use super::{is_valid_name, kv};
55
56/// A context which can perform jetstream scoped requests.
57#[derive(Debug, Clone)]
58pub struct Context {
59    pub(crate) client: Client,
60    pub(crate) prefix: String,
61    pub(crate) timeout: Duration,
62    pub(crate) max_ack_semaphore: Arc<tokio::sync::Semaphore>,
63    pub(crate) ack_sender:
64        tokio::sync::mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
65    pub(crate) backpressure_on_inflight: bool,
66    pub(crate) semaphore_capacity: usize,
67}
68
69fn spawn_acker(
70    rx: ReceiverStream<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
71    ack_timeout: Duration,
72    concurrency: Option<usize>,
73) -> tokio::task::JoinHandle<()> {
74    tokio::spawn(async move {
75        rx.for_each_concurrent(concurrency, |(subscription, permit)| async move {
76            tokio::time::timeout(ack_timeout, subscription).await.ok();
77            drop(permit);
78        })
79        .await;
80        debug!("Acker task exited");
81    })
82}
83
84use std::marker::PhantomData;
85
86#[derive(Debug, Default)]
87pub struct Yes;
88#[derive(Debug, Default)]
89pub struct No;
90
91pub trait ToAssign: Debug {}
92
93impl ToAssign for Yes {}
94impl ToAssign for No {}
95
96/// A builder for [Context]. Beyond what can be set by standard constructor, it allows tweaking
97/// pending publish ack backpressure settings.
98/// # Examples
99/// ```no_run
100/// # use async_nats::jetstream::context::ContextBuilder;
101/// # use async_nats::Client;
102/// # use std::time::Duration;
103/// # #[tokio::main]
104/// # async fn main() -> Result<(), async_nats::Error> {
105/// let client = async_nats::connect("demo.nats.io").await?;
106/// let context = ContextBuilder::new()
107///     .timeout(Duration::from_secs(5))
108///     .api_prefix("MY.JS.API")
109///     .max_ack_inflight(1000)
110///     .build(client);
111/// # Ok(())
112/// # }
113/// ```
114pub struct ContextBuilder<PREFIX: ToAssign> {
115    prefix: String,
116    timeout: Duration,
117    semaphore_capacity: usize,
118    ack_timeout: Duration,
119    backpressure_on_inflight: bool,
120    concurrency_limit: Option<usize>,
121    _phantom: PhantomData<PREFIX>,
122}
123
124impl Default for ContextBuilder<Yes> {
125    fn default() -> Self {
126        ContextBuilder {
127            prefix: "$JS.API".to_string(),
128            timeout: Duration::from_secs(5),
129            semaphore_capacity: 5_000,
130            ack_timeout: Duration::from_secs(30),
131            backpressure_on_inflight: true,
132            concurrency_limit: None,
133            _phantom: PhantomData {},
134        }
135    }
136}
137
138impl ContextBuilder<Yes> {
139    /// Create a new [ContextBuilder] with default settings.
140    pub fn new() -> ContextBuilder<Yes> {
141        ContextBuilder::default()
142    }
143}
144
145impl ContextBuilder<Yes> {
146    /// Set the prefix for the JetStream API.
147    pub fn api_prefix<T: Into<String>>(self, prefix: T) -> ContextBuilder<No> {
148        ContextBuilder {
149            prefix: prefix.into(),
150            timeout: self.timeout,
151            semaphore_capacity: self.semaphore_capacity,
152            ack_timeout: self.ack_timeout,
153            backpressure_on_inflight: self.backpressure_on_inflight,
154            concurrency_limit: self.concurrency_limit,
155            _phantom: PhantomData,
156        }
157    }
158
159    /// Set the domain for the JetStream API. Domain is the middle part of standard API prefix:
160    /// $JS.{domain}.API.
161    pub fn domain<T: Into<String>>(self, domain: T) -> ContextBuilder<No> {
162        ContextBuilder {
163            prefix: format!("$JS.{}.API", domain.into()),
164            timeout: self.timeout,
165            semaphore_capacity: self.semaphore_capacity,
166            ack_timeout: self.ack_timeout,
167            backpressure_on_inflight: self.backpressure_on_inflight,
168            concurrency_limit: self.concurrency_limit,
169            _phantom: PhantomData,
170        }
171    }
172}
173
174impl<PREFIX> ContextBuilder<PREFIX>
175where
176    PREFIX: ToAssign,
177{
178    /// Set the timeout for all JetStream API requests.
179    pub fn timeout(self, timeout: Duration) -> ContextBuilder<Yes>
180    where
181        Yes: ToAssign,
182    {
183        ContextBuilder {
184            prefix: self.prefix,
185            timeout,
186            semaphore_capacity: self.semaphore_capacity,
187            ack_timeout: self.ack_timeout,
188            backpressure_on_inflight: self.backpressure_on_inflight,
189            concurrency_limit: self.concurrency_limit,
190            _phantom: PhantomData,
191        }
192    }
193
194    /// Sets the maximum time client waits for acks from the server when default backpressure is
195    /// used.
196    pub fn ack_timeout(self, ack_timeout: Duration) -> ContextBuilder<Yes>
197    where
198        Yes: ToAssign,
199    {
200        ContextBuilder {
201            prefix: self.prefix,
202            timeout: self.timeout,
203            semaphore_capacity: self.semaphore_capacity,
204            ack_timeout,
205            backpressure_on_inflight: self.backpressure_on_inflight,
206            concurrency_limit: self.concurrency_limit,
207            _phantom: PhantomData,
208        }
209    }
210
211    /// Sets the maximum number of pending acks that can be in flight at any given time.
212    /// If limit is reached, `publish` throws an error by default, or waits if backpressure is enabled.
213    pub fn max_ack_inflight(self, capacity: usize) -> ContextBuilder<Yes>
214    where
215        Yes: ToAssign,
216    {
217        ContextBuilder {
218            prefix: self.prefix,
219            timeout: self.timeout,
220            semaphore_capacity: capacity,
221            ack_timeout: self.ack_timeout,
222            backpressure_on_inflight: self.backpressure_on_inflight,
223            concurrency_limit: self.concurrency_limit,
224            _phantom: PhantomData,
225        }
226    }
227
228    /// Enable or disable backpressure when max inflight acks is reached.
229    /// When enabled, publish will wait for permits to become available instead of returning an
230    /// error.
231    /// Default is false (errors on max inflight).
232    pub fn backpressure_on_inflight(self, enabled: bool) -> ContextBuilder<Yes>
233    where
234        Yes: ToAssign,
235    {
236        ContextBuilder {
237            prefix: self.prefix,
238            timeout: self.timeout,
239            semaphore_capacity: self.semaphore_capacity,
240            ack_timeout: self.ack_timeout,
241            backpressure_on_inflight: enabled,
242            concurrency_limit: self.concurrency_limit,
243            _phantom: PhantomData,
244        }
245    }
246
247    /// Sets the concurrency limit for the ack handler task. This might be useful
248    /// in scenarios where Tokio runtime is under heavy load.
249    pub fn concurrency_limit(self, limit: Option<usize>) -> ContextBuilder<Yes>
250    where
251        Yes: ToAssign,
252    {
253        ContextBuilder {
254            prefix: self.prefix,
255            timeout: self.timeout,
256            semaphore_capacity: self.semaphore_capacity,
257            ack_timeout: self.ack_timeout,
258            backpressure_on_inflight: self.backpressure_on_inflight,
259            concurrency_limit: limit,
260            _phantom: PhantomData,
261        }
262    }
263
264    /// Build the [Context] with the given settings.
265    pub fn build(self, client: Client) -> Context {
266        let (tx, rx) = tokio::sync::mpsc::channel::<(
267            oneshot::Receiver<Message>,
268            OwnedSemaphorePermit,
269        )>(self.semaphore_capacity);
270        let stream = ReceiverStream::new(rx);
271        spawn_acker(stream, self.ack_timeout, self.concurrency_limit);
272        Context {
273            client,
274            prefix: self.prefix,
275            timeout: self.timeout,
276            max_ack_semaphore: Arc::new(tokio::sync::Semaphore::new(self.semaphore_capacity)),
277            ack_sender: tx,
278            backpressure_on_inflight: self.backpressure_on_inflight,
279            semaphore_capacity: self.semaphore_capacity,
280        }
281    }
282}
283
284impl Context {
285    pub(crate) fn new(client: Client) -> Context {
286        ContextBuilder::default().build(client)
287    }
288
289    /// Sets the timeout for all JetStream API requests.
290    pub fn set_timeout(&mut self, timeout: Duration) {
291        self.timeout = timeout
292    }
293
294    /// Waits until all pending `acks` are received from the server.
295    /// Be aware that this is probably not the way you want to await `acks`,
296    /// as it will wait for every `ack` that is pending, including those that might
297    /// be published after you call this method.
298    /// Useful in testing, or maybe batching.
299    pub async fn wait_for_acks(&self) {
300        self.max_ack_semaphore
301            .acquire_many(self.semaphore_capacity as u32)
302            .await
303            .ok();
304    }
305
306    /// Create a new [Context] with given API prefix.
307    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
308        ContextBuilder::new()
309            .api_prefix(prefix.to_string())
310            .build(client)
311    }
312
313    /// Create a new [Context] with given domain.
314    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
315        ContextBuilder::new().domain(domain.as_ref()).build(client)
316    }
317
318    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
319    /// acknowledgment from the server that the message has been successfully delivered.
320    ///
321    /// Acknowledgment future that can be polled is returned instead.
322    ///
323    /// If the stream does not exist, `no responders` error will be returned.
324    ///
325    /// # Examples
326    ///
327    /// Publish, and after each publish, await for acknowledgment.
328    ///
329    /// ```no_run
330    /// # #[tokio::main]
331    /// # async fn main() -> Result<(), async_nats::Error> {
332    /// let client = async_nats::connect("localhost:4222").await?;
333    /// let jetstream = async_nats::jetstream::new(client);
334    ///
335    /// let ack = jetstream.publish("events", "data".into()).await?;
336    /// ack.await?;
337    /// jetstream.publish("events", "data".into()).await?.await?;
338    /// # Ok(())
339    /// # }
340    /// ```
341    ///
342    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
343    /// ignored entirely.
344    ///
345    /// ```no_run
346    /// # #[tokio::main]
347    /// # async fn main() -> Result<(), async_nats::Error> {
348    /// let client = async_nats::connect("localhost:4222").await?;
349    /// let jetstream = async_nats::jetstream::new(client);
350    ///
351    /// let first_ack = jetstream.publish("events", "data".into()).await?;
352    /// let second_ack = jetstream.publish("events", "data".into()).await?;
353    /// first_ack.await?;
354    /// second_ack.await?;
355    /// # Ok(())
356    /// # }
357    /// ```
358    pub async fn publish<S: ToSubject>(
359        &self,
360        subject: S,
361        payload: Bytes,
362    ) -> Result<PublishAckFuture, PublishError> {
363        self.send_publish(subject, Publish::build().payload(payload))
364            .await
365    }
366
367    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
368    /// the server that the message has been successfully delivered.
369    ///
370    /// If the stream does not exist, `no responders` error will be returned.
371    ///
372    /// # Examples
373    ///
374    /// ```no_run
375    /// # #[tokio::main]
376    /// # async fn main() -> Result<(), async_nats::Error> {
377    /// let client = async_nats::connect("localhost:4222").await?;
378    /// let jetstream = async_nats::jetstream::new(client);
379    ///
380    /// let mut headers = async_nats::HeaderMap::new();
381    /// headers.append("X-key", "Value");
382    /// let ack = jetstream
383    ///     .publish_with_headers("events", headers, "data".into())
384    ///     .await?;
385    /// # Ok(())
386    /// # }
387    /// ```
388    pub async fn publish_with_headers<S: ToSubject>(
389        &self,
390        subject: S,
391        headers: crate::header::HeaderMap,
392        payload: Bytes,
393    ) -> Result<PublishAckFuture, PublishError> {
394        self.send_publish(subject, Publish::build().payload(payload).headers(headers))
395            .await
396    }
397
398    /// Publish a message built by [Publish] and returns an acknowledgment future.
399    ///
400    /// If the stream does not exist, `no responders` error will be returned.
401    ///
402    /// # Examples
403    ///
404    /// ```no_run
405    /// # use async_nats::jetstream::context::Publish;
406    /// # #[tokio::main]
407    /// # async fn main() -> Result<(), async_nats::Error> {
408    /// let client = async_nats::connect("localhost:4222").await?;
409    /// let jetstream = async_nats::jetstream::new(client);
410    ///
411    /// let ack = jetstream
412    ///     .send_publish(
413    ///         "events",
414    ///         Publish::build().payload("data".into()).message_id("uuid"),
415    ///     )
416    ///     .await?;
417    /// # Ok(())
418    /// # }
419    /// ```
420    pub async fn send_publish<S: ToSubject>(
421        &self,
422        subject: S,
423        publish: Publish,
424    ) -> Result<PublishAckFuture, PublishError> {
425        let permit = if self.backpressure_on_inflight {
426            // When backpressure is enabled, wait for a permit to become available
427            self.max_ack_semaphore
428                .clone()
429                .acquire_owned()
430                .await
431                .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?
432        } else {
433            // When backpressure is disabled, error immediately if no permits available
434            self.max_ack_semaphore
435                .clone()
436                .try_acquire_owned()
437                .map_err(|err| match err {
438                    TryAcquireError::NoPermits => {
439                        PublishError::new(PublishErrorKind::MaxAckPending)
440                    }
441                    _ => PublishError::with_source(PublishErrorKind::Other, err),
442                })?
443        };
444        let subject = subject.to_subject();
445        let (sender, receiver) = oneshot::channel();
446
447        let respond = self.client.new_inbox().into();
448
449        let send_fut = self
450            .client
451            .sender
452            .send(Command::Request {
453                subject,
454                payload: publish.payload,
455                respond,
456                headers: publish.headers,
457                sender,
458            })
459            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
460
461        tokio::time::timeout(self.timeout, send_fut)
462            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
463            .await??;
464
465        Ok(PublishAckFuture {
466            timeout: self.timeout,
467            subscription: Some(receiver),
468            permit: Some(permit),
469            tx: self.ack_sender.clone(),
470        })
471    }
472
473    /// Query the server for account information
474    pub async fn query_account(&self) -> Result<Account, AccountError> {
475        let response: Response<Account> = self.request("INFO", b"").await?;
476
477        match response {
478            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
479            Response::Ok(account) => Ok(account),
480        }
481    }
482
483    /// Create a JetStream [Stream] with given config and return a handle to it.
484    /// That handle can be used to manage and use [Consumer].
485    ///
486    /// # Examples
487    ///
488    /// ```no_run
489    /// # #[tokio::main]
490    /// # async fn main() -> Result<(), async_nats::Error> {
491    /// use async_nats::jetstream::stream::Config;
492    /// use async_nats::jetstream::stream::DiscardPolicy;
493    /// let client = async_nats::connect("localhost:4222").await?;
494    /// let jetstream = async_nats::jetstream::new(client);
495    ///
496    /// let stream = jetstream
497    ///     .create_stream(Config {
498    ///         name: "events".to_string(),
499    ///         max_messages: 100_000,
500    ///         discard: DiscardPolicy::Old,
501    ///         ..Default::default()
502    ///     })
503    ///     .await?;
504    /// # Ok(())
505    /// # }
506    /// ```
507    pub async fn create_stream<S>(
508        &self,
509        stream_config: S,
510    ) -> Result<Stream<Info>, CreateStreamError>
511    where
512        Config: From<S>,
513    {
514        let mut config: Config = stream_config.into();
515        if config.name.is_empty() {
516            return Err(CreateStreamError::new(
517                CreateStreamErrorKind::EmptyStreamName,
518            ));
519        }
520        if !is_valid_name(config.name.as_str()) {
521            return Err(CreateStreamError::new(
522                CreateStreamErrorKind::InvalidStreamName,
523            ));
524        }
525        if let Some(ref mut mirror) = config.mirror {
526            if let Some(ref mut domain) = mirror.domain {
527                if mirror.external.is_some() {
528                    return Err(CreateStreamError::new(
529                        CreateStreamErrorKind::DomainAndExternalSet,
530                    ));
531                }
532                mirror.external = Some(External {
533                    api_prefix: format!("$JS.{domain}.API"),
534                    delivery_prefix: None,
535                })
536            }
537        }
538
539        if let Some(ref mut sources) = config.sources {
540            for source in sources {
541                if let Some(ref mut domain) = source.domain {
542                    if source.external.is_some() {
543                        return Err(CreateStreamError::new(
544                            CreateStreamErrorKind::DomainAndExternalSet,
545                        ));
546                    }
547                    source.external = Some(External {
548                        api_prefix: format!("$JS.{domain}.API"),
549                        delivery_prefix: None,
550                    })
551                }
552            }
553        }
554        let subject = format!("STREAM.CREATE.{}", config.name);
555        let response: Response<Info> = self.request(subject, &config).await?;
556
557        match response {
558            Response::Err { error } => Err(error.into()),
559            Response::Ok(info) => Ok(Stream {
560                context: self.clone(),
561                info,
562                name: config.name,
563            }),
564        }
565    }
566
567    /// Checks for [Stream] existence on the server and returns handle to it.
568    /// That handle can be used to manage and use [Consumer].
569    /// This variant does not fetch [Stream] info from the server.
570    /// It means it does not check if the stream actually exists.
571    /// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
572    /// If you however run single operations on many streams, this method is more efficient.
573    ///
574    /// # Examples
575    ///
576    /// ```no_run
577    /// # #[tokio::main]
578    /// # async fn main() -> Result<(), async_nats::Error> {
579    /// let client = async_nats::connect("localhost:4222").await?;
580    /// let jetstream = async_nats::jetstream::new(client);
581    ///
582    /// let stream = jetstream.get_stream_no_info("events").await?;
583    /// # Ok(())
584    /// # }
585    /// ```
586    pub async fn get_stream_no_info<T: AsRef<str>>(
587        &self,
588        stream: T,
589    ) -> Result<Stream<()>, GetStreamError> {
590        let stream = stream.as_ref();
591        if stream.is_empty() {
592            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
593        }
594
595        if !is_valid_name(stream) {
596            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
597        }
598
599        Ok(Stream {
600            context: self.clone(),
601            info: (),
602            name: stream.to_string(),
603        })
604    }
605
606    /// Checks for [Stream] existence on the server and returns handle to it.
607    /// That handle can be used to manage and use [Consumer].
608    ///
609    /// # Examples
610    ///
611    /// ```no_run
612    /// # #[tokio::main]
613    /// # async fn main() -> Result<(), async_nats::Error> {
614    /// let client = async_nats::connect("localhost:4222").await?;
615    /// let jetstream = async_nats::jetstream::new(client);
616    ///
617    /// let stream = jetstream.get_stream("events").await?;
618    /// # Ok(())
619    /// # }
620    /// ```
621    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
622        let stream = stream.as_ref();
623        if stream.is_empty() {
624            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
625        }
626
627        if !is_valid_name(stream) {
628            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
629        }
630
631        let subject = format!("STREAM.INFO.{stream}");
632        let request: Response<Info> = self
633            .request(subject, &())
634            .await
635            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
636        match request {
637            Response::Err { error } => {
638                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
639            }
640            Response::Ok(info) => Ok(Stream {
641                context: self.clone(),
642                info,
643                name: stream.to_string(),
644            }),
645        }
646    }
647
648    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
649    ///
650    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
651    ///
652    /// # Examples
653    ///
654    /// ```no_run
655    /// # #[tokio::main]
656    /// # async fn main() -> Result<(), async_nats::Error> {
657    /// use async_nats::jetstream::stream::Config;
658    /// let client = async_nats::connect("localhost:4222").await?;
659    /// let jetstream = async_nats::jetstream::new(client);
660    ///
661    /// let stream = jetstream
662    ///     .get_or_create_stream(Config {
663    ///         name: "events".to_string(),
664    ///         max_messages: 10_000,
665    ///         ..Default::default()
666    ///     })
667    ///     .await?;
668    /// # Ok(())
669    /// # }
670    /// ```
671    pub async fn get_or_create_stream<S>(
672        &self,
673        stream_config: S,
674    ) -> Result<Stream, CreateStreamError>
675    where
676        S: Into<Config>,
677    {
678        let config: Config = stream_config.into();
679
680        if config.name.is_empty() {
681            return Err(CreateStreamError::new(
682                CreateStreamErrorKind::EmptyStreamName,
683            ));
684        }
685
686        if !is_valid_name(config.name.as_str()) {
687            return Err(CreateStreamError::new(
688                CreateStreamErrorKind::InvalidStreamName,
689            ));
690        }
691        let subject = format!("STREAM.INFO.{}", config.name);
692
693        let request: Response<Info> = self.request(subject, &()).await?;
694        match request {
695            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
696            Response::Err { error } => Err(error.into()),
697            Response::Ok(info) => Ok(Stream {
698                context: self.clone(),
699                info,
700                name: config.name,
701            }),
702        }
703    }
704
705    /// Deletes a [Stream] with a given name.
706    ///
707    /// # Examples
708    ///
709    /// ```no_run
710    /// # #[tokio::main]
711    /// # async fn main() -> Result<(), async_nats::Error> {
712    /// use async_nats::jetstream::stream::Config;
713    /// let client = async_nats::connect("localhost:4222").await?;
714    /// let jetstream = async_nats::jetstream::new(client);
715    ///
716    /// let stream = jetstream.delete_stream("events").await?;
717    /// # Ok(())
718    /// # }
719    /// ```
720    pub async fn delete_stream<T: AsRef<str>>(
721        &self,
722        stream: T,
723    ) -> Result<DeleteStatus, DeleteStreamError> {
724        let stream = stream.as_ref();
725        if stream.is_empty() {
726            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
727        }
728
729        if !is_valid_name(stream) {
730            return Err(DeleteStreamError::new(
731                DeleteStreamErrorKind::InvalidStreamName,
732            ));
733        }
734
735        let subject = format!("STREAM.DELETE.{stream}");
736        match self
737            .request(subject, &json!({}))
738            .await
739            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
740        {
741            Response::Err { error } => Err(DeleteStreamError::new(
742                DeleteStreamErrorKind::JetStream(error),
743            )),
744            Response::Ok(delete_response) => Ok(delete_response),
745        }
746    }
747
748    /// Updates a [Stream] with a given config. If specific field cannot be updated,
749    /// error is returned.
750    ///
751    /// # Examples
752    ///
753    /// ```no_run
754    /// # #[tokio::main]
755    /// # async fn main() -> Result<(), async_nats::Error> {
756    /// use async_nats::jetstream::stream::Config;
757    /// use async_nats::jetstream::stream::DiscardPolicy;
758    /// let client = async_nats::connect("localhost:4222").await?;
759    /// let jetstream = async_nats::jetstream::new(client);
760    ///
761    /// let stream = jetstream
762    ///     .update_stream(Config {
763    ///         name: "events".to_string(),
764    ///         discard: DiscardPolicy::New,
765    ///         max_messages: 50_000,
766    ///         ..Default::default()
767    ///     })
768    ///     .await?;
769    /// # Ok(())
770    /// # }
771    /// ```
772    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
773    where
774        S: Borrow<Config>,
775    {
776        let config = config.borrow();
777
778        if config.name.is_empty() {
779            return Err(CreateStreamError::new(
780                CreateStreamErrorKind::EmptyStreamName,
781            ));
782        }
783
784        if !is_valid_name(config.name.as_str()) {
785            return Err(CreateStreamError::new(
786                CreateStreamErrorKind::InvalidStreamName,
787            ));
788        }
789
790        let subject = format!("STREAM.UPDATE.{}", config.name);
791        match self.request(subject, config).await? {
792            Response::Err { error } => Err(error.into()),
793            Response::Ok(info) => Ok(info),
794        }
795    }
796
797    /// Tries to update a [Stream] with a given config, and if it does not exist, create it.
798    ///
799    /// # Examples
800    ///
801    /// ```no_run
802    /// # #[tokio::main]
803    /// # async fn main() -> Result<(), async_nats::Error> {
804    /// use async_nats::jetstream::stream::Config;
805    /// use async_nats::jetstream::stream::DiscardPolicy;
806    /// let client = async_nats::connect("localhost:4222").await?;
807    /// let jetstream = async_nats::jetstream::new(client);
808    ///
809    /// let stream = jetstream
810    ///     .create_or_update_stream(Config {
811    ///         name: "events".to_string(),
812    ///         discard: DiscardPolicy::New,
813    ///         max_messages: 50_000,
814    ///         ..Default::default()
815    ///     })
816    ///     .await?;
817    /// # Ok(())
818    /// # }
819    /// ```
820    pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
821        match self.update_stream(config.clone()).await {
822            Ok(stream) => Ok(stream),
823            Err(err) => match err.kind() {
824                CreateStreamErrorKind::NotFound => {
825                    let stream = self
826                        .create_stream(config)
827                        .await
828                        .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
829                    Ok(stream.info)
830                }
831                _ => Err(err),
832            },
833        }
834    }
835
836    /// Looks up Stream that contains provided subject.
837    ///
838    /// # Examples
839    ///
840    /// ```no_run
841    /// # #[tokio::main]
842    /// # async fn main() -> Result<(), async_nats::Error> {
843    /// use futures_util::TryStreamExt;
844    /// let client = async_nats::connect("demo.nats.io:4222").await?;
845    /// let jetstream = async_nats::jetstream::new(client);
846    /// let stream_name = jetstream.stream_by_subject("foo.>");
847    /// # Ok(())
848    /// # }
849    /// ```
850    pub async fn stream_by_subject<T: Into<String>>(
851        &self,
852        subject: T,
853    ) -> Result<String, GetStreamByNameError> {
854        let subject = subject.into();
855        if !is_valid_subject(subject.as_str()) {
856            return Err(GetStreamByNameError::new(
857                GetStreamByNameErrorKind::InvalidSubject,
858            ));
859        }
860        let mut names = StreamNames {
861            context: self.clone(),
862            offset: 0,
863            page_request: None,
864            streams: Vec::new(),
865            subject: Some(subject),
866            done: false,
867        };
868        match names.next().await {
869            Some(name) => match name {
870                Ok(name) => Ok(name),
871                Err(err) => Err(GetStreamByNameError::with_source(
872                    GetStreamByNameErrorKind::Request,
873                    err,
874                )),
875            },
876            None => Err(GetStreamByNameError::new(
877                GetStreamByNameErrorKind::NotFound,
878            )),
879        }
880    }
881
882    /// Lists names of all streams for current context.
883    ///
884    /// # Examples
885    ///
886    /// ```no_run
887    /// # #[tokio::main]
888    /// # async fn main() -> Result<(), async_nats::Error> {
889    /// use futures_util::TryStreamExt;
890    /// let client = async_nats::connect("demo.nats.io:4222").await?;
891    /// let jetstream = async_nats::jetstream::new(client);
892    /// let mut names = jetstream.stream_names();
893    /// while let Some(stream) = names.try_next().await? {
894    ///     println!("stream: {}", stream);
895    /// }
896    /// # Ok(())
897    /// # }
898    /// ```
899    pub fn stream_names(&self) -> StreamNames {
900        StreamNames {
901            context: self.clone(),
902            offset: 0,
903            page_request: None,
904            streams: Vec::new(),
905            subject: None,
906            done: false,
907        }
908    }
909
910    /// Lists all streams info for current context.
911    ///
912    /// # Examples
913    ///
914    /// ```no_run
915    /// # #[tokio::main]
916    /// # async fn main() -> Result<(), async_nats::Error> {
917    /// use futures_util::TryStreamExt;
918    /// let client = async_nats::connect("demo.nats.io:4222").await?;
919    /// let jetstream = async_nats::jetstream::new(client);
920    /// let mut streams = jetstream.streams();
921    /// while let Some(stream) = streams.try_next().await? {
922    ///     println!("stream: {:?}", stream);
923    /// }
924    /// # Ok(())
925    /// # }
926    /// ```
927    pub fn streams(&self) -> Streams {
928        Streams {
929            context: self.clone(),
930            offset: 0,
931            page_request: None,
932            streams: Vec::new(),
933            done: false,
934        }
935    }
936    /// Returns an existing key-value bucket.
937    ///
938    /// # Examples
939    ///
940    /// ```no_run
941    /// # #[tokio::main]
942    /// # async fn main() -> Result<(), async_nats::Error> {
943    /// let client = async_nats::connect("demo.nats.io:4222").await?;
944    /// let jetstream = async_nats::jetstream::new(client);
945    /// let kv = jetstream.get_key_value("bucket").await?;
946    /// # Ok(())
947    /// # }
948    /// ```
949    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
950        let bucket: String = bucket.into();
951        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
952            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
953        }
954
955        let stream_name = format!("KV_{}", &bucket);
956        let stream = self
957            .get_stream(stream_name.clone())
958            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
959            .await?;
960
961        if stream.info.config.max_messages_per_subject < 1 {
962            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
963        }
964        let mut store = Store {
965            prefix: format!("$KV.{}.", &bucket),
966            name: bucket,
967            stream_name,
968            stream: stream.clone(),
969            put_prefix: None,
970            use_jetstream_prefix: self.prefix != "$JS.API",
971        };
972        if let Some(ref mirror) = stream.info.config.mirror {
973            let bucket = mirror.name.trim_start_matches("KV_");
974            if let Some(ref external) = mirror.external {
975                if !external.api_prefix.is_empty() {
976                    store.use_jetstream_prefix = false;
977                    store.prefix = format!("$KV.{bucket}.");
978                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
979                } else {
980                    store.put_prefix = Some(format!("$KV.{bucket}."));
981                }
982            }
983        };
984
985        Ok(store)
986    }
987
988    /// Creates a new key-value bucket.
989    ///
990    /// # Examples
991    ///
992    /// ```no_run
993    /// # #[tokio::main]
994    /// # async fn main() -> Result<(), async_nats::Error> {
995    /// let client = async_nats::connect("demo.nats.io:4222").await?;
996    /// let jetstream = async_nats::jetstream::new(client);
997    /// let kv = jetstream
998    ///     .create_key_value(async_nats::jetstream::kv::Config {
999    ///         bucket: "kv".to_string(),
1000    ///         history: 10,
1001    ///         ..Default::default()
1002    ///     })
1003    ///     .await?;
1004    /// # Ok(())
1005    /// # }
1006    /// ```
1007    pub async fn create_key_value(
1008        &self,
1009        config: crate::jetstream::kv::Config,
1010    ) -> Result<Store, CreateKeyValueError> {
1011        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1012            return Err(CreateKeyValueError::new(
1013                CreateKeyValueErrorKind::InvalidStoreName,
1014            ));
1015        }
1016        let info = self.query_account().await.map_err(|err| {
1017            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1018        })?;
1019
1020        let bucket_name = config.bucket.clone();
1021        let stream_config = kv_to_stream_config(config, info)?;
1022
1023        let stream = self.create_stream(stream_config).await.map_err(|err| {
1024            if err.kind() == CreateStreamErrorKind::TimedOut {
1025                CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
1026            } else {
1027                CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1028            }
1029        })?;
1030
1031        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1032    }
1033
1034    /// Updates an existing key-value bucket.
1035    ///
1036    /// # Examples
1037    ///
1038    /// ```no_run
1039    /// # #[tokio::main]
1040    /// # async fn main() -> Result<(), async_nats::Error> {
1041    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1042    /// let jetstream = async_nats::jetstream::new(client);
1043    /// let kv = jetstream
1044    ///     .update_key_value(async_nats::jetstream::kv::Config {
1045    ///         bucket: "kv".to_string(),
1046    ///         history: 60,
1047    ///         ..Default::default()
1048    ///     })
1049    ///     .await?;
1050    /// # Ok(())
1051    /// # }
1052    /// ```
1053    pub async fn update_key_value(
1054        &self,
1055        config: crate::jetstream::kv::Config,
1056    ) -> Result<Store, UpdateKeyValueError> {
1057        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1058            return Err(UpdateKeyValueError::new(
1059                UpdateKeyValueErrorKind::InvalidStoreName,
1060            ));
1061        }
1062
1063        let stream_name = format!("KV_{}", config.bucket);
1064        let bucket_name = config.bucket.clone();
1065
1066        let account = self.query_account().await.map_err(|err| {
1067            UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
1068        })?;
1069        let stream = self
1070            .update_stream(kv_to_stream_config(config, account)?)
1071            .await
1072            .map_err(|err| match err.kind() {
1073                UpdateStreamErrorKind::NotFound => {
1074                    UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
1075                }
1076                _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
1077            })?;
1078
1079        let stream = Stream {
1080            context: self.clone(),
1081            info: stream,
1082            name: stream_name,
1083        };
1084
1085        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1086    }
1087
1088    /// Tries to update an existing Key-Value bucket. If it does not exist, creates it.
1089    ///
1090    /// # Examples
1091    ///
1092    /// ```no_run
1093    /// # #[tokio::main]
1094    /// # async fn main() -> Result<(), async_nats::Error> {
1095    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1096    /// let jetstream = async_nats::jetstream::new(client);
1097    /// let kv = jetstream
1098    ///     .update_key_value(async_nats::jetstream::kv::Config {
1099    ///         bucket: "kv".to_string(),
1100    ///         history: 60,
1101    ///         ..Default::default()
1102    ///     })
1103    ///     .await?;
1104    /// # Ok(())
1105    /// # }
1106    /// ```
1107    pub async fn create_or_update_key_value(
1108        &self,
1109        config: crate::jetstream::kv::Config,
1110    ) -> Result<Store, CreateKeyValueError> {
1111        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1112            return Err(CreateKeyValueError::new(
1113                CreateKeyValueErrorKind::InvalidStoreName,
1114            ));
1115        }
1116
1117        let bucket_name = config.bucket.clone();
1118        let stream_name = format!("KV_{}", config.bucket);
1119
1120        let account = self.query_account().await.map_err(|err| {
1121            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1122        })?;
1123        let stream = self
1124            .create_or_update_stream(kv_to_stream_config(config, account)?)
1125            .await
1126            .map_err(|err| {
1127                CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
1128            })?;
1129
1130        let stream = Stream {
1131            context: self.clone(),
1132            info: stream,
1133            name: stream_name,
1134        };
1135
1136        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
1137    }
1138
1139    /// Deletes given key-value bucket.
1140    ///
1141    /// # Examples
1142    ///
1143    /// ```no_run
1144    /// # #[tokio::main]
1145    /// # async fn main() -> Result<(), async_nats::Error> {
1146    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1147    /// let jetstream = async_nats::jetstream::new(client);
1148    /// let kv = jetstream
1149    ///     .create_key_value(async_nats::jetstream::kv::Config {
1150    ///         bucket: "kv".to_string(),
1151    ///         history: 10,
1152    ///         ..Default::default()
1153    ///     })
1154    ///     .await?;
1155    /// # Ok(())
1156    /// # }
1157    /// ```
1158    pub async fn delete_key_value<T: AsRef<str>>(
1159        &self,
1160        bucket: T,
1161    ) -> Result<DeleteStatus, KeyValueError> {
1162        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
1163            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
1164        }
1165
1166        let stream_name = format!("KV_{}", bucket.as_ref());
1167        self.delete_stream(stream_name)
1168            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
1169            .await
1170    }
1171
1172    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
1173    //     let config = config.borrow();
1174    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
1175    //         return Err(Box::new(std::io::Error::other(
1176    //             "invalid bucket name",
1177    //         )));
1178    //     }
1179
1180    //     let stream_name = format!("KV_{}", config.bucket);
1181    //     self.update_stream()
1182    //         .await
1183    //         .and_then(|info| Ok(()))
1184    // }
1185
1186    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1187    ///
1188    /// It has one less interaction with the server when binding to only one
1189    /// [crate::jetstream::consumer::Consumer].
1190    ///
1191    /// # Examples:
1192    ///
1193    /// ```no_run
1194    /// # #[tokio::main]
1195    /// # async fn main() -> Result<(), async_nats::Error> {
1196    /// use async_nats::jetstream::consumer::PullConsumer;
1197    ///
1198    /// let client = async_nats::connect("localhost:4222").await?;
1199    /// let jetstream = async_nats::jetstream::new(client);
1200    ///
1201    /// let consumer: PullConsumer = jetstream
1202    ///     .get_consumer_from_stream("consumer", "stream")
1203    ///     .await?;
1204    ///
1205    /// # Ok(())
1206    /// # }
1207    /// ```
1208    pub async fn get_consumer_from_stream<T, C, S>(
1209        &self,
1210        consumer: C,
1211        stream: S,
1212    ) -> Result<Consumer<T>, ConsumerError>
1213    where
1214        T: FromConsumer + IntoConsumerConfig,
1215        S: AsRef<str>,
1216        C: AsRef<str>,
1217    {
1218        if !is_valid_name(stream.as_ref()) {
1219            return Err(ConsumerError::with_source(
1220                ConsumerErrorKind::InvalidName,
1221                "invalid stream",
1222            ));
1223        }
1224
1225        if !is_valid_name(consumer.as_ref()) {
1226            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1227        }
1228
1229        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
1230
1231        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
1232            Response::Ok(info) => info,
1233            Response::Err { error } => return Err(error.into()),
1234        };
1235
1236        Ok(Consumer::new(
1237            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1238                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1239            })?,
1240            info,
1241            self.clone(),
1242        ))
1243    }
1244
1245    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
1246    ///
1247    /// It has one less interaction with the server when binding to only one
1248    /// [crate::jetstream::consumer::Consumer].
1249    ///
1250    /// # Examples:
1251    ///
1252    /// ```no_run
1253    /// # #[tokio::main]
1254    /// # async fn main() -> Result<(), async_nats::Error> {
1255    /// use async_nats::jetstream::consumer::PullConsumer;
1256    ///
1257    /// let client = async_nats::connect("localhost:4222").await?;
1258    /// let jetstream = async_nats::jetstream::new(client);
1259    ///
1260    /// jetstream
1261    ///     .delete_consumer_from_stream("consumer", "stream")
1262    ///     .await?;
1263    ///
1264    /// # Ok(())
1265    /// # }
1266    /// ```
1267    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1268        &self,
1269        consumer: C,
1270        stream: S,
1271    ) -> Result<DeleteStatus, ConsumerError> {
1272        if !is_valid_name(consumer.as_ref()) {
1273            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1274        }
1275
1276        if !is_valid_name(stream.as_ref()) {
1277            return Err(ConsumerError::with_source(
1278                ConsumerErrorKind::Other,
1279                "invalid stream name",
1280            ));
1281        }
1282
1283        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1284
1285        match self.request(subject, &json!({})).await? {
1286            Response::Ok(delete_status) => Ok(delete_status),
1287            Response::Err { error } => Err(error.into()),
1288        }
1289    }
1290
1291    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
1292    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1293    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
1294    ///
1295    /// # Examples
1296    ///
1297    /// ```no_run
1298    /// # #[tokio::main]
1299    /// # async fn main() -> Result<(), async_nats::Error> {
1300    /// use async_nats::jetstream::consumer;
1301    /// let client = async_nats::connect("localhost:4222").await?;
1302    /// let jetstream = async_nats::jetstream::new(client);
1303    ///
1304    /// let consumer: consumer::PullConsumer = jetstream
1305    ///     .create_consumer_on_stream(
1306    ///         consumer::pull::Config {
1307    ///             durable_name: Some("pull".to_string()),
1308    ///             ..Default::default()
1309    ///         },
1310    ///         "stream",
1311    ///     )
1312    ///     .await?;
1313    /// # Ok(())
1314    /// # }
1315    /// ```
1316    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1317        &self,
1318        config: C,
1319        stream: S,
1320    ) -> Result<Consumer<C>, ConsumerError> {
1321        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1322            .await
1323    }
1324
1325    /// Update an existing consumer.
1326    /// This call will fail if the consumer does not exist.
1327    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
1328    ///
1329    /// # Examples
1330    ///
1331    /// ```no_run
1332    /// # #[tokio::main]
1333    /// # async fn main() -> Result<(), async_nats::Error> {
1334    /// use async_nats::jetstream::consumer;
1335    /// let client = async_nats::connect("localhost:4222").await?;
1336    /// let jetstream = async_nats::jetstream::new(client);
1337    ///
1338    /// let consumer: consumer::PullConsumer = jetstream
1339    ///     .update_consumer_on_stream(
1340    ///         consumer::pull::Config {
1341    ///             durable_name: Some("pull".to_string()),
1342    ///             description: Some("updated pull consumer".to_string()),
1343    ///             ..Default::default()
1344    ///         },
1345    ///         "stream",
1346    ///     )
1347    ///     .await?;
1348    /// # Ok(())
1349    /// # }
1350    /// ```
1351    #[cfg(feature = "server_2_10")]
1352    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1353        &self,
1354        config: C,
1355        stream: S,
1356    ) -> Result<Consumer<C>, ConsumerUpdateError> {
1357        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1358            .await
1359            .map_err(|err| err.into())
1360    }
1361
1362    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
1363    /// the same.
1364    /// This method will fail if consumer is already present with different config.
1365    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1366    ///
1367    /// # Examples
1368    ///
1369    /// ```no_run
1370    /// # #[tokio::main]
1371    /// # async fn main() -> Result<(), async_nats::Error> {
1372    /// use async_nats::jetstream::consumer;
1373    /// let client = async_nats::connect("localhost:4222").await?;
1374    /// let jetstream = async_nats::jetstream::new(client);
1375    ///
1376    /// let consumer: consumer::PullConsumer = jetstream
1377    ///     .create_consumer_strict_on_stream(
1378    ///         consumer::pull::Config {
1379    ///             durable_name: Some("pull".to_string()),
1380    ///             ..Default::default()
1381    ///         },
1382    ///         "stream",
1383    ///     )
1384    ///     .await?;
1385    /// # Ok(())
1386    /// # }
1387    /// ```
1388    #[cfg(feature = "server_2_10")]
1389    pub async fn create_consumer_strict_on_stream<
1390        C: IntoConsumerConfig + FromConsumer,
1391        S: AsRef<str>,
1392    >(
1393        &self,
1394        config: C,
1395        stream: S,
1396    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1397        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1398            .await
1399            .map_err(|err| err.into())
1400    }
1401
1402    async fn create_consumer_on_stream_action<
1403        C: IntoConsumerConfig + FromConsumer,
1404        S: AsRef<str>,
1405    >(
1406        &self,
1407        config: C,
1408        stream: S,
1409        action: ConsumerAction,
1410    ) -> Result<Consumer<C>, ConsumerError> {
1411        let config = config.into_consumer_config();
1412
1413        let subject = {
1414            let filter = if config.filter_subject.is_empty() {
1415                "".to_string()
1416            } else {
1417                format!(".{}", config.filter_subject)
1418            };
1419            config
1420                .name
1421                .as_ref()
1422                .or(config.durable_name.as_ref())
1423                .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1424                .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1425        };
1426
1427        match self
1428            .request(
1429                subject,
1430                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1431            )
1432            .await?
1433        {
1434            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1435            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1436                FromConsumer::try_from_consumer_config(info.clone().config)
1437                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1438                info,
1439                self.clone(),
1440            )),
1441        }
1442    }
1443
1444    /// Send a request to the jetstream JSON API.
1445    ///
1446    /// This is a low level API used mostly internally, that should be used only in
1447    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1448    ///
1449    /// # Examples
1450    ///
1451    /// ```no_run
1452    /// # use async_nats::jetstream::stream::Info;
1453    /// # use async_nats::jetstream::response::Response;
1454    /// # #[tokio::main]
1455    /// # async fn main() -> Result<(), async_nats::Error> {
1456    /// let client = async_nats::connect("localhost:4222").await?;
1457    /// let jetstream = async_nats::jetstream::new(client);
1458    ///
1459    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1460    /// # Ok(())
1461    /// # }
1462    /// ```
1463    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1464    where
1465        S: ToSubject,
1466        T: ?Sized + Serialize,
1467        V: DeserializeOwned,
1468    {
1469        let subject = subject.to_subject();
1470        let request = serde_json::to_vec(&payload)
1471            .map(Bytes::from)
1472            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1473
1474        debug!("JetStream request sent: {:?}", request);
1475
1476        let message = self
1477            .client
1478            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1479            .await;
1480        let message = message?;
1481        debug!(
1482            "JetStream request response: {:?}",
1483            from_utf8(&message.payload)
1484        );
1485        let response = serde_json::from_slice(message.payload.as_ref())
1486            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1487
1488        Ok(response)
1489    }
1490
1491    /// Creates a new object store bucket.
1492    ///
1493    /// # Examples
1494    ///
1495    /// ```no_run
1496    /// # #[tokio::main]
1497    /// # async fn main() -> Result<(), async_nats::Error> {
1498    /// let client = async_nats::connect("demo.nats.io").await?;
1499    /// let jetstream = async_nats::jetstream::new(client);
1500    /// let bucket = jetstream
1501    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1502    ///         bucket: "bucket".to_string(),
1503    ///         ..Default::default()
1504    ///     })
1505    ///     .await?;
1506    /// # Ok(())
1507    /// # }
1508    /// ```
1509    pub async fn create_object_store(
1510        &self,
1511        config: super::object_store::Config,
1512    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1513        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1514            return Err(CreateObjectStoreError::new(
1515                CreateKeyValueErrorKind::InvalidStoreName,
1516            ));
1517        }
1518
1519        let bucket_name = config.bucket.clone();
1520        let stream_name = format!("OBJ_{bucket_name}");
1521        let chunk_subject = format!("$O.{bucket_name}.C.>");
1522        let meta_subject = format!("$O.{bucket_name}.M.>");
1523
1524        let stream = self
1525            .create_stream(super::stream::Config {
1526                name: stream_name,
1527                description: config.description.clone(),
1528                subjects: vec![chunk_subject, meta_subject],
1529                max_age: config.max_age,
1530                max_bytes: config.max_bytes,
1531                storage: config.storage,
1532                num_replicas: config.num_replicas,
1533                discard: DiscardPolicy::New,
1534                allow_rollup: true,
1535                allow_direct: true,
1536                #[cfg(feature = "server_2_10")]
1537                compression: if config.compression {
1538                    Some(Compression::S2)
1539                } else {
1540                    None
1541                },
1542                placement: config.placement,
1543                ..Default::default()
1544            })
1545            .await
1546            .map_err(|err| {
1547                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1548            })?;
1549
1550        Ok(ObjectStore {
1551            name: bucket_name,
1552            stream,
1553        })
1554    }
1555
1556    /// Get an existing object store bucket.
1557    ///
1558    /// # Examples
1559    ///
1560    /// ```no_run
1561    /// # #[tokio::main]
1562    /// # async fn main() -> Result<(), async_nats::Error> {
1563    /// let client = async_nats::connect("demo.nats.io").await?;
1564    /// let jetstream = async_nats::jetstream::new(client);
1565    /// let bucket = jetstream.get_object_store("bucket").await?;
1566    /// # Ok(())
1567    /// # }
1568    /// ```
1569    pub async fn get_object_store<T: AsRef<str>>(
1570        &self,
1571        bucket_name: T,
1572    ) -> Result<ObjectStore, ObjectStoreError> {
1573        let bucket_name = bucket_name.as_ref();
1574        if !is_valid_bucket_name(bucket_name) {
1575            return Err(ObjectStoreError::new(
1576                ObjectStoreErrorKind::InvalidBucketName,
1577            ));
1578        }
1579        let stream_name = format!("OBJ_{bucket_name}");
1580        let stream = self
1581            .get_stream(stream_name)
1582            .await
1583            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1584
1585        Ok(ObjectStore {
1586            name: bucket_name.to_string(),
1587            stream,
1588        })
1589    }
1590
1591    /// Delete a object store bucket.
1592    ///
1593    /// # Examples
1594    ///
1595    /// ```no_run
1596    /// # #[tokio::main]
1597    /// # async fn main() -> Result<(), async_nats::Error> {
1598    /// let client = async_nats::connect("demo.nats.io").await?;
1599    /// let jetstream = async_nats::jetstream::new(client);
1600    /// let bucket = jetstream.delete_object_store("bucket").await?;
1601    /// # Ok(())
1602    /// # }
1603    /// ```
1604    pub async fn delete_object_store<T: AsRef<str>>(
1605        &self,
1606        bucket_name: T,
1607    ) -> Result<(), DeleteObjectStore> {
1608        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1609        self.delete_stream(stream_name)
1610            .await
1611            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1612        Ok(())
1613    }
1614}
1615
1616#[derive(Clone, Copy, Debug, PartialEq)]
1617pub enum PublishErrorKind {
1618    StreamNotFound,
1619    WrongLastMessageId,
1620    WrongLastSequence,
1621    TimedOut,
1622    BrokenPipe,
1623    MaxAckPending,
1624    Other,
1625}
1626
1627impl Display for PublishErrorKind {
1628    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1629        match self {
1630            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1631            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1632            Self::Other => write!(f, "publish failed"),
1633            Self::BrokenPipe => write!(f, "broken pipe"),
1634            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1635            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1636            Self::MaxAckPending => write!(f, "max ack pending reached"),
1637        }
1638    }
1639}
1640
1641pub type PublishError = Error<PublishErrorKind>;
1642
1643#[derive(Debug)]
1644pub struct PublishAckFuture {
1645    timeout: Duration,
1646    subscription: Option<oneshot::Receiver<Message>>,
1647    permit: Option<OwnedSemaphorePermit>,
1648    tx: mpsc::Sender<(oneshot::Receiver<Message>, OwnedSemaphorePermit)>,
1649}
1650
1651impl Drop for PublishAckFuture {
1652    fn drop(&mut self) {
1653        if let (Some(sub), Some(permit)) = (self.subscription.take(), self.permit.take()) {
1654            if let Err(err) = self.tx.try_send((sub, permit)) {
1655                tracing::warn!("failed to pass future permit to the acker: {}", err);
1656            }
1657        }
1658    }
1659}
1660
1661impl PublishAckFuture {
1662    async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
1663        let next = tokio::time::timeout(self.timeout, self.subscription.take().unwrap())
1664            .await
1665            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1666        next.map_or_else(
1667            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1668            |m| {
1669                if m.status == Some(StatusCode::NO_RESPONDERS) {
1670                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1671                }
1672                let response = serde_json::from_slice(m.payload.as_ref())
1673                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1674                match response {
1675                    Response::Err { error } => match error.error_code() {
1676                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1677                            PublishErrorKind::WrongLastMessageId,
1678                            error,
1679                        )),
1680                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1681                            PublishErrorKind::WrongLastSequence,
1682                            error,
1683                        )),
1684                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1685                    },
1686                    Response::Ok(publish_ack) => Ok(publish_ack),
1687                }
1688            },
1689        )
1690    }
1691}
1692impl IntoFuture for PublishAckFuture {
1693    type Output = Result<PublishAck, PublishError>;
1694
1695    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1696
1697    fn into_future(self) -> Self::IntoFuture {
1698        Box::pin(std::future::IntoFuture::into_future(
1699            self.next_with_timeout(),
1700        ))
1701    }
1702}
1703
1704#[derive(Deserialize, Debug)]
1705struct StreamPage {
1706    total: usize,
1707    streams: Option<Vec<String>>,
1708}
1709
1710#[derive(Deserialize, Debug)]
1711struct StreamInfoPage {
1712    total: usize,
1713    streams: Option<Vec<super::stream::Info>>,
1714}
1715
1716type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1717
1718pub struct StreamNames {
1719    context: Context,
1720    offset: usize,
1721    page_request: Option<PageRequest>,
1722    subject: Option<String>,
1723    streams: Vec<String>,
1724    done: bool,
1725}
1726
1727impl futures_util::Stream for StreamNames {
1728    type Item = Result<String, StreamsError>;
1729
1730    fn poll_next(
1731        mut self: Pin<&mut Self>,
1732        cx: &mut std::task::Context<'_>,
1733    ) -> std::task::Poll<Option<Self::Item>> {
1734        match self.page_request.as_mut() {
1735            Some(page) => match page.try_poll_unpin(cx) {
1736                std::task::Poll::Ready(page) => {
1737                    self.page_request = None;
1738                    let page = page
1739                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1740                    if let Some(streams) = page.streams {
1741                        self.offset += streams.len();
1742                        self.streams = streams;
1743                        if self.offset >= page.total {
1744                            self.done = true;
1745                        }
1746                        match self.streams.pop() {
1747                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1748                            None => Poll::Ready(None),
1749                        }
1750                    } else {
1751                        Poll::Ready(None)
1752                    }
1753                }
1754                std::task::Poll::Pending => std::task::Poll::Pending,
1755            },
1756            None => {
1757                if let Some(stream) = self.streams.pop() {
1758                    Poll::Ready(Some(Ok(stream)))
1759                } else {
1760                    if self.done {
1761                        return Poll::Ready(None);
1762                    }
1763                    let context = self.context.clone();
1764                    let offset = self.offset;
1765                    let subject = self.subject.clone();
1766                    self.page_request = Some(Box::pin(async move {
1767                        match context
1768                            .request(
1769                                "STREAM.NAMES",
1770                                &json!({
1771                                    "offset": offset,
1772                                    "subject": subject
1773                                }),
1774                            )
1775                            .await?
1776                        {
1777                            Response::Err { error } => {
1778                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1779                            }
1780                            Response::Ok(page) => Ok(page),
1781                        }
1782                    }));
1783                    self.poll_next(cx)
1784                }
1785            }
1786        }
1787    }
1788}
1789
1790type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1791
1792pub type StreamsErrorKind = RequestErrorKind;
1793pub type StreamsError = RequestError;
1794
1795pub struct Streams {
1796    context: Context,
1797    offset: usize,
1798    page_request: Option<PageInfoRequest>,
1799    streams: Vec<super::stream::Info>,
1800    done: bool,
1801}
1802
1803impl futures_util::Stream for Streams {
1804    type Item = Result<super::stream::Info, StreamsError>;
1805
1806    fn poll_next(
1807        mut self: Pin<&mut Self>,
1808        cx: &mut std::task::Context<'_>,
1809    ) -> std::task::Poll<Option<Self::Item>> {
1810        match self.page_request.as_mut() {
1811            Some(page) => match page.try_poll_unpin(cx) {
1812                std::task::Poll::Ready(page) => {
1813                    self.page_request = None;
1814                    let page = page
1815                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1816                    if let Some(streams) = page.streams {
1817                        self.offset += streams.len();
1818                        self.streams = streams;
1819                        if self.offset >= page.total {
1820                            self.done = true;
1821                        }
1822                        match self.streams.pop() {
1823                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1824                            None => Poll::Ready(None),
1825                        }
1826                    } else {
1827                        Poll::Ready(None)
1828                    }
1829                }
1830                std::task::Poll::Pending => std::task::Poll::Pending,
1831            },
1832            None => {
1833                if let Some(stream) = self.streams.pop() {
1834                    Poll::Ready(Some(Ok(stream)))
1835                } else {
1836                    if self.done {
1837                        return Poll::Ready(None);
1838                    }
1839                    let context = self.context.clone();
1840                    let offset = self.offset;
1841                    self.page_request = Some(Box::pin(async move {
1842                        match context
1843                            .request(
1844                                "STREAM.LIST",
1845                                &json!({
1846                                    "offset": offset,
1847                                }),
1848                            )
1849                            .await?
1850                        {
1851                            Response::Err { error } => {
1852                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1853                            }
1854                            Response::Ok(page) => Ok(page),
1855                        }
1856                    }));
1857                    self.poll_next(cx)
1858                }
1859            }
1860        }
1861    }
1862}
1863/// Used for building customized `publish` message.
1864#[derive(Default, Clone, Debug)]
1865pub struct Publish {
1866    payload: Bytes,
1867    headers: Option<header::HeaderMap>,
1868}
1869impl Publish {
1870    /// Creates a new custom Publish struct to be used with.
1871    pub fn build() -> Self {
1872        Default::default()
1873    }
1874
1875    /// Sets the payload for the message.
1876    pub fn payload(mut self, payload: Bytes) -> Self {
1877        self.payload = payload;
1878        self
1879    }
1880    /// Adds headers to the message.
1881    pub fn headers(mut self, headers: HeaderMap) -> Self {
1882        self.headers = Some(headers);
1883        self
1884    }
1885    /// A shorthand to add a single header.
1886    pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1887        self.headers
1888            .get_or_insert(header::HeaderMap::new())
1889            .insert(name, value);
1890        self
1891    }
1892    /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
1893    pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1894        self.header(header::NATS_MESSAGE_ID, id.as_ref())
1895    }
1896    /// Sets expected last message ID.
1897    /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
1898    pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1899        self.header(
1900            header::NATS_EXPECTED_LAST_MESSAGE_ID,
1901            last_message_id.as_ref(),
1902        )
1903    }
1904    /// Sets the last expected stream sequence.
1905    /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
1906    pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1907        self.header(
1908            header::NATS_EXPECTED_LAST_SEQUENCE,
1909            HeaderValue::from(last_sequence),
1910        )
1911    }
1912    /// Sets the last expected stream sequence for a subject this message will be published to.
1913    /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
1914    pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1915        self.header(
1916            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1917            HeaderValue::from(subject_sequence),
1918        )
1919    }
1920    /// Sets the expected stream name.
1921    /// It sets the `Nats-Expected-Stream` header with provided value.
1922    pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1923        self.header(
1924            header::NATS_EXPECTED_STREAM,
1925            HeaderValue::from(stream.as_ref()),
1926        )
1927    }
1928
1929    #[cfg(feature = "server_2_11")]
1930    /// Sets TTL for a single message.
1931    /// It sets the `Nats-TTL` header with provided value.
1932    pub fn ttl(self, ttl: Duration) -> Self {
1933        self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
1934    }
1935}
1936
1937#[derive(Clone, Copy, Debug, PartialEq)]
1938pub enum RequestErrorKind {
1939    NoResponders,
1940    TimedOut,
1941    Other,
1942}
1943
1944impl Display for RequestErrorKind {
1945    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1946        match self {
1947            Self::TimedOut => write!(f, "timed out"),
1948            Self::Other => write!(f, "request failed"),
1949            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1950        }
1951    }
1952}
1953
1954pub type RequestError = Error<RequestErrorKind>;
1955
1956impl From<crate::RequestError> for RequestError {
1957    fn from(error: crate::RequestError) -> Self {
1958        match error.kind() {
1959            crate::RequestErrorKind::TimedOut => {
1960                RequestError::with_source(RequestErrorKind::TimedOut, error)
1961            }
1962            crate::RequestErrorKind::NoResponders => {
1963                RequestError::new(RequestErrorKind::NoResponders)
1964            }
1965            crate::RequestErrorKind::Other => {
1966                RequestError::with_source(RequestErrorKind::Other, error)
1967            }
1968        }
1969    }
1970}
1971
1972impl From<super::errors::Error> for RequestError {
1973    fn from(err: super::errors::Error) -> Self {
1974        RequestError::with_source(RequestErrorKind::Other, err)
1975    }
1976}
1977
1978pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
1979
1980#[derive(Clone, Debug, PartialEq)]
1981pub enum ConsumerInfoErrorKind {
1982    InvalidName,
1983    Offline,
1984    NotFound,
1985    StreamNotFound,
1986    Request,
1987    JetStream(super::errors::Error),
1988    TimedOut,
1989    NoResponders,
1990}
1991
1992impl Display for ConsumerInfoErrorKind {
1993    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1994        match self {
1995            Self::InvalidName => write!(f, "invalid consumer name"),
1996            Self::Offline => write!(f, "consumer is offline"),
1997            Self::NotFound => write!(f, "consumer not found"),
1998            Self::StreamNotFound => write!(f, "stream not found"),
1999            Self::Request => write!(f, "request error"),
2000            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2001            Self::TimedOut => write!(f, "timed out"),
2002            Self::NoResponders => write!(f, "no responders"),
2003        }
2004    }
2005}
2006
2007impl From<super::errors::Error> for ConsumerInfoError {
2008    fn from(error: super::errors::Error) -> Self {
2009        match error.error_code() {
2010            ErrorCode::CONSUMER_NOT_FOUND => {
2011                ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
2012            }
2013            ErrorCode::STREAM_NOT_FOUND => {
2014                ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
2015            }
2016            ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
2017            _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
2018        }
2019    }
2020}
2021
2022impl From<RequestError> for ConsumerInfoError {
2023    fn from(error: RequestError) -> Self {
2024        match error.kind() {
2025            RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
2026            RequestErrorKind::Other => {
2027                ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
2028            }
2029            RequestErrorKind::NoResponders => {
2030                ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
2031            }
2032        }
2033    }
2034}
2035
2036#[derive(Clone, Debug, PartialEq)]
2037pub enum CreateStreamErrorKind {
2038    EmptyStreamName,
2039    InvalidStreamName,
2040    DomainAndExternalSet,
2041    JetStreamUnavailable,
2042    JetStream(super::errors::Error),
2043    TimedOut,
2044    Response,
2045    NotFound,
2046    ResponseParse,
2047}
2048
2049impl Display for CreateStreamErrorKind {
2050    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2051        match self {
2052            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
2053            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
2054            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
2055            Self::NotFound => write!(f, "stream not found"),
2056            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2057            Self::TimedOut => write!(f, "jetstream request timed out"),
2058            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
2059            Self::ResponseParse => write!(f, "failed to parse server response"),
2060            Self::Response => write!(f, "response error"),
2061        }
2062    }
2063}
2064
2065pub type CreateStreamError = Error<CreateStreamErrorKind>;
2066
2067impl From<super::errors::Error> for CreateStreamError {
2068    fn from(error: super::errors::Error) -> Self {
2069        match error.kind() {
2070            super::errors::ErrorCode::STREAM_NOT_FOUND => {
2071                CreateStreamError::new(CreateStreamErrorKind::NotFound)
2072            }
2073            _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
2074        }
2075    }
2076}
2077
2078impl From<RequestError> for CreateStreamError {
2079    fn from(error: RequestError) -> Self {
2080        match error.kind() {
2081            RequestErrorKind::NoResponders => {
2082                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
2083            }
2084            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
2085            RequestErrorKind::Other => {
2086                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
2087            }
2088        }
2089    }
2090}
2091
2092#[derive(Clone, Debug, PartialEq)]
2093pub enum GetStreamErrorKind {
2094    EmptyName,
2095    Request,
2096    InvalidStreamName,
2097    JetStream(super::errors::Error),
2098}
2099
2100impl Display for GetStreamErrorKind {
2101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2102        match self {
2103            Self::EmptyName => write!(f, "empty name cannot be empty"),
2104            Self::Request => write!(f, "request error"),
2105            Self::InvalidStreamName => write!(f, "invalid stream name"),
2106            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2107        }
2108    }
2109}
2110
2111#[derive(Clone, Debug, PartialEq)]
2112pub enum GetStreamByNameErrorKind {
2113    Request,
2114    NotFound,
2115    InvalidSubject,
2116    JetStream(super::errors::Error),
2117}
2118
2119impl Display for GetStreamByNameErrorKind {
2120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2121        match self {
2122            Self::Request => write!(f, "request error"),
2123            Self::NotFound => write!(f, "stream not found"),
2124            Self::InvalidSubject => write!(f, "invalid subject"),
2125            Self::JetStream(err) => write!(f, "jetstream error: {err}"),
2126        }
2127    }
2128}
2129
2130pub type GetStreamError = Error<GetStreamErrorKind>;
2131pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
2132
2133pub type UpdateStreamError = CreateStreamError;
2134pub type UpdateStreamErrorKind = CreateStreamErrorKind;
2135pub type DeleteStreamError = GetStreamError;
2136pub type DeleteStreamErrorKind = GetStreamErrorKind;
2137
2138#[derive(Clone, Copy, Debug, PartialEq)]
2139pub enum KeyValueErrorKind {
2140    InvalidStoreName,
2141    GetBucket,
2142    JetStream,
2143}
2144
2145impl Display for KeyValueErrorKind {
2146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2147        match self {
2148            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2149            Self::GetBucket => write!(f, "failed to get the bucket"),
2150            Self::JetStream => write!(f, "JetStream error"),
2151        }
2152    }
2153}
2154
2155pub type KeyValueError = Error<KeyValueErrorKind>;
2156
2157#[derive(Clone, Copy, Debug, PartialEq)]
2158pub enum CreateKeyValueErrorKind {
2159    InvalidStoreName,
2160    TooLongHistory,
2161    JetStream,
2162    BucketCreate,
2163    TimedOut,
2164    LimitMarkersNotSupported,
2165}
2166
2167impl Display for CreateKeyValueErrorKind {
2168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2169        match self {
2170            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2171            Self::TooLongHistory => write!(f, "too long history"),
2172            Self::JetStream => write!(f, "JetStream error"),
2173            Self::BucketCreate => write!(f, "bucket creation failed"),
2174            Self::TimedOut => write!(f, "timed out"),
2175            Self::LimitMarkersNotSupported => {
2176                write!(f, "limit markers not supported")
2177            }
2178        }
2179    }
2180}
2181
2182#[derive(Clone, Copy, Debug, PartialEq)]
2183pub enum UpdateKeyValueErrorKind {
2184    InvalidStoreName,
2185    TooLongHistory,
2186    JetStream,
2187    BucketUpdate,
2188    TimedOut,
2189    LimitMarkersNotSupported,
2190    NotFound,
2191}
2192
2193impl Display for UpdateKeyValueErrorKind {
2194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2195        match self {
2196            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
2197            Self::TooLongHistory => write!(f, "too long history"),
2198            Self::JetStream => write!(f, "JetStream error"),
2199            Self::BucketUpdate => write!(f, "bucket creation failed"),
2200            Self::TimedOut => write!(f, "timed out"),
2201            Self::LimitMarkersNotSupported => {
2202                write!(f, "limit markers not supported")
2203            }
2204            Self::NotFound => write!(f, "bucket does not exist"),
2205        }
2206    }
2207}
2208pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
2209pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
2210
2211pub type CreateObjectStoreError = CreateKeyValueError;
2212pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
2213
2214#[derive(Clone, Copy, Debug, PartialEq)]
2215pub enum ObjectStoreErrorKind {
2216    InvalidBucketName,
2217    GetStore,
2218}
2219
2220impl Display for ObjectStoreErrorKind {
2221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2222        match self {
2223            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
2224            Self::GetStore => write!(f, "failed to get Object Store"),
2225        }
2226    }
2227}
2228
2229pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
2230
2231pub type DeleteObjectStore = ObjectStoreError;
2232pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
2233
2234#[derive(Clone, Debug, PartialEq)]
2235pub enum AccountErrorKind {
2236    TimedOut,
2237    JetStream(super::errors::Error),
2238    JetStreamUnavailable,
2239    Other,
2240}
2241
2242impl Display for AccountErrorKind {
2243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2244        match self {
2245            Self::TimedOut => write!(f, "timed out"),
2246            Self::JetStream(err) => write!(f, "JetStream error: {err}"),
2247            Self::Other => write!(f, "error"),
2248            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
2249        }
2250    }
2251}
2252
2253pub type AccountError = Error<AccountErrorKind>;
2254
2255impl From<RequestError> for AccountError {
2256    fn from(err: RequestError) -> Self {
2257        match err.kind {
2258            RequestErrorKind::NoResponders => {
2259                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
2260            }
2261            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2262            RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2263        }
2264    }
2265}
2266
2267#[derive(Clone, Debug, Serialize)]
2268enum ConsumerAction {
2269    #[serde(rename = "")]
2270    CreateOrUpdate,
2271    #[serde(rename = "create")]
2272    #[cfg(feature = "server_2_10")]
2273    Create,
2274    #[serde(rename = "update")]
2275    #[cfg(feature = "server_2_10")]
2276    Update,
2277}
2278
2279// Maps a Stream config to KV Store.
2280fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2281    let mut store = Store {
2282        prefix: format!("$KV.{}.", bucket.as_str()),
2283        name: bucket,
2284        stream: stream.clone(),
2285        stream_name: stream.info.config.name.clone(),
2286        put_prefix: None,
2287        use_jetstream_prefix: prefix != "$JS.API",
2288    };
2289    if let Some(ref mirror) = stream.info.config.mirror {
2290        let bucket = mirror.name.trim_start_matches("KV_");
2291        if let Some(ref external) = mirror.external {
2292            if !external.api_prefix.is_empty() {
2293                store.use_jetstream_prefix = false;
2294                store.prefix = format!("$KV.{bucket}.");
2295                store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2296            } else {
2297                store.put_prefix = Some(format!("$KV.{bucket}."));
2298            }
2299        }
2300    };
2301    store
2302}
2303
2304enum KvToStreamConfigError {
2305    TooLongHistory,
2306    #[allow(dead_code)]
2307    LimitMarkersNotSupported,
2308}
2309
2310impl From<KvToStreamConfigError> for CreateKeyValueError {
2311    fn from(err: KvToStreamConfigError) -> Self {
2312        match err {
2313            KvToStreamConfigError::TooLongHistory => {
2314                CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2315            }
2316            KvToStreamConfigError::LimitMarkersNotSupported => {
2317                CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2318            }
2319        }
2320    }
2321}
2322
2323impl From<KvToStreamConfigError> for UpdateKeyValueError {
2324    fn from(err: KvToStreamConfigError) -> Self {
2325        match err {
2326            KvToStreamConfigError::TooLongHistory => {
2327                UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2328            }
2329            KvToStreamConfigError::LimitMarkersNotSupported => {
2330                UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2331            }
2332        }
2333    }
2334}
2335
2336// Maps the KV config to Stream config.
2337fn kv_to_stream_config(
2338    config: kv::Config,
2339    _account: Account,
2340) -> Result<super::stream::Config, KvToStreamConfigError> {
2341    let history = if config.history > 0 {
2342        if config.history > MAX_HISTORY {
2343            return Err(KvToStreamConfigError::TooLongHistory);
2344        }
2345        config.history
2346    } else {
2347        1
2348    };
2349
2350    let num_replicas = if config.num_replicas == 0 {
2351        1
2352    } else {
2353        config.num_replicas
2354    };
2355
2356    #[cfg(feature = "server_2_11")]
2357    let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2358
2359    #[cfg(feature = "server_2_11")]
2360    if let Some(duration) = config.limit_markers {
2361        if _account.requests.level < 1 {
2362            return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2363        }
2364        allow_message_ttl = true;
2365        subject_delete_marker_ttl = Some(duration);
2366    }
2367
2368    let mut mirror = config.mirror.clone();
2369    let mut sources = config.sources.clone();
2370    let mut mirror_direct = config.mirror_direct;
2371
2372    let mut subjects = Vec::new();
2373    if let Some(ref mut mirror) = mirror {
2374        if !mirror.name.starts_with("KV_") {
2375            mirror.name = format!("KV_{}", mirror.name);
2376        }
2377        mirror_direct = true;
2378    } else if let Some(ref mut sources) = sources {
2379        for source in sources {
2380            if !source.name.starts_with("KV_") {
2381                source.name = format!("KV_{}", source.name);
2382            }
2383        }
2384    } else {
2385        subjects = vec![format!("$KV.{}.>", config.bucket)];
2386    }
2387
2388    Ok(stream::Config {
2389        name: format!("KV_{}", config.bucket),
2390        description: Some(config.description),
2391        subjects,
2392        max_messages_per_subject: history,
2393        max_bytes: config.max_bytes,
2394        max_age: config.max_age,
2395        max_message_size: config.max_value_size,
2396        storage: config.storage,
2397        republish: config.republish,
2398        allow_rollup: true,
2399        deny_delete: true,
2400        deny_purge: false,
2401        allow_direct: true,
2402        sources,
2403        mirror,
2404        num_replicas,
2405        discard: stream::DiscardPolicy::New,
2406        mirror_direct,
2407        #[cfg(feature = "server_2_10")]
2408        compression: if config.compression {
2409            Some(stream::Compression::S2)
2410        } else {
2411            None
2412        },
2413        placement: config.placement,
2414        #[cfg(feature = "server_2_11")]
2415        allow_message_ttl,
2416        #[cfg(feature = "server_2_11")]
2417        subject_delete_marker_ttl,
2418        ..Default::default()
2419    })
2420}