async_nats/jetstream/kv/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19    fmt::{self, Display},
20    str::FromStr,
21    task::Poll,
22    time::Duration,
23};
24
25use crate::HeaderValue;
26use bytes::Bytes;
27use futures_util::StreamExt;
28use once_cell::sync::Lazy;
29use regex::Regex;
30use time::OffsetDateTime;
31use tracing::debug;
32
33use crate::error::Error;
34use crate::header;
35
36use self::bucket::Status;
37
38use super::{
39    consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
40    context::{PublishError, PublishErrorKind},
41    message::StreamMessage,
42    stream::{
43        self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
44        Source, StorageType, Stream,
45    },
46};
47
48fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
49    if let Some(op) = message.headers.get(KV_OPERATION) {
50        Operation::from_str(op.as_str())
51            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
52    } else if let Some(reason) = message.headers.get(header::NATS_MARKER_REASON) {
53        match reason.as_str() {
54            "MaxAge" | "Purge" => Ok(Operation::Purge),
55            "Remove" => Ok(Operation::Delete),
56            _ => Err(EntryError::with_source(
57                EntryErrorKind::Other,
58                "invalid marker reason",
59            )),
60        }
61    } else {
62        Err(EntryError::with_source(
63            EntryErrorKind::Other,
64            "missing operation",
65        ))
66    }
67}
68fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
69    let headers = match message.headers.as_ref() {
70        Some(headers) => headers,
71        None => return Ok(Operation::Put),
72    };
73    if let Some(op) = headers.get(KV_OPERATION) {
74        Operation::from_str(op.as_str())
75            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
76    } else if let Some(reason) = headers.get(header::NATS_MARKER_REASON) {
77        match reason.as_str() {
78            "MaxAge" | "Purge" => Ok(Operation::Purge),
79            "Remove" => Ok(Operation::Delete),
80            _ => Err(EntryError::with_source(
81                EntryErrorKind::Other,
82                "invalid marker reason",
83            )),
84        }
85    } else {
86        Ok(Operation::Put)
87    }
88}
89
90static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
91static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
92
93pub(crate) const MAX_HISTORY: i64 = 64;
94const ALL_KEYS: &str = ">";
95
96const KV_OPERATION: &str = "KV-Operation";
97const KV_OPERATION_DELETE: &str = "DEL";
98const KV_OPERATION_PURGE: &str = "PURGE";
99const KV_OPERATION_PUT: &str = "PUT";
100
101const NATS_ROLLUP: &str = "Nats-Rollup";
102const ROLLUP_SUBJECT: &str = "sub";
103
104pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
105    VALID_BUCKET_RE.is_match(bucket_name)
106}
107
108pub(crate) fn is_valid_key(key: &str) -> bool {
109    if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
110        return false;
111    }
112
113    VALID_KEY_RE.is_match(key)
114}
115
116/// Configuration values for key value stores.
117#[derive(Debug, Clone, Default)]
118pub struct Config {
119    /// Name of the bucket
120    pub bucket: String,
121    /// Human readable description.
122    pub description: String,
123    /// Maximum size of a single value.
124    pub max_value_size: i32,
125    /// Maximum historical entries.
126    pub history: i64,
127    /// Maximum age of any entry in the bucket, expressed in nanoseconds
128    pub max_age: std::time::Duration,
129    /// How large the bucket may become in total bytes before the configured discard policy kicks in
130    pub max_bytes: i64,
131    /// The type of storage backend, `File` (default) and `Memory`
132    pub storage: StorageType,
133    /// How many replicas to keep for each entry in a cluster.
134    pub num_replicas: usize,
135    /// Republish is for republishing messages once persistent in the Key Value Bucket.
136    pub republish: Option<Republish>,
137    /// Bucket mirror configuration.
138    pub mirror: Option<Source>,
139    /// Bucket sources configuration.
140    pub sources: Option<Vec<Source>>,
141    /// Allow mirrors using direct API.
142    pub mirror_direct: bool,
143    /// Compression
144    #[cfg(feature = "server_2_10")]
145    pub compression: bool,
146    /// Cluster and tag placement for the bucket.
147    pub placement: Option<stream::Placement>,
148    /// Enables per-message TTL and delete marker TTL for a bucket.
149    #[cfg(feature = "server_2_11")]
150    pub limit_markers: Option<Duration>,
151}
152
153/// Describes what kind of operation and entry represents
154#[derive(Debug, Clone, Copy, Eq, PartialEq)]
155pub enum Operation {
156    /// A value was put into the bucket
157    Put,
158    /// A value was deleted from a bucket
159    Delete,
160    /// A value was purged from a bucket
161    Purge,
162}
163
164impl FromStr for Operation {
165    type Err = ParseOperationError;
166
167    fn from_str(s: &str) -> Result<Self, Self::Err> {
168        match s {
169            KV_OPERATION_DELETE => Ok(Operation::Delete),
170            KV_OPERATION_PURGE => Ok(Operation::Purge),
171            KV_OPERATION_PUT => Ok(Operation::Put),
172            _ => Err(ParseOperationError),
173        }
174    }
175}
176
177#[derive(Debug, Clone)]
178pub struct ParseOperationError;
179
180impl fmt::Display for ParseOperationError {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
183    }
184}
185
186impl std::error::Error for ParseOperationError {}
187
188/// A struct used as a handle for the bucket.
189#[derive(Debug, Clone)]
190pub struct Store {
191    /// The name of the Store.
192    pub name: String,
193    /// The name of the stream associated with the Store.
194    pub stream_name: String,
195    /// The prefix for keys in the Store.
196    pub prefix: String,
197    /// The optional prefix to use when putting new key-value pairs.
198    pub put_prefix: Option<String>,
199    /// Indicates whether to use the JetStream prefix.
200    pub use_jetstream_prefix: bool,
201    /// The stream associated with the Store.
202    pub stream: Stream,
203}
204
205impl Store {
206    /// Queries the server and returns status from the server.
207    ///
208    /// # Examples
209    ///
210    /// ```no_run
211    /// # #[tokio::main]
212    /// # async fn main() -> Result<(), async_nats::Error> {
213    /// let client = async_nats::connect("demo.nats.io:4222").await?;
214    /// let jetstream = async_nats::jetstream::new(client);
215    /// let kv = jetstream
216    ///     .create_key_value(async_nats::jetstream::kv::Config {
217    ///         bucket: "kv".to_string(),
218    ///         history: 10,
219    ///         ..Default::default()
220    ///     })
221    ///     .await?;
222    /// let status = kv.status().await?;
223    /// println!("status: {:?}", status);
224    /// # Ok(())
225    /// # }
226    /// ```
227    pub async fn status(&self) -> Result<Status, StatusError> {
228        // TODO: should we poll for fresh info here? probably yes.
229        let info = self.stream.info.clone();
230
231        Ok(Status {
232            info,
233            bucket: self.name.to_string(),
234        })
235    }
236
237    /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
238    ///
239    /// # Examples
240    ///
241    /// ```no_run
242    /// # #[tokio::main]
243    /// # async fn main() -> Result<(), async_nats::Error> {
244    /// let client = async_nats::connect("demo.nats.io:4222").await?;
245    /// let jetstream = async_nats::jetstream::new(client);
246    /// let kv = jetstream
247    ///     .create_key_value(async_nats::jetstream::kv::Config {
248    ///         bucket: "kv".to_string(),
249    ///         history: 10,
250    ///         ..Default::default()
251    ///     })
252    ///     .await?;
253    ///
254    /// let status = kv.create("key", "value".into()).await;
255    /// assert!(status.is_ok());
256    ///
257    /// let status = kv.create("key", "value".into()).await;
258    /// assert!(status.is_err());
259    ///
260    /// # Ok(())
261    /// # }
262    /// ```
263    pub async fn create<T: AsRef<str>>(
264        &self,
265        key: T,
266        value: bytes::Bytes,
267    ) -> Result<u64, CreateError> {
268        self.create_maybe_ttl(key, value, None).await
269    }
270
271    /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
272    /// It will set a TTL specific for that key.
273    ///
274    /// # Examples
275    ///
276    /// ```no_run
277    /// # #[tokio::main]
278    /// # async fn main() -> Result<(), async_nats::Error> {
279    /// use std::time::Duration;
280    /// let client = async_nats::connect("demo.nats.io:4222").await?;
281    /// let jetstream = async_nats::jetstream::new(client);
282    /// let kv = jetstream
283    ///     .create_key_value(async_nats::jetstream::kv::Config {
284    ///         bucket: "kv".to_string(),
285    ///         history: 10,
286    ///         ..Default::default()
287    ///     })
288    ///     .await?;
289    ///
290    /// let status = kv
291    ///     .create_with_ttl("key", "value".into(), Duration::from_secs(10))
292    ///     .await;
293    /// assert!(status.is_ok());
294    ///
295    /// # Ok(())
296    /// # }
297    /// ```
298    pub async fn create_with_ttl<T: AsRef<str>>(
299        &self,
300        key: T,
301        value: bytes::Bytes,
302        ttl: Duration,
303    ) -> Result<u64, CreateError> {
304        self.create_maybe_ttl(key, value, Some(ttl)).await
305    }
306
307    async fn create_maybe_ttl<T: AsRef<str>>(
308        &self,
309        key: T,
310        value: bytes::Bytes,
311        ttl: Option<Duration>,
312    ) -> Result<u64, CreateError> {
313        let update_err = match self
314            .update_maybe_ttl(key.as_ref(), value.clone(), 0, ttl)
315            .await
316        {
317            Ok(revision) => return Ok(revision),
318            Err(err) => err,
319        };
320
321        match self.entry(key.as_ref()).await? {
322            // Deleted or Purged key, we can create it again.
323            Some(Entry {
324                operation: Operation::Delete | Operation::Purge,
325                revision,
326                ..
327            }) => {
328                let revision = self.update(key, value, revision).await?;
329                Ok(revision)
330            }
331
332            // key already exists.
333            Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
334
335            // Something went wrong with the initial update, return that error
336            None => Err(update_err.into()),
337        }
338    }
339
340    /// Puts new key value pair into the bucket.
341    /// If key didn't exist, it is created. If it did exist, a new value with a new version is
342    /// added.
343    ///
344    /// # Examples
345    ///
346    /// ```no_run
347    /// # #[tokio::main]
348    /// # async fn main() -> Result<(), async_nats::Error> {
349    /// let client = async_nats::connect("demo.nats.io:4222").await?;
350    /// let jetstream = async_nats::jetstream::new(client);
351    /// let kv = jetstream
352    ///     .create_key_value(async_nats::jetstream::kv::Config {
353    ///         bucket: "kv".to_string(),
354    ///         history: 10,
355    ///         ..Default::default()
356    ///     })
357    ///     .await?;
358    /// let status = kv.put("key", "value".into()).await?;
359    /// # Ok(())
360    /// # }
361    /// ```
362    pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
363        if !is_valid_key(key.as_ref()) {
364            return Err(PutError::new(PutErrorKind::InvalidKey));
365        }
366        let mut subject = String::new();
367        if self.use_jetstream_prefix {
368            subject.push_str(&self.stream.context.prefix);
369            subject.push('.');
370        }
371        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
372        subject.push_str(key.as_ref());
373
374        let publish_ack = self
375            .stream
376            .context
377            .publish(subject, value)
378            .await
379            .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
380        let ack = publish_ack
381            .await
382            .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
383
384        Ok(ack.sequence)
385    }
386
387    async fn entry_maybe_revision<T: Into<String>>(
388        &self,
389        key: T,
390        revision: Option<u64>,
391    ) -> Result<Option<Entry>, EntryError> {
392        let key: String = key.into();
393        if !is_valid_key(key.as_ref()) {
394            return Err(EntryError::new(EntryErrorKind::InvalidKey));
395        }
396
397        let subject = format!("{}{}", self.prefix.as_str(), &key);
398
399        let result: Option<(StreamMessage, Operation)> = {
400            if self.stream.info.config.allow_direct {
401                let message = match revision {
402                    Some(revision) => {
403                        let message = self.stream.direct_get(revision).await;
404                        if let Ok(message) = message.as_ref() {
405                            if message.subject.as_str() != subject {
406                                println!("subject mismatch {}", message.subject);
407                                return Ok(None);
408                            }
409                        }
410                        message
411                    }
412                    None => {
413                        self.stream
414                            .direct_get_last_for_subject(subject.as_str())
415                            .await
416                    }
417                };
418
419                match message {
420                    Ok(message) => {
421                        let operation =
422                            kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);
423
424                        Some((message, operation))
425                    }
426                    Err(err) => {
427                        if err.kind() == DirectGetErrorKind::NotFound {
428                            None
429                        } else {
430                            return Err(err.into());
431                        }
432                    }
433                }
434            } else {
435                let raw_message = match revision {
436                    Some(revision) => {
437                        let message = self.stream.get_raw_message(revision).await;
438                        if let Ok(message) = message.as_ref() {
439                            if message.subject.as_str() != subject {
440                                return Ok(None);
441                            }
442                        }
443                        message
444                    }
445                    None => {
446                        self.stream
447                            .get_last_raw_message_by_subject(subject.as_str())
448                            .await
449                    }
450                };
451                match raw_message {
452                    Ok(raw_message) => {
453                        let operation = kv_operation_from_stream_message(&raw_message)
454                            .unwrap_or(Operation::Put);
455                        // TODO: unnecessary expensive, cloning whole Message.
456                        Some((raw_message, operation))
457                    }
458                    Err(err) => match err.kind() {
459                        crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
460                        crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
461                            return Err(EntryError::new(EntryErrorKind::InvalidKey))
462                        }
463                        crate::jetstream::stream::LastRawMessageErrorKind::Other => {
464                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
465                        }
466                        crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
467                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
468                        }
469                    },
470                }
471            }
472        };
473
474        match result {
475            Some((message, operation)) => {
476                let entry = Entry {
477                    bucket: self.name.clone(),
478                    key,
479                    value: message.payload,
480                    revision: message.sequence,
481                    created: message.time,
482                    operation,
483                    delta: 0,
484                    seen_current: false,
485                };
486                Ok(Some(entry))
487            }
488            // TODO: remember to touch this when Errors are in place.
489            None => Ok(None),
490        }
491    }
492
493    /// Retrieves the last [Entry] for a given key from a bucket.
494    ///
495    /// # Examples
496    ///
497    /// ```no_run
498    /// # #[tokio::main]
499    /// # async fn main() -> Result<(), async_nats::Error> {
500    /// let client = async_nats::connect("demo.nats.io:4222").await?;
501    /// let jetstream = async_nats::jetstream::new(client);
502    /// let kv = jetstream
503    ///     .create_key_value(async_nats::jetstream::kv::Config {
504    ///         bucket: "kv".to_string(),
505    ///         history: 10,
506    ///         ..Default::default()
507    ///     })
508    ///     .await?;
509    /// let status = kv.put("key", "value".into()).await?;
510    /// let entry = kv.entry("key").await?;
511    /// println!("entry: {:?}", entry);
512    /// # Ok(())
513    /// # }
514    /// ```
515    pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
516        self.entry_maybe_revision(key, None).await
517    }
518
519    /// Retrieves the [Entry] for a given key revision from a bucket.
520    ///
521    /// # Examples
522    ///
523    /// ```no_run
524    /// # #[tokio::main]
525    /// # async fn main() -> Result<(), async_nats::Error> {
526    /// let client = async_nats::connect("demo.nats.io:4222").await?;
527    /// let jetstream = async_nats::jetstream::new(client);
528    /// let kv = jetstream
529    ///     .create_key_value(async_nats::jetstream::kv::Config {
530    ///         bucket: "kv".to_string(),
531    ///         history: 10,
532    ///         ..Default::default()
533    ///     })
534    ///     .await?;
535    /// let status = kv.put("key", "value".into()).await?;
536    /// let status = kv.put("key", "value2".into()).await?;
537    /// let entry = kv.entry_for_revision("key", 2).await?;
538    /// println!("entry: {:?}", entry);
539    /// # Ok(())
540    /// # }
541    /// ```
542    pub async fn entry_for_revision<T: Into<String>>(
543        &self,
544        key: T,
545        revision: u64,
546    ) -> Result<Option<Entry>, EntryError> {
547        self.entry_maybe_revision(key, Some(revision)).await
548    }
549
550    /// Creates a [futures_util::Stream] over [Entries][Entry]  a given key in the bucket, which yields
551    /// values whenever there are changes for that key.
552    ///
553    /// # Examples
554    ///
555    /// ```no_run
556    /// # #[tokio::main]
557    /// # async fn main() -> Result<(), async_nats::Error> {
558    /// use futures_util::StreamExt;
559    /// let client = async_nats::connect("demo.nats.io:4222").await?;
560    /// let jetstream = async_nats::jetstream::new(client);
561    /// let kv = jetstream
562    ///     .create_key_value(async_nats::jetstream::kv::Config {
563    ///         bucket: "kv".to_string(),
564    ///         history: 10,
565    ///         ..Default::default()
566    ///     })
567    ///     .await?;
568    /// let mut entries = kv.watch("kv").await?;
569    /// while let Some(entry) = entries.next().await {
570    ///     println!("entry: {:?}", entry);
571    /// }
572    /// # Ok(())
573    /// # }
574    /// ```
575    pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
576        self.watch_with_deliver_policy(key, DeliverPolicy::New)
577            .await
578    }
579
580    /// Creates a [futures_util::Stream] over [Entries][Entry] in the bucket, which yields
581    /// values whenever there are changes for given keys.
582    ///
583    /// # Examples
584    ///
585    /// ```no_run
586    /// # #[tokio::main]
587    /// # async fn main() -> Result<(), async_nats::Error> {
588    /// use futures_util::StreamExt;
589    /// let client = async_nats::connect("demo.nats.io:4222").await?;
590    /// let jetstream = async_nats::jetstream::new(client);
591    /// let kv = jetstream
592    ///     .create_key_value(async_nats::jetstream::kv::Config {
593    ///         bucket: "kv".to_string(),
594    ///         history: 10,
595    ///         ..Default::default()
596    ///     })
597    ///     .await?;
598    /// let mut entries = kv.watch_many(["foo", "bar"]).await?;
599    /// while let Some(entry) = entries.next().await {
600    ///     println!("entry: {:?}", entry);
601    /// }
602    /// # Ok(())
603    /// # }
604    /// ```
605    #[cfg(feature = "server_2_10")]
606    pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
607    where
608        T: AsRef<str>,
609        K: IntoIterator<Item = T>,
610    {
611        self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
612            .await
613    }
614
615    /// Creates a [futures_util::Stream] over [Entries][Entry] for a given key in the bucket, starting from
616    /// provided revision. This is useful to resume watching over big KV buckets without a need to
617    /// replay all the history.
618    ///
619    /// # Examples
620    ///
621    /// ```no_run
622    /// # #[tokio::main]
623    /// # async fn main() -> Result<(), async_nats::Error> {
624    /// use futures_util::StreamExt;
625    /// let client = async_nats::connect("demo.nats.io:4222").await?;
626    /// let jetstream = async_nats::jetstream::new(client);
627    /// let kv = jetstream
628    ///     .create_key_value(async_nats::jetstream::kv::Config {
629    ///         bucket: "kv".to_string(),
630    ///         history: 10,
631    ///         ..Default::default()
632    ///     })
633    ///     .await?;
634    /// let mut entries = kv.watch_from_revision("kv", 5).await?;
635    /// while let Some(entry) = entries.next().await {
636    ///     println!("entry: {:?}", entry);
637    /// }
638    /// # Ok(())
639    /// # }
640    /// ```
641    pub async fn watch_from_revision<T: AsRef<str>>(
642        &self,
643        key: T,
644        revision: u64,
645    ) -> Result<Watch, WatchError> {
646        self.watch_with_deliver_policy(
647            key,
648            DeliverPolicy::ByStartSequence {
649                start_sequence: revision,
650            },
651        )
652        .await
653    }
654
655    /// Creates a [futures_util::Stream] over [Entries][Entry]  a given key in the bucket, which yields
656    /// values whenever there are changes for that key with as well as last value.
657    ///
658    /// # Examples
659    ///
660    /// ```no_run
661    /// # #[tokio::main]
662    /// # async fn main() -> Result<(), async_nats::Error> {
663    /// use futures_util::StreamExt;
664    /// let client = async_nats::connect("demo.nats.io:4222").await?;
665    /// let jetstream = async_nats::jetstream::new(client);
666    /// let kv = jetstream
667    ///     .create_key_value(async_nats::jetstream::kv::Config {
668    ///         bucket: "kv".to_string(),
669    ///         history: 10,
670    ///         ..Default::default()
671    ///     })
672    ///     .await?;
673    /// let mut entries = kv.watch_with_history("kv").await?;
674    /// while let Some(entry) = entries.next().await {
675    ///     println!("entry: {:?}", entry);
676    /// }
677    /// # Ok(())
678    /// # }
679    /// ```
680    pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
681        self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
682            .await
683    }
684
685    /// Creates a [futures_util::Stream] over [Entries][Entry]  a given keys in the bucket, which yields
686    /// values whenever there are changes for those keys with as well as last value.
687    /// This requires server version > 2.10 as it uses consumers with multiple subject filters.
688    ///
689    /// # Examples
690    ///
691    /// ```no_run
692    /// # #[tokio::main]
693    /// # async fn main() -> Result<(), async_nats::Error> {
694    /// use futures_util::StreamExt;
695    /// let client = async_nats::connect("demo.nats.io:4222").await?;
696    /// let jetstream = async_nats::jetstream::new(client);
697    /// let kv = jetstream
698    ///     .create_key_value(async_nats::jetstream::kv::Config {
699    ///         bucket: "kv".to_string(),
700    ///         history: 10,
701    ///         ..Default::default()
702    ///     })
703    ///     .await?;
704    /// let mut entries = kv.watch_many_with_history(["key1", "key2"]).await?;
705    /// while let Some(entry) = entries.next().await {
706    ///     println!("entry: {:?}", entry);
707    /// }
708    /// # Ok(())
709    /// # }
710    /// ```
711    #[cfg(feature = "server_2_10")]
712    pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
713        &self,
714        keys: K,
715    ) -> Result<Watch, WatchError> {
716        self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
717            .await
718    }
719
720    #[cfg(feature = "server_2_10")]
721    async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
722        &self,
723        keys: K,
724        deliver_policy: DeliverPolicy,
725    ) -> Result<Watch, WatchError> {
726        let subjects = keys
727            .into_iter()
728            .map(|key| {
729                let key = key.as_ref();
730                format!("{}{}", self.prefix.as_str(), key)
731            })
732            .collect::<Vec<_>>();
733
734        debug!("initial consumer creation");
735        let consumer = self
736            .stream
737            .create_consumer(super::consumer::push::OrderedConfig {
738                deliver_subject: self.stream.context.client.new_inbox(),
739                description: Some("kv watch consumer".to_string()),
740                filter_subjects: subjects,
741                replay_policy: super::consumer::ReplayPolicy::Instant,
742                deliver_policy,
743                ..Default::default()
744            })
745            .await
746            .map_err(|err| match err.kind() {
747                crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
748                    WatchError::new(WatchErrorKind::TimedOut)
749                }
750                _ => WatchError::with_source(WatchErrorKind::Other, err),
751            })?;
752
753        let seen_current = consumer.cached_info().num_pending == 0;
754
755        Ok(Watch {
756            subscription: consumer.messages().await.map_err(|err| match err.kind() {
757                crate::jetstream::consumer::StreamErrorKind::TimedOut => {
758                    WatchError::new(WatchErrorKind::TimedOut)
759                }
760                crate::jetstream::consumer::StreamErrorKind::Other => {
761                    WatchError::with_source(WatchErrorKind::Other, err)
762                }
763            })?,
764            prefix: self.prefix.clone(),
765            bucket: self.name.clone(),
766            seen_current,
767        })
768    }
769
770    async fn watch_with_deliver_policy<T: AsRef<str>>(
771        &self,
772        key: T,
773        deliver_policy: DeliverPolicy,
774    ) -> Result<Watch, WatchError> {
775        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
776
777        debug!("initial consumer creation");
778        let consumer = self
779            .stream
780            .create_consumer(super::consumer::push::OrderedConfig {
781                deliver_subject: self.stream.context.client.new_inbox(),
782                description: Some("kv watch consumer".to_string()),
783                filter_subject: subject,
784                replay_policy: super::consumer::ReplayPolicy::Instant,
785                deliver_policy,
786                ..Default::default()
787            })
788            .await
789            .map_err(|err| match err.kind() {
790                crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
791                    WatchError::new(WatchErrorKind::TimedOut)
792                }
793                _ => WatchError::with_source(WatchErrorKind::Other, err),
794            })?;
795
796        let seen_current = consumer.cached_info().num_pending == 0;
797
798        Ok(Watch {
799            subscription: consumer.messages().await.map_err(|err| match err.kind() {
800                crate::jetstream::consumer::StreamErrorKind::TimedOut => {
801                    WatchError::new(WatchErrorKind::TimedOut)
802                }
803                crate::jetstream::consumer::StreamErrorKind::Other => {
804                    WatchError::with_source(WatchErrorKind::Other, err)
805                }
806            })?,
807            prefix: self.prefix.clone(),
808            bucket: self.name.clone(),
809            seen_current,
810        })
811    }
812
813    /// Creates a [futures_util::Stream] over [Entries][Entry] for all keys, which yields
814    /// values whenever there are changes in the bucket.
815    ///
816    /// # Examples
817    ///
818    /// ```no_run
819    /// # #[tokio::main]
820    /// # async fn main() -> Result<(), async_nats::Error> {
821    /// use futures_util::StreamExt;
822    /// let client = async_nats::connect("demo.nats.io:4222").await?;
823    /// let jetstream = async_nats::jetstream::new(client);
824    /// let kv = jetstream
825    ///     .create_key_value(async_nats::jetstream::kv::Config {
826    ///         bucket: "kv".to_string(),
827    ///         history: 10,
828    ///         ..Default::default()
829    ///     })
830    ///     .await?;
831    /// let mut entries = kv.watch_all().await?;
832    /// while let Some(entry) = entries.next().await {
833    ///     println!("entry: {:?}", entry);
834    /// }
835    /// # Ok(())
836    /// # }
837    /// ```
838    pub async fn watch_all(&self) -> Result<Watch, WatchError> {
839        self.watch(ALL_KEYS).await
840    }
841
842    /// Creates a [futures_util::Stream] over [Entries][Entry] for all keys starting
843    /// from a provider revision. This can be useful when resuming watching over a big bucket
844    /// without the need to replay all the history.
845    ///
846    /// # Examples
847    ///
848    /// ```no_run
849    /// # #[tokio::main]
850    /// # async fn main() -> Result<(), async_nats::Error> {
851    /// use futures_util::StreamExt;
852    /// let client = async_nats::connect("demo.nats.io:4222").await?;
853    /// let jetstream = async_nats::jetstream::new(client);
854    /// let kv = jetstream
855    ///     .create_key_value(async_nats::jetstream::kv::Config {
856    ///         bucket: "kv".to_string(),
857    ///         history: 10,
858    ///         ..Default::default()
859    ///     })
860    ///     .await?;
861    /// let mut entries = kv.watch_all_from_revision(40).await?;
862    /// while let Some(entry) = entries.next().await {
863    ///     println!("entry: {:?}", entry);
864    /// }
865    /// # Ok(())
866    /// # }
867    /// ```
868    pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
869        self.watch_from_revision(ALL_KEYS, revision).await
870    }
871
872    /// Retrieves the [Entry] for a given key from a bucket.
873    ///
874    /// # Examples
875    ///
876    /// ```no_run
877    /// # #[tokio::main]
878    /// # async fn main() -> Result<(), async_nats::Error> {
879    /// let client = async_nats::connect("demo.nats.io:4222").await?;
880    /// let jetstream = async_nats::jetstream::new(client);
881    /// let kv = jetstream
882    ///     .create_key_value(async_nats::jetstream::kv::Config {
883    ///         bucket: "kv".to_string(),
884    ///         history: 10,
885    ///         ..Default::default()
886    ///     })
887    ///     .await?;
888    /// let value = kv.get("key").await?;
889    /// match value {
890    ///     Some(bytes) => {
891    ///         let value_str = std::str::from_utf8(&bytes)?;
892    ///         println!("Value: {}", value_str);
893    ///     }
894    ///     None => {
895    ///         println!("Key not found or value not set");
896    ///     }
897    /// }
898    /// # Ok(())
899    /// # }
900    /// ```
901    pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
902        match self.entry(key).await {
903            Ok(Some(entry)) => match entry.operation {
904                Operation::Put => Ok(Some(entry.value)),
905                _ => Ok(None),
906            },
907            Ok(None) => Ok(None),
908            Err(err) => Err(err),
909        }
910    }
911
912    /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
913    /// the bucket.
914    ///
915    /// # Examples
916    ///
917    /// ```no_run
918    /// # #[tokio::main]
919    /// # async fn main() -> Result<(), async_nats::Error> {
920    /// use futures_util::StreamExt;
921    /// let client = async_nats::connect("demo.nats.io:4222").await?;
922    /// let jetstream = async_nats::jetstream::new(client);
923    /// let kv = jetstream
924    ///     .create_key_value(async_nats::jetstream::kv::Config {
925    ///         bucket: "kv".to_string(),
926    ///         history: 10,
927    ///         ..Default::default()
928    ///     })
929    ///     .await?;
930    /// let revision = kv.put("key", "value".into()).await?;
931    /// kv.update("key", "updated".into(), revision).await?;
932    /// # Ok(())
933    /// # }
934    /// ```
935    pub async fn update<T: AsRef<str>>(
936        &self,
937        key: T,
938        value: Bytes,
939        revision: u64,
940    ) -> Result<u64, UpdateError> {
941        self.update_maybe_ttl(key, value, revision, None).await
942    }
943
944    async fn update_maybe_ttl<T: AsRef<str>>(
945        &self,
946        key: T,
947        value: Bytes,
948        revision: u64,
949        ttl: Option<Duration>,
950    ) -> Result<u64, UpdateError> {
951        if !is_valid_key(key.as_ref()) {
952            return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
953        }
954        let mut subject = String::new();
955        if self.use_jetstream_prefix {
956            subject.push_str(&self.stream.context.prefix);
957            subject.push('.');
958        }
959        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
960        subject.push_str(key.as_ref());
961
962        let mut headers = crate::HeaderMap::default();
963        headers.insert(
964            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
965            HeaderValue::from(revision),
966        );
967
968        if let Some(ttl) = ttl {
969            headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
970        }
971
972        self.stream
973            .context
974            .publish_with_headers(subject, headers, value)
975            .await?
976            .await
977            .map_err(|err| err.into())
978            .map(|publish_ack| publish_ack.sequence)
979    }
980
981    /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
982    ///
983    /// # Examples
984    ///
985    /// ```no_run
986    /// # #[tokio::main]
987    /// # async fn main() -> Result<(), async_nats::Error> {
988    /// use futures_util::StreamExt;
989    /// let client = async_nats::connect("demo.nats.io:4222").await?;
990    /// let jetstream = async_nats::jetstream::new(client);
991    /// let kv = jetstream
992    ///     .create_key_value(async_nats::jetstream::kv::Config {
993    ///         bucket: "kv".to_string(),
994    ///         history: 10,
995    ///         ..Default::default()
996    ///     })
997    ///     .await?;
998    /// kv.put("key", "value".into()).await?;
999    /// kv.delete("key").await?;
1000    /// # Ok(())
1001    /// # }
1002    /// ```
1003    pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
1004        self.delete_expect_revision(key, None).await
1005    }
1006
1007    /// Deletes a given key if the revision matches. This is a non-destructive operation, which
1008    /// sets a `DELETE` marker.
1009    ///
1010    /// # Examples
1011    ///
1012    /// ```no_run
1013    /// # #[tokio::main]
1014    /// # async fn main() -> Result<(), async_nats::Error> {
1015    /// use futures_util::StreamExt;
1016    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1017    /// let jetstream = async_nats::jetstream::new(client);
1018    /// let kv = jetstream
1019    ///     .create_key_value(async_nats::jetstream::kv::Config {
1020    ///         bucket: "kv".to_string(),
1021    ///         history: 10,
1022    ///         ..Default::default()
1023    ///     })
1024    ///     .await?;
1025    /// let revision = kv.put("key", "value".into()).await?;
1026    /// kv.delete_expect_revision("key", Some(revision)).await?;
1027    /// # Ok(())
1028    /// # }
1029    /// ```
1030    pub async fn delete_expect_revision<T: AsRef<str>>(
1031        &self,
1032        key: T,
1033        revison: Option<u64>,
1034    ) -> Result<(), DeleteError> {
1035        if !is_valid_key(key.as_ref()) {
1036            return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
1037        }
1038        let mut subject = String::new();
1039        if self.use_jetstream_prefix {
1040            subject.push_str(&self.stream.context.prefix);
1041            subject.push('.');
1042        }
1043        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1044        subject.push_str(key.as_ref());
1045
1046        let mut headers = crate::HeaderMap::default();
1047        // TODO: figure out which headers k/v should be where.
1048        headers.insert(
1049            KV_OPERATION,
1050            KV_OPERATION_DELETE
1051                .parse::<HeaderValue>()
1052                .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
1053        );
1054
1055        if let Some(revision) = revison {
1056            headers.insert(
1057                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1058                HeaderValue::from(revision),
1059            );
1060        }
1061
1062        self.stream
1063            .context
1064            .publish_with_headers(subject, headers, "".into())
1065            .await?
1066            .await?;
1067        Ok(())
1068    }
1069
1070    /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1071    ///
1072    /// # Examples
1073    ///
1074    /// ```no_run
1075    /// # #[tokio::main]
1076    /// # async fn main() -> Result<(), async_nats::Error> {
1077    /// use futures_util::StreamExt;
1078    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1079    /// let jetstream = async_nats::jetstream::new(client);
1080    /// let kv = jetstream
1081    ///     .create_key_value(async_nats::jetstream::kv::Config {
1082    ///         bucket: "kv".to_string(),
1083    ///         history: 10,
1084    ///         ..Default::default()
1085    ///     })
1086    ///     .await?;
1087    /// kv.put("key", "value".into()).await?;
1088    /// kv.put("key", "another".into()).await?;
1089    /// kv.purge("key").await?;
1090    /// # Ok(())
1091    /// # }
1092    /// ```
1093    pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
1094        self.purge_expect_revision(key, None).await
1095    }
1096
1097    /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1098    /// The purge entry will remain valid for the given `ttl`.
1099    ///
1100    /// # Examples
1101    ///
1102    /// ```no_run
1103    /// # #[tokio::main]
1104    /// # async fn main() -> Result<(), async_nats::Error> {
1105    /// use futures_util::StreamExt;
1106    /// use std::time::Duration;
1107    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1108    /// let jetstream = async_nats::jetstream::new(client);
1109    /// let kv = jetstream
1110    ///     .create_key_value(async_nats::jetstream::kv::Config {
1111    ///         bucket: "kv".to_string(),
1112    ///         history: 10,
1113    ///         ..Default::default()
1114    ///     })
1115    ///     .await?;
1116    /// kv.put("key", "value".into()).await?;
1117    /// kv.put("key", "another".into()).await?;
1118    /// kv.purge_with_ttl("key", Duration::from_secs(10)).await?;
1119    /// # Ok(())
1120    /// # }
1121    /// ```
1122    pub async fn purge_with_ttl<T: AsRef<str>>(
1123        &self,
1124        key: T,
1125        ttl: Duration,
1126    ) -> Result<(), PurgeError> {
1127        self.purge_expect_revision_maybe_ttl(key, None, Some(ttl))
1128            .await
1129    }
1130
1131    /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1132    /// purge entry in-place.
1133    ///
1134    /// # Examples
1135    ///
1136    /// ```no_run
1137    /// # #[tokio::main]
1138    /// # async fn main() -> Result<(), async_nats::Error> {
1139    /// use futures_util::StreamExt;
1140    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1141    /// let jetstream = async_nats::jetstream::new(client);
1142    /// let kv = jetstream
1143    ///     .create_key_value(async_nats::jetstream::kv::Config {
1144    ///         bucket: "kv".to_string(),
1145    ///         history: 10,
1146    ///         ..Default::default()
1147    ///     })
1148    ///     .await?;
1149    /// kv.put("key", "value".into()).await?;
1150    /// let revision = kv.put("key", "another".into()).await?;
1151    /// kv.purge_expect_revision("key", Some(revision)).await?;
1152    /// # Ok(())
1153    /// # }
1154    /// ```
1155    pub async fn purge_expect_revision<T: AsRef<str>>(
1156        &self,
1157        key: T,
1158        revision: Option<u64>,
1159    ) -> Result<(), PurgeError> {
1160        self.purge_expect_revision_maybe_ttl(key, revision, None)
1161            .await
1162    }
1163
1164    /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1165    /// purge entry in-place. The purge entry will remain valid for the given `ttl`.
1166    ///
1167    /// # Examples
1168    ///
1169    /// ```no_run
1170    /// # #[tokio::main]
1171    /// # async fn main() -> Result<(), async_nats::Error> {
1172    /// use futures_util::StreamExt;
1173    /// use std::time::Duration;
1174    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1175    /// let jetstream = async_nats::jetstream::new(client);
1176    /// let kv = jetstream
1177    ///     .create_key_value(async_nats::jetstream::kv::Config {
1178    ///         bucket: "kv".to_string(),
1179    ///         history: 10,
1180    ///         ..Default::default()
1181    ///     })
1182    ///     .await?;
1183    /// kv.put("key", "value".into()).await?;
1184    /// let revision = kv.put("key", "another".into()).await?;
1185    /// kv.purge_expect_revision_with_ttl("key", revision, Duration::from_secs(10))
1186    ///     .await?;
1187    /// # Ok(())
1188    /// # }
1189    /// ```
1190    pub async fn purge_expect_revision_with_ttl<T: AsRef<str>>(
1191        &self,
1192        key: T,
1193        revision: u64,
1194        ttl: Duration,
1195    ) -> Result<(), PurgeError> {
1196        self.purge_expect_revision_maybe_ttl(key, Some(revision), Some(ttl))
1197            .await
1198    }
1199
1200    async fn purge_expect_revision_maybe_ttl<T: AsRef<str>>(
1201        &self,
1202        key: T,
1203        revision: Option<u64>,
1204        ttl: Option<Duration>,
1205    ) -> Result<(), PurgeError> {
1206        if !is_valid_key(key.as_ref()) {
1207            return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
1208        }
1209
1210        let mut subject = String::new();
1211        if self.use_jetstream_prefix {
1212            subject.push_str(&self.stream.context.prefix);
1213            subject.push('.');
1214        }
1215        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1216        subject.push_str(key.as_ref());
1217
1218        let mut headers = crate::HeaderMap::default();
1219        headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
1220        headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
1221        if let Some(ttl) = ttl {
1222            headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
1223        }
1224
1225        if let Some(revision) = revision {
1226            headers.insert(
1227                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1228                HeaderValue::from(revision),
1229            );
1230        }
1231
1232        self.stream
1233            .context
1234            .publish_with_headers(subject, headers, "".into())
1235            .await?
1236            .await?;
1237        Ok(())
1238    }
1239
1240    /// Returns a [futures_util::Stream] that allows iterating over all [Operations][Operation] that
1241    /// happen for given key.
1242    ///
1243    /// # Examples
1244    ///
1245    /// ```no_run
1246    /// # #[tokio::main]
1247    /// # async fn main() -> Result<(), async_nats::Error> {
1248    /// use futures_util::StreamExt;
1249    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1250    /// let jetstream = async_nats::jetstream::new(client);
1251    /// let kv = jetstream
1252    ///     .create_key_value(async_nats::jetstream::kv::Config {
1253    ///         bucket: "kv".to_string(),
1254    ///         history: 10,
1255    ///         ..Default::default()
1256    ///     })
1257    ///     .await?;
1258    /// let mut entries = kv.history("kv").await?;
1259    /// while let Some(entry) = entries.next().await {
1260    ///     println!("entry: {:?}", entry);
1261    /// }
1262    /// # Ok(())
1263    /// # }
1264    /// ```
1265    pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
1266        if !is_valid_key(key.as_ref()) {
1267            return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
1268        }
1269        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
1270
1271        let consumer = self
1272            .stream
1273            .create_consumer(super::consumer::push::OrderedConfig {
1274                deliver_subject: self.stream.context.client.new_inbox(),
1275                description: Some("kv history consumer".to_string()),
1276                filter_subject: subject,
1277                replay_policy: super::consumer::ReplayPolicy::Instant,
1278                ..Default::default()
1279            })
1280            .await?;
1281
1282        Ok(History {
1283            subscription: consumer.messages().await?,
1284            done: false,
1285            prefix: self.prefix.clone(),
1286            bucket: self.name.clone(),
1287        })
1288    }
1289
1290    /// Returns a [futures_util::Stream] that allows iterating over all keys in the bucket.
1291    ///
1292    /// # Examples
1293    ///
1294    /// Iterating over each each key individually
1295    ///
1296    /// ```no_run
1297    /// # #[tokio::main]
1298    /// # async fn main() -> Result<(), async_nats::Error> {
1299    /// use futures_util::{StreamExt, TryStreamExt};
1300    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1301    /// let jetstream = async_nats::jetstream::new(client);
1302    /// let kv = jetstream
1303    ///     .create_key_value(async_nats::jetstream::kv::Config {
1304    ///         bucket: "kv".to_string(),
1305    ///         history: 10,
1306    ///         ..Default::default()
1307    ///     })
1308    ///     .await?;
1309    /// let mut keys = kv.keys().await?.boxed();
1310    /// while let Some(key) = keys.try_next().await? {
1311    ///     println!("key: {:?}", key);
1312    /// }
1313    /// # Ok(())
1314    /// # }
1315    /// ```
1316    ///
1317    /// Collecting it into a vector of keys
1318    ///
1319    /// ```no_run
1320    /// # #[tokio::main]
1321    /// # async fn main() -> Result<(), async_nats::Error> {
1322    /// use futures_util::TryStreamExt;
1323    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1324    /// let jetstream = async_nats::jetstream::new(client);
1325    /// let kv = jetstream
1326    ///     .create_key_value(async_nats::jetstream::kv::Config {
1327    ///         bucket: "kv".to_string(),
1328    ///         history: 10,
1329    ///         ..Default::default()
1330    ///     })
1331    ///     .await?;
1332    /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1333    /// println!("Keys: {:?}", keys);
1334    /// # Ok(())
1335    /// # }
1336    /// ```
1337    pub async fn keys(&self) -> Result<Keys, HistoryError> {
1338        let subject = format!("{}>", self.prefix.as_str());
1339
1340        let consumer = self
1341            .stream
1342            .create_consumer(super::consumer::push::OrderedConfig {
1343                deliver_subject: self.stream.context.client.new_inbox(),
1344                description: Some("kv history consumer".to_string()),
1345                filter_subject: subject,
1346                headers_only: true,
1347                replay_policy: super::consumer::ReplayPolicy::Instant,
1348                // We only need to know the latest state for each key, not the whole history
1349                deliver_policy: DeliverPolicy::LastPerSubject,
1350                ..Default::default()
1351            })
1352            .await?;
1353
1354        let entries = History {
1355            done: consumer.info.num_pending == 0,
1356            subscription: consumer.messages().await?,
1357            prefix: self.prefix.clone(),
1358            bucket: self.name.clone(),
1359        };
1360
1361        Ok(Keys { inner: entries })
1362    }
1363}
1364
1365/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1366pub struct Watch {
1367    seen_current: bool,
1368    subscription: super::consumer::push::Ordered,
1369    prefix: String,
1370    bucket: String,
1371}
1372
1373impl futures_util::Stream for Watch {
1374    type Item = Result<Entry, WatcherError>;
1375
1376    fn poll_next(
1377        mut self: std::pin::Pin<&mut Self>,
1378        cx: &mut std::task::Context<'_>,
1379    ) -> std::task::Poll<Option<Self::Item>> {
1380        match self.subscription.poll_next_unpin(cx) {
1381            Poll::Ready(message) => match message {
1382                None => Poll::Ready(None),
1383                Some(message) => {
1384                    let message = message?;
1385                    let info = message.info().map_err(|err| {
1386                        WatcherError::with_source(
1387                            WatcherErrorKind::Other,
1388                            format!("failed to parse message metadata: {err}"),
1389                        )
1390                    })?;
1391
1392                    let operation =
1393                        kv_operation_from_message(&message.message).unwrap_or(Operation::Put);
1394
1395                    let key = message
1396                        .subject
1397                        .strip_prefix(&self.prefix)
1398                        .map(|s| s.to_string())
1399                        .unwrap();
1400
1401                    if !self.seen_current && info.pending == 0 {
1402                        self.seen_current = true;
1403                    }
1404
1405                    Poll::Ready(Some(Ok(Entry {
1406                        bucket: self.bucket.clone(),
1407                        key,
1408                        value: message.payload.clone(),
1409                        revision: info.stream_sequence,
1410                        created: info.published,
1411                        delta: info.pending,
1412                        operation,
1413                        seen_current: self.seen_current,
1414                    })))
1415                }
1416            },
1417            std::task::Poll::Pending => Poll::Pending,
1418        }
1419    }
1420
1421    fn size_hint(&self) -> (usize, Option<usize>) {
1422        (0, None)
1423    }
1424}
1425
1426/// A structure representing the history of a key-value bucket, yielding past values.
1427pub struct History {
1428    subscription: super::consumer::push::Ordered,
1429    done: bool,
1430    prefix: String,
1431    bucket: String,
1432}
1433
1434impl futures_util::Stream for History {
1435    type Item = Result<Entry, WatcherError>;
1436
1437    fn poll_next(
1438        mut self: std::pin::Pin<&mut Self>,
1439        cx: &mut std::task::Context<'_>,
1440    ) -> std::task::Poll<Option<Self::Item>> {
1441        if self.done {
1442            return Poll::Ready(None);
1443        }
1444        match self.subscription.poll_next_unpin(cx) {
1445            Poll::Ready(message) => match message {
1446                None => Poll::Ready(None),
1447                Some(message) => {
1448                    let message = message?;
1449                    let info = message.info().map_err(|err| {
1450                        WatcherError::with_source(
1451                            WatcherErrorKind::Other,
1452                            format!("failed to parse message metadata: {err}"),
1453                        )
1454                    })?;
1455                    if info.pending == 0 {
1456                        self.done = true;
1457                    }
1458
1459                    let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1460
1461                    let key = message
1462                        .subject
1463                        .strip_prefix(&self.prefix)
1464                        .map(|s| s.to_string())
1465                        .unwrap();
1466
1467                    Poll::Ready(Some(Ok(Entry {
1468                        bucket: self.bucket.clone(),
1469                        key,
1470                        value: message.payload.clone(),
1471                        revision: info.stream_sequence,
1472                        created: info.published,
1473                        delta: info.pending,
1474                        operation,
1475                        seen_current: self.done,
1476                    })))
1477                }
1478            },
1479            std::task::Poll::Pending => Poll::Pending,
1480        }
1481    }
1482
1483    fn size_hint(&self) -> (usize, Option<usize>) {
1484        (0, None)
1485    }
1486}
1487
1488pub struct Keys {
1489    inner: History,
1490}
1491
1492impl futures_util::Stream for Keys {
1493    type Item = Result<String, WatcherError>;
1494
1495    fn poll_next(
1496        mut self: std::pin::Pin<&mut Self>,
1497        cx: &mut std::task::Context<'_>,
1498    ) -> std::task::Poll<Option<Self::Item>> {
1499        loop {
1500            match self.inner.poll_next_unpin(cx) {
1501                Poll::Ready(None) => return Poll::Ready(None),
1502                Poll::Ready(Some(res)) => match res {
1503                    Ok(entry) => {
1504                        // Skip purged and deleted keys
1505                        if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1506                            // Try to poll again if we skip this one
1507                            continue;
1508                        } else {
1509                            return Poll::Ready(Some(Ok(entry.key)));
1510                        }
1511                    }
1512                    Err(e) => return Poll::Ready(Some(Err(e))),
1513                },
1514                Poll::Pending => return Poll::Pending,
1515            }
1516        }
1517    }
1518}
1519
1520/// An entry in a key-value bucket.
1521#[derive(Debug, Clone, PartialEq, Eq)]
1522pub struct Entry {
1523    /// Name of the bucket the entry is in.
1524    pub bucket: String,
1525    /// The key that was retrieved.
1526    pub key: String,
1527    /// The value that was retrieved.
1528    pub value: Bytes,
1529    /// A unique sequence for this value.
1530    pub revision: u64,
1531    /// Distance from the latest value.
1532    pub delta: u64,
1533    /// The time the data was put in the bucket.
1534    pub created: OffsetDateTime,
1535    /// The kind of operation that caused this entry.
1536    pub operation: Operation,
1537    /// Set to true after all historical messages have been received, and
1538    /// now all Entries are the new ones.
1539    pub seen_current: bool,
1540}
1541
1542#[derive(Clone, Debug, PartialEq)]
1543pub enum StatusErrorKind {
1544    JetStream(crate::jetstream::Error),
1545    TimedOut,
1546}
1547
1548impl Display for StatusErrorKind {
1549    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1550        match self {
1551            Self::JetStream(err) => write!(f, "jetstream request failed: {err}"),
1552            Self::TimedOut => write!(f, "timed out"),
1553        }
1554    }
1555}
1556
1557pub type StatusError = Error<StatusErrorKind>;
1558
1559#[derive(Clone, Copy, Debug, PartialEq)]
1560pub enum CreateErrorKind {
1561    AlreadyExists,
1562    InvalidKey,
1563    Publish,
1564    Ack,
1565    Other,
1566}
1567
1568impl From<UpdateError> for CreateError {
1569    fn from(error: UpdateError) -> Self {
1570        match error.kind() {
1571            UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1572            UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1573            UpdateErrorKind::WrongLastRevision => Error::from(CreateErrorKind::AlreadyExists),
1574            UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
1575        }
1576    }
1577}
1578
1579impl From<PutError> for CreateError {
1580    fn from(error: PutError) -> Self {
1581        match error.kind() {
1582            PutErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1583            PutErrorKind::Publish => Error::from(CreateErrorKind::Publish),
1584            PutErrorKind::Ack => Error::from(CreateErrorKind::Ack),
1585        }
1586    }
1587}
1588
1589impl From<EntryError> for CreateError {
1590    fn from(error: EntryError) -> Self {
1591        match error.kind() {
1592            EntryErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1593            EntryErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1594            EntryErrorKind::Other => Error::from(CreateErrorKind::Other),
1595        }
1596    }
1597}
1598
1599impl Display for CreateErrorKind {
1600    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1601        match self {
1602            Self::AlreadyExists => write!(f, "key already exists"),
1603            Self::Publish => write!(f, "failed to create key in store"),
1604            Self::Ack => write!(f, "ack error"),
1605            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1606            Self::Other => write!(f, "other error"),
1607        }
1608    }
1609}
1610
1611pub type CreateError = Error<CreateErrorKind>;
1612
1613#[derive(Clone, Copy, Debug, PartialEq)]
1614pub enum PutErrorKind {
1615    InvalidKey,
1616    Publish,
1617    Ack,
1618}
1619
1620impl Display for PutErrorKind {
1621    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1622        match self {
1623            Self::Publish => write!(f, "failed to put key into store"),
1624            Self::Ack => write!(f, "ack error"),
1625            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1626        }
1627    }
1628}
1629
1630pub type PutError = Error<PutErrorKind>;
1631
1632#[derive(Clone, Copy, Debug, PartialEq)]
1633pub enum EntryErrorKind {
1634    InvalidKey,
1635    TimedOut,
1636    Other,
1637}
1638
1639impl Display for EntryErrorKind {
1640    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1641        match self {
1642            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1643            Self::TimedOut => write!(f, "timed out"),
1644            Self::Other => write!(f, "failed getting entry"),
1645        }
1646    }
1647}
1648
1649pub type EntryError = Error<EntryErrorKind>;
1650
1651crate::from_with_timeout!(
1652    EntryError,
1653    EntryErrorKind,
1654    DirectGetError,
1655    DirectGetErrorKind
1656);
1657
1658#[derive(Clone, Copy, Debug, PartialEq)]
1659pub enum WatchErrorKind {
1660    InvalidKey,
1661    TimedOut,
1662    ConsumerCreate,
1663    Other,
1664}
1665
1666impl Display for WatchErrorKind {
1667    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1668        match self {
1669            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1670            Self::Other => write!(f, "watch failed"),
1671            Self::TimedOut => write!(f, "timed out"),
1672            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1673        }
1674    }
1675}
1676
1677pub type WatchError = Error<WatchErrorKind>;
1678
1679crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1680crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1681
1682#[derive(Clone, Copy, Debug, PartialEq)]
1683pub enum UpdateErrorKind {
1684    InvalidKey,
1685    TimedOut,
1686    WrongLastRevision,
1687    Other,
1688}
1689
1690impl Display for UpdateErrorKind {
1691    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1692        match self {
1693            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1694            Self::TimedOut => write!(f, "timed out"),
1695            Self::WrongLastRevision => write!(f, "wrong last revision"),
1696            Self::Other => write!(f, "failed getting entry"),
1697        }
1698    }
1699}
1700
1701pub type UpdateError = Error<UpdateErrorKind>;
1702
1703impl From<PublishError> for UpdateError {
1704    fn from(err: PublishError) -> Self {
1705        match err.kind() {
1706            PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
1707            PublishErrorKind::WrongLastSequence => {
1708                Self::with_source(UpdateErrorKind::WrongLastRevision, err)
1709            }
1710            _ => Self::with_source(UpdateErrorKind::Other, err),
1711        }
1712    }
1713}
1714
1715#[derive(Clone, Copy, Debug, PartialEq)]
1716pub enum WatcherErrorKind {
1717    Consumer,
1718    Other,
1719}
1720
1721impl Display for WatcherErrorKind {
1722    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1723        match self {
1724            Self::Consumer => write!(f, "watcher consumer error"),
1725            Self::Other => write!(f, "watcher error"),
1726        }
1727    }
1728}
1729
1730pub type WatcherError = Error<WatcherErrorKind>;
1731
1732impl From<OrderedError> for WatcherError {
1733    fn from(err: OrderedError) -> Self {
1734        WatcherError::with_source(WatcherErrorKind::Consumer, err)
1735    }
1736}
1737
1738pub type DeleteError = UpdateError;
1739pub type DeleteErrorKind = UpdateErrorKind;
1740
1741pub type PurgeError = UpdateError;
1742pub type PurgeErrorKind = UpdateErrorKind;
1743
1744pub type HistoryError = WatchError;
1745pub type HistoryErrorKind = WatchErrorKind;