async_nats/jetstream/kv/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19    fmt::{self, Display},
20    str::FromStr,
21    task::Poll,
22    time::Duration,
23};
24
25use crate::HeaderValue;
26use bytes::Bytes;
27use futures::StreamExt;
28use once_cell::sync::Lazy;
29use regex::Regex;
30use time::OffsetDateTime;
31use tracing::debug;
32
33use crate::error::Error;
34use crate::header;
35
36use self::bucket::Status;
37
38use super::{
39    consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
40    context::{PublishError, PublishErrorKind},
41    message::StreamMessage,
42    stream::{
43        self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
44        Source, StorageType, Stream,
45    },
46};
47
48fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
49    if let Some(op) = message.headers.get(KV_OPERATION) {
50        Operation::from_str(op.as_str())
51            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
52    } else if let Some(reason) = message.headers.get(header::NATS_MARKER_REASON) {
53        match reason.as_str() {
54            "MaxAge" | "Purge" => Ok(Operation::Purge),
55            "Remove" => Ok(Operation::Delete),
56            _ => Err(EntryError::with_source(
57                EntryErrorKind::Other,
58                "invalid marker reason",
59            )),
60        }
61    } else {
62        Err(EntryError::with_source(
63            EntryErrorKind::Other,
64            "missing operation",
65        ))
66    }
67}
68fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
69    let headers = match message.headers.as_ref() {
70        Some(headers) => headers,
71        None => return Ok(Operation::Put),
72    };
73    if let Some(op) = headers.get(KV_OPERATION) {
74        Operation::from_str(op.as_str())
75            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
76    } else {
77        Ok(Operation::Put)
78    }
79}
80
81static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
82static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
83
84pub(crate) const MAX_HISTORY: i64 = 64;
85const ALL_KEYS: &str = ">";
86
87const KV_OPERATION: &str = "KV-Operation";
88const KV_OPERATION_DELETE: &str = "DEL";
89const KV_OPERATION_PURGE: &str = "PURGE";
90const KV_OPERATION_PUT: &str = "PUT";
91
92const NATS_ROLLUP: &str = "Nats-Rollup";
93const ROLLUP_SUBJECT: &str = "sub";
94
95pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
96    VALID_BUCKET_RE.is_match(bucket_name)
97}
98
99pub(crate) fn is_valid_key(key: &str) -> bool {
100    if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
101        return false;
102    }
103
104    VALID_KEY_RE.is_match(key)
105}
106
107/// Configuration values for key value stores.
108#[derive(Debug, Clone, Default)]
109pub struct Config {
110    /// Name of the bucket
111    pub bucket: String,
112    /// Human readable description.
113    pub description: String,
114    /// Maximum size of a single value.
115    pub max_value_size: i32,
116    /// Maximum historical entries.
117    pub history: i64,
118    /// Maximum age of any entry in the bucket, expressed in nanoseconds
119    pub max_age: std::time::Duration,
120    /// How large the bucket may become in total bytes before the configured discard policy kicks in
121    pub max_bytes: i64,
122    /// The type of storage backend, `File` (default) and `Memory`
123    pub storage: StorageType,
124    /// How many replicas to keep for each entry in a cluster.
125    pub num_replicas: usize,
126    /// Republish is for republishing messages once persistent in the Key Value Bucket.
127    pub republish: Option<Republish>,
128    /// Bucket mirror configuration.
129    pub mirror: Option<Source>,
130    /// Bucket sources configuration.
131    pub sources: Option<Vec<Source>>,
132    /// Allow mirrors using direct API.
133    pub mirror_direct: bool,
134    /// Compression
135    #[cfg(feature = "server_2_10")]
136    pub compression: bool,
137    /// Cluster and tag placement for the bucket.
138    pub placement: Option<stream::Placement>,
139    /// Enables per-message TTL and delete marker TTL for a bucket.
140    #[cfg(feature = "server_2_11")]
141    pub limit_markers: Option<Duration>,
142}
143
144/// Describes what kind of operation and entry represents
145#[derive(Debug, Clone, Copy, Eq, PartialEq)]
146pub enum Operation {
147    /// A value was put into the bucket
148    Put,
149    /// A value was deleted from a bucket
150    Delete,
151    /// A value was purged from a bucket
152    Purge,
153}
154
155impl FromStr for Operation {
156    type Err = ParseOperationError;
157
158    fn from_str(s: &str) -> Result<Self, Self::Err> {
159        match s {
160            KV_OPERATION_DELETE => Ok(Operation::Delete),
161            KV_OPERATION_PURGE => Ok(Operation::Purge),
162            KV_OPERATION_PUT => Ok(Operation::Put),
163            _ => Err(ParseOperationError),
164        }
165    }
166}
167
168#[derive(Debug, Clone)]
169pub struct ParseOperationError;
170
171impl fmt::Display for ParseOperationError {
172    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173        write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
174    }
175}
176
177impl std::error::Error for ParseOperationError {}
178
179/// A struct used as a handle for the bucket.
180#[derive(Debug, Clone)]
181pub struct Store {
182    /// The name of the Store.
183    pub name: String,
184    /// The name of the stream associated with the Store.
185    pub stream_name: String,
186    /// The prefix for keys in the Store.
187    pub prefix: String,
188    /// The optional prefix to use when putting new key-value pairs.
189    pub put_prefix: Option<String>,
190    /// Indicates whether to use the JetStream prefix.
191    pub use_jetstream_prefix: bool,
192    /// The stream associated with the Store.
193    pub stream: Stream,
194}
195
196impl Store {
197    /// Queries the server and returns status from the server.
198    ///
199    /// # Examples
200    ///
201    /// ```no_run
202    /// # #[tokio::main]
203    /// # async fn main() -> Result<(), async_nats::Error> {
204    /// let client = async_nats::connect("demo.nats.io:4222").await?;
205    /// let jetstream = async_nats::jetstream::new(client);
206    /// let kv = jetstream
207    ///     .create_key_value(async_nats::jetstream::kv::Config {
208    ///         bucket: "kv".to_string(),
209    ///         history: 10,
210    ///         ..Default::default()
211    ///     })
212    ///     .await?;
213    /// let status = kv.status().await?;
214    /// println!("status: {:?}", status);
215    /// # Ok(())
216    /// # }
217    /// ```
218    pub async fn status(&self) -> Result<Status, StatusError> {
219        // TODO: should we poll for fresh info here? probably yes.
220        let info = self.stream.info.clone();
221
222        Ok(Status {
223            info,
224            bucket: self.name.to_string(),
225        })
226    }
227
228    /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
229    ///
230    /// # Examples
231    ///
232    /// ```no_run
233    /// # #[tokio::main]
234    /// # async fn main() -> Result<(), async_nats::Error> {
235    /// let client = async_nats::connect("demo.nats.io:4222").await?;
236    /// let jetstream = async_nats::jetstream::new(client);
237    /// let kv = jetstream
238    ///     .create_key_value(async_nats::jetstream::kv::Config {
239    ///         bucket: "kv".to_string(),
240    ///         history: 10,
241    ///         ..Default::default()
242    ///     })
243    ///     .await?;
244    ///
245    /// let status = kv.create("key", "value".into()).await;
246    /// assert!(status.is_ok());
247    ///
248    /// let status = kv.create("key", "value".into()).await;
249    /// assert!(status.is_err());
250    ///
251    /// # Ok(())
252    /// # }
253    /// ```
254    pub async fn create<T: AsRef<str>>(
255        &self,
256        key: T,
257        value: bytes::Bytes,
258    ) -> Result<u64, CreateError> {
259        self.create_maybe_ttl(key, value, None).await
260    }
261
262    /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
263    /// It will set a TTL specific for that key.
264    ///
265    /// # Examples
266    ///
267    /// ```no_run
268    /// # #[tokio::main]
269    /// # async fn main() -> Result<(), async_nats::Error> {
270    /// use std::time::Duration;
271    /// let client = async_nats::connect("demo.nats.io:4222").await?;
272    /// let jetstream = async_nats::jetstream::new(client);
273    /// let kv = jetstream
274    ///     .create_key_value(async_nats::jetstream::kv::Config {
275    ///         bucket: "kv".to_string(),
276    ///         history: 10,
277    ///         ..Default::default()
278    ///     })
279    ///     .await?;
280    ///
281    /// let status = kv
282    ///     .create_with_ttl("key", "value".into(), Duration::from_secs(10))
283    ///     .await;
284    /// assert!(status.is_ok());
285    ///
286    /// # Ok(())
287    /// # }
288    /// ```
289    pub async fn create_with_ttl<T: AsRef<str>>(
290        &self,
291        key: T,
292        value: bytes::Bytes,
293        ttl: Duration,
294    ) -> Result<u64, CreateError> {
295        self.create_maybe_ttl(key, value, Some(ttl)).await
296    }
297
298    async fn create_maybe_ttl<T: AsRef<str>>(
299        &self,
300        key: T,
301        value: bytes::Bytes,
302        ttl: Option<Duration>,
303    ) -> Result<u64, CreateError> {
304        let update_err = match self
305            .update_maybe_ttl(key.as_ref(), value.clone(), 0, ttl)
306            .await
307        {
308            Ok(revision) => return Ok(revision),
309            Err(err) => err,
310        };
311
312        match self.entry(key.as_ref()).await? {
313            // Deleted or Purged key, we can create it again.
314            Some(Entry {
315                operation: Operation::Delete | Operation::Purge,
316                revision,
317                ..
318            }) => {
319                let revision = self.update(key, value, revision).await?;
320                Ok(revision)
321            }
322
323            // key already exists.
324            Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
325
326            // Something went wrong with the initial update, return that error
327            None => Err(update_err.into()),
328        }
329    }
330
331    /// Puts new key value pair into the bucket.
332    /// If key didn't exist, it is created. If it did exist, a new value with a new version is
333    /// added.
334    ///
335    /// # Examples
336    ///
337    /// ```no_run
338    /// # #[tokio::main]
339    /// # async fn main() -> Result<(), async_nats::Error> {
340    /// let client = async_nats::connect("demo.nats.io:4222").await?;
341    /// let jetstream = async_nats::jetstream::new(client);
342    /// let kv = jetstream
343    ///     .create_key_value(async_nats::jetstream::kv::Config {
344    ///         bucket: "kv".to_string(),
345    ///         history: 10,
346    ///         ..Default::default()
347    ///     })
348    ///     .await?;
349    /// let status = kv.put("key", "value".into()).await?;
350    /// # Ok(())
351    /// # }
352    /// ```
353    pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
354        if !is_valid_key(key.as_ref()) {
355            return Err(PutError::new(PutErrorKind::InvalidKey));
356        }
357        let mut subject = String::new();
358        if self.use_jetstream_prefix {
359            subject.push_str(&self.stream.context.prefix);
360            subject.push('.');
361        }
362        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
363        subject.push_str(key.as_ref());
364
365        let publish_ack = self
366            .stream
367            .context
368            .publish(subject, value)
369            .await
370            .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
371        let ack = publish_ack
372            .await
373            .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
374
375        Ok(ack.sequence)
376    }
377
378    async fn entry_maybe_revision<T: Into<String>>(
379        &self,
380        key: T,
381        revision: Option<u64>,
382    ) -> Result<Option<Entry>, EntryError> {
383        let key: String = key.into();
384        if !is_valid_key(key.as_ref()) {
385            return Err(EntryError::new(EntryErrorKind::InvalidKey));
386        }
387
388        let subject = format!("{}{}", self.prefix.as_str(), &key);
389
390        let result: Option<(StreamMessage, Operation)> = {
391            if self.stream.info.config.allow_direct {
392                let message = match revision {
393                    Some(revision) => {
394                        let message = self.stream.direct_get(revision).await;
395                        if let Ok(message) = message.as_ref() {
396                            if message.subject.as_str() != subject {
397                                println!("subject mismatch {}", message.subject);
398                                return Ok(None);
399                            }
400                        }
401                        message
402                    }
403                    None => {
404                        self.stream
405                            .direct_get_last_for_subject(subject.as_str())
406                            .await
407                    }
408                };
409
410                match message {
411                    Ok(message) => {
412                        let operation =
413                            kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);
414
415                        Some((message, operation))
416                    }
417                    Err(err) => {
418                        if err.kind() == DirectGetErrorKind::NotFound {
419                            None
420                        } else {
421                            return Err(err.into());
422                        }
423                    }
424                }
425            } else {
426                let raw_message = match revision {
427                    Some(revision) => {
428                        let message = self.stream.get_raw_message(revision).await;
429                        if let Ok(message) = message.as_ref() {
430                            if message.subject.as_str() != subject {
431                                return Ok(None);
432                            }
433                        }
434                        message
435                    }
436                    None => {
437                        self.stream
438                            .get_last_raw_message_by_subject(subject.as_str())
439                            .await
440                    }
441                };
442                match raw_message {
443                    Ok(raw_message) => {
444                        let operation = kv_operation_from_stream_message(&raw_message)
445                            .unwrap_or(Operation::Put);
446                        // TODO: unnecessary expensive, cloning whole Message.
447                        Some((raw_message, operation))
448                    }
449                    Err(err) => match err.kind() {
450                        crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
451                        crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
452                            return Err(EntryError::new(EntryErrorKind::InvalidKey))
453                        }
454                        crate::jetstream::stream::LastRawMessageErrorKind::Other => {
455                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
456                        }
457                        crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
458                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
459                        }
460                    },
461                }
462            }
463        };
464
465        match result {
466            Some((message, operation)) => {
467                let entry = Entry {
468                    bucket: self.name.clone(),
469                    key,
470                    value: message.payload,
471                    revision: message.sequence,
472                    created: message.time,
473                    operation,
474                    delta: 0,
475                    seen_current: false,
476                };
477                Ok(Some(entry))
478            }
479            // TODO: remember to touch this when Errors are in place.
480            None => Ok(None),
481        }
482    }
483
484    /// Retrieves the last [Entry] for a given key from a bucket.
485    ///
486    /// # Examples
487    ///
488    /// ```no_run
489    /// # #[tokio::main]
490    /// # async fn main() -> Result<(), async_nats::Error> {
491    /// let client = async_nats::connect("demo.nats.io:4222").await?;
492    /// let jetstream = async_nats::jetstream::new(client);
493    /// let kv = jetstream
494    ///     .create_key_value(async_nats::jetstream::kv::Config {
495    ///         bucket: "kv".to_string(),
496    ///         history: 10,
497    ///         ..Default::default()
498    ///     })
499    ///     .await?;
500    /// let status = kv.put("key", "value".into()).await?;
501    /// let entry = kv.entry("key").await?;
502    /// println!("entry: {:?}", entry);
503    /// # Ok(())
504    /// # }
505    /// ```
506    pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
507        self.entry_maybe_revision(key, None).await
508    }
509
510    /// Retrieves the [Entry] for a given key revision from a bucket.
511    ///
512    /// # Examples
513    ///
514    /// ```no_run
515    /// # #[tokio::main]
516    /// # async fn main() -> Result<(), async_nats::Error> {
517    /// let client = async_nats::connect("demo.nats.io:4222").await?;
518    /// let jetstream = async_nats::jetstream::new(client);
519    /// let kv = jetstream
520    ///     .create_key_value(async_nats::jetstream::kv::Config {
521    ///         bucket: "kv".to_string(),
522    ///         history: 10,
523    ///         ..Default::default()
524    ///     })
525    ///     .await?;
526    /// let status = kv.put("key", "value".into()).await?;
527    /// let status = kv.put("key", "value2".into()).await?;
528    /// let entry = kv.entry_for_revision("key", 2).await?;
529    /// println!("entry: {:?}", entry);
530    /// # Ok(())
531    /// # }
532    /// ```
533    pub async fn entry_for_revision<T: Into<String>>(
534        &self,
535        key: T,
536        revision: u64,
537    ) -> Result<Option<Entry>, EntryError> {
538        self.entry_maybe_revision(key, Some(revision)).await
539    }
540
541    /// Creates a [futures::Stream] over [Entries][Entry]  a given key in the bucket, which yields
542    /// values whenever there are changes for that key.
543    ///
544    /// # Examples
545    ///
546    /// ```no_run
547    /// # #[tokio::main]
548    /// # async fn main() -> Result<(), async_nats::Error> {
549    /// use futures::StreamExt;
550    /// let client = async_nats::connect("demo.nats.io:4222").await?;
551    /// let jetstream = async_nats::jetstream::new(client);
552    /// let kv = jetstream
553    ///     .create_key_value(async_nats::jetstream::kv::Config {
554    ///         bucket: "kv".to_string(),
555    ///         history: 10,
556    ///         ..Default::default()
557    ///     })
558    ///     .await?;
559    /// let mut entries = kv.watch("kv").await?;
560    /// while let Some(entry) = entries.next().await {
561    ///     println!("entry: {:?}", entry);
562    /// }
563    /// # Ok(())
564    /// # }
565    /// ```
566    pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
567        self.watch_with_deliver_policy(key, DeliverPolicy::New)
568            .await
569    }
570
571    /// Creates a [futures::Stream] over [Entries][Entry] in the bucket, which yields
572    /// values whenever there are changes for given keys.
573    ///
574    /// # Examples
575    ///
576    /// ```no_run
577    /// # #[tokio::main]
578    /// # async fn main() -> Result<(), async_nats::Error> {
579    /// use futures::StreamExt;
580    /// let client = async_nats::connect("demo.nats.io:4222").await?;
581    /// let jetstream = async_nats::jetstream::new(client);
582    /// let kv = jetstream
583    ///     .create_key_value(async_nats::jetstream::kv::Config {
584    ///         bucket: "kv".to_string(),
585    ///         history: 10,
586    ///         ..Default::default()
587    ///     })
588    ///     .await?;
589    /// let mut entries = kv.watch_many(["foo", "bar"]).await?;
590    /// while let Some(entry) = entries.next().await {
591    ///     println!("entry: {:?}", entry);
592    /// }
593    /// # Ok(())
594    /// # }
595    /// ```
596    #[cfg(feature = "server_2_10")]
597    pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
598    where
599        T: AsRef<str>,
600        K: IntoIterator<Item = T>,
601    {
602        self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
603            .await
604    }
605
606    /// Creates a [futures::Stream] over [Entries][Entry] for a given key in the bucket, starting from
607    /// provided revision. This is useful to resume watching over big KV buckets without a need to
608    /// replay all the history.
609    ///
610    /// # Examples
611    ///
612    /// ```no_run
613    /// # #[tokio::main]
614    /// # async fn main() -> Result<(), async_nats::Error> {
615    /// use futures::StreamExt;
616    /// let client = async_nats::connect("demo.nats.io:4222").await?;
617    /// let jetstream = async_nats::jetstream::new(client);
618    /// let kv = jetstream
619    ///     .create_key_value(async_nats::jetstream::kv::Config {
620    ///         bucket: "kv".to_string(),
621    ///         history: 10,
622    ///         ..Default::default()
623    ///     })
624    ///     .await?;
625    /// let mut entries = kv.watch_from_revision("kv", 5).await?;
626    /// while let Some(entry) = entries.next().await {
627    ///     println!("entry: {:?}", entry);
628    /// }
629    /// # Ok(())
630    /// # }
631    /// ```
632    pub async fn watch_from_revision<T: AsRef<str>>(
633        &self,
634        key: T,
635        revision: u64,
636    ) -> Result<Watch, WatchError> {
637        self.watch_with_deliver_policy(
638            key,
639            DeliverPolicy::ByStartSequence {
640                start_sequence: revision,
641            },
642        )
643        .await
644    }
645
646    /// Creates a [futures::Stream] over [Entries][Entry]  a given key in the bucket, which yields
647    /// values whenever there are changes for that key with as well as last value.
648    ///
649    /// # Examples
650    ///
651    /// ```no_run
652    /// # #[tokio::main]
653    /// # async fn main() -> Result<(), async_nats::Error> {
654    /// use futures::StreamExt;
655    /// let client = async_nats::connect("demo.nats.io:4222").await?;
656    /// let jetstream = async_nats::jetstream::new(client);
657    /// let kv = jetstream
658    ///     .create_key_value(async_nats::jetstream::kv::Config {
659    ///         bucket: "kv".to_string(),
660    ///         history: 10,
661    ///         ..Default::default()
662    ///     })
663    ///     .await?;
664    /// let mut entries = kv.watch_with_history("kv").await?;
665    /// while let Some(entry) = entries.next().await {
666    ///     println!("entry: {:?}", entry);
667    /// }
668    /// # Ok(())
669    /// # }
670    /// ```
671    pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
672        self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
673            .await
674    }
675
676    /// Creates a [futures::Stream] over [Entries][Entry]  a given keys in the bucket, which yields
677    /// values whenever there are changes for those keys with as well as last value.
678    /// This requires server version > 2.10 as it uses consumers with multiple subject filters.
679    ///
680    /// # Examples
681    ///
682    /// ```no_run
683    /// # #[tokio::main]
684    /// # async fn main() -> Result<(), async_nats::Error> {
685    /// use futures::StreamExt;
686    /// let client = async_nats::connect("demo.nats.io:4222").await?;
687    /// let jetstream = async_nats::jetstream::new(client);
688    /// let kv = jetstream
689    ///     .create_key_value(async_nats::jetstream::kv::Config {
690    ///         bucket: "kv".to_string(),
691    ///         history: 10,
692    ///         ..Default::default()
693    ///     })
694    ///     .await?;
695    /// let mut entries = kv.watch_many_with_history(["key1", "key2"]).await?;
696    /// while let Some(entry) = entries.next().await {
697    ///     println!("entry: {:?}", entry);
698    /// }
699    /// # Ok(())
700    /// # }
701    /// ```
702    #[cfg(feature = "server_2_10")]
703    pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
704        &self,
705        keys: K,
706    ) -> Result<Watch, WatchError> {
707        self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
708            .await
709    }
710
711    #[cfg(feature = "server_2_10")]
712    async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
713        &self,
714        keys: K,
715        deliver_policy: DeliverPolicy,
716    ) -> Result<Watch, WatchError> {
717        let subjects = keys
718            .into_iter()
719            .map(|key| {
720                let key = key.as_ref();
721                format!("{}{}", self.prefix.as_str(), key)
722            })
723            .collect::<Vec<_>>();
724
725        debug!("initial consumer creation");
726        let consumer = self
727            .stream
728            .create_consumer(super::consumer::push::OrderedConfig {
729                deliver_subject: self.stream.context.client.new_inbox(),
730                description: Some("kv watch consumer".to_string()),
731                filter_subjects: subjects,
732                replay_policy: super::consumer::ReplayPolicy::Instant,
733                deliver_policy,
734                ..Default::default()
735            })
736            .await
737            .map_err(|err| match err.kind() {
738                crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
739                    WatchError::new(WatchErrorKind::TimedOut)
740                }
741                _ => WatchError::with_source(WatchErrorKind::Other, err),
742            })?;
743
744        let seen_current = consumer.cached_info().num_pending == 0;
745
746        Ok(Watch {
747            subscription: consumer.messages().await.map_err(|err| match err.kind() {
748                crate::jetstream::consumer::StreamErrorKind::TimedOut => {
749                    WatchError::new(WatchErrorKind::TimedOut)
750                }
751                crate::jetstream::consumer::StreamErrorKind::Other => {
752                    WatchError::with_source(WatchErrorKind::Other, err)
753                }
754            })?,
755            prefix: self.prefix.clone(),
756            bucket: self.name.clone(),
757            seen_current,
758        })
759    }
760
761    async fn watch_with_deliver_policy<T: AsRef<str>>(
762        &self,
763        key: T,
764        deliver_policy: DeliverPolicy,
765    ) -> Result<Watch, WatchError> {
766        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
767
768        debug!("initial consumer creation");
769        let consumer = self
770            .stream
771            .create_consumer(super::consumer::push::OrderedConfig {
772                deliver_subject: self.stream.context.client.new_inbox(),
773                description: Some("kv watch consumer".to_string()),
774                filter_subject: subject,
775                replay_policy: super::consumer::ReplayPolicy::Instant,
776                deliver_policy,
777                ..Default::default()
778            })
779            .await
780            .map_err(|err| match err.kind() {
781                crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
782                    WatchError::new(WatchErrorKind::TimedOut)
783                }
784                _ => WatchError::with_source(WatchErrorKind::Other, err),
785            })?;
786
787        let seen_current = consumer.cached_info().num_pending == 0;
788
789        Ok(Watch {
790            subscription: consumer.messages().await.map_err(|err| match err.kind() {
791                crate::jetstream::consumer::StreamErrorKind::TimedOut => {
792                    WatchError::new(WatchErrorKind::TimedOut)
793                }
794                crate::jetstream::consumer::StreamErrorKind::Other => {
795                    WatchError::with_source(WatchErrorKind::Other, err)
796                }
797            })?,
798            prefix: self.prefix.clone(),
799            bucket: self.name.clone(),
800            seen_current,
801        })
802    }
803
804    /// Creates a [futures::Stream] over [Entries][Entry] for all keys, which yields
805    /// values whenever there are changes in the bucket.
806    ///
807    /// # Examples
808    ///
809    /// ```no_run
810    /// # #[tokio::main]
811    /// # async fn main() -> Result<(), async_nats::Error> {
812    /// use futures::StreamExt;
813    /// let client = async_nats::connect("demo.nats.io:4222").await?;
814    /// let jetstream = async_nats::jetstream::new(client);
815    /// let kv = jetstream
816    ///     .create_key_value(async_nats::jetstream::kv::Config {
817    ///         bucket: "kv".to_string(),
818    ///         history: 10,
819    ///         ..Default::default()
820    ///     })
821    ///     .await?;
822    /// let mut entries = kv.watch_all().await?;
823    /// while let Some(entry) = entries.next().await {
824    ///     println!("entry: {:?}", entry);
825    /// }
826    /// # Ok(())
827    /// # }
828    /// ```
829    pub async fn watch_all(&self) -> Result<Watch, WatchError> {
830        self.watch(ALL_KEYS).await
831    }
832
833    /// Creates a [futures::Stream] over [Entries][Entry] for all keys starting
834    /// from a provider revision. This can be useful when resuming watching over a big bucket
835    /// without the need to replay all the history.
836    ///
837    /// # Examples
838    ///
839    /// ```no_run
840    /// # #[tokio::main]
841    /// # async fn main() -> Result<(), async_nats::Error> {
842    /// use futures::StreamExt;
843    /// let client = async_nats::connect("demo.nats.io:4222").await?;
844    /// let jetstream = async_nats::jetstream::new(client);
845    /// let kv = jetstream
846    ///     .create_key_value(async_nats::jetstream::kv::Config {
847    ///         bucket: "kv".to_string(),
848    ///         history: 10,
849    ///         ..Default::default()
850    ///     })
851    ///     .await?;
852    /// let mut entries = kv.watch_all_from_revision(40).await?;
853    /// while let Some(entry) = entries.next().await {
854    ///     println!("entry: {:?}", entry);
855    /// }
856    /// # Ok(())
857    /// # }
858    /// ```
859    pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
860        self.watch_from_revision(ALL_KEYS, revision).await
861    }
862
863    /// Retrieves the [Entry] for a given key from a bucket.
864    ///
865    /// # Examples
866    ///
867    /// ```no_run
868    /// # #[tokio::main]
869    /// # async fn main() -> Result<(), async_nats::Error> {
870    /// let client = async_nats::connect("demo.nats.io:4222").await?;
871    /// let jetstream = async_nats::jetstream::new(client);
872    /// let kv = jetstream
873    ///     .create_key_value(async_nats::jetstream::kv::Config {
874    ///         bucket: "kv".to_string(),
875    ///         history: 10,
876    ///         ..Default::default()
877    ///     })
878    ///     .await?;
879    /// let value = kv.get("key").await?;
880    /// match value {
881    ///     Some(bytes) => {
882    ///         let value_str = std::str::from_utf8(&bytes)?;
883    ///         println!("Value: {}", value_str);
884    ///     }
885    ///     None => {
886    ///         println!("Key not found or value not set");
887    ///     }
888    /// }
889    /// # Ok(())
890    /// # }
891    /// ```
892    pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
893        match self.entry(key).await {
894            Ok(Some(entry)) => match entry.operation {
895                Operation::Put => Ok(Some(entry.value)),
896                _ => Ok(None),
897            },
898            Ok(None) => Ok(None),
899            Err(err) => Err(err),
900        }
901    }
902
903    /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
904    /// the bucket.
905    ///
906    /// # Examples
907    ///
908    /// ```no_run
909    /// # #[tokio::main]
910    /// # async fn main() -> Result<(), async_nats::Error> {
911    /// use futures::StreamExt;
912    /// let client = async_nats::connect("demo.nats.io:4222").await?;
913    /// let jetstream = async_nats::jetstream::new(client);
914    /// let kv = jetstream
915    ///     .create_key_value(async_nats::jetstream::kv::Config {
916    ///         bucket: "kv".to_string(),
917    ///         history: 10,
918    ///         ..Default::default()
919    ///     })
920    ///     .await?;
921    /// let revision = kv.put("key", "value".into()).await?;
922    /// kv.update("key", "updated".into(), revision).await?;
923    /// # Ok(())
924    /// # }
925    /// ```
926    pub async fn update<T: AsRef<str>>(
927        &self,
928        key: T,
929        value: Bytes,
930        revision: u64,
931    ) -> Result<u64, UpdateError> {
932        self.update_maybe_ttl(key, value, revision, None).await
933    }
934
935    async fn update_maybe_ttl<T: AsRef<str>>(
936        &self,
937        key: T,
938        value: Bytes,
939        revision: u64,
940        ttl: Option<Duration>,
941    ) -> Result<u64, UpdateError> {
942        if !is_valid_key(key.as_ref()) {
943            return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
944        }
945        let mut subject = String::new();
946        if self.use_jetstream_prefix {
947            subject.push_str(&self.stream.context.prefix);
948            subject.push('.');
949        }
950        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
951        subject.push_str(key.as_ref());
952
953        let mut headers = crate::HeaderMap::default();
954        headers.insert(
955            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
956            HeaderValue::from(revision),
957        );
958
959        if let Some(ttl) = ttl {
960            headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
961        }
962
963        self.stream
964            .context
965            .publish_with_headers(subject, headers, value)
966            .await?
967            .await
968            .map_err(|err| err.into())
969            .map(|publish_ack| publish_ack.sequence)
970    }
971
972    /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
973    ///
974    /// # Examples
975    ///
976    /// ```no_run
977    /// # #[tokio::main]
978    /// # async fn main() -> Result<(), async_nats::Error> {
979    /// use futures::StreamExt;
980    /// let client = async_nats::connect("demo.nats.io:4222").await?;
981    /// let jetstream = async_nats::jetstream::new(client);
982    /// let kv = jetstream
983    ///     .create_key_value(async_nats::jetstream::kv::Config {
984    ///         bucket: "kv".to_string(),
985    ///         history: 10,
986    ///         ..Default::default()
987    ///     })
988    ///     .await?;
989    /// kv.put("key", "value".into()).await?;
990    /// kv.delete("key").await?;
991    /// # Ok(())
992    /// # }
993    /// ```
994    pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
995        self.delete_expect_revision(key, None).await
996    }
997
998    /// Deletes a given key if the revision matches. This is a non-destructive operation, which
999    /// sets a `DELETE` marker.
1000    ///
1001    /// # Examples
1002    ///
1003    /// ```no_run
1004    /// # #[tokio::main]
1005    /// # async fn main() -> Result<(), async_nats::Error> {
1006    /// use futures::StreamExt;
1007    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1008    /// let jetstream = async_nats::jetstream::new(client);
1009    /// let kv = jetstream
1010    ///     .create_key_value(async_nats::jetstream::kv::Config {
1011    ///         bucket: "kv".to_string(),
1012    ///         history: 10,
1013    ///         ..Default::default()
1014    ///     })
1015    ///     .await?;
1016    /// let revision = kv.put("key", "value".into()).await?;
1017    /// kv.delete_expect_revision("key", Some(revision)).await?;
1018    /// # Ok(())
1019    /// # }
1020    /// ```
1021    pub async fn delete_expect_revision<T: AsRef<str>>(
1022        &self,
1023        key: T,
1024        revison: Option<u64>,
1025    ) -> Result<(), DeleteError> {
1026        if !is_valid_key(key.as_ref()) {
1027            return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
1028        }
1029        let mut subject = String::new();
1030        if self.use_jetstream_prefix {
1031            subject.push_str(&self.stream.context.prefix);
1032            subject.push('.');
1033        }
1034        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1035        subject.push_str(key.as_ref());
1036
1037        let mut headers = crate::HeaderMap::default();
1038        // TODO: figure out which headers k/v should be where.
1039        headers.insert(
1040            KV_OPERATION,
1041            KV_OPERATION_DELETE
1042                .parse::<HeaderValue>()
1043                .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
1044        );
1045
1046        if let Some(revision) = revison {
1047            headers.insert(
1048                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1049                HeaderValue::from(revision),
1050            );
1051        }
1052
1053        self.stream
1054            .context
1055            .publish_with_headers(subject, headers, "".into())
1056            .await?
1057            .await?;
1058        Ok(())
1059    }
1060
1061    /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1062    ///
1063    /// # Examples
1064    ///
1065    /// ```no_run
1066    /// # #[tokio::main]
1067    /// # async fn main() -> Result<(), async_nats::Error> {
1068    /// use futures::StreamExt;
1069    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1070    /// let jetstream = async_nats::jetstream::new(client);
1071    /// let kv = jetstream
1072    ///     .create_key_value(async_nats::jetstream::kv::Config {
1073    ///         bucket: "kv".to_string(),
1074    ///         history: 10,
1075    ///         ..Default::default()
1076    ///     })
1077    ///     .await?;
1078    /// kv.put("key", "value".into()).await?;
1079    /// kv.put("key", "another".into()).await?;
1080    /// kv.purge("key").await?;
1081    /// # Ok(())
1082    /// # }
1083    /// ```
1084    pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
1085        self.purge_expect_revision(key, None).await
1086    }
1087
1088    /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1089    /// The purge entry will remain valid for the given `ttl`.
1090    ///
1091    /// # Examples
1092    ///
1093    /// ```no_run
1094    /// # #[tokio::main]
1095    /// # async fn main() -> Result<(), async_nats::Error> {
1096    /// use futures::StreamExt;
1097    /// use std::time::Duration;
1098    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1099    /// let jetstream = async_nats::jetstream::new(client);
1100    /// let kv = jetstream
1101    ///     .create_key_value(async_nats::jetstream::kv::Config {
1102    ///         bucket: "kv".to_string(),
1103    ///         history: 10,
1104    ///         ..Default::default()
1105    ///     })
1106    ///     .await?;
1107    /// kv.put("key", "value".into()).await?;
1108    /// kv.put("key", "another".into()).await?;
1109    /// kv.purge_with_ttl("key", Duration::from_secs(10)).await?;
1110    /// # Ok(())
1111    /// # }
1112    /// ```
1113    pub async fn purge_with_ttl<T: AsRef<str>>(
1114        &self,
1115        key: T,
1116        ttl: Duration,
1117    ) -> Result<(), PurgeError> {
1118        self.purge_expect_revision_maybe_ttl(key, None, Some(ttl))
1119            .await
1120    }
1121
1122    /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1123    /// purge entry in-place.
1124    ///
1125    /// # Examples
1126    ///
1127    /// ```no_run
1128    /// # #[tokio::main]
1129    /// # async fn main() -> Result<(), async_nats::Error> {
1130    /// use futures::StreamExt;
1131    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1132    /// let jetstream = async_nats::jetstream::new(client);
1133    /// let kv = jetstream
1134    ///     .create_key_value(async_nats::jetstream::kv::Config {
1135    ///         bucket: "kv".to_string(),
1136    ///         history: 10,
1137    ///         ..Default::default()
1138    ///     })
1139    ///     .await?;
1140    /// kv.put("key", "value".into()).await?;
1141    /// let revision = kv.put("key", "another".into()).await?;
1142    /// kv.purge_expect_revision("key", Some(revision)).await?;
1143    /// # Ok(())
1144    /// # }
1145    /// ```
1146    pub async fn purge_expect_revision<T: AsRef<str>>(
1147        &self,
1148        key: T,
1149        revision: Option<u64>,
1150    ) -> Result<(), PurgeError> {
1151        self.purge_expect_revision_maybe_ttl(key, revision, None)
1152            .await
1153    }
1154
1155    /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1156    /// purge entry in-place. The purge entry will remain valid for the given `ttl`.
1157    ///
1158    /// # Examples
1159    ///
1160    /// ```no_run
1161    /// # #[tokio::main]
1162    /// # async fn main() -> Result<(), async_nats::Error> {
1163    /// use futures::StreamExt;
1164    /// use std::time::Duration;
1165    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1166    /// let jetstream = async_nats::jetstream::new(client);
1167    /// let kv = jetstream
1168    ///     .create_key_value(async_nats::jetstream::kv::Config {
1169    ///         bucket: "kv".to_string(),
1170    ///         history: 10,
1171    ///         ..Default::default()
1172    ///     })
1173    ///     .await?;
1174    /// kv.put("key", "value".into()).await?;
1175    /// let revision = kv.put("key", "another".into()).await?;
1176    /// kv.purge_expect_revision_with_ttl("key", revision, Duration::from_secs(10))
1177    ///     .await?;
1178    /// # Ok(())
1179    /// # }
1180    /// ```
1181    pub async fn purge_expect_revision_with_ttl<T: AsRef<str>>(
1182        &self,
1183        key: T,
1184        revision: u64,
1185        ttl: Duration,
1186    ) -> Result<(), PurgeError> {
1187        self.purge_expect_revision_maybe_ttl(key, Some(revision), Some(ttl))
1188            .await
1189    }
1190
1191    async fn purge_expect_revision_maybe_ttl<T: AsRef<str>>(
1192        &self,
1193        key: T,
1194        revision: Option<u64>,
1195        ttl: Option<Duration>,
1196    ) -> Result<(), PurgeError> {
1197        if !is_valid_key(key.as_ref()) {
1198            return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
1199        }
1200
1201        let mut subject = String::new();
1202        if self.use_jetstream_prefix {
1203            subject.push_str(&self.stream.context.prefix);
1204            subject.push('.');
1205        }
1206        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1207        subject.push_str(key.as_ref());
1208
1209        let mut headers = crate::HeaderMap::default();
1210        headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
1211        headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
1212        if let Some(ttl) = ttl {
1213            headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
1214        }
1215
1216        if let Some(revision) = revision {
1217            headers.insert(
1218                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1219                HeaderValue::from(revision),
1220            );
1221        }
1222
1223        self.stream
1224            .context
1225            .publish_with_headers(subject, headers, "".into())
1226            .await?
1227            .await?;
1228        Ok(())
1229    }
1230
1231    /// Returns a [futures::Stream] that allows iterating over all [Operations][Operation] that
1232    /// happen for given key.
1233    ///
1234    /// # Examples
1235    ///
1236    /// ```no_run
1237    /// # #[tokio::main]
1238    /// # async fn main() -> Result<(), async_nats::Error> {
1239    /// use futures::StreamExt;
1240    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1241    /// let jetstream = async_nats::jetstream::new(client);
1242    /// let kv = jetstream
1243    ///     .create_key_value(async_nats::jetstream::kv::Config {
1244    ///         bucket: "kv".to_string(),
1245    ///         history: 10,
1246    ///         ..Default::default()
1247    ///     })
1248    ///     .await?;
1249    /// let mut entries = kv.history("kv").await?;
1250    /// while let Some(entry) = entries.next().await {
1251    ///     println!("entry: {:?}", entry);
1252    /// }
1253    /// # Ok(())
1254    /// # }
1255    /// ```
1256    pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
1257        if !is_valid_key(key.as_ref()) {
1258            return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
1259        }
1260        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
1261
1262        let consumer = self
1263            .stream
1264            .create_consumer(super::consumer::push::OrderedConfig {
1265                deliver_subject: self.stream.context.client.new_inbox(),
1266                description: Some("kv history consumer".to_string()),
1267                filter_subject: subject,
1268                replay_policy: super::consumer::ReplayPolicy::Instant,
1269                ..Default::default()
1270            })
1271            .await?;
1272
1273        Ok(History {
1274            subscription: consumer.messages().await?,
1275            done: false,
1276            prefix: self.prefix.clone(),
1277            bucket: self.name.clone(),
1278        })
1279    }
1280
1281    /// Returns a [futures::Stream] that allows iterating over all keys in the bucket.
1282    ///
1283    /// # Examples
1284    ///
1285    /// Iterating over each each key individually
1286    ///
1287    /// ```no_run
1288    /// # #[tokio::main]
1289    /// # async fn main() -> Result<(), async_nats::Error> {
1290    /// use futures::{StreamExt, TryStreamExt};
1291    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1292    /// let jetstream = async_nats::jetstream::new(client);
1293    /// let kv = jetstream
1294    ///     .create_key_value(async_nats::jetstream::kv::Config {
1295    ///         bucket: "kv".to_string(),
1296    ///         history: 10,
1297    ///         ..Default::default()
1298    ///     })
1299    ///     .await?;
1300    /// let mut keys = kv.keys().await?.boxed();
1301    /// while let Some(key) = keys.try_next().await? {
1302    ///     println!("key: {:?}", key);
1303    /// }
1304    /// # Ok(())
1305    /// # }
1306    /// ```
1307    ///
1308    /// Collecting it into a vector of keys
1309    ///
1310    /// ```no_run
1311    /// # #[tokio::main]
1312    /// # async fn main() -> Result<(), async_nats::Error> {
1313    /// use futures::TryStreamExt;
1314    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1315    /// let jetstream = async_nats::jetstream::new(client);
1316    /// let kv = jetstream
1317    ///     .create_key_value(async_nats::jetstream::kv::Config {
1318    ///         bucket: "kv".to_string(),
1319    ///         history: 10,
1320    ///         ..Default::default()
1321    ///     })
1322    ///     .await?;
1323    /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1324    /// println!("Keys: {:?}", keys);
1325    /// # Ok(())
1326    /// # }
1327    /// ```
1328    pub async fn keys(&self) -> Result<Keys, HistoryError> {
1329        let subject = format!("{}>", self.prefix.as_str());
1330
1331        let consumer = self
1332            .stream
1333            .create_consumer(super::consumer::push::OrderedConfig {
1334                deliver_subject: self.stream.context.client.new_inbox(),
1335                description: Some("kv history consumer".to_string()),
1336                filter_subject: subject,
1337                headers_only: true,
1338                replay_policy: super::consumer::ReplayPolicy::Instant,
1339                // We only need to know the latest state for each key, not the whole history
1340                deliver_policy: DeliverPolicy::LastPerSubject,
1341                ..Default::default()
1342            })
1343            .await?;
1344
1345        let entries = History {
1346            done: consumer.info.num_pending == 0,
1347            subscription: consumer.messages().await?,
1348            prefix: self.prefix.clone(),
1349            bucket: self.name.clone(),
1350        };
1351
1352        Ok(Keys { inner: entries })
1353    }
1354}
1355
1356/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1357pub struct Watch {
1358    seen_current: bool,
1359    subscription: super::consumer::push::Ordered,
1360    prefix: String,
1361    bucket: String,
1362}
1363
1364impl futures::Stream for Watch {
1365    type Item = Result<Entry, WatcherError>;
1366
1367    fn poll_next(
1368        mut self: std::pin::Pin<&mut Self>,
1369        cx: &mut std::task::Context<'_>,
1370    ) -> std::task::Poll<Option<Self::Item>> {
1371        match self.subscription.poll_next_unpin(cx) {
1372            Poll::Ready(message) => match message {
1373                None => Poll::Ready(None),
1374                Some(message) => {
1375                    let message = message?;
1376                    let info = message.info().map_err(|err| {
1377                        WatcherError::with_source(
1378                            WatcherErrorKind::Other,
1379                            format!("failed to parse message metadata: {}", err),
1380                        )
1381                    })?;
1382
1383                    let operation =
1384                        kv_operation_from_message(&message.message).unwrap_or(Operation::Put);
1385
1386                    let key = message
1387                        .subject
1388                        .strip_prefix(&self.prefix)
1389                        .map(|s| s.to_string())
1390                        .unwrap();
1391
1392                    if !self.seen_current && info.pending == 0 {
1393                        self.seen_current = true;
1394                    }
1395
1396                    Poll::Ready(Some(Ok(Entry {
1397                        bucket: self.bucket.clone(),
1398                        key,
1399                        value: message.payload.clone(),
1400                        revision: info.stream_sequence,
1401                        created: info.published,
1402                        delta: info.pending,
1403                        operation,
1404                        seen_current: self.seen_current,
1405                    })))
1406                }
1407            },
1408            std::task::Poll::Pending => Poll::Pending,
1409        }
1410    }
1411
1412    fn size_hint(&self) -> (usize, Option<usize>) {
1413        (0, None)
1414    }
1415}
1416
1417/// A structure representing the history of a key-value bucket, yielding past values.
1418pub struct History {
1419    subscription: super::consumer::push::Ordered,
1420    done: bool,
1421    prefix: String,
1422    bucket: String,
1423}
1424
1425impl futures::Stream for History {
1426    type Item = Result<Entry, WatcherError>;
1427
1428    fn poll_next(
1429        mut self: std::pin::Pin<&mut Self>,
1430        cx: &mut std::task::Context<'_>,
1431    ) -> std::task::Poll<Option<Self::Item>> {
1432        if self.done {
1433            return Poll::Ready(None);
1434        }
1435        match self.subscription.poll_next_unpin(cx) {
1436            Poll::Ready(message) => match message {
1437                None => Poll::Ready(None),
1438                Some(message) => {
1439                    let message = message?;
1440                    let info = message.info().map_err(|err| {
1441                        WatcherError::with_source(
1442                            WatcherErrorKind::Other,
1443                            format!("failed to parse message metadata: {}", err),
1444                        )
1445                    })?;
1446                    if info.pending == 0 {
1447                        self.done = true;
1448                    }
1449
1450                    let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1451
1452                    let key = message
1453                        .subject
1454                        .strip_prefix(&self.prefix)
1455                        .map(|s| s.to_string())
1456                        .unwrap();
1457
1458                    Poll::Ready(Some(Ok(Entry {
1459                        bucket: self.bucket.clone(),
1460                        key,
1461                        value: message.payload.clone(),
1462                        revision: info.stream_sequence,
1463                        created: info.published,
1464                        delta: info.pending,
1465                        operation,
1466                        seen_current: self.done,
1467                    })))
1468                }
1469            },
1470            std::task::Poll::Pending => Poll::Pending,
1471        }
1472    }
1473
1474    fn size_hint(&self) -> (usize, Option<usize>) {
1475        (0, None)
1476    }
1477}
1478
1479pub struct Keys {
1480    inner: History,
1481}
1482
1483impl futures::Stream for Keys {
1484    type Item = Result<String, WatcherError>;
1485
1486    fn poll_next(
1487        mut self: std::pin::Pin<&mut Self>,
1488        cx: &mut std::task::Context<'_>,
1489    ) -> std::task::Poll<Option<Self::Item>> {
1490        loop {
1491            match self.inner.poll_next_unpin(cx) {
1492                Poll::Ready(None) => return Poll::Ready(None),
1493                Poll::Ready(Some(res)) => match res {
1494                    Ok(entry) => {
1495                        // Skip purged and deleted keys
1496                        if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1497                            // Try to poll again if we skip this one
1498                            continue;
1499                        } else {
1500                            return Poll::Ready(Some(Ok(entry.key)));
1501                        }
1502                    }
1503                    Err(e) => return Poll::Ready(Some(Err(e))),
1504                },
1505                Poll::Pending => return Poll::Pending,
1506            }
1507        }
1508    }
1509}
1510
1511/// An entry in a key-value bucket.
1512#[derive(Debug, Clone, PartialEq, Eq)]
1513pub struct Entry {
1514    /// Name of the bucket the entry is in.
1515    pub bucket: String,
1516    /// The key that was retrieved.
1517    pub key: String,
1518    /// The value that was retrieved.
1519    pub value: Bytes,
1520    /// A unique sequence for this value.
1521    pub revision: u64,
1522    /// Distance from the latest value.
1523    pub delta: u64,
1524    /// The time the data was put in the bucket.
1525    pub created: OffsetDateTime,
1526    /// The kind of operation that caused this entry.
1527    pub operation: Operation,
1528    /// Set to true after all historical messages have been received, and
1529    /// now all Entries are the new ones.
1530    pub seen_current: bool,
1531}
1532
1533#[derive(Clone, Debug, PartialEq)]
1534pub enum StatusErrorKind {
1535    JetStream(crate::jetstream::Error),
1536    TimedOut,
1537}
1538
1539impl Display for StatusErrorKind {
1540    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1541        match self {
1542            Self::JetStream(err) => write!(f, "jetstream request failed: {}", err),
1543            Self::TimedOut => write!(f, "timed out"),
1544        }
1545    }
1546}
1547
1548pub type StatusError = Error<StatusErrorKind>;
1549
1550#[derive(Clone, Copy, Debug, PartialEq)]
1551pub enum CreateErrorKind {
1552    AlreadyExists,
1553    InvalidKey,
1554    Publish,
1555    Ack,
1556    Other,
1557}
1558
1559impl From<UpdateError> for CreateError {
1560    fn from(error: UpdateError) -> Self {
1561        match error.kind() {
1562            UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1563            UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1564            UpdateErrorKind::WrongLastRevision => Error::from(CreateErrorKind::AlreadyExists),
1565            UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
1566        }
1567    }
1568}
1569
1570impl From<PutError> for CreateError {
1571    fn from(error: PutError) -> Self {
1572        match error.kind() {
1573            PutErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1574            PutErrorKind::Publish => Error::from(CreateErrorKind::Publish),
1575            PutErrorKind::Ack => Error::from(CreateErrorKind::Ack),
1576        }
1577    }
1578}
1579
1580impl From<EntryError> for CreateError {
1581    fn from(error: EntryError) -> Self {
1582        match error.kind() {
1583            EntryErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1584            EntryErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1585            EntryErrorKind::Other => Error::from(CreateErrorKind::Other),
1586        }
1587    }
1588}
1589
1590impl Display for CreateErrorKind {
1591    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1592        match self {
1593            Self::AlreadyExists => write!(f, "key already exists"),
1594            Self::Publish => write!(f, "failed to create key in store"),
1595            Self::Ack => write!(f, "ack error"),
1596            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1597            Self::Other => write!(f, "other error"),
1598        }
1599    }
1600}
1601
1602pub type CreateError = Error<CreateErrorKind>;
1603
1604#[derive(Clone, Copy, Debug, PartialEq)]
1605pub enum PutErrorKind {
1606    InvalidKey,
1607    Publish,
1608    Ack,
1609}
1610
1611impl Display for PutErrorKind {
1612    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1613        match self {
1614            Self::Publish => write!(f, "failed to put key into store"),
1615            Self::Ack => write!(f, "ack error"),
1616            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1617        }
1618    }
1619}
1620
1621pub type PutError = Error<PutErrorKind>;
1622
1623#[derive(Clone, Copy, Debug, PartialEq)]
1624pub enum EntryErrorKind {
1625    InvalidKey,
1626    TimedOut,
1627    Other,
1628}
1629
1630impl Display for EntryErrorKind {
1631    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1632        match self {
1633            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1634            Self::TimedOut => write!(f, "timed out"),
1635            Self::Other => write!(f, "failed getting entry"),
1636        }
1637    }
1638}
1639
1640pub type EntryError = Error<EntryErrorKind>;
1641
1642crate::from_with_timeout!(
1643    EntryError,
1644    EntryErrorKind,
1645    DirectGetError,
1646    DirectGetErrorKind
1647);
1648
1649#[derive(Clone, Copy, Debug, PartialEq)]
1650pub enum WatchErrorKind {
1651    InvalidKey,
1652    TimedOut,
1653    ConsumerCreate,
1654    Other,
1655}
1656
1657impl Display for WatchErrorKind {
1658    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1659        match self {
1660            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1661            Self::Other => write!(f, "watch failed"),
1662            Self::TimedOut => write!(f, "timed out"),
1663            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1664        }
1665    }
1666}
1667
1668pub type WatchError = Error<WatchErrorKind>;
1669
1670crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1671crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1672
1673#[derive(Clone, Copy, Debug, PartialEq)]
1674pub enum UpdateErrorKind {
1675    InvalidKey,
1676    TimedOut,
1677    WrongLastRevision,
1678    Other,
1679}
1680
1681impl Display for UpdateErrorKind {
1682    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1683        match self {
1684            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1685            Self::TimedOut => write!(f, "timed out"),
1686            Self::WrongLastRevision => write!(f, "wrong last revision"),
1687            Self::Other => write!(f, "failed getting entry"),
1688        }
1689    }
1690}
1691
1692pub type UpdateError = Error<UpdateErrorKind>;
1693
1694impl From<PublishError> for UpdateError {
1695    fn from(err: PublishError) -> Self {
1696        match err.kind() {
1697            PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
1698            PublishErrorKind::WrongLastSequence => {
1699                Self::with_source(UpdateErrorKind::WrongLastRevision, err)
1700            }
1701            _ => Self::with_source(UpdateErrorKind::Other, err),
1702        }
1703    }
1704}
1705
1706#[derive(Clone, Copy, Debug, PartialEq)]
1707pub enum WatcherErrorKind {
1708    Consumer,
1709    Other,
1710}
1711
1712impl Display for WatcherErrorKind {
1713    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1714        match self {
1715            Self::Consumer => write!(f, "watcher consumer error"),
1716            Self::Other => write!(f, "watcher error"),
1717        }
1718    }
1719}
1720
1721pub type WatcherError = Error<WatcherErrorKind>;
1722
1723impl From<OrderedError> for WatcherError {
1724    fn from(err: OrderedError) -> Self {
1725        WatcherError::with_source(WatcherErrorKind::Consumer, err)
1726    }
1727}
1728
1729pub type DeleteError = UpdateError;
1730pub type DeleteErrorKind = UpdateErrorKind;
1731
1732pub type PurgeError = UpdateError;
1733pub type PurgeErrorKind = UpdateErrorKind;
1734
1735pub type HistoryError = WatchError;
1736pub type HistoryErrorKind = WatchErrorKind;