async_nats/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23    header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures::future::BoxFuture;
27use futures::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Display;
33use std::future::IntoFuture;
34use std::pin::Pin;
35use std::str::from_utf8;
36use std::task::Poll;
37use std::time::Duration;
38use tokio::sync::oneshot;
39use tracing::debug;
40
41use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
42use super::errors::ErrorCode;
43use super::kv::{Store, MAX_HISTORY};
44use super::object_store::{is_valid_bucket_name, ObjectStore};
45use super::stream::{
46    self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
47    Stream,
48};
49#[cfg(feature = "server_2_10")]
50use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
51use super::{is_valid_name, kv};
52
53/// A context which can perform jetstream scoped requests.
54#[derive(Debug, Clone)]
55pub struct Context {
56    pub(crate) client: Client,
57    pub(crate) prefix: String,
58    pub(crate) timeout: Duration,
59}
60
61impl Context {
62    pub(crate) fn new(client: Client) -> Context {
63        Context {
64            client,
65            prefix: "$JS.API".to_string(),
66            timeout: Duration::from_secs(5),
67        }
68    }
69
70    pub fn set_timeout(&mut self, timeout: Duration) {
71        self.timeout = timeout
72    }
73
74    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
75        Context {
76            client,
77            prefix: prefix.to_string(),
78            timeout: Duration::from_secs(5),
79        }
80    }
81
82    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
83        Context {
84            client,
85            prefix: format!("$JS.{}.API", domain.as_ref()),
86            timeout: Duration::from_secs(5),
87        }
88    }
89
90    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
91    /// acknowledgment from the server that the message has been successfully delivered.
92    ///
93    /// Acknowledgment future that can be polled is returned instead.
94    ///
95    /// If the stream does not exist, `no responders` error will be returned.
96    ///
97    /// # Examples
98    ///
99    /// Publish, and after each publish, await for acknowledgment.
100    ///
101    /// ```no_run
102    /// # #[tokio::main]
103    /// # async fn main() -> Result<(), async_nats::Error> {
104    /// let client = async_nats::connect("localhost:4222").await?;
105    /// let jetstream = async_nats::jetstream::new(client);
106    ///
107    /// let ack = jetstream.publish("events", "data".into()).await?;
108    /// ack.await?;
109    /// jetstream.publish("events", "data".into()).await?.await?;
110    /// # Ok(())
111    /// # }
112    /// ```
113    ///
114    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
115    /// ignored entirely.
116    ///
117    /// ```no_run
118    /// # #[tokio::main]
119    /// # async fn main() -> Result<(), async_nats::Error> {
120    /// let client = async_nats::connect("localhost:4222").await?;
121    /// let jetstream = async_nats::jetstream::new(client);
122    ///
123    /// let first_ack = jetstream.publish("events", "data".into()).await?;
124    /// let second_ack = jetstream.publish("events", "data".into()).await?;
125    /// first_ack.await?;
126    /// second_ack.await?;
127    /// # Ok(())
128    /// # }
129    /// ```
130    pub async fn publish<S: ToSubject>(
131        &self,
132        subject: S,
133        payload: Bytes,
134    ) -> Result<PublishAckFuture, PublishError> {
135        self.send_publish(subject, Publish::build().payload(payload))
136            .await
137    }
138
139    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
140    /// the server that the message has been successfully delivered.
141    ///
142    /// If the stream does not exist, `no responders` error will be returned.
143    ///
144    /// # Examples
145    ///
146    /// ```no_run
147    /// # #[tokio::main]
148    /// # async fn main() -> Result<(), async_nats::Error> {
149    /// let client = async_nats::connect("localhost:4222").await?;
150    /// let jetstream = async_nats::jetstream::new(client);
151    ///
152    /// let mut headers = async_nats::HeaderMap::new();
153    /// headers.append("X-key", "Value");
154    /// let ack = jetstream
155    ///     .publish_with_headers("events", headers, "data".into())
156    ///     .await?;
157    /// # Ok(())
158    /// # }
159    /// ```
160    pub async fn publish_with_headers<S: ToSubject>(
161        &self,
162        subject: S,
163        headers: crate::header::HeaderMap,
164        payload: Bytes,
165    ) -> Result<PublishAckFuture, PublishError> {
166        self.send_publish(subject, Publish::build().payload(payload).headers(headers))
167            .await
168    }
169
170    /// Publish a message built by [Publish] and returns an acknowledgment future.
171    ///
172    /// If the stream does not exist, `no responders` error will be returned.
173    ///
174    /// # Examples
175    ///
176    /// ```no_run
177    /// # use async_nats::jetstream::context::Publish;
178    /// # #[tokio::main]
179    /// # async fn main() -> Result<(), async_nats::Error> {
180    /// let client = async_nats::connect("localhost:4222").await?;
181    /// let jetstream = async_nats::jetstream::new(client);
182    ///
183    /// let ack = jetstream
184    ///     .send_publish(
185    ///         "events",
186    ///         Publish::build().payload("data".into()).message_id("uuid"),
187    ///     )
188    ///     .await?;
189    /// # Ok(())
190    /// # }
191    /// ```
192    pub async fn send_publish<S: ToSubject>(
193        &self,
194        subject: S,
195        publish: Publish,
196    ) -> Result<PublishAckFuture, PublishError> {
197        let subject = subject.to_subject();
198        let (sender, receiver) = oneshot::channel();
199
200        let respond = self.client.new_inbox().into();
201
202        let send_fut = self
203            .client
204            .sender
205            .send(Command::Request {
206                subject,
207                payload: publish.payload,
208                respond,
209                headers: publish.headers,
210                sender,
211            })
212            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
213
214        tokio::time::timeout(self.timeout, send_fut)
215            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
216            .await??;
217
218        Ok(PublishAckFuture {
219            timeout: self.timeout,
220            subscription: receiver,
221        })
222    }
223
224    /// Query the server for account information
225    pub async fn query_account(&self) -> Result<Account, AccountError> {
226        let response: Response<Account> = self.request("INFO", b"").await?;
227
228        match response {
229            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
230            Response::Ok(account) => Ok(account),
231        }
232    }
233
234    /// Create a JetStream [Stream] with given config and return a handle to it.
235    /// That handle can be used to manage and use [Consumer].
236    ///
237    /// # Examples
238    ///
239    /// ```no_run
240    /// # #[tokio::main]
241    /// # async fn main() -> Result<(), async_nats::Error> {
242    /// use async_nats::jetstream::stream::Config;
243    /// use async_nats::jetstream::stream::DiscardPolicy;
244    /// let client = async_nats::connect("localhost:4222").await?;
245    /// let jetstream = async_nats::jetstream::new(client);
246    ///
247    /// let stream = jetstream
248    ///     .create_stream(Config {
249    ///         name: "events".to_string(),
250    ///         max_messages: 100_000,
251    ///         discard: DiscardPolicy::Old,
252    ///         ..Default::default()
253    ///     })
254    ///     .await?;
255    /// # Ok(())
256    /// # }
257    /// ```
258    pub async fn create_stream<S>(
259        &self,
260        stream_config: S,
261    ) -> Result<Stream<Info>, CreateStreamError>
262    where
263        Config: From<S>,
264    {
265        let mut config: Config = stream_config.into();
266        if config.name.is_empty() {
267            return Err(CreateStreamError::new(
268                CreateStreamErrorKind::EmptyStreamName,
269            ));
270        }
271        if !is_valid_name(config.name.as_str()) {
272            return Err(CreateStreamError::new(
273                CreateStreamErrorKind::InvalidStreamName,
274            ));
275        }
276        if let Some(ref mut mirror) = config.mirror {
277            if let Some(ref mut domain) = mirror.domain {
278                if mirror.external.is_some() {
279                    return Err(CreateStreamError::new(
280                        CreateStreamErrorKind::DomainAndExternalSet,
281                    ));
282                }
283                mirror.external = Some(External {
284                    api_prefix: format!("$JS.{domain}.API"),
285                    delivery_prefix: None,
286                })
287            }
288        }
289
290        if let Some(ref mut sources) = config.sources {
291            for source in sources {
292                if let Some(ref mut domain) = source.domain {
293                    if source.external.is_some() {
294                        return Err(CreateStreamError::new(
295                            CreateStreamErrorKind::DomainAndExternalSet,
296                        ));
297                    }
298                    source.external = Some(External {
299                        api_prefix: format!("$JS.{domain}.API"),
300                        delivery_prefix: None,
301                    })
302                }
303            }
304        }
305        let subject = format!("STREAM.CREATE.{}", config.name);
306        let response: Response<Info> = self.request(subject, &config).await?;
307
308        match response {
309            Response::Err { error } => Err(error.into()),
310            Response::Ok(info) => Ok(Stream {
311                context: self.clone(),
312                info,
313                name: config.name,
314            }),
315        }
316    }
317
318    /// Checks for [Stream] existence on the server and returns handle to it.
319    /// That handle can be used to manage and use [Consumer].
320    /// This variant does not fetch [Stream] info from the server.
321    /// It means it does not check if the stream actually exists.
322    /// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
323    /// If you however run single operations on many streams, this method is more efficient.
324    ///
325    /// # Examples
326    ///
327    /// ```no_run
328    /// # #[tokio::main]
329    /// # async fn main() -> Result<(), async_nats::Error> {
330    /// let client = async_nats::connect("localhost:4222").await?;
331    /// let jetstream = async_nats::jetstream::new(client);
332    ///
333    /// let stream = jetstream.get_stream_no_info("events").await?;
334    /// # Ok(())
335    /// # }
336    /// ```
337    pub async fn get_stream_no_info<T: AsRef<str>>(
338        &self,
339        stream: T,
340    ) -> Result<Stream<()>, GetStreamError> {
341        let stream = stream.as_ref();
342        if stream.is_empty() {
343            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
344        }
345
346        if !is_valid_name(stream) {
347            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
348        }
349
350        Ok(Stream {
351            context: self.clone(),
352            info: (),
353            name: stream.to_string(),
354        })
355    }
356
357    /// Checks for [Stream] existence on the server and returns handle to it.
358    /// That handle can be used to manage and use [Consumer].
359    ///
360    /// # Examples
361    ///
362    /// ```no_run
363    /// # #[tokio::main]
364    /// # async fn main() -> Result<(), async_nats::Error> {
365    /// let client = async_nats::connect("localhost:4222").await?;
366    /// let jetstream = async_nats::jetstream::new(client);
367    ///
368    /// let stream = jetstream.get_stream("events").await?;
369    /// # Ok(())
370    /// # }
371    /// ```
372    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
373        let stream = stream.as_ref();
374        if stream.is_empty() {
375            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
376        }
377
378        if !is_valid_name(stream) {
379            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
380        }
381
382        let subject = format!("STREAM.INFO.{stream}");
383        let request: Response<Info> = self
384            .request(subject, &())
385            .await
386            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
387        match request {
388            Response::Err { error } => {
389                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
390            }
391            Response::Ok(info) => Ok(Stream {
392                context: self.clone(),
393                info,
394                name: stream.to_string(),
395            }),
396        }
397    }
398
399    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
400    ///
401    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
402    ///
403    /// # Examples
404    ///
405    /// ```no_run
406    /// # #[tokio::main]
407    /// # async fn main() -> Result<(), async_nats::Error> {
408    /// use async_nats::jetstream::stream::Config;
409    /// let client = async_nats::connect("localhost:4222").await?;
410    /// let jetstream = async_nats::jetstream::new(client);
411    ///
412    /// let stream = jetstream
413    ///     .get_or_create_stream(Config {
414    ///         name: "events".to_string(),
415    ///         max_messages: 10_000,
416    ///         ..Default::default()
417    ///     })
418    ///     .await?;
419    /// # Ok(())
420    /// # }
421    /// ```
422    pub async fn get_or_create_stream<S>(
423        &self,
424        stream_config: S,
425    ) -> Result<Stream, CreateStreamError>
426    where
427        S: Into<Config>,
428    {
429        let config: Config = stream_config.into();
430
431        if config.name.is_empty() {
432            return Err(CreateStreamError::new(
433                CreateStreamErrorKind::EmptyStreamName,
434            ));
435        }
436
437        if !is_valid_name(config.name.as_str()) {
438            return Err(CreateStreamError::new(
439                CreateStreamErrorKind::InvalidStreamName,
440            ));
441        }
442        let subject = format!("STREAM.INFO.{}", config.name);
443
444        let request: Response<Info> = self.request(subject, &()).await?;
445        match request {
446            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
447            Response::Err { error } => Err(error.into()),
448            Response::Ok(info) => Ok(Stream {
449                context: self.clone(),
450                info,
451                name: config.name,
452            }),
453        }
454    }
455
456    /// Deletes a [Stream] with a given name.
457    ///
458    /// # Examples
459    ///
460    /// ```no_run
461    /// # #[tokio::main]
462    /// # async fn main() -> Result<(), async_nats::Error> {
463    /// use async_nats::jetstream::stream::Config;
464    /// let client = async_nats::connect("localhost:4222").await?;
465    /// let jetstream = async_nats::jetstream::new(client);
466    ///
467    /// let stream = jetstream.delete_stream("events").await?;
468    /// # Ok(())
469    /// # }
470    /// ```
471    pub async fn delete_stream<T: AsRef<str>>(
472        &self,
473        stream: T,
474    ) -> Result<DeleteStatus, DeleteStreamError> {
475        let stream = stream.as_ref();
476        if stream.is_empty() {
477            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
478        }
479
480        if !is_valid_name(stream) {
481            return Err(DeleteStreamError::new(
482                DeleteStreamErrorKind::InvalidStreamName,
483            ));
484        }
485
486        let subject = format!("STREAM.DELETE.{stream}");
487        match self
488            .request(subject, &json!({}))
489            .await
490            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
491        {
492            Response::Err { error } => Err(DeleteStreamError::new(
493                DeleteStreamErrorKind::JetStream(error),
494            )),
495            Response::Ok(delete_response) => Ok(delete_response),
496        }
497    }
498
499    /// Updates a [Stream] with a given config. If specific field cannot be updated,
500    /// error is returned.
501    ///
502    /// # Examples
503    ///
504    /// ```no_run
505    /// # #[tokio::main]
506    /// # async fn main() -> Result<(), async_nats::Error> {
507    /// use async_nats::jetstream::stream::Config;
508    /// use async_nats::jetstream::stream::DiscardPolicy;
509    /// let client = async_nats::connect("localhost:4222").await?;
510    /// let jetstream = async_nats::jetstream::new(client);
511    ///
512    /// let stream = jetstream
513    ///     .update_stream(Config {
514    ///         name: "events".to_string(),
515    ///         discard: DiscardPolicy::New,
516    ///         max_messages: 50_000,
517    ///         ..Default::default()
518    ///     })
519    ///     .await?;
520    /// # Ok(())
521    /// # }
522    /// ```
523    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
524    where
525        S: Borrow<Config>,
526    {
527        let config = config.borrow();
528
529        if config.name.is_empty() {
530            return Err(CreateStreamError::new(
531                CreateStreamErrorKind::EmptyStreamName,
532            ));
533        }
534
535        if !is_valid_name(config.name.as_str()) {
536            return Err(CreateStreamError::new(
537                CreateStreamErrorKind::InvalidStreamName,
538            ));
539        }
540
541        let subject = format!("STREAM.UPDATE.{}", config.name);
542        match self.request(subject, config).await? {
543            Response::Err { error } => Err(error.into()),
544            Response::Ok(info) => Ok(info),
545        }
546    }
547
548    /// Tries to update a [Stream] with a given config, and if it does not exist, create it.
549    ///
550    /// # Examples
551    ///
552    /// ```no_run
553    /// # #[tokio::main]
554    /// # async fn main() -> Result<(), async_nats::Error> {
555    /// use async_nats::jetstream::stream::Config;
556    /// use async_nats::jetstream::stream::DiscardPolicy;
557    /// let client = async_nats::connect("localhost:4222").await?;
558    /// let jetstream = async_nats::jetstream::new(client);
559    ///
560    /// let stream = jetstream
561    ///     .create_or_update_stream(Config {
562    ///         name: "events".to_string(),
563    ///         discard: DiscardPolicy::New,
564    ///         max_messages: 50_000,
565    ///         ..Default::default()
566    ///     })
567    ///     .await?;
568    /// # Ok(())
569    /// # }
570    /// ```
571    pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
572        match self.update_stream(config.clone()).await {
573            Ok(stream) => Ok(stream),
574            Err(err) => match err.kind() {
575                CreateStreamErrorKind::NotFound => {
576                    let stream = self
577                        .create_stream(config)
578                        .await
579                        .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
580                    Ok(stream.info)
581                }
582                _ => Err(err),
583            },
584        }
585    }
586
587    /// Looks up Stream that contains provided subject.
588    ///
589    /// # Examples
590    ///
591    /// ```no_run
592    /// # #[tokio::main]
593    /// # async fn main() -> Result<(), async_nats::Error> {
594    /// use futures::TryStreamExt;
595    /// let client = async_nats::connect("demo.nats.io:4222").await?;
596    /// let jetstream = async_nats::jetstream::new(client);
597    /// let stream_name = jetstream.stream_by_subject("foo.>");
598    /// # Ok(())
599    /// # }
600    /// ```
601    pub async fn stream_by_subject<T: Into<String>>(
602        &self,
603        subject: T,
604    ) -> Result<String, GetStreamByNameError> {
605        let subject = subject.into();
606        if !is_valid_subject(subject.as_str()) {
607            return Err(GetStreamByNameError::new(
608                GetStreamByNameErrorKind::InvalidSubject,
609            ));
610        }
611        let mut names = StreamNames {
612            context: self.clone(),
613            offset: 0,
614            page_request: None,
615            streams: Vec::new(),
616            subject: Some(subject),
617            done: false,
618        };
619        match names.next().await {
620            Some(name) => match name {
621                Ok(name) => Ok(name),
622                Err(err) => Err(GetStreamByNameError::with_source(
623                    GetStreamByNameErrorKind::Request,
624                    err,
625                )),
626            },
627            None => Err(GetStreamByNameError::new(
628                GetStreamByNameErrorKind::NotFound,
629            )),
630        }
631    }
632
633    /// Lists names of all streams for current context.
634    ///
635    /// # Examples
636    ///
637    /// ```no_run
638    /// # #[tokio::main]
639    /// # async fn main() -> Result<(), async_nats::Error> {
640    /// use futures::TryStreamExt;
641    /// let client = async_nats::connect("demo.nats.io:4222").await?;
642    /// let jetstream = async_nats::jetstream::new(client);
643    /// let mut names = jetstream.stream_names();
644    /// while let Some(stream) = names.try_next().await? {
645    ///     println!("stream: {}", stream);
646    /// }
647    /// # Ok(())
648    /// # }
649    /// ```
650    pub fn stream_names(&self) -> StreamNames {
651        StreamNames {
652            context: self.clone(),
653            offset: 0,
654            page_request: None,
655            streams: Vec::new(),
656            subject: None,
657            done: false,
658        }
659    }
660
661    /// Lists all streams info for current context.
662    ///
663    /// # Examples
664    ///
665    /// ```no_run
666    /// # #[tokio::main]
667    /// # async fn main() -> Result<(), async_nats::Error> {
668    /// use futures::TryStreamExt;
669    /// let client = async_nats::connect("demo.nats.io:4222").await?;
670    /// let jetstream = async_nats::jetstream::new(client);
671    /// let mut streams = jetstream.streams();
672    /// while let Some(stream) = streams.try_next().await? {
673    ///     println!("stream: {:?}", stream);
674    /// }
675    /// # Ok(())
676    /// # }
677    /// ```
678    pub fn streams(&self) -> Streams {
679        Streams {
680            context: self.clone(),
681            offset: 0,
682            page_request: None,
683            streams: Vec::new(),
684            done: false,
685        }
686    }
687    /// Returns an existing key-value bucket.
688    ///
689    /// # Examples
690    ///
691    /// ```no_run
692    /// # #[tokio::main]
693    /// # async fn main() -> Result<(), async_nats::Error> {
694    /// let client = async_nats::connect("demo.nats.io:4222").await?;
695    /// let jetstream = async_nats::jetstream::new(client);
696    /// let kv = jetstream.get_key_value("bucket").await?;
697    /// # Ok(())
698    /// # }
699    /// ```
700    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
701        let bucket: String = bucket.into();
702        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
703            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
704        }
705
706        let stream_name = format!("KV_{}", &bucket);
707        let stream = self
708            .get_stream(stream_name.clone())
709            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
710            .await?;
711
712        if stream.info.config.max_messages_per_subject < 1 {
713            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
714        }
715        let mut store = Store {
716            prefix: format!("$KV.{}.", &bucket),
717            name: bucket,
718            stream_name,
719            stream: stream.clone(),
720            put_prefix: None,
721            use_jetstream_prefix: self.prefix != "$JS.API",
722        };
723        if let Some(ref mirror) = stream.info.config.mirror {
724            let bucket = mirror.name.trim_start_matches("KV_");
725            if let Some(ref external) = mirror.external {
726                if !external.api_prefix.is_empty() {
727                    store.use_jetstream_prefix = false;
728                    store.prefix = format!("$KV.{bucket}.");
729                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
730                } else {
731                    store.put_prefix = Some(format!("$KV.{bucket}."));
732                }
733            }
734        };
735
736        Ok(store)
737    }
738
739    /// Creates a new key-value bucket.
740    ///
741    /// # Examples
742    ///
743    /// ```no_run
744    /// # #[tokio::main]
745    /// # async fn main() -> Result<(), async_nats::Error> {
746    /// let client = async_nats::connect("demo.nats.io:4222").await?;
747    /// let jetstream = async_nats::jetstream::new(client);
748    /// let kv = jetstream
749    ///     .create_key_value(async_nats::jetstream::kv::Config {
750    ///         bucket: "kv".to_string(),
751    ///         history: 10,
752    ///         ..Default::default()
753    ///     })
754    ///     .await?;
755    /// # Ok(())
756    /// # }
757    /// ```
758    pub async fn create_key_value(
759        &self,
760        config: crate::jetstream::kv::Config,
761    ) -> Result<Store, CreateKeyValueError> {
762        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
763            return Err(CreateKeyValueError::new(
764                CreateKeyValueErrorKind::InvalidStoreName,
765            ));
766        }
767        let info = self.query_account().await.map_err(|err| {
768            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
769        })?;
770
771        let bucket_name = config.bucket.clone();
772        let stream_config = kv_to_stream_config(config, info)?;
773
774        let stream = self.create_stream(stream_config).await.map_err(|err| {
775            if err.kind() == CreateStreamErrorKind::TimedOut {
776                CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
777            } else {
778                CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
779            }
780        })?;
781
782        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
783    }
784
785    /// Updates an existing key-value bucket.
786    ///
787    /// # Examples
788    ///
789    /// ```no_run
790    /// # #[tokio::main]
791    /// # async fn main() -> Result<(), async_nats::Error> {
792    /// let client = async_nats::connect("demo.nats.io:4222").await?;
793    /// let jetstream = async_nats::jetstream::new(client);
794    /// let kv = jetstream
795    ///     .update_key_value(async_nats::jetstream::kv::Config {
796    ///         bucket: "kv".to_string(),
797    ///         history: 60,
798    ///         ..Default::default()
799    ///     })
800    ///     .await?;
801    /// # Ok(())
802    /// # }
803    /// ```
804    pub async fn update_key_value(
805        &self,
806        config: crate::jetstream::kv::Config,
807    ) -> Result<Store, UpdateKeyValueError> {
808        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
809            return Err(UpdateKeyValueError::new(
810                UpdateKeyValueErrorKind::InvalidStoreName,
811            ));
812        }
813
814        let stream_name = format!("KV_{}", config.bucket);
815        let bucket_name = config.bucket.clone();
816
817        let account = self.query_account().await.map_err(|err| {
818            UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
819        })?;
820        let stream = self
821            .update_stream(kv_to_stream_config(config, account)?)
822            .await
823            .map_err(|err| match err.kind() {
824                UpdateStreamErrorKind::NotFound => {
825                    UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
826                }
827                _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
828            })?;
829
830        let stream = Stream {
831            context: self.clone(),
832            info: stream,
833            name: stream_name,
834        };
835
836        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
837    }
838
839    /// Tries to update an existing Key-Value bucket. If it does not exist, creates it.
840    ///
841    /// # Examples
842    ///
843    /// ```no_run
844    /// # #[tokio::main]
845    /// # async fn main() -> Result<(), async_nats::Error> {
846    /// let client = async_nats::connect("demo.nats.io:4222").await?;
847    /// let jetstream = async_nats::jetstream::new(client);
848    /// let kv = jetstream
849    ///     .update_key_value(async_nats::jetstream::kv::Config {
850    ///         bucket: "kv".to_string(),
851    ///         history: 60,
852    ///         ..Default::default()
853    ///     })
854    ///     .await?;
855    /// # Ok(())
856    /// # }
857    /// ```
858    pub async fn create_or_update_key_value(
859        &self,
860        config: crate::jetstream::kv::Config,
861    ) -> Result<Store, CreateKeyValueError> {
862        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
863            return Err(CreateKeyValueError::new(
864                CreateKeyValueErrorKind::InvalidStoreName,
865            ));
866        }
867
868        let bucket_name = config.bucket.clone();
869        let stream_name = format!("KV_{}", config.bucket);
870
871        let account = self.query_account().await.map_err(|err| {
872            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
873        })?;
874        let stream = self
875            .create_or_update_stream(kv_to_stream_config(config, account)?)
876            .await
877            .map_err(|err| {
878                CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
879            })?;
880
881        let stream = Stream {
882            context: self.clone(),
883            info: stream,
884            name: stream_name,
885        };
886
887        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
888    }
889
890    /// Deletes given key-value bucket.
891    ///
892    /// # Examples
893    ///
894    /// ```no_run
895    /// # #[tokio::main]
896    /// # async fn main() -> Result<(), async_nats::Error> {
897    /// let client = async_nats::connect("demo.nats.io:4222").await?;
898    /// let jetstream = async_nats::jetstream::new(client);
899    /// let kv = jetstream
900    ///     .create_key_value(async_nats::jetstream::kv::Config {
901    ///         bucket: "kv".to_string(),
902    ///         history: 10,
903    ///         ..Default::default()
904    ///     })
905    ///     .await?;
906    /// # Ok(())
907    /// # }
908    /// ```
909    pub async fn delete_key_value<T: AsRef<str>>(
910        &self,
911        bucket: T,
912    ) -> Result<DeleteStatus, KeyValueError> {
913        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
914            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
915        }
916
917        let stream_name = format!("KV_{}", bucket.as_ref());
918        self.delete_stream(stream_name)
919            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
920            .await
921    }
922
923    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
924    //     let config = config.borrow();
925    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
926    //         return Err(Box::new(std::io::Error::new(
927    //             ErrorKind::Other,
928    //             "invalid bucket name",
929    //         )));
930    //     }
931
932    //     let stream_name = format!("KV_{}", config.bucket);
933    //     self.update_stream()
934    //         .await
935    //         .and_then(|info| Ok(()))
936    // }
937
938    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
939    ///
940    /// It has one less interaction with the server when binding to only one
941    /// [crate::jetstream::consumer::Consumer].
942    ///
943    /// # Examples:
944    ///
945    /// ```no_run
946    /// # #[tokio::main]
947    /// # async fn main() -> Result<(), async_nats::Error> {
948    /// use async_nats::jetstream::consumer::PullConsumer;
949    ///
950    /// let client = async_nats::connect("localhost:4222").await?;
951    /// let jetstream = async_nats::jetstream::new(client);
952    ///
953    /// let consumer: PullConsumer = jetstream
954    ///     .get_consumer_from_stream("consumer", "stream")
955    ///     .await?;
956    ///
957    /// # Ok(())
958    /// # }
959    /// ```
960    pub async fn get_consumer_from_stream<T, C, S>(
961        &self,
962        consumer: C,
963        stream: S,
964    ) -> Result<Consumer<T>, ConsumerError>
965    where
966        T: FromConsumer + IntoConsumerConfig,
967        S: AsRef<str>,
968        C: AsRef<str>,
969    {
970        if !is_valid_name(stream.as_ref()) {
971            return Err(ConsumerError::with_source(
972                ConsumerErrorKind::InvalidName,
973                "invalid stream",
974            ));
975        }
976
977        if !is_valid_name(consumer.as_ref()) {
978            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
979        }
980
981        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
982
983        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
984            Response::Ok(info) => info,
985            Response::Err { error } => return Err(error.into()),
986        };
987
988        Ok(Consumer::new(
989            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
990                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
991            })?,
992            info,
993            self.clone(),
994        ))
995    }
996
997    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
998    ///
999    /// It has one less interaction with the server when binding to only one
1000    /// [crate::jetstream::consumer::Consumer].
1001    ///
1002    /// # Examples:
1003    ///
1004    /// ```no_run
1005    /// # #[tokio::main]
1006    /// # async fn main() -> Result<(), async_nats::Error> {
1007    /// use async_nats::jetstream::consumer::PullConsumer;
1008    ///
1009    /// let client = async_nats::connect("localhost:4222").await?;
1010    /// let jetstream = async_nats::jetstream::new(client);
1011    ///
1012    /// jetstream
1013    ///     .delete_consumer_from_stream("consumer", "stream")
1014    ///     .await?;
1015    ///
1016    /// # Ok(())
1017    /// # }
1018    /// ```
1019    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1020        &self,
1021        consumer: C,
1022        stream: S,
1023    ) -> Result<DeleteStatus, ConsumerError> {
1024        if !is_valid_name(consumer.as_ref()) {
1025            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1026        }
1027
1028        if !is_valid_name(stream.as_ref()) {
1029            return Err(ConsumerError::with_source(
1030                ConsumerErrorKind::Other,
1031                "invalid stream name",
1032            ));
1033        }
1034
1035        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1036
1037        match self.request(subject, &json!({})).await? {
1038            Response::Ok(delete_status) => Ok(delete_status),
1039            Response::Err { error } => Err(error.into()),
1040        }
1041    }
1042
1043    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
1044    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1045    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
1046    ///
1047    /// # Examples
1048    ///
1049    /// ```no_run
1050    /// # #[tokio::main]
1051    /// # async fn main() -> Result<(), async_nats::Error> {
1052    /// use async_nats::jetstream::consumer;
1053    /// let client = async_nats::connect("localhost:4222").await?;
1054    /// let jetstream = async_nats::jetstream::new(client);
1055    ///
1056    /// let consumer: consumer::PullConsumer = jetstream
1057    ///     .create_consumer_on_stream(
1058    ///         consumer::pull::Config {
1059    ///             durable_name: Some("pull".to_string()),
1060    ///             ..Default::default()
1061    ///         },
1062    ///         "stream",
1063    ///     )
1064    ///     .await?;
1065    /// # Ok(())
1066    /// # }
1067    /// ```
1068    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1069        &self,
1070        config: C,
1071        stream: S,
1072    ) -> Result<Consumer<C>, ConsumerError> {
1073        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1074            .await
1075    }
1076
1077    /// Update an existing consumer.
1078    /// This call will fail if the consumer does not exist.
1079    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
1080    ///
1081    /// # Examples
1082    ///
1083    /// ```no_run
1084    /// # #[tokio::main]
1085    /// # async fn main() -> Result<(), async_nats::Error> {
1086    /// use async_nats::jetstream::consumer;
1087    /// let client = async_nats::connect("localhost:4222").await?;
1088    /// let jetstream = async_nats::jetstream::new(client);
1089    ///
1090    /// let consumer: consumer::PullConsumer = jetstream
1091    ///     .update_consumer_on_stream(
1092    ///         consumer::pull::Config {
1093    ///             durable_name: Some("pull".to_string()),
1094    ///             description: Some("updated pull consumer".to_string()),
1095    ///             ..Default::default()
1096    ///         },
1097    ///         "stream",
1098    ///     )
1099    ///     .await?;
1100    /// # Ok(())
1101    /// # }
1102    /// ```
1103    #[cfg(feature = "server_2_10")]
1104    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1105        &self,
1106        config: C,
1107        stream: S,
1108    ) -> Result<Consumer<C>, ConsumerUpdateError> {
1109        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1110            .await
1111            .map_err(|err| err.into())
1112    }
1113
1114    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
1115    /// the same.
1116    /// This method will fail if consumer is already present with different config.
1117    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1118    ///
1119    /// # Examples
1120    ///
1121    /// ```no_run
1122    /// # #[tokio::main]
1123    /// # async fn main() -> Result<(), async_nats::Error> {
1124    /// use async_nats::jetstream::consumer;
1125    /// let client = async_nats::connect("localhost:4222").await?;
1126    /// let jetstream = async_nats::jetstream::new(client);
1127    ///
1128    /// let consumer: consumer::PullConsumer = jetstream
1129    ///     .create_consumer_strict_on_stream(
1130    ///         consumer::pull::Config {
1131    ///             durable_name: Some("pull".to_string()),
1132    ///             ..Default::default()
1133    ///         },
1134    ///         "stream",
1135    ///     )
1136    ///     .await?;
1137    /// # Ok(())
1138    /// # }
1139    /// ```
1140    #[cfg(feature = "server_2_10")]
1141    pub async fn create_consumer_strict_on_stream<
1142        C: IntoConsumerConfig + FromConsumer,
1143        S: AsRef<str>,
1144    >(
1145        &self,
1146        config: C,
1147        stream: S,
1148    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1149        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1150            .await
1151            .map_err(|err| err.into())
1152    }
1153
1154    async fn create_consumer_on_stream_action<
1155        C: IntoConsumerConfig + FromConsumer,
1156        S: AsRef<str>,
1157    >(
1158        &self,
1159        config: C,
1160        stream: S,
1161        action: ConsumerAction,
1162    ) -> Result<Consumer<C>, ConsumerError> {
1163        let config = config.into_consumer_config();
1164
1165        let subject = {
1166            let filter = if config.filter_subject.is_empty() {
1167                "".to_string()
1168            } else {
1169                format!(".{}", config.filter_subject)
1170            };
1171            config
1172                .name
1173                .as_ref()
1174                .or(config.durable_name.as_ref())
1175                .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1176                .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1177        };
1178
1179        match self
1180            .request(
1181                subject,
1182                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1183            )
1184            .await?
1185        {
1186            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1187            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1188                FromConsumer::try_from_consumer_config(info.clone().config)
1189                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1190                info,
1191                self.clone(),
1192            )),
1193        }
1194    }
1195
1196    /// Send a request to the jetstream JSON API.
1197    ///
1198    /// This is a low level API used mostly internally, that should be used only in
1199    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1200    ///
1201    /// # Examples
1202    ///
1203    /// ```no_run
1204    /// # use async_nats::jetstream::stream::Info;
1205    /// # use async_nats::jetstream::response::Response;
1206    /// # #[tokio::main]
1207    /// # async fn main() -> Result<(), async_nats::Error> {
1208    /// let client = async_nats::connect("localhost:4222").await?;
1209    /// let jetstream = async_nats::jetstream::new(client);
1210    ///
1211    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1212    /// # Ok(())
1213    /// # }
1214    /// ```
1215    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1216    where
1217        S: ToSubject,
1218        T: ?Sized + Serialize,
1219        V: DeserializeOwned,
1220    {
1221        let subject = subject.to_subject();
1222        let request = serde_json::to_vec(&payload)
1223            .map(Bytes::from)
1224            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1225
1226        debug!("JetStream request sent: {:?}", request);
1227
1228        let message = self
1229            .client
1230            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1231            .await;
1232        let message = message?;
1233        debug!(
1234            "JetStream request response: {:?}",
1235            from_utf8(&message.payload)
1236        );
1237        let response = serde_json::from_slice(message.payload.as_ref())
1238            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1239
1240        Ok(response)
1241    }
1242
1243    /// Creates a new object store bucket.
1244    ///
1245    /// # Examples
1246    ///
1247    /// ```no_run
1248    /// # #[tokio::main]
1249    /// # async fn main() -> Result<(), async_nats::Error> {
1250    /// let client = async_nats::connect("demo.nats.io").await?;
1251    /// let jetstream = async_nats::jetstream::new(client);
1252    /// let bucket = jetstream
1253    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1254    ///         bucket: "bucket".to_string(),
1255    ///         ..Default::default()
1256    ///     })
1257    ///     .await?;
1258    /// # Ok(())
1259    /// # }
1260    /// ```
1261    pub async fn create_object_store(
1262        &self,
1263        config: super::object_store::Config,
1264    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1265        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1266            return Err(CreateObjectStoreError::new(
1267                CreateKeyValueErrorKind::InvalidStoreName,
1268            ));
1269        }
1270
1271        let bucket_name = config.bucket.clone();
1272        let stream_name = format!("OBJ_{bucket_name}");
1273        let chunk_subject = format!("$O.{bucket_name}.C.>");
1274        let meta_subject = format!("$O.{bucket_name}.M.>");
1275
1276        let stream = self
1277            .create_stream(super::stream::Config {
1278                name: stream_name,
1279                description: config.description.clone(),
1280                subjects: vec![chunk_subject, meta_subject],
1281                max_age: config.max_age,
1282                max_bytes: config.max_bytes,
1283                storage: config.storage,
1284                num_replicas: config.num_replicas,
1285                discard: DiscardPolicy::New,
1286                allow_rollup: true,
1287                allow_direct: true,
1288                #[cfg(feature = "server_2_10")]
1289                compression: if config.compression {
1290                    Some(Compression::S2)
1291                } else {
1292                    None
1293                },
1294                placement: config.placement,
1295                ..Default::default()
1296            })
1297            .await
1298            .map_err(|err| {
1299                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1300            })?;
1301
1302        Ok(ObjectStore {
1303            name: bucket_name,
1304            stream,
1305        })
1306    }
1307
1308    /// Get an existing object store bucket.
1309    ///
1310    /// # Examples
1311    ///
1312    /// ```no_run
1313    /// # #[tokio::main]
1314    /// # async fn main() -> Result<(), async_nats::Error> {
1315    /// let client = async_nats::connect("demo.nats.io").await?;
1316    /// let jetstream = async_nats::jetstream::new(client);
1317    /// let bucket = jetstream.get_object_store("bucket").await?;
1318    /// # Ok(())
1319    /// # }
1320    /// ```
1321    pub async fn get_object_store<T: AsRef<str>>(
1322        &self,
1323        bucket_name: T,
1324    ) -> Result<ObjectStore, ObjectStoreError> {
1325        let bucket_name = bucket_name.as_ref();
1326        if !is_valid_bucket_name(bucket_name) {
1327            return Err(ObjectStoreError::new(
1328                ObjectStoreErrorKind::InvalidBucketName,
1329            ));
1330        }
1331        let stream_name = format!("OBJ_{bucket_name}");
1332        let stream = self
1333            .get_stream(stream_name)
1334            .await
1335            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1336
1337        Ok(ObjectStore {
1338            name: bucket_name.to_string(),
1339            stream,
1340        })
1341    }
1342
1343    /// Delete a object store bucket.
1344    ///
1345    /// # Examples
1346    ///
1347    /// ```no_run
1348    /// # #[tokio::main]
1349    /// # async fn main() -> Result<(), async_nats::Error> {
1350    /// let client = async_nats::connect("demo.nats.io").await?;
1351    /// let jetstream = async_nats::jetstream::new(client);
1352    /// let bucket = jetstream.delete_object_store("bucket").await?;
1353    /// # Ok(())
1354    /// # }
1355    /// ```
1356    pub async fn delete_object_store<T: AsRef<str>>(
1357        &self,
1358        bucket_name: T,
1359    ) -> Result<(), DeleteObjectStore> {
1360        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1361        self.delete_stream(stream_name)
1362            .await
1363            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1364        Ok(())
1365    }
1366}
1367
1368#[derive(Clone, Copy, Debug, PartialEq)]
1369pub enum PublishErrorKind {
1370    StreamNotFound,
1371    WrongLastMessageId,
1372    WrongLastSequence,
1373    TimedOut,
1374    BrokenPipe,
1375    Other,
1376}
1377
1378impl Display for PublishErrorKind {
1379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1380        match self {
1381            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1382            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1383            Self::Other => write!(f, "publish failed"),
1384            Self::BrokenPipe => write!(f, "broken pipe"),
1385            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1386            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1387        }
1388    }
1389}
1390
1391pub type PublishError = Error<PublishErrorKind>;
1392
1393#[derive(Debug)]
1394pub struct PublishAckFuture {
1395    timeout: Duration,
1396    subscription: oneshot::Receiver<Message>,
1397}
1398
1399impl PublishAckFuture {
1400    async fn next_with_timeout(self) -> Result<PublishAck, PublishError> {
1401        let next = tokio::time::timeout(self.timeout, self.subscription)
1402            .await
1403            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1404        next.map_or_else(
1405            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1406            |m| {
1407                if m.status == Some(StatusCode::NO_RESPONDERS) {
1408                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1409                }
1410                let response = serde_json::from_slice(m.payload.as_ref())
1411                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1412                match response {
1413                    Response::Err { error } => match error.error_code() {
1414                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1415                            PublishErrorKind::WrongLastMessageId,
1416                            error,
1417                        )),
1418                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1419                            PublishErrorKind::WrongLastSequence,
1420                            error,
1421                        )),
1422                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1423                    },
1424                    Response::Ok(publish_ack) => Ok(publish_ack),
1425                }
1426            },
1427        )
1428    }
1429}
1430impl IntoFuture for PublishAckFuture {
1431    type Output = Result<PublishAck, PublishError>;
1432
1433    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1434
1435    fn into_future(self) -> Self::IntoFuture {
1436        Box::pin(std::future::IntoFuture::into_future(
1437            self.next_with_timeout(),
1438        ))
1439    }
1440}
1441
1442#[derive(Deserialize, Debug)]
1443struct StreamPage {
1444    total: usize,
1445    streams: Option<Vec<String>>,
1446}
1447
1448#[derive(Deserialize, Debug)]
1449struct StreamInfoPage {
1450    total: usize,
1451    streams: Option<Vec<super::stream::Info>>,
1452}
1453
1454type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1455
1456pub struct StreamNames {
1457    context: Context,
1458    offset: usize,
1459    page_request: Option<PageRequest>,
1460    subject: Option<String>,
1461    streams: Vec<String>,
1462    done: bool,
1463}
1464
1465impl futures::Stream for StreamNames {
1466    type Item = Result<String, StreamsError>;
1467
1468    fn poll_next(
1469        mut self: Pin<&mut Self>,
1470        cx: &mut std::task::Context<'_>,
1471    ) -> std::task::Poll<Option<Self::Item>> {
1472        match self.page_request.as_mut() {
1473            Some(page) => match page.try_poll_unpin(cx) {
1474                std::task::Poll::Ready(page) => {
1475                    self.page_request = None;
1476                    let page = page
1477                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1478                    if let Some(streams) = page.streams {
1479                        self.offset += streams.len();
1480                        self.streams = streams;
1481                        if self.offset >= page.total {
1482                            self.done = true;
1483                        }
1484                        match self.streams.pop() {
1485                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1486                            None => Poll::Ready(None),
1487                        }
1488                    } else {
1489                        Poll::Ready(None)
1490                    }
1491                }
1492                std::task::Poll::Pending => std::task::Poll::Pending,
1493            },
1494            None => {
1495                if let Some(stream) = self.streams.pop() {
1496                    Poll::Ready(Some(Ok(stream)))
1497                } else {
1498                    if self.done {
1499                        return Poll::Ready(None);
1500                    }
1501                    let context = self.context.clone();
1502                    let offset = self.offset;
1503                    let subject = self.subject.clone();
1504                    self.page_request = Some(Box::pin(async move {
1505                        match context
1506                            .request(
1507                                "STREAM.NAMES",
1508                                &json!({
1509                                    "offset": offset,
1510                                    "subject": subject
1511                                }),
1512                            )
1513                            .await?
1514                        {
1515                            Response::Err { error } => {
1516                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1517                            }
1518                            Response::Ok(page) => Ok(page),
1519                        }
1520                    }));
1521                    self.poll_next(cx)
1522                }
1523            }
1524        }
1525    }
1526}
1527
1528type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1529
1530pub type StreamsErrorKind = RequestErrorKind;
1531pub type StreamsError = RequestError;
1532
1533pub struct Streams {
1534    context: Context,
1535    offset: usize,
1536    page_request: Option<PageInfoRequest>,
1537    streams: Vec<super::stream::Info>,
1538    done: bool,
1539}
1540
1541impl futures::Stream for Streams {
1542    type Item = Result<super::stream::Info, StreamsError>;
1543
1544    fn poll_next(
1545        mut self: Pin<&mut Self>,
1546        cx: &mut std::task::Context<'_>,
1547    ) -> std::task::Poll<Option<Self::Item>> {
1548        match self.page_request.as_mut() {
1549            Some(page) => match page.try_poll_unpin(cx) {
1550                std::task::Poll::Ready(page) => {
1551                    self.page_request = None;
1552                    let page = page
1553                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1554                    if let Some(streams) = page.streams {
1555                        self.offset += streams.len();
1556                        self.streams = streams;
1557                        if self.offset >= page.total {
1558                            self.done = true;
1559                        }
1560                        match self.streams.pop() {
1561                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1562                            None => Poll::Ready(None),
1563                        }
1564                    } else {
1565                        Poll::Ready(None)
1566                    }
1567                }
1568                std::task::Poll::Pending => std::task::Poll::Pending,
1569            },
1570            None => {
1571                if let Some(stream) = self.streams.pop() {
1572                    Poll::Ready(Some(Ok(stream)))
1573                } else {
1574                    if self.done {
1575                        return Poll::Ready(None);
1576                    }
1577                    let context = self.context.clone();
1578                    let offset = self.offset;
1579                    self.page_request = Some(Box::pin(async move {
1580                        match context
1581                            .request(
1582                                "STREAM.LIST",
1583                                &json!({
1584                                    "offset": offset,
1585                                }),
1586                            )
1587                            .await?
1588                        {
1589                            Response::Err { error } => {
1590                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1591                            }
1592                            Response::Ok(page) => Ok(page),
1593                        }
1594                    }));
1595                    self.poll_next(cx)
1596                }
1597            }
1598        }
1599    }
1600}
1601/// Used for building customized `publish` message.
1602#[derive(Default, Clone, Debug)]
1603pub struct Publish {
1604    payload: Bytes,
1605    headers: Option<header::HeaderMap>,
1606}
1607impl Publish {
1608    /// Creates a new custom Publish struct to be used with.
1609    pub fn build() -> Self {
1610        Default::default()
1611    }
1612
1613    /// Sets the payload for the message.
1614    pub fn payload(mut self, payload: Bytes) -> Self {
1615        self.payload = payload;
1616        self
1617    }
1618    /// Adds headers to the message.
1619    pub fn headers(mut self, headers: HeaderMap) -> Self {
1620        self.headers = Some(headers);
1621        self
1622    }
1623    /// A shorthand to add a single header.
1624    pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1625        self.headers
1626            .get_or_insert(header::HeaderMap::new())
1627            .insert(name, value);
1628        self
1629    }
1630    /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
1631    pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1632        self.header(header::NATS_MESSAGE_ID, id.as_ref())
1633    }
1634    /// Sets expected last message ID.
1635    /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
1636    pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1637        self.header(
1638            header::NATS_EXPECTED_LAST_MESSAGE_ID,
1639            last_message_id.as_ref(),
1640        )
1641    }
1642    /// Sets the last expected stream sequence.
1643    /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
1644    pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1645        self.header(
1646            header::NATS_EXPECTED_LAST_SEQUENCE,
1647            HeaderValue::from(last_sequence),
1648        )
1649    }
1650    /// Sets the last expected stream sequence for a subject this message will be published to.
1651    /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
1652    pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1653        self.header(
1654            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1655            HeaderValue::from(subject_sequence),
1656        )
1657    }
1658    /// Sets the expected stream name.
1659    /// It sets the `Nats-Expected-Stream` header with provided value.
1660    pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1661        self.header(
1662            header::NATS_EXPECTED_STREAM,
1663            HeaderValue::from(stream.as_ref()),
1664        )
1665    }
1666
1667    #[cfg(feature = "server_2_11")]
1668    /// Sets TTL for a single message.
1669    /// It sets the `Nats-TTL` header with provided value.
1670    pub fn ttl(self, ttl: Duration) -> Self {
1671        self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
1672    }
1673}
1674
1675#[derive(Clone, Copy, Debug, PartialEq)]
1676pub enum RequestErrorKind {
1677    NoResponders,
1678    TimedOut,
1679    Other,
1680}
1681
1682impl Display for RequestErrorKind {
1683    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1684        match self {
1685            Self::TimedOut => write!(f, "timed out"),
1686            Self::Other => write!(f, "request failed"),
1687            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1688        }
1689    }
1690}
1691
1692pub type RequestError = Error<RequestErrorKind>;
1693
1694impl From<crate::RequestError> for RequestError {
1695    fn from(error: crate::RequestError) -> Self {
1696        match error.kind() {
1697            crate::RequestErrorKind::TimedOut => {
1698                RequestError::with_source(RequestErrorKind::TimedOut, error)
1699            }
1700            crate::RequestErrorKind::NoResponders => {
1701                RequestError::new(RequestErrorKind::NoResponders)
1702            }
1703            crate::RequestErrorKind::Other => {
1704                RequestError::with_source(RequestErrorKind::Other, error)
1705            }
1706        }
1707    }
1708}
1709
1710impl From<super::errors::Error> for RequestError {
1711    fn from(err: super::errors::Error) -> Self {
1712        RequestError::with_source(RequestErrorKind::Other, err)
1713    }
1714}
1715
1716pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
1717
1718#[derive(Clone, Debug, PartialEq)]
1719pub enum ConsumerInfoErrorKind {
1720    InvalidName,
1721    Offline,
1722    NotFound,
1723    StreamNotFound,
1724    Request,
1725    JetStream(super::errors::Error),
1726    TimedOut,
1727    NoResponders,
1728}
1729
1730impl Display for ConsumerInfoErrorKind {
1731    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1732        match self {
1733            Self::InvalidName => write!(f, "invalid consumer name"),
1734            Self::Offline => write!(f, "consumer is offline"),
1735            Self::NotFound => write!(f, "consumer not found"),
1736            Self::StreamNotFound => write!(f, "stream not found"),
1737            Self::Request => write!(f, "request error"),
1738            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1739            Self::TimedOut => write!(f, "timed out"),
1740            Self::NoResponders => write!(f, "no responders"),
1741        }
1742    }
1743}
1744
1745impl From<super::errors::Error> for ConsumerInfoError {
1746    fn from(error: super::errors::Error) -> Self {
1747        match error.error_code() {
1748            ErrorCode::CONSUMER_NOT_FOUND => {
1749                ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
1750            }
1751            ErrorCode::STREAM_NOT_FOUND => {
1752                ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
1753            }
1754            ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
1755            _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
1756        }
1757    }
1758}
1759
1760impl From<RequestError> for ConsumerInfoError {
1761    fn from(error: RequestError) -> Self {
1762        match error.kind() {
1763            RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
1764            RequestErrorKind::Other => {
1765                ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
1766            }
1767            RequestErrorKind::NoResponders => {
1768                ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
1769            }
1770        }
1771    }
1772}
1773
1774#[derive(Clone, Debug, PartialEq)]
1775pub enum CreateStreamErrorKind {
1776    EmptyStreamName,
1777    InvalidStreamName,
1778    DomainAndExternalSet,
1779    JetStreamUnavailable,
1780    JetStream(super::errors::Error),
1781    TimedOut,
1782    Response,
1783    NotFound,
1784    ResponseParse,
1785}
1786
1787impl Display for CreateStreamErrorKind {
1788    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1789        match self {
1790            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
1791            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
1792            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
1793            Self::NotFound => write!(f, "stream not found"),
1794            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1795            Self::TimedOut => write!(f, "jetstream request timed out"),
1796            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
1797            Self::ResponseParse => write!(f, "failed to parse server response"),
1798            Self::Response => write!(f, "response error"),
1799        }
1800    }
1801}
1802
1803pub type CreateStreamError = Error<CreateStreamErrorKind>;
1804
1805impl From<super::errors::Error> for CreateStreamError {
1806    fn from(error: super::errors::Error) -> Self {
1807        match error.kind() {
1808            super::errors::ErrorCode::STREAM_NOT_FOUND => {
1809                CreateStreamError::new(CreateStreamErrorKind::NotFound)
1810            }
1811            _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
1812        }
1813    }
1814}
1815
1816impl From<RequestError> for CreateStreamError {
1817    fn from(error: RequestError) -> Self {
1818        match error.kind() {
1819            RequestErrorKind::NoResponders => {
1820                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
1821            }
1822            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
1823            RequestErrorKind::Other => {
1824                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
1825            }
1826        }
1827    }
1828}
1829
1830#[derive(Clone, Debug, PartialEq)]
1831pub enum GetStreamErrorKind {
1832    EmptyName,
1833    Request,
1834    InvalidStreamName,
1835    JetStream(super::errors::Error),
1836}
1837
1838impl Display for GetStreamErrorKind {
1839    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1840        match self {
1841            Self::EmptyName => write!(f, "empty name cannot be empty"),
1842            Self::Request => write!(f, "request error"),
1843            Self::InvalidStreamName => write!(f, "invalid stream name"),
1844            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1845        }
1846    }
1847}
1848
1849#[derive(Clone, Debug, PartialEq)]
1850pub enum GetStreamByNameErrorKind {
1851    Request,
1852    NotFound,
1853    InvalidSubject,
1854    JetStream(super::errors::Error),
1855}
1856
1857impl Display for GetStreamByNameErrorKind {
1858    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1859        match self {
1860            Self::Request => write!(f, "request error"),
1861            Self::NotFound => write!(f, "stream not found"),
1862            Self::InvalidSubject => write!(f, "invalid subject"),
1863            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1864        }
1865    }
1866}
1867
1868pub type GetStreamError = Error<GetStreamErrorKind>;
1869pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
1870
1871pub type UpdateStreamError = CreateStreamError;
1872pub type UpdateStreamErrorKind = CreateStreamErrorKind;
1873pub type DeleteStreamError = GetStreamError;
1874pub type DeleteStreamErrorKind = GetStreamErrorKind;
1875
1876#[derive(Clone, Copy, Debug, PartialEq)]
1877pub enum KeyValueErrorKind {
1878    InvalidStoreName,
1879    GetBucket,
1880    JetStream,
1881}
1882
1883impl Display for KeyValueErrorKind {
1884    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1885        match self {
1886            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1887            Self::GetBucket => write!(f, "failed to get the bucket"),
1888            Self::JetStream => write!(f, "JetStream error"),
1889        }
1890    }
1891}
1892
1893pub type KeyValueError = Error<KeyValueErrorKind>;
1894
1895#[derive(Clone, Copy, Debug, PartialEq)]
1896pub enum CreateKeyValueErrorKind {
1897    InvalidStoreName,
1898    TooLongHistory,
1899    JetStream,
1900    BucketCreate,
1901    TimedOut,
1902    LimitMarkersNotSupported,
1903}
1904
1905impl Display for CreateKeyValueErrorKind {
1906    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1907        match self {
1908            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1909            Self::TooLongHistory => write!(f, "too long history"),
1910            Self::JetStream => write!(f, "JetStream error"),
1911            Self::BucketCreate => write!(f, "bucket creation failed"),
1912            Self::TimedOut => write!(f, "timed out"),
1913            Self::LimitMarkersNotSupported => {
1914                write!(f, "limit markers not supported")
1915            }
1916        }
1917    }
1918}
1919
1920#[derive(Clone, Copy, Debug, PartialEq)]
1921pub enum UpdateKeyValueErrorKind {
1922    InvalidStoreName,
1923    TooLongHistory,
1924    JetStream,
1925    BucketUpdate,
1926    TimedOut,
1927    LimitMarkersNotSupported,
1928    NotFound,
1929}
1930
1931impl Display for UpdateKeyValueErrorKind {
1932    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1933        match self {
1934            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1935            Self::TooLongHistory => write!(f, "too long history"),
1936            Self::JetStream => write!(f, "JetStream error"),
1937            Self::BucketUpdate => write!(f, "bucket creation failed"),
1938            Self::TimedOut => write!(f, "timed out"),
1939            Self::LimitMarkersNotSupported => {
1940                write!(f, "limit markers not supported")
1941            }
1942            Self::NotFound => write!(f, "bucket does not exist"),
1943        }
1944    }
1945}
1946pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
1947pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
1948
1949pub type CreateObjectStoreError = CreateKeyValueError;
1950pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
1951
1952#[derive(Clone, Copy, Debug, PartialEq)]
1953pub enum ObjectStoreErrorKind {
1954    InvalidBucketName,
1955    GetStore,
1956}
1957
1958impl Display for ObjectStoreErrorKind {
1959    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1960        match self {
1961            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
1962            Self::GetStore => write!(f, "failed to get Object Store"),
1963        }
1964    }
1965}
1966
1967pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
1968
1969pub type DeleteObjectStore = ObjectStoreError;
1970pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
1971
1972#[derive(Clone, Debug, PartialEq)]
1973pub enum AccountErrorKind {
1974    TimedOut,
1975    JetStream(super::errors::Error),
1976    JetStreamUnavailable,
1977    Other,
1978}
1979
1980impl Display for AccountErrorKind {
1981    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1982        match self {
1983            Self::TimedOut => write!(f, "timed out"),
1984            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1985            Self::Other => write!(f, "error"),
1986            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
1987        }
1988    }
1989}
1990
1991pub type AccountError = Error<AccountErrorKind>;
1992
1993impl From<RequestError> for AccountError {
1994    fn from(err: RequestError) -> Self {
1995        match err.kind {
1996            RequestErrorKind::NoResponders => {
1997                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
1998            }
1999            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
2000            RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2001        }
2002    }
2003}
2004
2005#[derive(Clone, Debug, Serialize)]
2006enum ConsumerAction {
2007    #[serde(rename = "")]
2008    CreateOrUpdate,
2009    #[serde(rename = "create")]
2010    #[cfg(feature = "server_2_10")]
2011    Create,
2012    #[serde(rename = "update")]
2013    #[cfg(feature = "server_2_10")]
2014    Update,
2015}
2016
2017// Maps a Stream config to KV Store.
2018fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2019    let mut store = Store {
2020        prefix: format!("$KV.{}.", bucket.as_str()),
2021        name: bucket,
2022        stream: stream.clone(),
2023        stream_name: stream.info.config.name.clone(),
2024        put_prefix: None,
2025        use_jetstream_prefix: prefix != "$JS.API",
2026    };
2027    if let Some(ref mirror) = stream.info.config.mirror {
2028        let bucket = mirror.name.trim_start_matches("KV_");
2029        if let Some(ref external) = mirror.external {
2030            if !external.api_prefix.is_empty() {
2031                store.use_jetstream_prefix = false;
2032                store.prefix = format!("$KV.{bucket}.");
2033                store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2034            } else {
2035                store.put_prefix = Some(format!("$KV.{bucket}."));
2036            }
2037        }
2038    };
2039    store
2040}
2041
2042enum KvToStreamConfigError {
2043    TooLongHistory,
2044    #[allow(dead_code)]
2045    LimitMarkersNotSupported,
2046}
2047
2048impl From<KvToStreamConfigError> for CreateKeyValueError {
2049    fn from(err: KvToStreamConfigError) -> Self {
2050        match err {
2051            KvToStreamConfigError::TooLongHistory => {
2052                CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2053            }
2054            KvToStreamConfigError::LimitMarkersNotSupported => {
2055                CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2056            }
2057        }
2058    }
2059}
2060
2061impl From<KvToStreamConfigError> for UpdateKeyValueError {
2062    fn from(err: KvToStreamConfigError) -> Self {
2063        match err {
2064            KvToStreamConfigError::TooLongHistory => {
2065                UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2066            }
2067            KvToStreamConfigError::LimitMarkersNotSupported => {
2068                UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2069            }
2070        }
2071    }
2072}
2073
2074// Maps the KV config to Stream config.
2075fn kv_to_stream_config(
2076    config: kv::Config,
2077    _account: Account,
2078) -> Result<super::stream::Config, KvToStreamConfigError> {
2079    let history = if config.history > 0 {
2080        if config.history > MAX_HISTORY {
2081            return Err(KvToStreamConfigError::TooLongHistory);
2082        }
2083        config.history
2084    } else {
2085        1
2086    };
2087
2088    let num_replicas = if config.num_replicas == 0 {
2089        1
2090    } else {
2091        config.num_replicas
2092    };
2093
2094    #[cfg(feature = "server_2_11")]
2095    let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2096
2097    #[cfg(feature = "server_2_11")]
2098    if let Some(duration) = config.limit_markers {
2099        if _account.requests.level < 1 {
2100            return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2101        }
2102        allow_message_ttl = true;
2103        subject_delete_marker_ttl = Some(duration);
2104    }
2105
2106    let mut mirror = config.mirror.clone();
2107    let mut sources = config.sources.clone();
2108    let mut mirror_direct = config.mirror_direct;
2109
2110    let mut subjects = Vec::new();
2111    if let Some(ref mut mirror) = mirror {
2112        if !mirror.name.starts_with("KV_") {
2113            mirror.name = format!("KV_{}", mirror.name);
2114        }
2115        mirror_direct = true;
2116    } else if let Some(ref mut sources) = sources {
2117        for source in sources {
2118            if !source.name.starts_with("KV_") {
2119                source.name = format!("KV_{}", source.name);
2120            }
2121        }
2122    } else {
2123        subjects = vec![format!("$KV.{}.>", config.bucket)];
2124    }
2125
2126    Ok(stream::Config {
2127        name: format!("KV_{}", config.bucket),
2128        description: Some(config.description),
2129        subjects,
2130        max_messages_per_subject: history,
2131        max_bytes: config.max_bytes,
2132        max_age: config.max_age,
2133        max_message_size: config.max_value_size,
2134        storage: config.storage,
2135        republish: config.republish,
2136        allow_rollup: true,
2137        deny_delete: true,
2138        deny_purge: false,
2139        allow_direct: true,
2140        sources,
2141        mirror,
2142        num_replicas,
2143        discard: stream::DiscardPolicy::New,
2144        mirror_direct,
2145        #[cfg(feature = "server_2_10")]
2146        compression: if config.compression {
2147            Some(stream::Compression::S2)
2148        } else {
2149            None
2150        },
2151        placement: config.placement,
2152        #[cfg(feature = "server_2_11")]
2153        allow_message_ttl,
2154        #[cfg(feature = "server_2_11")]
2155        subject_delete_marker_ttl,
2156        ..Default::default()
2157    })
2158}