Skip to main content

async_nats/jetstream/kv/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19    fmt::{self, Display},
20    str::FromStr,
21    task::Poll,
22    time::Duration,
23};
24
25use crate::HeaderValue;
26use bytes::Bytes;
27use futures_util::StreamExt;
28use regex::Regex;
29use std::sync::LazyLock;
30use time::OffsetDateTime;
31use tracing::debug;
32
33use crate::error::Error;
34use crate::header;
35
36use self::bucket::Status;
37
38use super::{
39    consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
40    context::{PublishError, PublishErrorKind},
41    message::StreamMessage,
42    stream::{
43        self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
44        Source, StorageType, Stream,
45    },
46};
47
48fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
49    if let Some(op) = message.headers.get(KV_OPERATION) {
50        Operation::from_str(op.as_str())
51            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
52    } else if let Some(reason) = message.headers.get(header::NATS_MARKER_REASON) {
53        match reason.as_str() {
54            "MaxAge" | "Purge" => Ok(Operation::Purge),
55            "Remove" => Ok(Operation::Delete),
56            _ => Err(EntryError::with_source(
57                EntryErrorKind::Other,
58                "invalid marker reason",
59            )),
60        }
61    } else {
62        Err(EntryError::with_source(
63            EntryErrorKind::Other,
64            "missing operation",
65        ))
66    }
67}
68fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
69    let headers = match message.headers.as_ref() {
70        Some(headers) => headers,
71        None => return Ok(Operation::Put),
72    };
73    if let Some(op) = headers.get(KV_OPERATION) {
74        Operation::from_str(op.as_str())
75            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
76    } else if let Some(reason) = headers.get(header::NATS_MARKER_REASON) {
77        match reason.as_str() {
78            "MaxAge" | "Purge" => Ok(Operation::Purge),
79            "Remove" => Ok(Operation::Delete),
80            _ => Err(EntryError::with_source(
81                EntryErrorKind::Other,
82                "invalid marker reason",
83            )),
84        }
85    } else {
86        Ok(Operation::Put)
87    }
88}
89
90static VALID_BUCKET_RE: LazyLock<Regex> =
91    LazyLock::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
92static VALID_KEY_RE: LazyLock<Regex> =
93    LazyLock::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
94
95pub(crate) const MAX_HISTORY: i64 = 64;
96const ALL_KEYS: &str = ">";
97
98const KV_OPERATION: &str = "KV-Operation";
99const KV_OPERATION_DELETE: &str = "DEL";
100const KV_OPERATION_PURGE: &str = "PURGE";
101const KV_OPERATION_PUT: &str = "PUT";
102
103const NATS_ROLLUP: &str = "Nats-Rollup";
104const ROLLUP_SUBJECT: &str = "sub";
105
106pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
107    VALID_BUCKET_RE.is_match(bucket_name)
108}
109
110pub(crate) fn is_valid_key(key: &str) -> bool {
111    if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
112        return false;
113    }
114
115    VALID_KEY_RE.is_match(key)
116}
117
118/// Configuration values for key value stores.
119#[derive(Debug, Clone, Default)]
120pub struct Config {
121    /// Name of the bucket
122    pub bucket: String,
123    /// Human readable description.
124    pub description: String,
125    /// Maximum size of a single value.
126    pub max_value_size: i32,
127    /// Maximum historical entries.
128    pub history: i64,
129    /// Maximum age of any entry in the bucket, expressed in nanoseconds
130    pub max_age: std::time::Duration,
131    /// How large the bucket may become in total bytes before the configured discard policy kicks in
132    pub max_bytes: i64,
133    /// The type of storage backend, `File` (default) and `Memory`
134    pub storage: StorageType,
135    /// How many replicas to keep for each entry in a cluster.
136    pub num_replicas: usize,
137    /// Republish is for republishing messages once persistent in the Key Value Bucket.
138    pub republish: Option<Republish>,
139    /// Bucket mirror configuration.
140    pub mirror: Option<Source>,
141    /// Bucket sources configuration.
142    pub sources: Option<Vec<Source>>,
143    /// Allow mirrors using direct API.
144    pub mirror_direct: bool,
145    /// Compression
146    #[cfg(feature = "server_2_10")]
147    pub compression: bool,
148    /// Cluster and tag placement for the bucket.
149    pub placement: Option<stream::Placement>,
150    /// Enables per-message TTL and delete marker TTL for a bucket.
151    #[cfg(feature = "server_2_11")]
152    pub limit_markers: Option<Duration>,
153}
154
155/// Describes what kind of operation and entry represents
156#[derive(Debug, Clone, Copy, Eq, PartialEq)]
157pub enum Operation {
158    /// A value was put into the bucket
159    Put,
160    /// A value was deleted from a bucket
161    Delete,
162    /// A value was purged from a bucket
163    Purge,
164}
165
166impl FromStr for Operation {
167    type Err = ParseOperationError;
168
169    fn from_str(s: &str) -> Result<Self, Self::Err> {
170        match s {
171            KV_OPERATION_DELETE => Ok(Operation::Delete),
172            KV_OPERATION_PURGE => Ok(Operation::Purge),
173            KV_OPERATION_PUT => Ok(Operation::Put),
174            _ => Err(ParseOperationError),
175        }
176    }
177}
178
179#[derive(Debug, Clone)]
180pub struct ParseOperationError;
181
182impl fmt::Display for ParseOperationError {
183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
185    }
186}
187
188impl std::error::Error for ParseOperationError {}
189
190/// A struct used as a handle for the bucket.
191#[derive(Debug, Clone)]
192pub struct Store {
193    /// The name of the Store.
194    pub name: String,
195    /// The name of the stream associated with the Store.
196    pub stream_name: String,
197    /// The prefix for keys in the Store.
198    pub prefix: String,
199    /// The optional prefix to use when putting new key-value pairs.
200    pub put_prefix: Option<String>,
201    /// Indicates whether to use the JetStream prefix.
202    pub use_jetstream_prefix: bool,
203    /// The stream associated with the Store.
204    pub stream: Stream,
205}
206
207impl Store {
208    /// Queries the server and returns status from the server.
209    ///
210    /// # Examples
211    ///
212    /// ```no_run
213    /// # #[tokio::main]
214    /// # async fn main() -> Result<(), async_nats::Error> {
215    /// let client = async_nats::connect("demo.nats.io:4222").await?;
216    /// let jetstream = async_nats::jetstream::new(client);
217    /// let kv = jetstream
218    ///     .create_key_value(async_nats::jetstream::kv::Config {
219    ///         bucket: "kv".to_string(),
220    ///         history: 10,
221    ///         ..Default::default()
222    ///     })
223    ///     .await?;
224    /// let status = kv.status().await?;
225    /// println!("status: {:?}", status);
226    /// # Ok(())
227    /// # }
228    /// ```
229    pub async fn status(&self) -> Result<Status, StatusError> {
230        // TODO: should we poll for fresh info here? probably yes.
231        let info = self.stream.info.clone();
232
233        Ok(Status {
234            info,
235            bucket: self.name.to_string(),
236        })
237    }
238
239    /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
240    ///
241    /// # Examples
242    ///
243    /// ```no_run
244    /// # #[tokio::main]
245    /// # async fn main() -> Result<(), async_nats::Error> {
246    /// let client = async_nats::connect("demo.nats.io:4222").await?;
247    /// let jetstream = async_nats::jetstream::new(client);
248    /// let kv = jetstream
249    ///     .create_key_value(async_nats::jetstream::kv::Config {
250    ///         bucket: "kv".to_string(),
251    ///         history: 10,
252    ///         ..Default::default()
253    ///     })
254    ///     .await?;
255    ///
256    /// let status = kv.create("key", "value".into()).await;
257    /// assert!(status.is_ok());
258    ///
259    /// let status = kv.create("key", "value".into()).await;
260    /// assert!(status.is_err());
261    ///
262    /// # Ok(())
263    /// # }
264    /// ```
265    pub async fn create<T: AsRef<str>>(
266        &self,
267        key: T,
268        value: bytes::Bytes,
269    ) -> Result<u64, CreateError> {
270        self.create_maybe_ttl(key, value, None).await
271    }
272
273    /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
274    /// It will set a TTL specific for that key.
275    ///
276    /// # Examples
277    ///
278    /// ```no_run
279    /// # #[tokio::main]
280    /// # async fn main() -> Result<(), async_nats::Error> {
281    /// use std::time::Duration;
282    /// let client = async_nats::connect("demo.nats.io:4222").await?;
283    /// let jetstream = async_nats::jetstream::new(client);
284    /// let kv = jetstream
285    ///     .create_key_value(async_nats::jetstream::kv::Config {
286    ///         bucket: "kv".to_string(),
287    ///         history: 10,
288    ///         ..Default::default()
289    ///     })
290    ///     .await?;
291    ///
292    /// let status = kv
293    ///     .create_with_ttl("key", "value".into(), Duration::from_secs(10))
294    ///     .await;
295    /// assert!(status.is_ok());
296    ///
297    /// # Ok(())
298    /// # }
299    /// ```
300    pub async fn create_with_ttl<T: AsRef<str>>(
301        &self,
302        key: T,
303        value: bytes::Bytes,
304        ttl: Duration,
305    ) -> Result<u64, CreateError> {
306        self.create_maybe_ttl(key, value, Some(ttl)).await
307    }
308
309    async fn create_maybe_ttl<T: AsRef<str>>(
310        &self,
311        key: T,
312        value: bytes::Bytes,
313        ttl: Option<Duration>,
314    ) -> Result<u64, CreateError> {
315        let update_err = match self
316            .update_maybe_ttl(key.as_ref(), value.clone(), 0, ttl)
317            .await
318        {
319            Ok(revision) => return Ok(revision),
320            Err(err) => err,
321        };
322
323        match self.entry(key.as_ref()).await? {
324            // Deleted or Purged key, we can create it again.
325            Some(Entry {
326                operation: Operation::Delete | Operation::Purge,
327                revision,
328                ..
329            }) => {
330                let revision = self.update(key, value, revision).await?;
331                Ok(revision)
332            }
333
334            // key already exists.
335            Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
336
337            // Something went wrong with the initial update, return that error
338            None => Err(update_err.into()),
339        }
340    }
341
342    /// Puts new key value pair into the bucket.
343    /// If key didn't exist, it is created. If it did exist, a new value with a new version is
344    /// added.
345    ///
346    /// # Examples
347    ///
348    /// ```no_run
349    /// # #[tokio::main]
350    /// # async fn main() -> Result<(), async_nats::Error> {
351    /// let client = async_nats::connect("demo.nats.io:4222").await?;
352    /// let jetstream = async_nats::jetstream::new(client);
353    /// let kv = jetstream
354    ///     .create_key_value(async_nats::jetstream::kv::Config {
355    ///         bucket: "kv".to_string(),
356    ///         history: 10,
357    ///         ..Default::default()
358    ///     })
359    ///     .await?;
360    /// let status = kv.put("key", "value".into()).await?;
361    /// # Ok(())
362    /// # }
363    /// ```
364    pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
365        if !is_valid_key(key.as_ref()) {
366            return Err(PutError::new(PutErrorKind::InvalidKey));
367        }
368        let mut subject = String::new();
369        if self.use_jetstream_prefix {
370            subject.push_str(&self.stream.context.prefix);
371            subject.push('.');
372        }
373        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
374        subject.push_str(key.as_ref());
375
376        let publish_ack = self
377            .stream
378            .context
379            .publish(subject, value)
380            .await
381            .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
382        let ack = publish_ack
383            .await
384            .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
385
386        Ok(ack.sequence)
387    }
388
389    async fn entry_maybe_revision<T: Into<String>>(
390        &self,
391        key: T,
392        revision: Option<u64>,
393    ) -> Result<Option<Entry>, EntryError> {
394        let key: String = key.into();
395        if !is_valid_key(key.as_ref()) {
396            return Err(EntryError::new(EntryErrorKind::InvalidKey));
397        }
398
399        let subject = format!("{}{}", self.prefix.as_str(), &key);
400
401        let result: Option<(StreamMessage, Operation)> = {
402            if self.stream.info.config.allow_direct {
403                let message = match revision {
404                    Some(revision) => {
405                        let message = self.stream.direct_get(revision).await;
406                        if let Ok(message) = message.as_ref() {
407                            if message.subject.as_str() != subject {
408                                println!("subject mismatch {}", message.subject);
409                                return Ok(None);
410                            }
411                        }
412                        message
413                    }
414                    None => {
415                        self.stream
416                            .direct_get_last_for_subject(subject.as_str())
417                            .await
418                    }
419                };
420
421                match message {
422                    Ok(message) => {
423                        let operation =
424                            kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);
425
426                        Some((message, operation))
427                    }
428                    Err(err) => {
429                        if err.kind() == DirectGetErrorKind::NotFound {
430                            None
431                        } else {
432                            return Err(err.into());
433                        }
434                    }
435                }
436            } else {
437                let raw_message = match revision {
438                    Some(revision) => {
439                        let message = self.stream.get_raw_message(revision).await;
440                        if let Ok(message) = message.as_ref() {
441                            if message.subject.as_str() != subject {
442                                return Ok(None);
443                            }
444                        }
445                        message
446                    }
447                    None => {
448                        self.stream
449                            .get_last_raw_message_by_subject(subject.as_str())
450                            .await
451                    }
452                };
453                match raw_message {
454                    Ok(raw_message) => {
455                        let operation = kv_operation_from_stream_message(&raw_message)
456                            .unwrap_or(Operation::Put);
457                        // TODO: unnecessary expensive, cloning whole Message.
458                        Some((raw_message, operation))
459                    }
460                    Err(err) => match err.kind() {
461                        crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
462                        crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
463                            return Err(EntryError::new(EntryErrorKind::InvalidKey))
464                        }
465                        crate::jetstream::stream::LastRawMessageErrorKind::Other => {
466                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
467                        }
468                        crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
469                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
470                        }
471                    },
472                }
473            }
474        };
475
476        match result {
477            Some((message, operation)) => {
478                let entry = Entry {
479                    bucket: self.name.clone(),
480                    key,
481                    value: message.payload,
482                    revision: message.sequence,
483                    created: message.time,
484                    operation,
485                    delta: 0,
486                    seen_current: false,
487                };
488                Ok(Some(entry))
489            }
490            // TODO: remember to touch this when Errors are in place.
491            None => Ok(None),
492        }
493    }
494
495    /// Retrieves the last [Entry] for a given key from a bucket.
496    ///
497    /// # Examples
498    ///
499    /// ```no_run
500    /// # #[tokio::main]
501    /// # async fn main() -> Result<(), async_nats::Error> {
502    /// let client = async_nats::connect("demo.nats.io:4222").await?;
503    /// let jetstream = async_nats::jetstream::new(client);
504    /// let kv = jetstream
505    ///     .create_key_value(async_nats::jetstream::kv::Config {
506    ///         bucket: "kv".to_string(),
507    ///         history: 10,
508    ///         ..Default::default()
509    ///     })
510    ///     .await?;
511    /// let status = kv.put("key", "value".into()).await?;
512    /// let entry = kv.entry("key").await?;
513    /// println!("entry: {:?}", entry);
514    /// # Ok(())
515    /// # }
516    /// ```
517    pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
518        self.entry_maybe_revision(key, None).await
519    }
520
521    /// Retrieves the [Entry] for a given key revision from a bucket.
522    ///
523    /// # Examples
524    ///
525    /// ```no_run
526    /// # #[tokio::main]
527    /// # async fn main() -> Result<(), async_nats::Error> {
528    /// let client = async_nats::connect("demo.nats.io:4222").await?;
529    /// let jetstream = async_nats::jetstream::new(client);
530    /// let kv = jetstream
531    ///     .create_key_value(async_nats::jetstream::kv::Config {
532    ///         bucket: "kv".to_string(),
533    ///         history: 10,
534    ///         ..Default::default()
535    ///     })
536    ///     .await?;
537    /// let status = kv.put("key", "value".into()).await?;
538    /// let status = kv.put("key", "value2".into()).await?;
539    /// let entry = kv.entry_for_revision("key", 2).await?;
540    /// println!("entry: {:?}", entry);
541    /// # Ok(())
542    /// # }
543    /// ```
544    pub async fn entry_for_revision<T: Into<String>>(
545        &self,
546        key: T,
547        revision: u64,
548    ) -> Result<Option<Entry>, EntryError> {
549        self.entry_maybe_revision(key, Some(revision)).await
550    }
551
552    /// Creates a [futures_util::Stream] over [Entries][Entry]  a given key in the bucket, which yields
553    /// values whenever there are changes for that key.
554    ///
555    /// # Examples
556    ///
557    /// ```no_run
558    /// # #[tokio::main]
559    /// # async fn main() -> Result<(), async_nats::Error> {
560    /// use futures_util::StreamExt;
561    /// let client = async_nats::connect("demo.nats.io:4222").await?;
562    /// let jetstream = async_nats::jetstream::new(client);
563    /// let kv = jetstream
564    ///     .create_key_value(async_nats::jetstream::kv::Config {
565    ///         bucket: "kv".to_string(),
566    ///         history: 10,
567    ///         ..Default::default()
568    ///     })
569    ///     .await?;
570    /// let mut entries = kv.watch("kv").await?;
571    /// while let Some(entry) = entries.next().await {
572    ///     println!("entry: {:?}", entry);
573    /// }
574    /// # Ok(())
575    /// # }
576    /// ```
577    pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
578        self.watch_with_deliver_policy(key, DeliverPolicy::New)
579            .await
580    }
581
582    /// Creates a [futures_util::Stream] over [Entries][Entry] in the bucket, which yields
583    /// values whenever there are changes for given keys.
584    ///
585    /// # Examples
586    ///
587    /// ```no_run
588    /// # #[tokio::main]
589    /// # async fn main() -> Result<(), async_nats::Error> {
590    /// use futures_util::StreamExt;
591    /// let client = async_nats::connect("demo.nats.io:4222").await?;
592    /// let jetstream = async_nats::jetstream::new(client);
593    /// let kv = jetstream
594    ///     .create_key_value(async_nats::jetstream::kv::Config {
595    ///         bucket: "kv".to_string(),
596    ///         history: 10,
597    ///         ..Default::default()
598    ///     })
599    ///     .await?;
600    /// let mut entries = kv.watch_many(["foo", "bar"]).await?;
601    /// while let Some(entry) = entries.next().await {
602    ///     println!("entry: {:?}", entry);
603    /// }
604    /// # Ok(())
605    /// # }
606    /// ```
607    #[cfg(feature = "server_2_10")]
608    pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
609    where
610        T: AsRef<str>,
611        K: IntoIterator<Item = T>,
612    {
613        self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
614            .await
615    }
616
617    /// Creates a [futures_util::Stream] over [Entries][Entry] for a given key in the bucket, starting from
618    /// provided revision. This is useful to resume watching over big KV buckets without a need to
619    /// replay all the history.
620    ///
621    /// # Examples
622    ///
623    /// ```no_run
624    /// # #[tokio::main]
625    /// # async fn main() -> Result<(), async_nats::Error> {
626    /// use futures_util::StreamExt;
627    /// let client = async_nats::connect("demo.nats.io:4222").await?;
628    /// let jetstream = async_nats::jetstream::new(client);
629    /// let kv = jetstream
630    ///     .create_key_value(async_nats::jetstream::kv::Config {
631    ///         bucket: "kv".to_string(),
632    ///         history: 10,
633    ///         ..Default::default()
634    ///     })
635    ///     .await?;
636    /// let mut entries = kv.watch_from_revision("kv", 5).await?;
637    /// while let Some(entry) = entries.next().await {
638    ///     println!("entry: {:?}", entry);
639    /// }
640    /// # Ok(())
641    /// # }
642    /// ```
643    pub async fn watch_from_revision<T: AsRef<str>>(
644        &self,
645        key: T,
646        revision: u64,
647    ) -> Result<Watch, WatchError> {
648        self.watch_with_deliver_policy(
649            key,
650            DeliverPolicy::ByStartSequence {
651                start_sequence: revision,
652            },
653        )
654        .await
655    }
656
657    /// Creates a [futures_util::Stream] over [Entries][Entry]  a given key in the bucket, which yields
658    /// values whenever there are changes for that key with as well as last value.
659    ///
660    /// # Examples
661    ///
662    /// ```no_run
663    /// # #[tokio::main]
664    /// # async fn main() -> Result<(), async_nats::Error> {
665    /// use futures_util::StreamExt;
666    /// let client = async_nats::connect("demo.nats.io:4222").await?;
667    /// let jetstream = async_nats::jetstream::new(client);
668    /// let kv = jetstream
669    ///     .create_key_value(async_nats::jetstream::kv::Config {
670    ///         bucket: "kv".to_string(),
671    ///         history: 10,
672    ///         ..Default::default()
673    ///     })
674    ///     .await?;
675    /// let mut entries = kv.watch_with_history("kv").await?;
676    /// while let Some(entry) = entries.next().await {
677    ///     println!("entry: {:?}", entry);
678    /// }
679    /// # Ok(())
680    /// # }
681    /// ```
682    pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
683        self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
684            .await
685    }
686
687    /// Creates a [futures_util::Stream] over [Entries][Entry]  a given keys in the bucket, which yields
688    /// values whenever there are changes for those keys with as well as last value.
689    /// This requires server version > 2.10 as it uses consumers with multiple subject filters.
690    ///
691    /// # Examples
692    ///
693    /// ```no_run
694    /// # #[tokio::main]
695    /// # async fn main() -> Result<(), async_nats::Error> {
696    /// use futures_util::StreamExt;
697    /// let client = async_nats::connect("demo.nats.io:4222").await?;
698    /// let jetstream = async_nats::jetstream::new(client);
699    /// let kv = jetstream
700    ///     .create_key_value(async_nats::jetstream::kv::Config {
701    ///         bucket: "kv".to_string(),
702    ///         history: 10,
703    ///         ..Default::default()
704    ///     })
705    ///     .await?;
706    /// let mut entries = kv.watch_many_with_history(["key1", "key2"]).await?;
707    /// while let Some(entry) = entries.next().await {
708    ///     println!("entry: {:?}", entry);
709    /// }
710    /// # Ok(())
711    /// # }
712    /// ```
713    #[cfg(feature = "server_2_10")]
714    pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
715        &self,
716        keys: K,
717    ) -> Result<Watch, WatchError> {
718        self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
719            .await
720    }
721
722    #[cfg(feature = "server_2_10")]
723    async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
724        &self,
725        keys: K,
726        deliver_policy: DeliverPolicy,
727    ) -> Result<Watch, WatchError> {
728        let subjects = keys
729            .into_iter()
730            .map(|key| {
731                let key = key.as_ref();
732                format!("{}{}", self.prefix.as_str(), key)
733            })
734            .collect::<Vec<_>>();
735
736        debug!("initial consumer creation");
737        let consumer = self
738            .stream
739            .create_consumer(super::consumer::push::OrderedConfig {
740                deliver_subject: self.stream.context.client.new_inbox(),
741                description: Some("kv watch consumer".to_string()),
742                filter_subjects: subjects,
743                replay_policy: super::consumer::ReplayPolicy::Instant,
744                deliver_policy,
745                ..Default::default()
746            })
747            .await
748            .map_err(|err| match err.kind() {
749                crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
750                    WatchError::new(WatchErrorKind::TimedOut)
751                }
752                _ => WatchError::with_source(WatchErrorKind::Other, err),
753            })?;
754
755        let seen_current = consumer.cached_info().num_pending == 0;
756
757        Ok(Watch {
758            subscription: consumer.messages().await.map_err(|err| match err.kind() {
759                crate::jetstream::consumer::StreamErrorKind::TimedOut => {
760                    WatchError::new(WatchErrorKind::TimedOut)
761                }
762                crate::jetstream::consumer::StreamErrorKind::Other => {
763                    WatchError::with_source(WatchErrorKind::Other, err)
764                }
765            })?,
766            prefix: self.prefix.clone(),
767            bucket: self.name.clone(),
768            seen_current,
769        })
770    }
771
772    async fn watch_with_deliver_policy<T: AsRef<str>>(
773        &self,
774        key: T,
775        deliver_policy: DeliverPolicy,
776    ) -> Result<Watch, WatchError> {
777        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
778
779        debug!("initial consumer creation");
780        let consumer = self
781            .stream
782            .create_consumer(super::consumer::push::OrderedConfig {
783                deliver_subject: self.stream.context.client.new_inbox(),
784                description: Some("kv watch consumer".to_string()),
785                filter_subject: subject,
786                replay_policy: super::consumer::ReplayPolicy::Instant,
787                deliver_policy,
788                ..Default::default()
789            })
790            .await
791            .map_err(|err| match err.kind() {
792                crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
793                    WatchError::new(WatchErrorKind::TimedOut)
794                }
795                _ => WatchError::with_source(WatchErrorKind::Other, err),
796            })?;
797
798        let seen_current = consumer.cached_info().num_pending == 0;
799
800        Ok(Watch {
801            subscription: consumer.messages().await.map_err(|err| match err.kind() {
802                crate::jetstream::consumer::StreamErrorKind::TimedOut => {
803                    WatchError::new(WatchErrorKind::TimedOut)
804                }
805                crate::jetstream::consumer::StreamErrorKind::Other => {
806                    WatchError::with_source(WatchErrorKind::Other, err)
807                }
808            })?,
809            prefix: self.prefix.clone(),
810            bucket: self.name.clone(),
811            seen_current,
812        })
813    }
814
815    /// Creates a [futures_util::Stream] over [Entries][Entry] for all keys, which yields
816    /// values whenever there are changes in the bucket.
817    ///
818    /// # Examples
819    ///
820    /// ```no_run
821    /// # #[tokio::main]
822    /// # async fn main() -> Result<(), async_nats::Error> {
823    /// use futures_util::StreamExt;
824    /// let client = async_nats::connect("demo.nats.io:4222").await?;
825    /// let jetstream = async_nats::jetstream::new(client);
826    /// let kv = jetstream
827    ///     .create_key_value(async_nats::jetstream::kv::Config {
828    ///         bucket: "kv".to_string(),
829    ///         history: 10,
830    ///         ..Default::default()
831    ///     })
832    ///     .await?;
833    /// let mut entries = kv.watch_all().await?;
834    /// while let Some(entry) = entries.next().await {
835    ///     println!("entry: {:?}", entry);
836    /// }
837    /// # Ok(())
838    /// # }
839    /// ```
840    pub async fn watch_all(&self) -> Result<Watch, WatchError> {
841        self.watch(ALL_KEYS).await
842    }
843
844    /// Creates a [futures_util::Stream] over [Entries][Entry] for all keys starting
845    /// from a provider revision. This can be useful when resuming watching over a big bucket
846    /// without the need to replay all the history.
847    ///
848    /// # Examples
849    ///
850    /// ```no_run
851    /// # #[tokio::main]
852    /// # async fn main() -> Result<(), async_nats::Error> {
853    /// use futures_util::StreamExt;
854    /// let client = async_nats::connect("demo.nats.io:4222").await?;
855    /// let jetstream = async_nats::jetstream::new(client);
856    /// let kv = jetstream
857    ///     .create_key_value(async_nats::jetstream::kv::Config {
858    ///         bucket: "kv".to_string(),
859    ///         history: 10,
860    ///         ..Default::default()
861    ///     })
862    ///     .await?;
863    /// let mut entries = kv.watch_all_from_revision(40).await?;
864    /// while let Some(entry) = entries.next().await {
865    ///     println!("entry: {:?}", entry);
866    /// }
867    /// # Ok(())
868    /// # }
869    /// ```
870    pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
871        self.watch_from_revision(ALL_KEYS, revision).await
872    }
873
874    /// Retrieves the [Entry] for a given key from a bucket.
875    ///
876    /// # Examples
877    ///
878    /// ```no_run
879    /// # #[tokio::main]
880    /// # async fn main() -> Result<(), async_nats::Error> {
881    /// let client = async_nats::connect("demo.nats.io:4222").await?;
882    /// let jetstream = async_nats::jetstream::new(client);
883    /// let kv = jetstream
884    ///     .create_key_value(async_nats::jetstream::kv::Config {
885    ///         bucket: "kv".to_string(),
886    ///         history: 10,
887    ///         ..Default::default()
888    ///     })
889    ///     .await?;
890    /// let value = kv.get("key").await?;
891    /// match value {
892    ///     Some(bytes) => {
893    ///         let value_str = std::str::from_utf8(&bytes)?;
894    ///         println!("Value: {}", value_str);
895    ///     }
896    ///     None => {
897    ///         println!("Key not found or value not set");
898    ///     }
899    /// }
900    /// # Ok(())
901    /// # }
902    /// ```
903    pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
904        match self.entry(key).await {
905            Ok(Some(entry)) => match entry.operation {
906                Operation::Put => Ok(Some(entry.value)),
907                _ => Ok(None),
908            },
909            Ok(None) => Ok(None),
910            Err(err) => Err(err),
911        }
912    }
913
914    /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
915    /// the bucket.
916    ///
917    /// # Examples
918    ///
919    /// ```no_run
920    /// # #[tokio::main]
921    /// # async fn main() -> Result<(), async_nats::Error> {
922    /// use futures_util::StreamExt;
923    /// let client = async_nats::connect("demo.nats.io:4222").await?;
924    /// let jetstream = async_nats::jetstream::new(client);
925    /// let kv = jetstream
926    ///     .create_key_value(async_nats::jetstream::kv::Config {
927    ///         bucket: "kv".to_string(),
928    ///         history: 10,
929    ///         ..Default::default()
930    ///     })
931    ///     .await?;
932    /// let revision = kv.put("key", "value".into()).await?;
933    /// kv.update("key", "updated".into(), revision).await?;
934    /// # Ok(())
935    /// # }
936    /// ```
937    pub async fn update<T: AsRef<str>>(
938        &self,
939        key: T,
940        value: Bytes,
941        revision: u64,
942    ) -> Result<u64, UpdateError> {
943        self.update_maybe_ttl(key, value, revision, None).await
944    }
945
946    async fn update_maybe_ttl<T: AsRef<str>>(
947        &self,
948        key: T,
949        value: Bytes,
950        revision: u64,
951        ttl: Option<Duration>,
952    ) -> Result<u64, UpdateError> {
953        if !is_valid_key(key.as_ref()) {
954            return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
955        }
956        let mut subject = String::new();
957        if self.use_jetstream_prefix {
958            subject.push_str(&self.stream.context.prefix);
959            subject.push('.');
960        }
961        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
962        subject.push_str(key.as_ref());
963
964        let mut headers = crate::HeaderMap::default();
965        headers.insert(
966            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
967            HeaderValue::from(revision),
968        );
969
970        if let Some(ttl) = ttl {
971            headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
972        }
973
974        self.stream
975            .context
976            .publish_with_headers(subject, headers, value)
977            .await?
978            .await
979            .map_err(|err| err.into())
980            .map(|publish_ack| publish_ack.sequence)
981    }
982
983    /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
984    ///
985    /// # Examples
986    ///
987    /// ```no_run
988    /// # #[tokio::main]
989    /// # async fn main() -> Result<(), async_nats::Error> {
990    /// use futures_util::StreamExt;
991    /// let client = async_nats::connect("demo.nats.io:4222").await?;
992    /// let jetstream = async_nats::jetstream::new(client);
993    /// let kv = jetstream
994    ///     .create_key_value(async_nats::jetstream::kv::Config {
995    ///         bucket: "kv".to_string(),
996    ///         history: 10,
997    ///         ..Default::default()
998    ///     })
999    ///     .await?;
1000    /// kv.put("key", "value".into()).await?;
1001    /// kv.delete("key").await?;
1002    /// # Ok(())
1003    /// # }
1004    /// ```
1005    pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
1006        self.delete_expect_revision(key, None).await
1007    }
1008
1009    /// Deletes a given key if the revision matches. This is a non-destructive operation, which
1010    /// sets a `DELETE` marker.
1011    ///
1012    /// # Examples
1013    ///
1014    /// ```no_run
1015    /// # #[tokio::main]
1016    /// # async fn main() -> Result<(), async_nats::Error> {
1017    /// use futures_util::StreamExt;
1018    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1019    /// let jetstream = async_nats::jetstream::new(client);
1020    /// let kv = jetstream
1021    ///     .create_key_value(async_nats::jetstream::kv::Config {
1022    ///         bucket: "kv".to_string(),
1023    ///         history: 10,
1024    ///         ..Default::default()
1025    ///     })
1026    ///     .await?;
1027    /// let revision = kv.put("key", "value".into()).await?;
1028    /// kv.delete_expect_revision("key", Some(revision)).await?;
1029    /// # Ok(())
1030    /// # }
1031    /// ```
1032    pub async fn delete_expect_revision<T: AsRef<str>>(
1033        &self,
1034        key: T,
1035        revison: Option<u64>,
1036    ) -> Result<(), DeleteError> {
1037        if !is_valid_key(key.as_ref()) {
1038            return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
1039        }
1040        let mut subject = String::new();
1041        if self.use_jetstream_prefix {
1042            subject.push_str(&self.stream.context.prefix);
1043            subject.push('.');
1044        }
1045        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1046        subject.push_str(key.as_ref());
1047
1048        let mut headers = crate::HeaderMap::default();
1049        // TODO: figure out which headers k/v should be where.
1050        headers.insert(
1051            KV_OPERATION,
1052            KV_OPERATION_DELETE
1053                .parse::<HeaderValue>()
1054                .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
1055        );
1056
1057        if let Some(revision) = revison {
1058            headers.insert(
1059                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1060                HeaderValue::from(revision),
1061            );
1062        }
1063
1064        self.stream
1065            .context
1066            .publish_with_headers(subject, headers, "".into())
1067            .await?
1068            .await?;
1069        Ok(())
1070    }
1071
1072    /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1073    ///
1074    /// # Examples
1075    ///
1076    /// ```no_run
1077    /// # #[tokio::main]
1078    /// # async fn main() -> Result<(), async_nats::Error> {
1079    /// use futures_util::StreamExt;
1080    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1081    /// let jetstream = async_nats::jetstream::new(client);
1082    /// let kv = jetstream
1083    ///     .create_key_value(async_nats::jetstream::kv::Config {
1084    ///         bucket: "kv".to_string(),
1085    ///         history: 10,
1086    ///         ..Default::default()
1087    ///     })
1088    ///     .await?;
1089    /// kv.put("key", "value".into()).await?;
1090    /// kv.put("key", "another".into()).await?;
1091    /// kv.purge("key").await?;
1092    /// # Ok(())
1093    /// # }
1094    /// ```
1095    pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
1096        self.purge_expect_revision(key, None).await
1097    }
1098
1099    /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1100    /// The purge entry will remain valid for the given `ttl`.
1101    ///
1102    /// # Examples
1103    ///
1104    /// ```no_run
1105    /// # #[tokio::main]
1106    /// # async fn main() -> Result<(), async_nats::Error> {
1107    /// use futures_util::StreamExt;
1108    /// use std::time::Duration;
1109    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1110    /// let jetstream = async_nats::jetstream::new(client);
1111    /// let kv = jetstream
1112    ///     .create_key_value(async_nats::jetstream::kv::Config {
1113    ///         bucket: "kv".to_string(),
1114    ///         history: 10,
1115    ///         ..Default::default()
1116    ///     })
1117    ///     .await?;
1118    /// kv.put("key", "value".into()).await?;
1119    /// kv.put("key", "another".into()).await?;
1120    /// kv.purge_with_ttl("key", Duration::from_secs(10)).await?;
1121    /// # Ok(())
1122    /// # }
1123    /// ```
1124    pub async fn purge_with_ttl<T: AsRef<str>>(
1125        &self,
1126        key: T,
1127        ttl: Duration,
1128    ) -> Result<(), PurgeError> {
1129        self.purge_expect_revision_maybe_ttl(key, None, Some(ttl))
1130            .await
1131    }
1132
1133    /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1134    /// purge entry in-place.
1135    ///
1136    /// # Examples
1137    ///
1138    /// ```no_run
1139    /// # #[tokio::main]
1140    /// # async fn main() -> Result<(), async_nats::Error> {
1141    /// use futures_util::StreamExt;
1142    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1143    /// let jetstream = async_nats::jetstream::new(client);
1144    /// let kv = jetstream
1145    ///     .create_key_value(async_nats::jetstream::kv::Config {
1146    ///         bucket: "kv".to_string(),
1147    ///         history: 10,
1148    ///         ..Default::default()
1149    ///     })
1150    ///     .await?;
1151    /// kv.put("key", "value".into()).await?;
1152    /// let revision = kv.put("key", "another".into()).await?;
1153    /// kv.purge_expect_revision("key", Some(revision)).await?;
1154    /// # Ok(())
1155    /// # }
1156    /// ```
1157    pub async fn purge_expect_revision<T: AsRef<str>>(
1158        &self,
1159        key: T,
1160        revision: Option<u64>,
1161    ) -> Result<(), PurgeError> {
1162        self.purge_expect_revision_maybe_ttl(key, revision, None)
1163            .await
1164    }
1165
1166    /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1167    /// purge entry in-place. The purge entry will remain valid for the given `ttl`.
1168    ///
1169    /// # Examples
1170    ///
1171    /// ```no_run
1172    /// # #[tokio::main]
1173    /// # async fn main() -> Result<(), async_nats::Error> {
1174    /// use futures_util::StreamExt;
1175    /// use std::time::Duration;
1176    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1177    /// let jetstream = async_nats::jetstream::new(client);
1178    /// let kv = jetstream
1179    ///     .create_key_value(async_nats::jetstream::kv::Config {
1180    ///         bucket: "kv".to_string(),
1181    ///         history: 10,
1182    ///         ..Default::default()
1183    ///     })
1184    ///     .await?;
1185    /// kv.put("key", "value".into()).await?;
1186    /// let revision = kv.put("key", "another".into()).await?;
1187    /// kv.purge_expect_revision_with_ttl("key", revision, Duration::from_secs(10))
1188    ///     .await?;
1189    /// # Ok(())
1190    /// # }
1191    /// ```
1192    pub async fn purge_expect_revision_with_ttl<T: AsRef<str>>(
1193        &self,
1194        key: T,
1195        revision: u64,
1196        ttl: Duration,
1197    ) -> Result<(), PurgeError> {
1198        self.purge_expect_revision_maybe_ttl(key, Some(revision), Some(ttl))
1199            .await
1200    }
1201
1202    async fn purge_expect_revision_maybe_ttl<T: AsRef<str>>(
1203        &self,
1204        key: T,
1205        revision: Option<u64>,
1206        ttl: Option<Duration>,
1207    ) -> Result<(), PurgeError> {
1208        if !is_valid_key(key.as_ref()) {
1209            return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
1210        }
1211
1212        let mut subject = String::new();
1213        if self.use_jetstream_prefix {
1214            subject.push_str(&self.stream.context.prefix);
1215            subject.push('.');
1216        }
1217        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1218        subject.push_str(key.as_ref());
1219
1220        let mut headers = crate::HeaderMap::default();
1221        headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
1222        headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
1223        if let Some(ttl) = ttl {
1224            headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
1225        }
1226
1227        if let Some(revision) = revision {
1228            headers.insert(
1229                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1230                HeaderValue::from(revision),
1231            );
1232        }
1233
1234        self.stream
1235            .context
1236            .publish_with_headers(subject, headers, "".into())
1237            .await?
1238            .await?;
1239        Ok(())
1240    }
1241
1242    /// Returns a [futures_util::Stream] that allows iterating over all [Operations][Operation] that
1243    /// happen for given key.
1244    ///
1245    /// # Examples
1246    ///
1247    /// ```no_run
1248    /// # #[tokio::main]
1249    /// # async fn main() -> Result<(), async_nats::Error> {
1250    /// use futures_util::StreamExt;
1251    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1252    /// let jetstream = async_nats::jetstream::new(client);
1253    /// let kv = jetstream
1254    ///     .create_key_value(async_nats::jetstream::kv::Config {
1255    ///         bucket: "kv".to_string(),
1256    ///         history: 10,
1257    ///         ..Default::default()
1258    ///     })
1259    ///     .await?;
1260    /// let mut entries = kv.history("kv").await?;
1261    /// while let Some(entry) = entries.next().await {
1262    ///     println!("entry: {:?}", entry);
1263    /// }
1264    /// # Ok(())
1265    /// # }
1266    /// ```
1267    pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
1268        if !is_valid_key(key.as_ref()) {
1269            return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
1270        }
1271        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
1272
1273        let consumer = self
1274            .stream
1275            .create_consumer(super::consumer::push::OrderedConfig {
1276                deliver_subject: self.stream.context.client.new_inbox(),
1277                description: Some("kv history consumer".to_string()),
1278                filter_subject: subject,
1279                replay_policy: super::consumer::ReplayPolicy::Instant,
1280                ..Default::default()
1281            })
1282            .await?;
1283
1284        Ok(History {
1285            subscription: consumer.messages().await?,
1286            done: false,
1287            prefix: self.prefix.clone(),
1288            bucket: self.name.clone(),
1289        })
1290    }
1291
1292    /// Returns a [futures_util::Stream] that allows iterating over all keys in the bucket.
1293    ///
1294    /// # Examples
1295    ///
1296    /// Iterating over each each key individually
1297    ///
1298    /// ```no_run
1299    /// # #[tokio::main]
1300    /// # async fn main() -> Result<(), async_nats::Error> {
1301    /// use futures_util::{StreamExt, TryStreamExt};
1302    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1303    /// let jetstream = async_nats::jetstream::new(client);
1304    /// let kv = jetstream
1305    ///     .create_key_value(async_nats::jetstream::kv::Config {
1306    ///         bucket: "kv".to_string(),
1307    ///         history: 10,
1308    ///         ..Default::default()
1309    ///     })
1310    ///     .await?;
1311    /// let mut keys = kv.keys().await?.boxed();
1312    /// while let Some(key) = keys.try_next().await? {
1313    ///     println!("key: {:?}", key);
1314    /// }
1315    /// # Ok(())
1316    /// # }
1317    /// ```
1318    ///
1319    /// Collecting it into a vector of keys
1320    ///
1321    /// ```no_run
1322    /// # #[tokio::main]
1323    /// # async fn main() -> Result<(), async_nats::Error> {
1324    /// use futures_util::TryStreamExt;
1325    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1326    /// let jetstream = async_nats::jetstream::new(client);
1327    /// let kv = jetstream
1328    ///     .create_key_value(async_nats::jetstream::kv::Config {
1329    ///         bucket: "kv".to_string(),
1330    ///         history: 10,
1331    ///         ..Default::default()
1332    ///     })
1333    ///     .await?;
1334    /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1335    /// println!("Keys: {:?}", keys);
1336    /// # Ok(())
1337    /// # }
1338    /// ```
1339    pub async fn keys(&self) -> Result<Keys, HistoryError> {
1340        let subject = format!("{}>", self.prefix.as_str());
1341
1342        let consumer = self
1343            .stream
1344            .create_consumer(super::consumer::push::OrderedConfig {
1345                deliver_subject: self.stream.context.client.new_inbox(),
1346                description: Some("kv history consumer".to_string()),
1347                filter_subject: subject,
1348                headers_only: true,
1349                replay_policy: super::consumer::ReplayPolicy::Instant,
1350                // We only need to know the latest state for each key, not the whole history
1351                deliver_policy: DeliverPolicy::LastPerSubject,
1352                ..Default::default()
1353            })
1354            .await?;
1355
1356        let entries = History {
1357            done: consumer.info.num_pending == 0,
1358            subscription: consumer.messages().await?,
1359            prefix: self.prefix.clone(),
1360            bucket: self.name.clone(),
1361        };
1362
1363        Ok(Keys { inner: entries })
1364    }
1365}
1366
1367/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1368pub struct Watch {
1369    seen_current: bool,
1370    subscription: super::consumer::push::Ordered,
1371    prefix: String,
1372    bucket: String,
1373}
1374
1375impl futures_util::Stream for Watch {
1376    type Item = Result<Entry, WatcherError>;
1377
1378    fn poll_next(
1379        mut self: std::pin::Pin<&mut Self>,
1380        cx: &mut std::task::Context<'_>,
1381    ) -> std::task::Poll<Option<Self::Item>> {
1382        match self.subscription.poll_next_unpin(cx) {
1383            Poll::Ready(message) => match message {
1384                None => Poll::Ready(None),
1385                Some(message) => {
1386                    let message = message?;
1387                    let info = message.info().map_err(|err| {
1388                        WatcherError::with_source(
1389                            WatcherErrorKind::Other,
1390                            format!("failed to parse message metadata: {err}"),
1391                        )
1392                    })?;
1393
1394                    let operation =
1395                        kv_operation_from_message(&message.message).unwrap_or(Operation::Put);
1396
1397                    let key = message
1398                        .subject
1399                        .strip_prefix(&self.prefix)
1400                        .map(|s| s.to_string())
1401                        .unwrap();
1402
1403                    if !self.seen_current && info.pending == 0 {
1404                        self.seen_current = true;
1405                    }
1406
1407                    Poll::Ready(Some(Ok(Entry {
1408                        bucket: self.bucket.clone(),
1409                        key,
1410                        value: message.payload.clone(),
1411                        revision: info.stream_sequence,
1412                        created: info.published,
1413                        delta: info.pending,
1414                        operation,
1415                        seen_current: self.seen_current,
1416                    })))
1417                }
1418            },
1419            std::task::Poll::Pending => Poll::Pending,
1420        }
1421    }
1422
1423    fn size_hint(&self) -> (usize, Option<usize>) {
1424        (0, None)
1425    }
1426}
1427
1428/// A structure representing the history of a key-value bucket, yielding past values.
1429pub struct History {
1430    subscription: super::consumer::push::Ordered,
1431    done: bool,
1432    prefix: String,
1433    bucket: String,
1434}
1435
1436impl futures_util::Stream for History {
1437    type Item = Result<Entry, WatcherError>;
1438
1439    fn poll_next(
1440        mut self: std::pin::Pin<&mut Self>,
1441        cx: &mut std::task::Context<'_>,
1442    ) -> std::task::Poll<Option<Self::Item>> {
1443        if self.done {
1444            return Poll::Ready(None);
1445        }
1446        match self.subscription.poll_next_unpin(cx) {
1447            Poll::Ready(message) => match message {
1448                None => Poll::Ready(None),
1449                Some(message) => {
1450                    let message = message?;
1451                    let info = message.info().map_err(|err| {
1452                        WatcherError::with_source(
1453                            WatcherErrorKind::Other,
1454                            format!("failed to parse message metadata: {err}"),
1455                        )
1456                    })?;
1457                    if info.pending == 0 {
1458                        self.done = true;
1459                    }
1460
1461                    let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1462
1463                    let key = message
1464                        .subject
1465                        .strip_prefix(&self.prefix)
1466                        .map(|s| s.to_string())
1467                        .unwrap();
1468
1469                    Poll::Ready(Some(Ok(Entry {
1470                        bucket: self.bucket.clone(),
1471                        key,
1472                        value: message.payload.clone(),
1473                        revision: info.stream_sequence,
1474                        created: info.published,
1475                        delta: info.pending,
1476                        operation,
1477                        seen_current: self.done,
1478                    })))
1479                }
1480            },
1481            std::task::Poll::Pending => Poll::Pending,
1482        }
1483    }
1484
1485    fn size_hint(&self) -> (usize, Option<usize>) {
1486        (0, None)
1487    }
1488}
1489
1490pub struct Keys {
1491    inner: History,
1492}
1493
1494impl futures_util::Stream for Keys {
1495    type Item = Result<String, WatcherError>;
1496
1497    fn poll_next(
1498        mut self: std::pin::Pin<&mut Self>,
1499        cx: &mut std::task::Context<'_>,
1500    ) -> std::task::Poll<Option<Self::Item>> {
1501        loop {
1502            match self.inner.poll_next_unpin(cx) {
1503                Poll::Ready(None) => return Poll::Ready(None),
1504                Poll::Ready(Some(res)) => match res {
1505                    Ok(entry) => {
1506                        // Skip purged and deleted keys
1507                        if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1508                            // Try to poll again if we skip this one
1509                            continue;
1510                        } else {
1511                            return Poll::Ready(Some(Ok(entry.key)));
1512                        }
1513                    }
1514                    Err(e) => return Poll::Ready(Some(Err(e))),
1515                },
1516                Poll::Pending => return Poll::Pending,
1517            }
1518        }
1519    }
1520}
1521
1522/// An entry in a key-value bucket.
1523#[derive(Debug, Clone, PartialEq, Eq)]
1524pub struct Entry {
1525    /// Name of the bucket the entry is in.
1526    pub bucket: String,
1527    /// The key that was retrieved.
1528    pub key: String,
1529    /// The value that was retrieved.
1530    pub value: Bytes,
1531    /// A unique sequence for this value.
1532    pub revision: u64,
1533    /// Distance from the latest value.
1534    pub delta: u64,
1535    /// The time the data was put in the bucket.
1536    pub created: OffsetDateTime,
1537    /// The kind of operation that caused this entry.
1538    pub operation: Operation,
1539    /// Set to true after all historical messages have been received, and
1540    /// now all Entries are the new ones.
1541    pub seen_current: bool,
1542}
1543
1544#[derive(Clone, Debug, PartialEq)]
1545pub enum StatusErrorKind {
1546    JetStream(crate::jetstream::Error),
1547    TimedOut,
1548}
1549
1550impl Display for StatusErrorKind {
1551    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1552        match self {
1553            Self::JetStream(err) => write!(f, "jetstream request failed: {err}"),
1554            Self::TimedOut => write!(f, "timed out"),
1555        }
1556    }
1557}
1558
1559pub type StatusError = Error<StatusErrorKind>;
1560
1561#[derive(Clone, Copy, Debug, PartialEq)]
1562pub enum CreateErrorKind {
1563    AlreadyExists,
1564    InvalidKey,
1565    Publish,
1566    Ack,
1567    Other,
1568}
1569
1570impl From<UpdateError> for CreateError {
1571    fn from(error: UpdateError) -> Self {
1572        match error.kind() {
1573            UpdateErrorKind::InvalidKey => {
1574                CreateError::with_source(CreateErrorKind::InvalidKey, error)
1575            }
1576            UpdateErrorKind::TimedOut => CreateError::with_source(CreateErrorKind::Publish, error),
1577            UpdateErrorKind::WrongLastRevision => {
1578                CreateError::with_source(CreateErrorKind::AlreadyExists, error)
1579            }
1580            UpdateErrorKind::Other => CreateError::with_source(CreateErrorKind::Other, error),
1581        }
1582    }
1583}
1584
1585impl From<PutError> for CreateError {
1586    fn from(error: PutError) -> Self {
1587        match error.kind() {
1588            PutErrorKind::InvalidKey => {
1589                CreateError::with_source(CreateErrorKind::InvalidKey, error)
1590            }
1591            PutErrorKind::Publish => CreateError::with_source(CreateErrorKind::Publish, error),
1592            PutErrorKind::Ack => CreateError::with_source(CreateErrorKind::Ack, error),
1593        }
1594    }
1595}
1596
1597impl From<EntryError> for CreateError {
1598    fn from(error: EntryError) -> Self {
1599        match error.kind() {
1600            EntryErrorKind::InvalidKey => {
1601                CreateError::with_source(CreateErrorKind::InvalidKey, error)
1602            }
1603            EntryErrorKind::TimedOut => CreateError::with_source(CreateErrorKind::Publish, error),
1604            EntryErrorKind::Other => CreateError::with_source(CreateErrorKind::Other, error),
1605        }
1606    }
1607}
1608
1609impl Display for CreateErrorKind {
1610    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1611        match self {
1612            Self::AlreadyExists => write!(f, "key already exists"),
1613            Self::Publish => write!(f, "failed to create key in store"),
1614            Self::Ack => write!(f, "ack error"),
1615            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1616            Self::Other => write!(f, "other error"),
1617        }
1618    }
1619}
1620
1621pub type CreateError = Error<CreateErrorKind>;
1622
1623#[derive(Clone, Copy, Debug, PartialEq)]
1624pub enum PutErrorKind {
1625    InvalidKey,
1626    Publish,
1627    Ack,
1628}
1629
1630impl Display for PutErrorKind {
1631    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1632        match self {
1633            Self::Publish => write!(f, "failed to put key into store"),
1634            Self::Ack => write!(f, "ack error"),
1635            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1636        }
1637    }
1638}
1639
1640pub type PutError = Error<PutErrorKind>;
1641
1642#[derive(Clone, Copy, Debug, PartialEq)]
1643pub enum EntryErrorKind {
1644    InvalidKey,
1645    TimedOut,
1646    Other,
1647}
1648
1649impl Display for EntryErrorKind {
1650    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1651        match self {
1652            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1653            Self::TimedOut => write!(f, "timed out"),
1654            Self::Other => write!(f, "failed getting entry"),
1655        }
1656    }
1657}
1658
1659pub type EntryError = Error<EntryErrorKind>;
1660
1661crate::from_with_timeout!(
1662    EntryError,
1663    EntryErrorKind,
1664    DirectGetError,
1665    DirectGetErrorKind
1666);
1667
1668#[derive(Clone, Copy, Debug, PartialEq)]
1669pub enum WatchErrorKind {
1670    InvalidKey,
1671    TimedOut,
1672    ConsumerCreate,
1673    Other,
1674}
1675
1676impl Display for WatchErrorKind {
1677    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1678        match self {
1679            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1680            Self::Other => write!(f, "watch failed"),
1681            Self::TimedOut => write!(f, "timed out"),
1682            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1683        }
1684    }
1685}
1686
1687pub type WatchError = Error<WatchErrorKind>;
1688
1689crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1690crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1691
1692#[derive(Clone, Copy, Debug, PartialEq)]
1693pub enum UpdateErrorKind {
1694    InvalidKey,
1695    TimedOut,
1696    WrongLastRevision,
1697    Other,
1698}
1699
1700impl Display for UpdateErrorKind {
1701    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1702        match self {
1703            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1704            Self::TimedOut => write!(f, "timed out"),
1705            Self::WrongLastRevision => write!(f, "wrong last revision"),
1706            Self::Other => write!(f, "failed getting entry"),
1707        }
1708    }
1709}
1710
1711pub type UpdateError = Error<UpdateErrorKind>;
1712
1713impl From<PublishError> for UpdateError {
1714    fn from(err: PublishError) -> Self {
1715        match err.kind() {
1716            PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
1717            PublishErrorKind::WrongLastSequence => {
1718                Self::with_source(UpdateErrorKind::WrongLastRevision, err)
1719            }
1720            _ => Self::with_source(UpdateErrorKind::Other, err),
1721        }
1722    }
1723}
1724
1725#[derive(Clone, Copy, Debug, PartialEq)]
1726pub enum WatcherErrorKind {
1727    Consumer,
1728    Other,
1729}
1730
1731impl Display for WatcherErrorKind {
1732    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1733        match self {
1734            Self::Consumer => write!(f, "watcher consumer error"),
1735            Self::Other => write!(f, "watcher error"),
1736        }
1737    }
1738}
1739
1740pub type WatcherError = Error<WatcherErrorKind>;
1741
1742impl From<OrderedError> for WatcherError {
1743    fn from(err: OrderedError) -> Self {
1744        WatcherError::with_source(WatcherErrorKind::Consumer, err)
1745    }
1746}
1747
1748pub type DeleteError = UpdateError;
1749pub type DeleteErrorKind = UpdateErrorKind;
1750
1751pub type PurgeError = UpdateError;
1752pub type PurgeErrorKind = UpdateErrorKind;
1753
1754pub type HistoryError = WatchError;
1755pub type HistoryErrorKind = WatchErrorKind;