async_nats/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23    header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures::future::BoxFuture;
27use futures::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Display;
33use std::future::IntoFuture;
34use std::pin::Pin;
35use std::str::from_utf8;
36use std::task::Poll;
37use std::time::Duration;
38use tokio::sync::oneshot;
39use tracing::debug;
40
41use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
42use super::errors::ErrorCode;
43use super::kv::{Store, MAX_HISTORY};
44use super::object_store::{is_valid_bucket_name, ObjectStore};
45use super::stream::{
46    self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
47    Stream,
48};
49#[cfg(feature = "server_2_10")]
50use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
51use super::{is_valid_name, kv};
52
53/// A context which can perform jetstream scoped requests.
54#[derive(Debug, Clone)]
55pub struct Context {
56    pub(crate) client: Client,
57    pub(crate) prefix: String,
58    pub(crate) timeout: Duration,
59}
60
61impl Context {
62    pub(crate) fn new(client: Client) -> Context {
63        Context {
64            client,
65            prefix: "$JS.API".to_string(),
66            timeout: Duration::from_secs(5),
67        }
68    }
69
70    pub fn set_timeout(&mut self, timeout: Duration) {
71        self.timeout = timeout
72    }
73
74    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
75        Context {
76            client,
77            prefix: prefix.to_string(),
78            timeout: Duration::from_secs(5),
79        }
80    }
81
82    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
83        Context {
84            client,
85            prefix: format!("$JS.{}.API", domain.as_ref()),
86            timeout: Duration::from_secs(5),
87        }
88    }
89
90    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
91    /// acknowledgment from the server that the message has been successfully delivered.
92    ///
93    /// Acknowledgment future that can be polled is returned instead.
94    ///
95    /// If the stream does not exist, `no responders` error will be returned.
96    ///
97    /// # Examples
98    ///
99    /// Publish, and after each publish, await for acknowledgment.
100    ///
101    /// ```no_run
102    /// # #[tokio::main]
103    /// # async fn main() -> Result<(), async_nats::Error> {
104    /// let client = async_nats::connect("localhost:4222").await?;
105    /// let jetstream = async_nats::jetstream::new(client);
106    ///
107    /// let ack = jetstream.publish("events", "data".into()).await?;
108    /// ack.await?;
109    /// jetstream.publish("events", "data".into()).await?.await?;
110    /// # Ok(())
111    /// # }
112    /// ```
113    ///
114    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
115    /// ignored entirely.
116    ///
117    /// ```no_run
118    /// # #[tokio::main]
119    /// # async fn main() -> Result<(), async_nats::Error> {
120    /// let client = async_nats::connect("localhost:4222").await?;
121    /// let jetstream = async_nats::jetstream::new(client);
122    ///
123    /// let first_ack = jetstream.publish("events", "data".into()).await?;
124    /// let second_ack = jetstream.publish("events", "data".into()).await?;
125    /// first_ack.await?;
126    /// second_ack.await?;
127    /// # Ok(())
128    /// # }
129    /// ```
130    pub async fn publish<S: ToSubject>(
131        &self,
132        subject: S,
133        payload: Bytes,
134    ) -> Result<PublishAckFuture, PublishError> {
135        self.send_publish(subject, Publish::build().payload(payload))
136            .await
137    }
138
139    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
140    /// the server that the message has been successfully delivered.
141    ///
142    /// If the stream does not exist, `no responders` error will be returned.
143    ///
144    /// # Examples
145    ///
146    /// ```no_run
147    /// # #[tokio::main]
148    /// # async fn main() -> Result<(), async_nats::Error> {
149    /// let client = async_nats::connect("localhost:4222").await?;
150    /// let jetstream = async_nats::jetstream::new(client);
151    ///
152    /// let mut headers = async_nats::HeaderMap::new();
153    /// headers.append("X-key", "Value");
154    /// let ack = jetstream
155    ///     .publish_with_headers("events", headers, "data".into())
156    ///     .await?;
157    /// # Ok(())
158    /// # }
159    /// ```
160    pub async fn publish_with_headers<S: ToSubject>(
161        &self,
162        subject: S,
163        headers: crate::header::HeaderMap,
164        payload: Bytes,
165    ) -> Result<PublishAckFuture, PublishError> {
166        self.send_publish(subject, Publish::build().payload(payload).headers(headers))
167            .await
168    }
169
170    /// Publish a message built by [Publish] and returns an acknowledgment future.
171    ///
172    /// If the stream does not exist, `no responders` error will be returned.
173    ///
174    /// # Examples
175    ///
176    /// ```no_run
177    /// # use async_nats::jetstream::context::Publish;
178    /// # #[tokio::main]
179    /// # async fn main() -> Result<(), async_nats::Error> {
180    /// let client = async_nats::connect("localhost:4222").await?;
181    /// let jetstream = async_nats::jetstream::new(client);
182    ///
183    /// let ack = jetstream
184    ///     .send_publish(
185    ///         "events",
186    ///         Publish::build().payload("data".into()).message_id("uuid"),
187    ///     )
188    ///     .await?;
189    /// # Ok(())
190    /// # }
191    /// ```
192    pub async fn send_publish<S: ToSubject>(
193        &self,
194        subject: S,
195        publish: Publish,
196    ) -> Result<PublishAckFuture, PublishError> {
197        let subject = subject.to_subject();
198        let (sender, receiver) = oneshot::channel();
199
200        let respond = self.client.new_inbox().into();
201
202        let send_fut = self
203            .client
204            .sender
205            .send(Command::Request {
206                subject,
207                payload: publish.payload,
208                respond,
209                headers: publish.headers,
210                sender,
211            })
212            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
213
214        tokio::time::timeout(self.timeout, send_fut)
215            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
216            .await??;
217
218        Ok(PublishAckFuture {
219            timeout: self.timeout,
220            subscription: receiver,
221        })
222    }
223
224    /// Query the server for account information
225    pub async fn query_account(&self) -> Result<Account, AccountError> {
226        let response: Response<Account> = self.request("INFO", b"").await?;
227
228        match response {
229            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
230            Response::Ok(account) => Ok(account),
231        }
232    }
233
234    /// Create a JetStream [Stream] with given config and return a handle to it.
235    /// That handle can be used to manage and use [Consumer].
236    ///
237    /// # Examples
238    ///
239    /// ```no_run
240    /// # #[tokio::main]
241    /// # async fn main() -> Result<(), async_nats::Error> {
242    /// use async_nats::jetstream::stream::Config;
243    /// use async_nats::jetstream::stream::DiscardPolicy;
244    /// let client = async_nats::connect("localhost:4222").await?;
245    /// let jetstream = async_nats::jetstream::new(client);
246    ///
247    /// let stream = jetstream
248    ///     .create_stream(Config {
249    ///         name: "events".to_string(),
250    ///         max_messages: 100_000,
251    ///         discard: DiscardPolicy::Old,
252    ///         ..Default::default()
253    ///     })
254    ///     .await?;
255    /// # Ok(())
256    /// # }
257    /// ```
258    pub async fn create_stream<S>(
259        &self,
260        stream_config: S,
261    ) -> Result<Stream<Info>, CreateStreamError>
262    where
263        Config: From<S>,
264    {
265        let mut config: Config = stream_config.into();
266        if config.name.is_empty() {
267            return Err(CreateStreamError::new(
268                CreateStreamErrorKind::EmptyStreamName,
269            ));
270        }
271        if !is_valid_name(config.name.as_str()) {
272            return Err(CreateStreamError::new(
273                CreateStreamErrorKind::InvalidStreamName,
274            ));
275        }
276        if let Some(ref mut mirror) = config.mirror {
277            if let Some(ref mut domain) = mirror.domain {
278                if mirror.external.is_some() {
279                    return Err(CreateStreamError::new(
280                        CreateStreamErrorKind::DomainAndExternalSet,
281                    ));
282                }
283                mirror.external = Some(External {
284                    api_prefix: format!("$JS.{domain}.API"),
285                    delivery_prefix: None,
286                })
287            }
288        }
289
290        if let Some(ref mut sources) = config.sources {
291            for source in sources {
292                if let Some(ref mut domain) = source.domain {
293                    if source.external.is_some() {
294                        return Err(CreateStreamError::new(
295                            CreateStreamErrorKind::DomainAndExternalSet,
296                        ));
297                    }
298                    source.external = Some(External {
299                        api_prefix: format!("$JS.{domain}.API"),
300                        delivery_prefix: None,
301                    })
302                }
303            }
304        }
305        let subject = format!("STREAM.CREATE.{}", config.name);
306        let response: Response<Info> = self.request(subject, &config).await?;
307
308        match response {
309            Response::Err { error } => Err(error.into()),
310            Response::Ok(info) => Ok(Stream {
311                context: self.clone(),
312                info,
313                name: config.name,
314            }),
315        }
316    }
317
318    /// Checks for [Stream] existence on the server and returns handle to it.
319    /// That handle can be used to manage and use [Consumer].
320    /// This variant does not fetch [Stream] info from the server.
321    /// It means it does not check if the stream actually exists.
322    /// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
323    /// If you however run single operations on many streams, this method is more efficient.
324    ///
325    /// # Examples
326    ///
327    /// ```no_run
328    /// # #[tokio::main]
329    /// # async fn main() -> Result<(), async_nats::Error> {
330    /// let client = async_nats::connect("localhost:4222").await?;
331    /// let jetstream = async_nats::jetstream::new(client);
332    ///
333    /// let stream = jetstream.get_stream_no_info("events").await?;
334    /// # Ok(())
335    /// # }
336    /// ```
337    pub async fn get_stream_no_info<T: AsRef<str>>(
338        &self,
339        stream: T,
340    ) -> Result<Stream<()>, GetStreamError> {
341        let stream = stream.as_ref();
342        if stream.is_empty() {
343            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
344        }
345
346        if !is_valid_name(stream) {
347            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
348        }
349
350        Ok(Stream {
351            context: self.clone(),
352            info: (),
353            name: stream.to_string(),
354        })
355    }
356
357    /// Checks for [Stream] existence on the server and returns handle to it.
358    /// That handle can be used to manage and use [Consumer].
359    ///
360    /// # Examples
361    ///
362    /// ```no_run
363    /// # #[tokio::main]
364    /// # async fn main() -> Result<(), async_nats::Error> {
365    /// let client = async_nats::connect("localhost:4222").await?;
366    /// let jetstream = async_nats::jetstream::new(client);
367    ///
368    /// let stream = jetstream.get_stream("events").await?;
369    /// # Ok(())
370    /// # }
371    /// ```
372    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
373        let stream = stream.as_ref();
374        if stream.is_empty() {
375            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
376        }
377
378        if !is_valid_name(stream) {
379            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
380        }
381
382        let subject = format!("STREAM.INFO.{stream}");
383        let request: Response<Info> = self
384            .request(subject, &())
385            .await
386            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
387        match request {
388            Response::Err { error } => {
389                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
390            }
391            Response::Ok(info) => Ok(Stream {
392                context: self.clone(),
393                info,
394                name: stream.to_string(),
395            }),
396        }
397    }
398
399    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
400    ///
401    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
402    ///
403    /// # Examples
404    ///
405    /// ```no_run
406    /// # #[tokio::main]
407    /// # async fn main() -> Result<(), async_nats::Error> {
408    /// use async_nats::jetstream::stream::Config;
409    /// let client = async_nats::connect("localhost:4222").await?;
410    /// let jetstream = async_nats::jetstream::new(client);
411    ///
412    /// let stream = jetstream
413    ///     .get_or_create_stream(Config {
414    ///         name: "events".to_string(),
415    ///         max_messages: 10_000,
416    ///         ..Default::default()
417    ///     })
418    ///     .await?;
419    /// # Ok(())
420    /// # }
421    /// ```
422    pub async fn get_or_create_stream<S>(
423        &self,
424        stream_config: S,
425    ) -> Result<Stream, CreateStreamError>
426    where
427        S: Into<Config>,
428    {
429        let config: Config = stream_config.into();
430
431        if config.name.is_empty() {
432            return Err(CreateStreamError::new(
433                CreateStreamErrorKind::EmptyStreamName,
434            ));
435        }
436
437        if !is_valid_name(config.name.as_str()) {
438            return Err(CreateStreamError::new(
439                CreateStreamErrorKind::InvalidStreamName,
440            ));
441        }
442        let subject = format!("STREAM.INFO.{}", config.name);
443
444        let request: Response<Info> = self.request(subject, &()).await?;
445        match request {
446            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
447            Response::Err { error } => Err(error.into()),
448            Response::Ok(info) => Ok(Stream {
449                context: self.clone(),
450                info,
451                name: config.name,
452            }),
453        }
454    }
455
456    /// Deletes a [Stream] with a given name.
457    ///
458    /// # Examples
459    ///
460    /// ```no_run
461    /// # #[tokio::main]
462    /// # async fn main() -> Result<(), async_nats::Error> {
463    /// use async_nats::jetstream::stream::Config;
464    /// let client = async_nats::connect("localhost:4222").await?;
465    /// let jetstream = async_nats::jetstream::new(client);
466    ///
467    /// let stream = jetstream.delete_stream("events").await?;
468    /// # Ok(())
469    /// # }
470    /// ```
471    pub async fn delete_stream<T: AsRef<str>>(
472        &self,
473        stream: T,
474    ) -> Result<DeleteStatus, DeleteStreamError> {
475        let stream = stream.as_ref();
476        if stream.is_empty() {
477            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
478        }
479
480        if !is_valid_name(stream) {
481            return Err(DeleteStreamError::new(
482                DeleteStreamErrorKind::InvalidStreamName,
483            ));
484        }
485
486        let subject = format!("STREAM.DELETE.{stream}");
487        match self
488            .request(subject, &json!({}))
489            .await
490            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
491        {
492            Response::Err { error } => Err(DeleteStreamError::new(
493                DeleteStreamErrorKind::JetStream(error),
494            )),
495            Response::Ok(delete_response) => Ok(delete_response),
496        }
497    }
498
499    /// Updates a [Stream] with a given config. If specific field cannot be updated,
500    /// error is returned.
501    ///
502    /// # Examples
503    ///
504    /// ```no_run
505    /// # #[tokio::main]
506    /// # async fn main() -> Result<(), async_nats::Error> {
507    /// use async_nats::jetstream::stream::Config;
508    /// use async_nats::jetstream::stream::DiscardPolicy;
509    /// let client = async_nats::connect("localhost:4222").await?;
510    /// let jetstream = async_nats::jetstream::new(client);
511    ///
512    /// let stream = jetstream
513    ///     .update_stream(Config {
514    ///         name: "events".to_string(),
515    ///         discard: DiscardPolicy::New,
516    ///         max_messages: 50_000,
517    ///         ..Default::default()
518    ///     })
519    ///     .await?;
520    /// # Ok(())
521    /// # }
522    /// ```
523    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
524    where
525        S: Borrow<Config>,
526    {
527        let config = config.borrow();
528
529        if config.name.is_empty() {
530            return Err(CreateStreamError::new(
531                CreateStreamErrorKind::EmptyStreamName,
532            ));
533        }
534
535        if !is_valid_name(config.name.as_str()) {
536            return Err(CreateStreamError::new(
537                CreateStreamErrorKind::InvalidStreamName,
538            ));
539        }
540
541        let subject = format!("STREAM.UPDATE.{}", config.name);
542        match self.request(subject, config).await? {
543            Response::Err { error } => Err(error.into()),
544            Response::Ok(info) => Ok(info),
545        }
546    }
547
548    /// Tries to update a [Stream] with a given config, and if it does not exist, create it.
549    ///
550    /// # Examples
551    ///
552    /// ```no_run
553    /// # #[tokio::main]
554    /// # async fn main() -> Result<(), async_nats::Error> {
555    /// use async_nats::jetstream::stream::Config;
556    /// use async_nats::jetstream::stream::DiscardPolicy;
557    /// let client = async_nats::connect("localhost:4222").await?;
558    /// let jetstream = async_nats::jetstream::new(client);
559    ///
560    /// let stream = jetstream
561    ///     .create_or_update_stream(Config {
562    ///         name: "events".to_string(),
563    ///         discard: DiscardPolicy::New,
564    ///         max_messages: 50_000,
565    ///         ..Default::default()
566    ///     })
567    ///     .await?;
568    /// # Ok(())
569    /// # }
570    /// ```
571    pub async fn create_or_update_stream(&self, config: Config) -> Result<Info, CreateStreamError> {
572        match self.update_stream(config.clone()).await {
573            Ok(stream) => Ok(stream),
574            Err(err) => match err.kind() {
575                CreateStreamErrorKind::NotFound => {
576                    let stream = self
577                        .create_stream(config)
578                        .await
579                        .map_err(|err| CreateStreamError::with_source(err.kind(), err))?;
580                    Ok(stream.info)
581                }
582                _ => Err(err),
583            },
584        }
585    }
586
587    /// Looks up Stream that contains provided subject.
588    ///
589    /// # Examples
590    ///
591    /// ```no_run
592    /// # #[tokio::main]
593    /// # async fn main() -> Result<(), async_nats::Error> {
594    /// use futures::TryStreamExt;
595    /// let client = async_nats::connect("demo.nats.io:4222").await?;
596    /// let jetstream = async_nats::jetstream::new(client);
597    /// let stream_name = jetstream.stream_by_subject("foo.>");
598    /// # Ok(())
599    /// # }
600    /// ```
601    pub async fn stream_by_subject<T: Into<String>>(
602        &self,
603        subject: T,
604    ) -> Result<String, GetStreamByNameError> {
605        let subject = subject.into();
606        if !is_valid_subject(subject.as_str()) {
607            return Err(GetStreamByNameError::new(
608                GetStreamByNameErrorKind::InvalidSubject,
609            ));
610        }
611        let mut names = StreamNames {
612            context: self.clone(),
613            offset: 0,
614            page_request: None,
615            streams: Vec::new(),
616            subject: Some(subject),
617            done: false,
618        };
619        match names.next().await {
620            Some(name) => match name {
621                Ok(name) => Ok(name),
622                Err(err) => Err(GetStreamByNameError::with_source(
623                    GetStreamByNameErrorKind::Request,
624                    err,
625                )),
626            },
627            None => Err(GetStreamByNameError::new(
628                GetStreamByNameErrorKind::NotFound,
629            )),
630        }
631    }
632
633    /// Lists names of all streams for current context.
634    ///
635    /// # Examples
636    ///
637    /// ```no_run
638    /// # #[tokio::main]
639    /// # async fn main() -> Result<(), async_nats::Error> {
640    /// use futures::TryStreamExt;
641    /// let client = async_nats::connect("demo.nats.io:4222").await?;
642    /// let jetstream = async_nats::jetstream::new(client);
643    /// let mut names = jetstream.stream_names();
644    /// while let Some(stream) = names.try_next().await? {
645    ///     println!("stream: {}", stream);
646    /// }
647    /// # Ok(())
648    /// # }
649    /// ```
650    pub fn stream_names(&self) -> StreamNames {
651        StreamNames {
652            context: self.clone(),
653            offset: 0,
654            page_request: None,
655            streams: Vec::new(),
656            subject: None,
657            done: false,
658        }
659    }
660
661    /// Lists all streams info for current context.
662    ///
663    /// # Examples
664    ///
665    /// ```no_run
666    /// # #[tokio::main]
667    /// # async fn main() -> Result<(), async_nats::Error> {
668    /// use futures::TryStreamExt;
669    /// let client = async_nats::connect("demo.nats.io:4222").await?;
670    /// let jetstream = async_nats::jetstream::new(client);
671    /// let mut streams = jetstream.streams();
672    /// while let Some(stream) = streams.try_next().await? {
673    ///     println!("stream: {:?}", stream);
674    /// }
675    /// # Ok(())
676    /// # }
677    /// ```
678    pub fn streams(&self) -> Streams {
679        Streams {
680            context: self.clone(),
681            offset: 0,
682            page_request: None,
683            streams: Vec::new(),
684            done: false,
685        }
686    }
687    /// Returns an existing key-value bucket.
688    ///
689    /// # Examples
690    ///
691    /// ```no_run
692    /// # #[tokio::main]
693    /// # async fn main() -> Result<(), async_nats::Error> {
694    /// let client = async_nats::connect("demo.nats.io:4222").await?;
695    /// let jetstream = async_nats::jetstream::new(client);
696    /// let kv = jetstream.get_key_value("bucket").await?;
697    /// # Ok(())
698    /// # }
699    /// ```
700    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
701        let bucket: String = bucket.into();
702        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
703            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
704        }
705
706        let stream_name = format!("KV_{}", &bucket);
707        let stream = self
708            .get_stream(stream_name.clone())
709            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
710            .await?;
711
712        if stream.info.config.max_messages_per_subject < 1 {
713            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
714        }
715        let mut store = Store {
716            prefix: format!("$KV.{}.", &bucket),
717            name: bucket,
718            stream_name,
719            stream: stream.clone(),
720            put_prefix: None,
721            use_jetstream_prefix: self.prefix != "$JS.API",
722        };
723        if let Some(ref mirror) = stream.info.config.mirror {
724            let bucket = mirror.name.trim_start_matches("KV_");
725            if let Some(ref external) = mirror.external {
726                if !external.api_prefix.is_empty() {
727                    store.use_jetstream_prefix = false;
728                    store.prefix = format!("$KV.{bucket}.");
729                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
730                } else {
731                    store.put_prefix = Some(format!("$KV.{bucket}."));
732                }
733            }
734        };
735
736        Ok(store)
737    }
738
739    /// Creates a new key-value bucket.
740    ///
741    /// # Examples
742    ///
743    /// ```no_run
744    /// # #[tokio::main]
745    /// # async fn main() -> Result<(), async_nats::Error> {
746    /// let client = async_nats::connect("demo.nats.io:4222").await?;
747    /// let jetstream = async_nats::jetstream::new(client);
748    /// let kv = jetstream
749    ///     .create_key_value(async_nats::jetstream::kv::Config {
750    ///         bucket: "kv".to_string(),
751    ///         history: 10,
752    ///         ..Default::default()
753    ///     })
754    ///     .await?;
755    /// # Ok(())
756    /// # }
757    /// ```
758    pub async fn create_key_value(
759        &self,
760        config: crate::jetstream::kv::Config,
761    ) -> Result<Store, CreateKeyValueError> {
762        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
763            return Err(CreateKeyValueError::new(
764                CreateKeyValueErrorKind::InvalidStoreName,
765            ));
766        }
767        let info = self.query_account().await.map_err(|err| {
768            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
769        })?;
770
771        let bucket_name = config.bucket.clone();
772        let stream_config = kv_to_stream_config(config, info)?;
773
774        let stream = self.create_stream(stream_config).await.map_err(|err| {
775            if err.kind() == CreateStreamErrorKind::TimedOut {
776                CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
777            } else {
778                CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
779            }
780        })?;
781
782        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
783    }
784
785    /// Updates an existing key-value bucket.
786    ///
787    /// # Examples
788    ///
789    /// ```no_run
790    /// # #[tokio::main]
791    /// # async fn main() -> Result<(), async_nats::Error> {
792    /// let client = async_nats::connect("demo.nats.io:4222").await?;
793    /// let jetstream = async_nats::jetstream::new(client);
794    /// let kv = jetstream
795    ///     .update_key_value(async_nats::jetstream::kv::Config {
796    ///         bucket: "kv".to_string(),
797    ///         history: 60,
798    ///         ..Default::default()
799    ///     })
800    ///     .await?;
801    /// # Ok(())
802    /// # }
803    /// ```
804    pub async fn update_key_value(
805        &self,
806        config: crate::jetstream::kv::Config,
807    ) -> Result<Store, UpdateKeyValueError> {
808        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
809            return Err(UpdateKeyValueError::new(
810                UpdateKeyValueErrorKind::InvalidStoreName,
811            ));
812        }
813
814        let stream_name = format!("KV_{}", config.bucket);
815        let bucket_name = config.bucket.clone();
816
817        let account = self.query_account().await.map_err(|err| {
818            UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err)
819        })?;
820        let stream = self
821            .update_stream(kv_to_stream_config(config, account)?)
822            .await
823            .map_err(|err| match err.kind() {
824                UpdateStreamErrorKind::NotFound => {
825                    UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::NotFound, err)
826                }
827                _ => UpdateKeyValueError::with_source(UpdateKeyValueErrorKind::JetStream, err),
828            })?;
829
830        let stream = Stream {
831            context: self.clone(),
832            info: stream,
833            name: stream_name,
834        };
835
836        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
837    }
838
839    /// Tries to update an existing Key-Value bucket. If it does not exist, creates it.
840    ///
841    /// # Examples
842    ///
843    /// ```no_run
844    /// # #[tokio::main]
845    /// # async fn main() -> Result<(), async_nats::Error> {
846    /// let client = async_nats::connect("demo.nats.io:4222").await?;
847    /// let jetstream = async_nats::jetstream::new(client);
848    /// let kv = jetstream
849    ///     .update_key_value(async_nats::jetstream::kv::Config {
850    ///         bucket: "kv".to_string(),
851    ///         history: 60,
852    ///         ..Default::default()
853    ///     })
854    ///     .await?;
855    /// # Ok(())
856    /// # }
857    /// ```
858    pub async fn create_or_update_key_value(
859        &self,
860        config: crate::jetstream::kv::Config,
861    ) -> Result<Store, CreateKeyValueError> {
862        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
863            return Err(CreateKeyValueError::new(
864                CreateKeyValueErrorKind::InvalidStoreName,
865            ));
866        }
867
868        let bucket_name = config.bucket.clone();
869        let stream_name = format!("KV_{}", config.bucket);
870
871        let account = self.query_account().await.map_err(|err| {
872            CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
873        })?;
874        let stream = self
875            .create_or_update_stream(kv_to_stream_config(config, account)?)
876            .await
877            .map_err(|err| {
878                CreateKeyValueError::with_source(CreateKeyValueErrorKind::JetStream, err)
879            })?;
880
881        let stream = Stream {
882            context: self.clone(),
883            info: stream,
884            name: stream_name,
885        };
886
887        Ok(map_to_kv(stream, self.prefix.clone(), bucket_name))
888    }
889
890    /// Deletes given key-value bucket.
891    ///
892    /// # Examples
893    ///
894    /// ```no_run
895    /// # #[tokio::main]
896    /// # async fn main() -> Result<(), async_nats::Error> {
897    /// let client = async_nats::connect("demo.nats.io:4222").await?;
898    /// let jetstream = async_nats::jetstream::new(client);
899    /// let kv = jetstream
900    ///     .create_key_value(async_nats::jetstream::kv::Config {
901    ///         bucket: "kv".to_string(),
902    ///         history: 10,
903    ///         ..Default::default()
904    ///     })
905    ///     .await?;
906    /// # Ok(())
907    /// # }
908    /// ```
909    pub async fn delete_key_value<T: AsRef<str>>(
910        &self,
911        bucket: T,
912    ) -> Result<DeleteStatus, KeyValueError> {
913        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
914            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
915        }
916
917        let stream_name = format!("KV_{}", bucket.as_ref());
918        self.delete_stream(stream_name)
919            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
920            .await
921    }
922
923    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
924    //     let config = config.borrow();
925    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
926    //         return Err(Box::new(std::io::Error::other(
927    //             "invalid bucket name",
928    //         )));
929    //     }
930
931    //     let stream_name = format!("KV_{}", config.bucket);
932    //     self.update_stream()
933    //         .await
934    //         .and_then(|info| Ok(()))
935    // }
936
937    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
938    ///
939    /// It has one less interaction with the server when binding to only one
940    /// [crate::jetstream::consumer::Consumer].
941    ///
942    /// # Examples:
943    ///
944    /// ```no_run
945    /// # #[tokio::main]
946    /// # async fn main() -> Result<(), async_nats::Error> {
947    /// use async_nats::jetstream::consumer::PullConsumer;
948    ///
949    /// let client = async_nats::connect("localhost:4222").await?;
950    /// let jetstream = async_nats::jetstream::new(client);
951    ///
952    /// let consumer: PullConsumer = jetstream
953    ///     .get_consumer_from_stream("consumer", "stream")
954    ///     .await?;
955    ///
956    /// # Ok(())
957    /// # }
958    /// ```
959    pub async fn get_consumer_from_stream<T, C, S>(
960        &self,
961        consumer: C,
962        stream: S,
963    ) -> Result<Consumer<T>, ConsumerError>
964    where
965        T: FromConsumer + IntoConsumerConfig,
966        S: AsRef<str>,
967        C: AsRef<str>,
968    {
969        if !is_valid_name(stream.as_ref()) {
970            return Err(ConsumerError::with_source(
971                ConsumerErrorKind::InvalidName,
972                "invalid stream",
973            ));
974        }
975
976        if !is_valid_name(consumer.as_ref()) {
977            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
978        }
979
980        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
981
982        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
983            Response::Ok(info) => info,
984            Response::Err { error } => return Err(error.into()),
985        };
986
987        Ok(Consumer::new(
988            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
989                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
990            })?,
991            info,
992            self.clone(),
993        ))
994    }
995
996    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
997    ///
998    /// It has one less interaction with the server when binding to only one
999    /// [crate::jetstream::consumer::Consumer].
1000    ///
1001    /// # Examples:
1002    ///
1003    /// ```no_run
1004    /// # #[tokio::main]
1005    /// # async fn main() -> Result<(), async_nats::Error> {
1006    /// use async_nats::jetstream::consumer::PullConsumer;
1007    ///
1008    /// let client = async_nats::connect("localhost:4222").await?;
1009    /// let jetstream = async_nats::jetstream::new(client);
1010    ///
1011    /// jetstream
1012    ///     .delete_consumer_from_stream("consumer", "stream")
1013    ///     .await?;
1014    ///
1015    /// # Ok(())
1016    /// # }
1017    /// ```
1018    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
1019        &self,
1020        consumer: C,
1021        stream: S,
1022    ) -> Result<DeleteStatus, ConsumerError> {
1023        if !is_valid_name(consumer.as_ref()) {
1024            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
1025        }
1026
1027        if !is_valid_name(stream.as_ref()) {
1028            return Err(ConsumerError::with_source(
1029                ConsumerErrorKind::Other,
1030                "invalid stream name",
1031            ));
1032        }
1033
1034        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
1035
1036        match self.request(subject, &json!({})).await? {
1037            Response::Ok(delete_status) => Ok(delete_status),
1038            Response::Err { error } => Err(error.into()),
1039        }
1040    }
1041
1042    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
1043    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1044    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
1045    ///
1046    /// # Examples
1047    ///
1048    /// ```no_run
1049    /// # #[tokio::main]
1050    /// # async fn main() -> Result<(), async_nats::Error> {
1051    /// use async_nats::jetstream::consumer;
1052    /// let client = async_nats::connect("localhost:4222").await?;
1053    /// let jetstream = async_nats::jetstream::new(client);
1054    ///
1055    /// let consumer: consumer::PullConsumer = jetstream
1056    ///     .create_consumer_on_stream(
1057    ///         consumer::pull::Config {
1058    ///             durable_name: Some("pull".to_string()),
1059    ///             ..Default::default()
1060    ///         },
1061    ///         "stream",
1062    ///     )
1063    ///     .await?;
1064    /// # Ok(())
1065    /// # }
1066    /// ```
1067    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1068        &self,
1069        config: C,
1070        stream: S,
1071    ) -> Result<Consumer<C>, ConsumerError> {
1072        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1073            .await
1074    }
1075
1076    /// Update an existing consumer.
1077    /// This call will fail if the consumer does not exist.
1078    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
1079    ///
1080    /// # Examples
1081    ///
1082    /// ```no_run
1083    /// # #[tokio::main]
1084    /// # async fn main() -> Result<(), async_nats::Error> {
1085    /// use async_nats::jetstream::consumer;
1086    /// let client = async_nats::connect("localhost:4222").await?;
1087    /// let jetstream = async_nats::jetstream::new(client);
1088    ///
1089    /// let consumer: consumer::PullConsumer = jetstream
1090    ///     .update_consumer_on_stream(
1091    ///         consumer::pull::Config {
1092    ///             durable_name: Some("pull".to_string()),
1093    ///             description: Some("updated pull consumer".to_string()),
1094    ///             ..Default::default()
1095    ///         },
1096    ///         "stream",
1097    ///     )
1098    ///     .await?;
1099    /// # Ok(())
1100    /// # }
1101    /// ```
1102    #[cfg(feature = "server_2_10")]
1103    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1104        &self,
1105        config: C,
1106        stream: S,
1107    ) -> Result<Consumer<C>, ConsumerUpdateError> {
1108        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1109            .await
1110            .map_err(|err| err.into())
1111    }
1112
1113    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
1114    /// the same.
1115    /// This method will fail if consumer is already present with different config.
1116    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1117    ///
1118    /// # Examples
1119    ///
1120    /// ```no_run
1121    /// # #[tokio::main]
1122    /// # async fn main() -> Result<(), async_nats::Error> {
1123    /// use async_nats::jetstream::consumer;
1124    /// let client = async_nats::connect("localhost:4222").await?;
1125    /// let jetstream = async_nats::jetstream::new(client);
1126    ///
1127    /// let consumer: consumer::PullConsumer = jetstream
1128    ///     .create_consumer_strict_on_stream(
1129    ///         consumer::pull::Config {
1130    ///             durable_name: Some("pull".to_string()),
1131    ///             ..Default::default()
1132    ///         },
1133    ///         "stream",
1134    ///     )
1135    ///     .await?;
1136    /// # Ok(())
1137    /// # }
1138    /// ```
1139    #[cfg(feature = "server_2_10")]
1140    pub async fn create_consumer_strict_on_stream<
1141        C: IntoConsumerConfig + FromConsumer,
1142        S: AsRef<str>,
1143    >(
1144        &self,
1145        config: C,
1146        stream: S,
1147    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1148        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1149            .await
1150            .map_err(|err| err.into())
1151    }
1152
1153    async fn create_consumer_on_stream_action<
1154        C: IntoConsumerConfig + FromConsumer,
1155        S: AsRef<str>,
1156    >(
1157        &self,
1158        config: C,
1159        stream: S,
1160        action: ConsumerAction,
1161    ) -> Result<Consumer<C>, ConsumerError> {
1162        let config = config.into_consumer_config();
1163
1164        let subject = {
1165            let filter = if config.filter_subject.is_empty() {
1166                "".to_string()
1167            } else {
1168                format!(".{}", config.filter_subject)
1169            };
1170            config
1171                .name
1172                .as_ref()
1173                .or(config.durable_name.as_ref())
1174                .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1175                .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1176        };
1177
1178        match self
1179            .request(
1180                subject,
1181                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1182            )
1183            .await?
1184        {
1185            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1186            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1187                FromConsumer::try_from_consumer_config(info.clone().config)
1188                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1189                info,
1190                self.clone(),
1191            )),
1192        }
1193    }
1194
1195    /// Send a request to the jetstream JSON API.
1196    ///
1197    /// This is a low level API used mostly internally, that should be used only in
1198    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1199    ///
1200    /// # Examples
1201    ///
1202    /// ```no_run
1203    /// # use async_nats::jetstream::stream::Info;
1204    /// # use async_nats::jetstream::response::Response;
1205    /// # #[tokio::main]
1206    /// # async fn main() -> Result<(), async_nats::Error> {
1207    /// let client = async_nats::connect("localhost:4222").await?;
1208    /// let jetstream = async_nats::jetstream::new(client);
1209    ///
1210    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1211    /// # Ok(())
1212    /// # }
1213    /// ```
1214    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1215    where
1216        S: ToSubject,
1217        T: ?Sized + Serialize,
1218        V: DeserializeOwned,
1219    {
1220        let subject = subject.to_subject();
1221        let request = serde_json::to_vec(&payload)
1222            .map(Bytes::from)
1223            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1224
1225        debug!("JetStream request sent: {:?}", request);
1226
1227        let message = self
1228            .client
1229            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1230            .await;
1231        let message = message?;
1232        debug!(
1233            "JetStream request response: {:?}",
1234            from_utf8(&message.payload)
1235        );
1236        let response = serde_json::from_slice(message.payload.as_ref())
1237            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1238
1239        Ok(response)
1240    }
1241
1242    /// Creates a new object store bucket.
1243    ///
1244    /// # Examples
1245    ///
1246    /// ```no_run
1247    /// # #[tokio::main]
1248    /// # async fn main() -> Result<(), async_nats::Error> {
1249    /// let client = async_nats::connect("demo.nats.io").await?;
1250    /// let jetstream = async_nats::jetstream::new(client);
1251    /// let bucket = jetstream
1252    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1253    ///         bucket: "bucket".to_string(),
1254    ///         ..Default::default()
1255    ///     })
1256    ///     .await?;
1257    /// # Ok(())
1258    /// # }
1259    /// ```
1260    pub async fn create_object_store(
1261        &self,
1262        config: super::object_store::Config,
1263    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1264        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1265            return Err(CreateObjectStoreError::new(
1266                CreateKeyValueErrorKind::InvalidStoreName,
1267            ));
1268        }
1269
1270        let bucket_name = config.bucket.clone();
1271        let stream_name = format!("OBJ_{bucket_name}");
1272        let chunk_subject = format!("$O.{bucket_name}.C.>");
1273        let meta_subject = format!("$O.{bucket_name}.M.>");
1274
1275        let stream = self
1276            .create_stream(super::stream::Config {
1277                name: stream_name,
1278                description: config.description.clone(),
1279                subjects: vec![chunk_subject, meta_subject],
1280                max_age: config.max_age,
1281                max_bytes: config.max_bytes,
1282                storage: config.storage,
1283                num_replicas: config.num_replicas,
1284                discard: DiscardPolicy::New,
1285                allow_rollup: true,
1286                allow_direct: true,
1287                #[cfg(feature = "server_2_10")]
1288                compression: if config.compression {
1289                    Some(Compression::S2)
1290                } else {
1291                    None
1292                },
1293                placement: config.placement,
1294                ..Default::default()
1295            })
1296            .await
1297            .map_err(|err| {
1298                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1299            })?;
1300
1301        Ok(ObjectStore {
1302            name: bucket_name,
1303            stream,
1304        })
1305    }
1306
1307    /// Get an existing object store bucket.
1308    ///
1309    /// # Examples
1310    ///
1311    /// ```no_run
1312    /// # #[tokio::main]
1313    /// # async fn main() -> Result<(), async_nats::Error> {
1314    /// let client = async_nats::connect("demo.nats.io").await?;
1315    /// let jetstream = async_nats::jetstream::new(client);
1316    /// let bucket = jetstream.get_object_store("bucket").await?;
1317    /// # Ok(())
1318    /// # }
1319    /// ```
1320    pub async fn get_object_store<T: AsRef<str>>(
1321        &self,
1322        bucket_name: T,
1323    ) -> Result<ObjectStore, ObjectStoreError> {
1324        let bucket_name = bucket_name.as_ref();
1325        if !is_valid_bucket_name(bucket_name) {
1326            return Err(ObjectStoreError::new(
1327                ObjectStoreErrorKind::InvalidBucketName,
1328            ));
1329        }
1330        let stream_name = format!("OBJ_{bucket_name}");
1331        let stream = self
1332            .get_stream(stream_name)
1333            .await
1334            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1335
1336        Ok(ObjectStore {
1337            name: bucket_name.to_string(),
1338            stream,
1339        })
1340    }
1341
1342    /// Delete a object store bucket.
1343    ///
1344    /// # Examples
1345    ///
1346    /// ```no_run
1347    /// # #[tokio::main]
1348    /// # async fn main() -> Result<(), async_nats::Error> {
1349    /// let client = async_nats::connect("demo.nats.io").await?;
1350    /// let jetstream = async_nats::jetstream::new(client);
1351    /// let bucket = jetstream.delete_object_store("bucket").await?;
1352    /// # Ok(())
1353    /// # }
1354    /// ```
1355    pub async fn delete_object_store<T: AsRef<str>>(
1356        &self,
1357        bucket_name: T,
1358    ) -> Result<(), DeleteObjectStore> {
1359        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1360        self.delete_stream(stream_name)
1361            .await
1362            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1363        Ok(())
1364    }
1365}
1366
1367#[derive(Clone, Copy, Debug, PartialEq)]
1368pub enum PublishErrorKind {
1369    StreamNotFound,
1370    WrongLastMessageId,
1371    WrongLastSequence,
1372    TimedOut,
1373    BrokenPipe,
1374    Other,
1375}
1376
1377impl Display for PublishErrorKind {
1378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1379        match self {
1380            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1381            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1382            Self::Other => write!(f, "publish failed"),
1383            Self::BrokenPipe => write!(f, "broken pipe"),
1384            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1385            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1386        }
1387    }
1388}
1389
1390pub type PublishError = Error<PublishErrorKind>;
1391
1392#[derive(Debug)]
1393pub struct PublishAckFuture {
1394    timeout: Duration,
1395    subscription: oneshot::Receiver<Message>,
1396}
1397
1398impl PublishAckFuture {
1399    async fn next_with_timeout(self) -> Result<PublishAck, PublishError> {
1400        let next = tokio::time::timeout(self.timeout, self.subscription)
1401            .await
1402            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1403        next.map_or_else(
1404            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1405            |m| {
1406                if m.status == Some(StatusCode::NO_RESPONDERS) {
1407                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1408                }
1409                let response = serde_json::from_slice(m.payload.as_ref())
1410                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1411                match response {
1412                    Response::Err { error } => match error.error_code() {
1413                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1414                            PublishErrorKind::WrongLastMessageId,
1415                            error,
1416                        )),
1417                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1418                            PublishErrorKind::WrongLastSequence,
1419                            error,
1420                        )),
1421                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1422                    },
1423                    Response::Ok(publish_ack) => Ok(publish_ack),
1424                }
1425            },
1426        )
1427    }
1428}
1429impl IntoFuture for PublishAckFuture {
1430    type Output = Result<PublishAck, PublishError>;
1431
1432    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1433
1434    fn into_future(self) -> Self::IntoFuture {
1435        Box::pin(std::future::IntoFuture::into_future(
1436            self.next_with_timeout(),
1437        ))
1438    }
1439}
1440
1441#[derive(Deserialize, Debug)]
1442struct StreamPage {
1443    total: usize,
1444    streams: Option<Vec<String>>,
1445}
1446
1447#[derive(Deserialize, Debug)]
1448struct StreamInfoPage {
1449    total: usize,
1450    streams: Option<Vec<super::stream::Info>>,
1451}
1452
1453type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1454
1455pub struct StreamNames {
1456    context: Context,
1457    offset: usize,
1458    page_request: Option<PageRequest>,
1459    subject: Option<String>,
1460    streams: Vec<String>,
1461    done: bool,
1462}
1463
1464impl futures::Stream for StreamNames {
1465    type Item = Result<String, StreamsError>;
1466
1467    fn poll_next(
1468        mut self: Pin<&mut Self>,
1469        cx: &mut std::task::Context<'_>,
1470    ) -> std::task::Poll<Option<Self::Item>> {
1471        match self.page_request.as_mut() {
1472            Some(page) => match page.try_poll_unpin(cx) {
1473                std::task::Poll::Ready(page) => {
1474                    self.page_request = None;
1475                    let page = page
1476                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1477                    if let Some(streams) = page.streams {
1478                        self.offset += streams.len();
1479                        self.streams = streams;
1480                        if self.offset >= page.total {
1481                            self.done = true;
1482                        }
1483                        match self.streams.pop() {
1484                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1485                            None => Poll::Ready(None),
1486                        }
1487                    } else {
1488                        Poll::Ready(None)
1489                    }
1490                }
1491                std::task::Poll::Pending => std::task::Poll::Pending,
1492            },
1493            None => {
1494                if let Some(stream) = self.streams.pop() {
1495                    Poll::Ready(Some(Ok(stream)))
1496                } else {
1497                    if self.done {
1498                        return Poll::Ready(None);
1499                    }
1500                    let context = self.context.clone();
1501                    let offset = self.offset;
1502                    let subject = self.subject.clone();
1503                    self.page_request = Some(Box::pin(async move {
1504                        match context
1505                            .request(
1506                                "STREAM.NAMES",
1507                                &json!({
1508                                    "offset": offset,
1509                                    "subject": subject
1510                                }),
1511                            )
1512                            .await?
1513                        {
1514                            Response::Err { error } => {
1515                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1516                            }
1517                            Response::Ok(page) => Ok(page),
1518                        }
1519                    }));
1520                    self.poll_next(cx)
1521                }
1522            }
1523        }
1524    }
1525}
1526
1527type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1528
1529pub type StreamsErrorKind = RequestErrorKind;
1530pub type StreamsError = RequestError;
1531
1532pub struct Streams {
1533    context: Context,
1534    offset: usize,
1535    page_request: Option<PageInfoRequest>,
1536    streams: Vec<super::stream::Info>,
1537    done: bool,
1538}
1539
1540impl futures::Stream for Streams {
1541    type Item = Result<super::stream::Info, StreamsError>;
1542
1543    fn poll_next(
1544        mut self: Pin<&mut Self>,
1545        cx: &mut std::task::Context<'_>,
1546    ) -> std::task::Poll<Option<Self::Item>> {
1547        match self.page_request.as_mut() {
1548            Some(page) => match page.try_poll_unpin(cx) {
1549                std::task::Poll::Ready(page) => {
1550                    self.page_request = None;
1551                    let page = page
1552                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1553                    if let Some(streams) = page.streams {
1554                        self.offset += streams.len();
1555                        self.streams = streams;
1556                        if self.offset >= page.total {
1557                            self.done = true;
1558                        }
1559                        match self.streams.pop() {
1560                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1561                            None => Poll::Ready(None),
1562                        }
1563                    } else {
1564                        Poll::Ready(None)
1565                    }
1566                }
1567                std::task::Poll::Pending => std::task::Poll::Pending,
1568            },
1569            None => {
1570                if let Some(stream) = self.streams.pop() {
1571                    Poll::Ready(Some(Ok(stream)))
1572                } else {
1573                    if self.done {
1574                        return Poll::Ready(None);
1575                    }
1576                    let context = self.context.clone();
1577                    let offset = self.offset;
1578                    self.page_request = Some(Box::pin(async move {
1579                        match context
1580                            .request(
1581                                "STREAM.LIST",
1582                                &json!({
1583                                    "offset": offset,
1584                                }),
1585                            )
1586                            .await?
1587                        {
1588                            Response::Err { error } => {
1589                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1590                            }
1591                            Response::Ok(page) => Ok(page),
1592                        }
1593                    }));
1594                    self.poll_next(cx)
1595                }
1596            }
1597        }
1598    }
1599}
1600/// Used for building customized `publish` message.
1601#[derive(Default, Clone, Debug)]
1602pub struct Publish {
1603    payload: Bytes,
1604    headers: Option<header::HeaderMap>,
1605}
1606impl Publish {
1607    /// Creates a new custom Publish struct to be used with.
1608    pub fn build() -> Self {
1609        Default::default()
1610    }
1611
1612    /// Sets the payload for the message.
1613    pub fn payload(mut self, payload: Bytes) -> Self {
1614        self.payload = payload;
1615        self
1616    }
1617    /// Adds headers to the message.
1618    pub fn headers(mut self, headers: HeaderMap) -> Self {
1619        self.headers = Some(headers);
1620        self
1621    }
1622    /// A shorthand to add a single header.
1623    pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1624        self.headers
1625            .get_or_insert(header::HeaderMap::new())
1626            .insert(name, value);
1627        self
1628    }
1629    /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
1630    pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1631        self.header(header::NATS_MESSAGE_ID, id.as_ref())
1632    }
1633    /// Sets expected last message ID.
1634    /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
1635    pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1636        self.header(
1637            header::NATS_EXPECTED_LAST_MESSAGE_ID,
1638            last_message_id.as_ref(),
1639        )
1640    }
1641    /// Sets the last expected stream sequence.
1642    /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
1643    pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1644        self.header(
1645            header::NATS_EXPECTED_LAST_SEQUENCE,
1646            HeaderValue::from(last_sequence),
1647        )
1648    }
1649    /// Sets the last expected stream sequence for a subject this message will be published to.
1650    /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
1651    pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1652        self.header(
1653            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1654            HeaderValue::from(subject_sequence),
1655        )
1656    }
1657    /// Sets the expected stream name.
1658    /// It sets the `Nats-Expected-Stream` header with provided value.
1659    pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1660        self.header(
1661            header::NATS_EXPECTED_STREAM,
1662            HeaderValue::from(stream.as_ref()),
1663        )
1664    }
1665
1666    #[cfg(feature = "server_2_11")]
1667    /// Sets TTL for a single message.
1668    /// It sets the `Nats-TTL` header with provided value.
1669    pub fn ttl(self, ttl: Duration) -> Self {
1670        self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
1671    }
1672}
1673
1674#[derive(Clone, Copy, Debug, PartialEq)]
1675pub enum RequestErrorKind {
1676    NoResponders,
1677    TimedOut,
1678    Other,
1679}
1680
1681impl Display for RequestErrorKind {
1682    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1683        match self {
1684            Self::TimedOut => write!(f, "timed out"),
1685            Self::Other => write!(f, "request failed"),
1686            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1687        }
1688    }
1689}
1690
1691pub type RequestError = Error<RequestErrorKind>;
1692
1693impl From<crate::RequestError> for RequestError {
1694    fn from(error: crate::RequestError) -> Self {
1695        match error.kind() {
1696            crate::RequestErrorKind::TimedOut => {
1697                RequestError::with_source(RequestErrorKind::TimedOut, error)
1698            }
1699            crate::RequestErrorKind::NoResponders => {
1700                RequestError::new(RequestErrorKind::NoResponders)
1701            }
1702            crate::RequestErrorKind::Other => {
1703                RequestError::with_source(RequestErrorKind::Other, error)
1704            }
1705        }
1706    }
1707}
1708
1709impl From<super::errors::Error> for RequestError {
1710    fn from(err: super::errors::Error) -> Self {
1711        RequestError::with_source(RequestErrorKind::Other, err)
1712    }
1713}
1714
1715pub type ConsumerInfoError = Error<ConsumerInfoErrorKind>;
1716
1717#[derive(Clone, Debug, PartialEq)]
1718pub enum ConsumerInfoErrorKind {
1719    InvalidName,
1720    Offline,
1721    NotFound,
1722    StreamNotFound,
1723    Request,
1724    JetStream(super::errors::Error),
1725    TimedOut,
1726    NoResponders,
1727}
1728
1729impl Display for ConsumerInfoErrorKind {
1730    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1731        match self {
1732            Self::InvalidName => write!(f, "invalid consumer name"),
1733            Self::Offline => write!(f, "consumer is offline"),
1734            Self::NotFound => write!(f, "consumer not found"),
1735            Self::StreamNotFound => write!(f, "stream not found"),
1736            Self::Request => write!(f, "request error"),
1737            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1738            Self::TimedOut => write!(f, "timed out"),
1739            Self::NoResponders => write!(f, "no responders"),
1740        }
1741    }
1742}
1743
1744impl From<super::errors::Error> for ConsumerInfoError {
1745    fn from(error: super::errors::Error) -> Self {
1746        match error.error_code() {
1747            ErrorCode::CONSUMER_NOT_FOUND => {
1748                ConsumerInfoError::new(ConsumerInfoErrorKind::NotFound)
1749            }
1750            ErrorCode::STREAM_NOT_FOUND => {
1751                ConsumerInfoError::new(ConsumerInfoErrorKind::StreamNotFound)
1752            }
1753            ErrorCode::CONSUMER_OFFLINE => ConsumerInfoError::new(ConsumerInfoErrorKind::Offline),
1754            _ => ConsumerInfoError::new(ConsumerInfoErrorKind::JetStream(error)),
1755        }
1756    }
1757}
1758
1759impl From<RequestError> for ConsumerInfoError {
1760    fn from(error: RequestError) -> Self {
1761        match error.kind() {
1762            RequestErrorKind::TimedOut => ConsumerInfoError::new(ConsumerInfoErrorKind::TimedOut),
1763            RequestErrorKind::Other => {
1764                ConsumerInfoError::with_source(ConsumerInfoErrorKind::Request, error)
1765            }
1766            RequestErrorKind::NoResponders => {
1767                ConsumerInfoError::new(ConsumerInfoErrorKind::NoResponders)
1768            }
1769        }
1770    }
1771}
1772
1773#[derive(Clone, Debug, PartialEq)]
1774pub enum CreateStreamErrorKind {
1775    EmptyStreamName,
1776    InvalidStreamName,
1777    DomainAndExternalSet,
1778    JetStreamUnavailable,
1779    JetStream(super::errors::Error),
1780    TimedOut,
1781    Response,
1782    NotFound,
1783    ResponseParse,
1784}
1785
1786impl Display for CreateStreamErrorKind {
1787    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1788        match self {
1789            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
1790            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
1791            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
1792            Self::NotFound => write!(f, "stream not found"),
1793            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1794            Self::TimedOut => write!(f, "jetstream request timed out"),
1795            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
1796            Self::ResponseParse => write!(f, "failed to parse server response"),
1797            Self::Response => write!(f, "response error"),
1798        }
1799    }
1800}
1801
1802pub type CreateStreamError = Error<CreateStreamErrorKind>;
1803
1804impl From<super::errors::Error> for CreateStreamError {
1805    fn from(error: super::errors::Error) -> Self {
1806        match error.kind() {
1807            super::errors::ErrorCode::STREAM_NOT_FOUND => {
1808                CreateStreamError::new(CreateStreamErrorKind::NotFound)
1809            }
1810            _ => CreateStreamError::new(CreateStreamErrorKind::JetStream(error)),
1811        }
1812    }
1813}
1814
1815impl From<RequestError> for CreateStreamError {
1816    fn from(error: RequestError) -> Self {
1817        match error.kind() {
1818            RequestErrorKind::NoResponders => {
1819                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
1820            }
1821            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
1822            RequestErrorKind::Other => {
1823                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
1824            }
1825        }
1826    }
1827}
1828
1829#[derive(Clone, Debug, PartialEq)]
1830pub enum GetStreamErrorKind {
1831    EmptyName,
1832    Request,
1833    InvalidStreamName,
1834    JetStream(super::errors::Error),
1835}
1836
1837impl Display for GetStreamErrorKind {
1838    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1839        match self {
1840            Self::EmptyName => write!(f, "empty name cannot be empty"),
1841            Self::Request => write!(f, "request error"),
1842            Self::InvalidStreamName => write!(f, "invalid stream name"),
1843            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1844        }
1845    }
1846}
1847
1848#[derive(Clone, Debug, PartialEq)]
1849pub enum GetStreamByNameErrorKind {
1850    Request,
1851    NotFound,
1852    InvalidSubject,
1853    JetStream(super::errors::Error),
1854}
1855
1856impl Display for GetStreamByNameErrorKind {
1857    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1858        match self {
1859            Self::Request => write!(f, "request error"),
1860            Self::NotFound => write!(f, "stream not found"),
1861            Self::InvalidSubject => write!(f, "invalid subject"),
1862            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1863        }
1864    }
1865}
1866
1867pub type GetStreamError = Error<GetStreamErrorKind>;
1868pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
1869
1870pub type UpdateStreamError = CreateStreamError;
1871pub type UpdateStreamErrorKind = CreateStreamErrorKind;
1872pub type DeleteStreamError = GetStreamError;
1873pub type DeleteStreamErrorKind = GetStreamErrorKind;
1874
1875#[derive(Clone, Copy, Debug, PartialEq)]
1876pub enum KeyValueErrorKind {
1877    InvalidStoreName,
1878    GetBucket,
1879    JetStream,
1880}
1881
1882impl Display for KeyValueErrorKind {
1883    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1884        match self {
1885            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1886            Self::GetBucket => write!(f, "failed to get the bucket"),
1887            Self::JetStream => write!(f, "JetStream error"),
1888        }
1889    }
1890}
1891
1892pub type KeyValueError = Error<KeyValueErrorKind>;
1893
1894#[derive(Clone, Copy, Debug, PartialEq)]
1895pub enum CreateKeyValueErrorKind {
1896    InvalidStoreName,
1897    TooLongHistory,
1898    JetStream,
1899    BucketCreate,
1900    TimedOut,
1901    LimitMarkersNotSupported,
1902}
1903
1904impl Display for CreateKeyValueErrorKind {
1905    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1906        match self {
1907            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1908            Self::TooLongHistory => write!(f, "too long history"),
1909            Self::JetStream => write!(f, "JetStream error"),
1910            Self::BucketCreate => write!(f, "bucket creation failed"),
1911            Self::TimedOut => write!(f, "timed out"),
1912            Self::LimitMarkersNotSupported => {
1913                write!(f, "limit markers not supported")
1914            }
1915        }
1916    }
1917}
1918
1919#[derive(Clone, Copy, Debug, PartialEq)]
1920pub enum UpdateKeyValueErrorKind {
1921    InvalidStoreName,
1922    TooLongHistory,
1923    JetStream,
1924    BucketUpdate,
1925    TimedOut,
1926    LimitMarkersNotSupported,
1927    NotFound,
1928}
1929
1930impl Display for UpdateKeyValueErrorKind {
1931    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1932        match self {
1933            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1934            Self::TooLongHistory => write!(f, "too long history"),
1935            Self::JetStream => write!(f, "JetStream error"),
1936            Self::BucketUpdate => write!(f, "bucket creation failed"),
1937            Self::TimedOut => write!(f, "timed out"),
1938            Self::LimitMarkersNotSupported => {
1939                write!(f, "limit markers not supported")
1940            }
1941            Self::NotFound => write!(f, "bucket does not exist"),
1942        }
1943    }
1944}
1945pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
1946pub type UpdateKeyValueError = Error<UpdateKeyValueErrorKind>;
1947
1948pub type CreateObjectStoreError = CreateKeyValueError;
1949pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
1950
1951#[derive(Clone, Copy, Debug, PartialEq)]
1952pub enum ObjectStoreErrorKind {
1953    InvalidBucketName,
1954    GetStore,
1955}
1956
1957impl Display for ObjectStoreErrorKind {
1958    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1959        match self {
1960            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
1961            Self::GetStore => write!(f, "failed to get Object Store"),
1962        }
1963    }
1964}
1965
1966pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
1967
1968pub type DeleteObjectStore = ObjectStoreError;
1969pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
1970
1971#[derive(Clone, Debug, PartialEq)]
1972pub enum AccountErrorKind {
1973    TimedOut,
1974    JetStream(super::errors::Error),
1975    JetStreamUnavailable,
1976    Other,
1977}
1978
1979impl Display for AccountErrorKind {
1980    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1981        match self {
1982            Self::TimedOut => write!(f, "timed out"),
1983            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1984            Self::Other => write!(f, "error"),
1985            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
1986        }
1987    }
1988}
1989
1990pub type AccountError = Error<AccountErrorKind>;
1991
1992impl From<RequestError> for AccountError {
1993    fn from(err: RequestError) -> Self {
1994        match err.kind {
1995            RequestErrorKind::NoResponders => {
1996                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
1997            }
1998            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
1999            RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
2000        }
2001    }
2002}
2003
2004#[derive(Clone, Debug, Serialize)]
2005enum ConsumerAction {
2006    #[serde(rename = "")]
2007    CreateOrUpdate,
2008    #[serde(rename = "create")]
2009    #[cfg(feature = "server_2_10")]
2010    Create,
2011    #[serde(rename = "update")]
2012    #[cfg(feature = "server_2_10")]
2013    Update,
2014}
2015
2016// Maps a Stream config to KV Store.
2017fn map_to_kv(stream: super::stream::Stream, prefix: String, bucket: String) -> Store {
2018    let mut store = Store {
2019        prefix: format!("$KV.{}.", bucket.as_str()),
2020        name: bucket,
2021        stream: stream.clone(),
2022        stream_name: stream.info.config.name.clone(),
2023        put_prefix: None,
2024        use_jetstream_prefix: prefix != "$JS.API",
2025    };
2026    if let Some(ref mirror) = stream.info.config.mirror {
2027        let bucket = mirror.name.trim_start_matches("KV_");
2028        if let Some(ref external) = mirror.external {
2029            if !external.api_prefix.is_empty() {
2030                store.use_jetstream_prefix = false;
2031                store.prefix = format!("$KV.{bucket}.");
2032                store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
2033            } else {
2034                store.put_prefix = Some(format!("$KV.{bucket}."));
2035            }
2036        }
2037    };
2038    store
2039}
2040
2041enum KvToStreamConfigError {
2042    TooLongHistory,
2043    #[allow(dead_code)]
2044    LimitMarkersNotSupported,
2045}
2046
2047impl From<KvToStreamConfigError> for CreateKeyValueError {
2048    fn from(err: KvToStreamConfigError) -> Self {
2049        match err {
2050            KvToStreamConfigError::TooLongHistory => {
2051                CreateKeyValueError::new(CreateKeyValueErrorKind::TooLongHistory)
2052            }
2053            KvToStreamConfigError::LimitMarkersNotSupported => {
2054                CreateKeyValueError::new(CreateKeyValueErrorKind::LimitMarkersNotSupported)
2055            }
2056        }
2057    }
2058}
2059
2060impl From<KvToStreamConfigError> for UpdateKeyValueError {
2061    fn from(err: KvToStreamConfigError) -> Self {
2062        match err {
2063            KvToStreamConfigError::TooLongHistory => {
2064                UpdateKeyValueError::new(UpdateKeyValueErrorKind::TooLongHistory)
2065            }
2066            KvToStreamConfigError::LimitMarkersNotSupported => {
2067                UpdateKeyValueError::new(UpdateKeyValueErrorKind::LimitMarkersNotSupported)
2068            }
2069        }
2070    }
2071}
2072
2073// Maps the KV config to Stream config.
2074fn kv_to_stream_config(
2075    config: kv::Config,
2076    _account: Account,
2077) -> Result<super::stream::Config, KvToStreamConfigError> {
2078    let history = if config.history > 0 {
2079        if config.history > MAX_HISTORY {
2080            return Err(KvToStreamConfigError::TooLongHistory);
2081        }
2082        config.history
2083    } else {
2084        1
2085    };
2086
2087    let num_replicas = if config.num_replicas == 0 {
2088        1
2089    } else {
2090        config.num_replicas
2091    };
2092
2093    #[cfg(feature = "server_2_11")]
2094    let (mut allow_message_ttl, mut subject_delete_marker_ttl) = (false, None);
2095
2096    #[cfg(feature = "server_2_11")]
2097    if let Some(duration) = config.limit_markers {
2098        if _account.requests.level < 1 {
2099            return Err(KvToStreamConfigError::LimitMarkersNotSupported);
2100        }
2101        allow_message_ttl = true;
2102        subject_delete_marker_ttl = Some(duration);
2103    }
2104
2105    let mut mirror = config.mirror.clone();
2106    let mut sources = config.sources.clone();
2107    let mut mirror_direct = config.mirror_direct;
2108
2109    let mut subjects = Vec::new();
2110    if let Some(ref mut mirror) = mirror {
2111        if !mirror.name.starts_with("KV_") {
2112            mirror.name = format!("KV_{}", mirror.name);
2113        }
2114        mirror_direct = true;
2115    } else if let Some(ref mut sources) = sources {
2116        for source in sources {
2117            if !source.name.starts_with("KV_") {
2118                source.name = format!("KV_{}", source.name);
2119            }
2120        }
2121    } else {
2122        subjects = vec![format!("$KV.{}.>", config.bucket)];
2123    }
2124
2125    Ok(stream::Config {
2126        name: format!("KV_{}", config.bucket),
2127        description: Some(config.description),
2128        subjects,
2129        max_messages_per_subject: history,
2130        max_bytes: config.max_bytes,
2131        max_age: config.max_age,
2132        max_message_size: config.max_value_size,
2133        storage: config.storage,
2134        republish: config.republish,
2135        allow_rollup: true,
2136        deny_delete: true,
2137        deny_purge: false,
2138        allow_direct: true,
2139        sources,
2140        mirror,
2141        num_replicas,
2142        discard: stream::DiscardPolicy::New,
2143        mirror_direct,
2144        #[cfg(feature = "server_2_10")]
2145        compression: if config.compression {
2146            Some(stream::Compression::S2)
2147        } else {
2148            None
2149        },
2150        placement: config.placement,
2151        #[cfg(feature = "server_2_11")]
2152        allow_message_ttl,
2153        #[cfg(feature = "server_2_11")]
2154        subject_delete_marker_ttl,
2155        ..Default::default()
2156    })
2157}