async_nats_wrpc/jetstream/kv/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19    fmt::{self, Display},
20    str::FromStr,
21    task::Poll,
22};
23
24use crate::{HeaderValue, StatusCode};
25use bytes::Bytes;
26use futures::StreamExt;
27use once_cell::sync::Lazy;
28use regex::Regex;
29use time::{format_description::well_known::Rfc3339, OffsetDateTime};
30use tracing::debug;
31
32use crate::error::Error;
33use crate::{header, Message};
34
35use self::bucket::Status;
36
37use super::{
38    consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
39    context::{PublishError, PublishErrorKind},
40    stream::{
41        self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, RawMessage,
42        Republish, Source, StorageType, Stream,
43    },
44};
45
46fn kv_operation_from_stream_message(message: &RawMessage) -> Operation {
47    match message.headers.as_deref() {
48        Some(headers) => headers.parse().unwrap_or(Operation::Put),
49        None => Operation::Put,
50    }
51}
52
53fn kv_operation_from_message(message: &Message) -> Result<Operation, EntryError> {
54    let headers = message
55        .headers
56        .as_ref()
57        .ok_or_else(|| EntryError::with_source(EntryErrorKind::Other, "missing headers"))?;
58
59    if let Some(op) = headers.get(KV_OPERATION) {
60        Operation::from_str(op.as_str())
61            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
62    } else {
63        Err(EntryError::with_source(
64            EntryErrorKind::Other,
65            "missing operation",
66        ))
67    }
68}
69
70static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
71static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
72
73pub(crate) const MAX_HISTORY: i64 = 64;
74const ALL_KEYS: &str = ">";
75
76const KV_OPERATION: &str = "KV-Operation";
77const KV_OPERATION_DELETE: &str = "DEL";
78const KV_OPERATION_PURGE: &str = "PURGE";
79const KV_OPERATION_PUT: &str = "PUT";
80
81const NATS_ROLLUP: &str = "Nats-Rollup";
82const ROLLUP_SUBJECT: &str = "sub";
83
84pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
85    VALID_BUCKET_RE.is_match(bucket_name)
86}
87
88pub(crate) fn is_valid_key(key: &str) -> bool {
89    if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
90        return false;
91    }
92
93    VALID_KEY_RE.is_match(key)
94}
95
96/// Configuration values for key value stores.
97#[derive(Debug, Default)]
98pub struct Config {
99    /// Name of the bucket
100    pub bucket: String,
101    /// Human readable description.
102    pub description: String,
103    /// Maximum size of a single value.
104    pub max_value_size: i32,
105    /// Maximum historical entries.
106    pub history: i64,
107    /// Maximum age of any entry in the bucket, expressed in nanoseconds
108    pub max_age: std::time::Duration,
109    /// How large the bucket may become in total bytes before the configured discard policy kicks in
110    pub max_bytes: i64,
111    /// The type of storage backend, `File` (default) and `Memory`
112    pub storage: StorageType,
113    /// How many replicas to keep for each entry in a cluster.
114    pub num_replicas: usize,
115    /// Republish is for republishing messages once persistent in the Key Value Bucket.
116    pub republish: Option<Republish>,
117    /// Bucket mirror configuration.
118    pub mirror: Option<Source>,
119    /// Bucket sources configuration.
120    pub sources: Option<Vec<Source>>,
121    /// Allow mirrors using direct API.
122    pub mirror_direct: bool,
123    /// Compression
124    #[cfg(feature = "server_2_10")]
125    pub compression: bool,
126    /// Cluster and tag placement for the bucket.
127    pub placement: Option<stream::Placement>,
128}
129
130/// Describes what kind of operation and entry represents
131#[derive(Debug, Clone, Copy, Eq, PartialEq)]
132pub enum Operation {
133    /// A value was put into the bucket
134    Put,
135    /// A value was deleted from a bucket
136    Delete,
137    /// A value was purged from a bucket
138    Purge,
139}
140
141impl FromStr for Operation {
142    type Err = ParseOperationError;
143
144    fn from_str(s: &str) -> Result<Self, Self::Err> {
145        match s {
146            KV_OPERATION_DELETE => Ok(Operation::Delete),
147            KV_OPERATION_PURGE => Ok(Operation::Purge),
148            KV_OPERATION_PUT => Ok(Operation::Put),
149            _ => Err(ParseOperationError),
150        }
151    }
152}
153
154#[derive(Debug, Clone)]
155pub struct ParseOperationError;
156
157impl fmt::Display for ParseOperationError {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
160    }
161}
162
163impl std::error::Error for ParseOperationError {}
164
165/// A struct used as a handle for the bucket.
166#[derive(Debug, Clone)]
167pub struct Store {
168    /// The name of the Store.
169    pub name: String,
170    /// The name of the stream associated with the Store.
171    pub stream_name: String,
172    /// The prefix for keys in the Store.
173    pub prefix: String,
174    /// The optional prefix to use when putting new key-value pairs.
175    pub put_prefix: Option<String>,
176    /// Indicates whether to use the JetStream prefix.
177    pub use_jetstream_prefix: bool,
178    /// The stream associated with the Store.
179    pub stream: Stream,
180}
181
182impl Store {
183    /// Queries the server and returns status from the server.
184    ///
185    /// # Examples
186    ///
187    /// ```no_run
188    /// # #[tokio::main]
189    /// # async fn main() -> Result<(), async_nats::Error> {
190    /// let client = async_nats::connect("demo.nats.io:4222").await?;
191    /// let jetstream = async_nats::jetstream::new(client);
192    /// let kv = jetstream
193    ///     .create_key_value(async_nats::jetstream::kv::Config {
194    ///         bucket: "kv".to_string(),
195    ///         history: 10,
196    ///         ..Default::default()
197    ///     })
198    ///     .await?;
199    /// let status = kv.status().await?;
200    /// println!("status: {:?}", status);
201    /// # Ok(())
202    /// # }
203    /// ```
204    pub async fn status(&self) -> Result<Status, StatusError> {
205        // TODO: should we poll for fresh info here? probably yes.
206        let info = self.stream.info.clone();
207
208        Ok(Status {
209            info,
210            bucket: self.name.to_string(),
211        })
212    }
213
214    /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
215    ///
216    /// # Examples
217    ///
218    /// ```no_run
219    /// # #[tokio::main]
220    /// # async fn main() -> Result<(), async_nats::Error> {
221    /// let client = async_nats::connect("demo.nats.io:4222").await?;
222    /// let jetstream = async_nats::jetstream::new(client);
223    /// let kv = jetstream
224    ///     .create_key_value(async_nats::jetstream::kv::Config {
225    ///         bucket: "kv".to_string(),
226    ///         history: 10,
227    ///         ..Default::default()
228    ///     })
229    ///     .await?;
230    ///
231    /// let status = kv.create("key", "value".into()).await;
232    /// assert!(status.is_ok());
233    ///
234    /// let status = kv.create("key", "value".into()).await;
235    /// assert!(status.is_err());
236    ///
237    /// # Ok(())
238    /// # }
239    /// ```
240    pub async fn create<T: AsRef<str>>(
241        &self,
242        key: T,
243        value: bytes::Bytes,
244    ) -> Result<u64, CreateError> {
245        let update_err = match self.update(key.as_ref(), value.clone(), 0).await {
246            Ok(revision) => return Ok(revision),
247            Err(err) => err,
248        };
249
250        match self.entry(key.as_ref()).await? {
251            // Deleted or Purged key, we can create it again.
252            Some(Entry {
253                operation: Operation::Delete | Operation::Purge,
254                ..
255            }) => {
256                let revision = self.put(key, value).await?;
257                Ok(revision)
258            }
259
260            // key already exists.
261            Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
262
263            // Something went wrong with the initial update, return that error
264            None => Err(update_err.into()),
265        }
266    }
267
268    /// Puts new key value pair into the bucket.
269    /// If key didn't exist, it is created. If it did exist, a new value with a new version is
270    /// added.
271    ///
272    /// # Examples
273    ///
274    /// ```no_run
275    /// # #[tokio::main]
276    /// # async fn main() -> Result<(), async_nats::Error> {
277    /// let client = async_nats::connect("demo.nats.io:4222").await?;
278    /// let jetstream = async_nats::jetstream::new(client);
279    /// let kv = jetstream
280    ///     .create_key_value(async_nats::jetstream::kv::Config {
281    ///         bucket: "kv".to_string(),
282    ///         history: 10,
283    ///         ..Default::default()
284    ///     })
285    ///     .await?;
286    /// let status = kv.put("key", "value".into()).await?;
287    /// # Ok(())
288    /// # }
289    /// ```
290    pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
291        if !is_valid_key(key.as_ref()) {
292            return Err(PutError::new(PutErrorKind::InvalidKey));
293        }
294        let mut subject = String::new();
295        if self.use_jetstream_prefix {
296            subject.push_str(&self.stream.context.prefix);
297            subject.push('.');
298        }
299        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
300        subject.push_str(key.as_ref());
301
302        let publish_ack = self
303            .stream
304            .context
305            .publish(subject, value)
306            .await
307            .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
308        let ack = publish_ack
309            .await
310            .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
311
312        Ok(ack.sequence)
313    }
314
315    /// Retrieves the last [Entry] for a given key from a bucket.
316    ///
317    /// # Examples
318    ///
319    /// ```no_run
320    /// # #[tokio::main]
321    /// # async fn main() -> Result<(), async_nats::Error> {
322    /// let client = async_nats::connect("demo.nats.io:4222").await?;
323    /// let jetstream = async_nats::jetstream::new(client);
324    /// let kv = jetstream
325    ///     .create_key_value(async_nats::jetstream::kv::Config {
326    ///         bucket: "kv".to_string(),
327    ///         history: 10,
328    ///         ..Default::default()
329    ///     })
330    ///     .await?;
331    /// let status = kv.put("key", "value".into()).await?;
332    /// let entry = kv.entry("key").await?;
333    /// println!("entry: {:?}", entry);
334    /// # Ok(())
335    /// # }
336    /// ```
337    pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
338        let key: String = key.into();
339        if !is_valid_key(key.as_ref()) {
340            return Err(EntryError::new(EntryErrorKind::InvalidKey));
341        }
342
343        let subject = format!("{}{}", self.prefix.as_str(), &key);
344
345        let result: Option<(Message, Operation, u64, OffsetDateTime)> = {
346            if self.stream.info.config.allow_direct {
347                let message = self
348                    .stream
349                    .direct_get_last_for_subject(subject.as_str())
350                    .await;
351
352                match message {
353                    Ok(message) => {
354                        let headers = message.headers.as_ref().ok_or_else(|| {
355                            EntryError::with_source(EntryErrorKind::Other, "missing headers")
356                        })?;
357
358                        let operation =
359                            kv_operation_from_message(&message).unwrap_or(Operation::Put);
360
361                        let sequence = headers
362                            .get_last(header::NATS_SEQUENCE)
363                            .ok_or_else(|| {
364                                EntryError::with_source(
365                                    EntryErrorKind::Other,
366                                    "missing sequence headers",
367                                )
368                            })?
369                            .as_str()
370                            .parse()
371                            .map_err(|err| {
372                                EntryError::with_source(
373                                    EntryErrorKind::Other,
374                                    format!("failed to parse headers sequence value: {}", err),
375                                )
376                            })?;
377
378                        let created = headers
379                            .get_last(header::NATS_TIME_STAMP)
380                            .ok_or_else(|| {
381                                EntryError::with_source(
382                                    EntryErrorKind::Other,
383                                    "did not found timestamp header",
384                                )
385                            })
386                            .and_then(|created| {
387                                OffsetDateTime::parse(created.as_str(), &Rfc3339).map_err(|err| {
388                                    EntryError::with_source(
389                                        EntryErrorKind::Other,
390                                        format!(
391                                            "failed to parse headers timestampt value: {}",
392                                            err
393                                        ),
394                                    )
395                                })
396                            })?;
397
398                        Some((message.message, operation, sequence, created))
399                    }
400                    Err(err) => {
401                        if err.kind() == DirectGetErrorKind::NotFound {
402                            None
403                        } else {
404                            return Err(err.into());
405                        }
406                    }
407                }
408            } else {
409                let raw_message = self
410                    .stream
411                    .get_last_raw_message_by_subject(subject.as_str())
412                    .await;
413                match raw_message {
414                    Ok(raw_message) => {
415                        let operation = kv_operation_from_stream_message(&raw_message);
416                        // TODO: unnecessary expensive, cloning whole Message.
417                        let nats_message = Message::try_from(raw_message.clone())
418                            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))?;
419                        Some((
420                            nats_message,
421                            operation,
422                            raw_message.sequence,
423                            raw_message.time,
424                        ))
425                    }
426                    Err(err) => match err.kind() {
427                        crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
428                        crate::jetstream::stream::LastRawMessageErrorKind::Other => {
429                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
430                        }
431                        crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
432                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
433                        }
434                    },
435                }
436            }
437        };
438
439        match result {
440            Some((message, operation, revision, created)) => {
441                if message.status == Some(StatusCode::NO_RESPONDERS) {
442                    return Ok(None);
443                }
444
445                let entry = Entry {
446                    bucket: self.name.clone(),
447                    key,
448                    value: message.payload,
449                    revision,
450                    created,
451                    operation,
452                    delta: 0,
453                };
454                Ok(Some(entry))
455            }
456            // TODO: remember to touch this when Errors are in place.
457            None => Ok(None),
458        }
459    }
460
461    /// Creates a [futures::Stream] over [Entries][Entry]  a given key in the bucket, which yields
462    /// values whenever there are changes for that key.
463    ///
464    /// # Examples
465    ///
466    /// ```no_run
467    /// # #[tokio::main]
468    /// # async fn main() -> Result<(), async_nats::Error> {
469    /// use futures::StreamExt;
470    /// let client = async_nats::connect("demo.nats.io:4222").await?;
471    /// let jetstream = async_nats::jetstream::new(client);
472    /// let kv = jetstream
473    ///     .create_key_value(async_nats::jetstream::kv::Config {
474    ///         bucket: "kv".to_string(),
475    ///         history: 10,
476    ///         ..Default::default()
477    ///     })
478    ///     .await?;
479    /// let mut entries = kv.watch("kv").await?;
480    /// while let Some(entry) = entries.next().await {
481    ///     println!("entry: {:?}", entry);
482    /// }
483    /// # Ok(())
484    /// # }
485    /// ```
486    pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
487        self.watch_with_deliver_policy(key, DeliverPolicy::New)
488            .await
489    }
490
491    /// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, starting from
492    /// provided revision. This is useful to resume watching over big KV buckets without a need to
493    /// replay all the history.
494    ///
495    /// # Examples
496    ///
497    /// ```no_run
498    /// # #[tokio::main]
499    /// # async fn main() -> Result<(), async_nats::Error> {
500    /// use futures::StreamExt;
501    /// let client = async_nats::connect("demo.nats.io:4222").await?;
502    /// let jetstream = async_nats::jetstream::new(client);
503    /// let kv = jetstream
504    ///     .create_key_value(async_nats::jetstream::kv::Config {
505    ///         bucket: "kv".to_string(),
506    ///         history: 10,
507    ///         ..Default::default()
508    ///     })
509    ///     .await?;
510    /// let mut entries = kv.watch_from_revision("kv", 5).await?;
511    /// while let Some(entry) = entries.next().await {
512    ///     println!("entry: {:?}", entry);
513    /// }
514    /// # Ok(())
515    /// # }
516    /// ```
517    pub async fn watch_from_revision<T: AsRef<str>>(
518        &self,
519        key: T,
520        revision: u64,
521    ) -> Result<Watch, WatchError> {
522        self.watch_with_deliver_policy(
523            key,
524            DeliverPolicy::ByStartSequence {
525                start_sequence: revision,
526            },
527        )
528        .await
529    }
530
531    /// Creates a [futures::Stream] over [Entries][Entry]  a given key in the bucket, which yields
532    /// values whenever there are changes for that key with as well as last value.
533    ///
534    /// # Examples
535    ///
536    /// ```no_run
537    /// # #[tokio::main]
538    /// # async fn main() -> Result<(), async_nats::Error> {
539    /// use futures::StreamExt;
540    /// let client = async_nats::connect("demo.nats.io:4222").await?;
541    /// let jetstream = async_nats::jetstream::new(client);
542    /// let kv = jetstream
543    ///     .create_key_value(async_nats::jetstream::kv::Config {
544    ///         bucket: "kv".to_string(),
545    ///         history: 10,
546    ///         ..Default::default()
547    ///     })
548    ///     .await?;
549    /// let mut entries = kv.watch_with_history("kv").await?;
550    /// while let Some(entry) = entries.next().await {
551    ///     println!("entry: {:?}", entry);
552    /// }
553    /// # Ok(())
554    /// # }
555    /// ```
556    pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
557        self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
558            .await
559    }
560
561    async fn watch_with_deliver_policy<T: AsRef<str>>(
562        &self,
563        key: T,
564        deliver_policy: DeliverPolicy,
565    ) -> Result<Watch, WatchError> {
566        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
567
568        debug!("initial consumer creation");
569        let consumer = self
570            .stream
571            .create_consumer(super::consumer::push::OrderedConfig {
572                deliver_subject: self.stream.context.client.new_inbox(),
573                description: Some("kv watch consumer".to_string()),
574                filter_subject: subject,
575                replay_policy: super::consumer::ReplayPolicy::Instant,
576                deliver_policy,
577                ..Default::default()
578            })
579            .await
580            .map_err(|err| match err.kind() {
581                crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
582                    WatchError::new(WatchErrorKind::TimedOut)
583                }
584                _ => WatchError::with_source(WatchErrorKind::Other, err),
585            })?;
586
587        Ok(Watch {
588            subscription: consumer.messages().await.map_err(|err| match err.kind() {
589                crate::jetstream::consumer::StreamErrorKind::TimedOut => {
590                    WatchError::new(WatchErrorKind::TimedOut)
591                }
592                crate::jetstream::consumer::StreamErrorKind::Other => {
593                    WatchError::with_source(WatchErrorKind::Other, err)
594                }
595            })?,
596            prefix: self.prefix.clone(),
597            bucket: self.name.clone(),
598        })
599    }
600
601    /// Creates a [futures::Stream] over [Entries][Entry] for all keys, which yields
602    /// values whenever there are changes in the bucket.
603    ///
604    /// # Examples
605    ///
606    /// ```no_run
607    /// # #[tokio::main]
608    /// # async fn main() -> Result<(), async_nats::Error> {
609    /// use futures::StreamExt;
610    /// let client = async_nats::connect("demo.nats.io:4222").await?;
611    /// let jetstream = async_nats::jetstream::new(client);
612    /// let kv = jetstream
613    ///     .create_key_value(async_nats::jetstream::kv::Config {
614    ///         bucket: "kv".to_string(),
615    ///         history: 10,
616    ///         ..Default::default()
617    ///     })
618    ///     .await?;
619    /// let mut entries = kv.watch_all().await?;
620    /// while let Some(entry) = entries.next().await {
621    ///     println!("entry: {:?}", entry);
622    /// }
623    /// # Ok(())
624    /// # }
625    /// ```
626    pub async fn watch_all(&self) -> Result<Watch, WatchError> {
627        self.watch(ALL_KEYS).await
628    }
629
630    /// Creates a [futures::Stream] over [Entries][Entry] for all keys starting
631    /// from a provider revision. This can be useful when resuming watching over a big bucket
632    /// without the need to replay all the history.
633    ///
634    /// # Examples
635    ///
636    /// ```no_run
637    /// # #[tokio::main]
638    /// # async fn main() -> Result<(), async_nats::Error> {
639    /// use futures::StreamExt;
640    /// let client = async_nats::connect("demo.nats.io:4222").await?;
641    /// let jetstream = async_nats::jetstream::new(client);
642    /// let kv = jetstream
643    ///     .create_key_value(async_nats::jetstream::kv::Config {
644    ///         bucket: "kv".to_string(),
645    ///         history: 10,
646    ///         ..Default::default()
647    ///     })
648    ///     .await?;
649    /// let mut entries = kv.watch_all_from_revision(40).await?;
650    /// while let Some(entry) = entries.next().await {
651    ///     println!("entry: {:?}", entry);
652    /// }
653    /// # Ok(())
654    /// # }
655    /// ```
656    pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
657        self.watch_from_revision(ALL_KEYS, revision).await
658    }
659
660    /// Retrieves the [Entry] for a given key from a bucket.
661    ///
662    /// # Examples
663    ///
664    /// ```no_run
665    /// # #[tokio::main]
666    /// # async fn main() -> Result<(), async_nats::Error> {
667    /// let client = async_nats::connect("demo.nats.io:4222").await?;
668    /// let jetstream = async_nats::jetstream::new(client);
669    /// let kv = jetstream
670    ///     .create_key_value(async_nats::jetstream::kv::Config {
671    ///         bucket: "kv".to_string(),
672    ///         history: 10,
673    ///         ..Default::default()
674    ///     })
675    ///     .await?;
676    /// let value = kv.get("key").await?;
677    /// match value {
678    ///     Some(bytes) => {
679    ///         let value_str = std::str::from_utf8(&bytes)?;
680    ///         println!("Value: {}", value_str);
681    ///     }
682    ///     None => {
683    ///         println!("Key not found or value not set");
684    ///     }
685    /// }
686    /// # Ok(())
687    /// # }
688    /// ```
689    pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
690        match self.entry(key).await {
691            Ok(Some(entry)) => match entry.operation {
692                Operation::Put => Ok(Some(entry.value)),
693                _ => Ok(None),
694            },
695            Ok(None) => Ok(None),
696            Err(err) => Err(err),
697        }
698    }
699
700    /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
701    /// the bucket.
702    ///
703    /// # Examples
704    ///
705    /// ```no_run
706    /// # #[tokio::main]
707    /// # async fn main() -> Result<(), async_nats::Error> {
708    /// use futures::StreamExt;
709    /// let client = async_nats::connect("demo.nats.io:4222").await?;
710    /// let jetstream = async_nats::jetstream::new(client);
711    /// let kv = jetstream
712    ///     .create_key_value(async_nats::jetstream::kv::Config {
713    ///         bucket: "kv".to_string(),
714    ///         history: 10,
715    ///         ..Default::default()
716    ///     })
717    ///     .await?;
718    /// let revision = kv.put("key", "value".into()).await?;
719    /// kv.update("key", "updated".into(), revision).await?;
720    /// # Ok(())
721    /// # }
722    /// ```
723    pub async fn update<T: AsRef<str>>(
724        &self,
725        key: T,
726        value: Bytes,
727        revision: u64,
728    ) -> Result<u64, UpdateError> {
729        if !is_valid_key(key.as_ref()) {
730            return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
731        }
732        let mut subject = String::new();
733        if self.use_jetstream_prefix {
734            subject.push_str(&self.stream.context.prefix);
735            subject.push('.');
736        }
737        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
738        subject.push_str(key.as_ref());
739
740        let mut headers = crate::HeaderMap::default();
741        headers.insert(
742            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
743            HeaderValue::from(revision),
744        );
745
746        self.stream
747            .context
748            .publish_with_headers(subject, headers, value)
749            .await?
750            .await
751            .map_err(|err| err.into())
752            .map(|publish_ack| publish_ack.sequence)
753    }
754
755    /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
756    ///
757    /// # Examples
758    ///
759    /// ```no_run
760    /// # #[tokio::main]
761    /// # async fn main() -> Result<(), async_nats::Error> {
762    /// use futures::StreamExt;
763    /// let client = async_nats::connect("demo.nats.io:4222").await?;
764    /// let jetstream = async_nats::jetstream::new(client);
765    /// let kv = jetstream
766    ///     .create_key_value(async_nats::jetstream::kv::Config {
767    ///         bucket: "kv".to_string(),
768    ///         history: 10,
769    ///         ..Default::default()
770    ///     })
771    ///     .await?;
772    /// kv.put("key", "value".into()).await?;
773    /// kv.delete("key").await?;
774    /// # Ok(())
775    /// # }
776    /// ```
777    pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
778        self.delete_expect_revision(key, None).await
779    }
780
781    /// Deletes a given key if the revision matches. This is a non-destructive operation, which
782    /// sets a `DELETE` marker.
783    ///
784    /// # Examples
785    ///
786    /// ```no_run
787    /// # #[tokio::main]
788    /// # async fn main() -> Result<(), async_nats::Error> {
789    /// use futures::StreamExt;
790    /// let client = async_nats::connect("demo.nats.io:4222").await?;
791    /// let jetstream = async_nats::jetstream::new(client);
792    /// let kv = jetstream
793    ///     .create_key_value(async_nats::jetstream::kv::Config {
794    ///         bucket: "kv".to_string(),
795    ///         history: 10,
796    ///         ..Default::default()
797    ///     })
798    ///     .await?;
799    /// let revision = kv.put("key", "value".into()).await?;
800    /// kv.delete_expect_revision("key", Some(revision)).await?;
801    /// # Ok(())
802    /// # }
803    /// ```
804    pub async fn delete_expect_revision<T: AsRef<str>>(
805        &self,
806        key: T,
807        revison: Option<u64>,
808    ) -> Result<(), DeleteError> {
809        if !is_valid_key(key.as_ref()) {
810            return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
811        }
812        let mut subject = String::new();
813        if self.use_jetstream_prefix {
814            subject.push_str(&self.stream.context.prefix);
815            subject.push('.');
816        }
817        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
818        subject.push_str(key.as_ref());
819
820        let mut headers = crate::HeaderMap::default();
821        // TODO: figure out which headers k/v should be where.
822        headers.insert(
823            KV_OPERATION,
824            KV_OPERATION_DELETE
825                .parse::<HeaderValue>()
826                .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
827        );
828
829        if let Some(revision) = revison {
830            headers.insert(
831                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
832                HeaderValue::from(revision),
833            );
834        }
835
836        self.stream
837            .context
838            .publish_with_headers(subject, headers, "".into())
839            .await?
840            .await?;
841        Ok(())
842    }
843
844    /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
845    ///
846    /// # Examples
847    ///
848    /// ```no_run
849    /// # #[tokio::main]
850    /// # async fn main() -> Result<(), async_nats::Error> {
851    /// use futures::StreamExt;
852    /// let client = async_nats::connect("demo.nats.io:4222").await?;
853    /// let jetstream = async_nats::jetstream::new(client);
854    /// let kv = jetstream
855    ///     .create_key_value(async_nats::jetstream::kv::Config {
856    ///         bucket: "kv".to_string(),
857    ///         history: 10,
858    ///         ..Default::default()
859    ///     })
860    ///     .await?;
861    /// kv.put("key", "value".into()).await?;
862    /// kv.put("key", "another".into()).await?;
863    /// kv.purge("key").await?;
864    /// # Ok(())
865    /// # }
866    /// ```
867    pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
868        self.purge_expect_revision(key, None).await
869    }
870
871    /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
872    /// purge entry in-place.
873    ///
874    /// # Examples
875    ///
876    /// ```no_run
877    /// # #[tokio::main]
878    /// # async fn main() -> Result<(), async_nats::Error> {
879    /// use futures::StreamExt;
880    /// let client = async_nats::connect("demo.nats.io:4222").await?;
881    /// let jetstream = async_nats::jetstream::new(client);
882    /// let kv = jetstream
883    ///     .create_key_value(async_nats::jetstream::kv::Config {
884    ///         bucket: "kv".to_string(),
885    ///         history: 10,
886    ///         ..Default::default()
887    ///     })
888    ///     .await?;
889    /// kv.put("key", "value".into()).await?;
890    /// let revision = kv.put("key", "another".into()).await?;
891    /// kv.purge_expect_revision("key", Some(revision)).await?;
892    /// # Ok(())
893    /// # }
894    /// ```
895    pub async fn purge_expect_revision<T: AsRef<str>>(
896        &self,
897        key: T,
898        revison: Option<u64>,
899    ) -> Result<(), PurgeError> {
900        if !is_valid_key(key.as_ref()) {
901            return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
902        }
903
904        let mut subject = String::new();
905        if self.use_jetstream_prefix {
906            subject.push_str(&self.stream.context.prefix);
907            subject.push('.');
908        }
909        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
910        subject.push_str(key.as_ref());
911
912        let mut headers = crate::HeaderMap::default();
913        headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
914        headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
915
916        if let Some(revision) = revison {
917            headers.insert(
918                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
919                HeaderValue::from(revision),
920            );
921        }
922
923        self.stream
924            .context
925            .publish_with_headers(subject, headers, "".into())
926            .await?
927            .await?;
928        Ok(())
929    }
930
931    /// Returns a [futures::Stream] that allows iterating over all [Operations][Operation] that
932    /// happen for given key.
933    ///
934    /// # Examples
935    ///
936    /// ```no_run
937    /// # #[tokio::main]
938    /// # async fn main() -> Result<(), async_nats::Error> {
939    /// use futures::StreamExt;
940    /// let client = async_nats::connect("demo.nats.io:4222").await?;
941    /// let jetstream = async_nats::jetstream::new(client);
942    /// let kv = jetstream
943    ///     .create_key_value(async_nats::jetstream::kv::Config {
944    ///         bucket: "kv".to_string(),
945    ///         history: 10,
946    ///         ..Default::default()
947    ///     })
948    ///     .await?;
949    /// let mut entries = kv.history("kv").await?;
950    /// while let Some(entry) = entries.next().await {
951    ///     println!("entry: {:?}", entry);
952    /// }
953    /// # Ok(())
954    /// # }
955    /// ```
956    pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
957        if !is_valid_key(key.as_ref()) {
958            return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
959        }
960        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
961
962        let consumer = self
963            .stream
964            .create_consumer(super::consumer::push::OrderedConfig {
965                deliver_subject: self.stream.context.client.new_inbox(),
966                description: Some("kv history consumer".to_string()),
967                filter_subject: subject,
968                replay_policy: super::consumer::ReplayPolicy::Instant,
969                ..Default::default()
970            })
971            .await?;
972
973        Ok(History {
974            subscription: consumer.messages().await?,
975            done: false,
976            prefix: self.prefix.clone(),
977            bucket: self.name.clone(),
978        })
979    }
980
981    /// Returns a [futures::Stream] that allows iterating over all keys in the bucket.
982    ///
983    /// # Examples
984    ///
985    /// Iterating over each each key individually
986    ///
987    /// ```no_run
988    /// # #[tokio::main]
989    /// # async fn main() -> Result<(), async_nats::Error> {
990    /// use futures::{StreamExt, TryStreamExt};
991    /// let client = async_nats::connect("demo.nats.io:4222").await?;
992    /// let jetstream = async_nats::jetstream::new(client);
993    /// let kv = jetstream
994    ///     .create_key_value(async_nats::jetstream::kv::Config {
995    ///         bucket: "kv".to_string(),
996    ///         history: 10,
997    ///         ..Default::default()
998    ///     })
999    ///     .await?;
1000    /// let mut keys = kv.keys().await?.boxed();
1001    /// while let Some(key) = keys.try_next().await? {
1002    ///     println!("key: {:?}", key);
1003    /// }
1004    /// # Ok(())
1005    /// # }
1006    /// ```
1007    ///
1008    /// Collecting it into a vector of keys
1009    ///
1010    /// ```no_run
1011    /// # #[tokio::main]
1012    /// # async fn main() -> Result<(), async_nats::Error> {
1013    /// use futures::TryStreamExt;
1014    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1015    /// let jetstream = async_nats::jetstream::new(client);
1016    /// let kv = jetstream
1017    ///     .create_key_value(async_nats::jetstream::kv::Config {
1018    ///         bucket: "kv".to_string(),
1019    ///         history: 10,
1020    ///         ..Default::default()
1021    ///     })
1022    ///     .await?;
1023    /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1024    /// println!("Keys: {:?}", keys);
1025    /// # Ok(())
1026    /// # }
1027    /// ```
1028    pub async fn keys(&self) -> Result<Keys, HistoryError> {
1029        let subject = format!("{}>", self.prefix.as_str());
1030
1031        let consumer = self
1032            .stream
1033            .create_consumer(super::consumer::push::OrderedConfig {
1034                deliver_subject: self.stream.context.client.new_inbox(),
1035                description: Some("kv history consumer".to_string()),
1036                filter_subject: subject,
1037                headers_only: true,
1038                replay_policy: super::consumer::ReplayPolicy::Instant,
1039                // We only need to know the latest state for each key, not the whole history
1040                deliver_policy: DeliverPolicy::LastPerSubject,
1041                ..Default::default()
1042            })
1043            .await?;
1044
1045        let entries = History {
1046            done: consumer.info.num_pending == 0,
1047            subscription: consumer.messages().await?,
1048            prefix: self.prefix.clone(),
1049            bucket: self.name.clone(),
1050        };
1051
1052        Ok(Keys { inner: entries })
1053    }
1054}
1055
1056/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1057pub struct Watch {
1058    subscription: super::consumer::push::Ordered,
1059    prefix: String,
1060    bucket: String,
1061}
1062
1063impl futures::Stream for Watch {
1064    type Item = Result<Entry, WatcherError>;
1065
1066    fn poll_next(
1067        mut self: std::pin::Pin<&mut Self>,
1068        cx: &mut std::task::Context<'_>,
1069    ) -> std::task::Poll<Option<Self::Item>> {
1070        match self.subscription.poll_next_unpin(cx) {
1071            Poll::Ready(message) => match message {
1072                None => Poll::Ready(None),
1073                Some(message) => {
1074                    let message = message?;
1075                    let info = message.info().map_err(|err| {
1076                        WatcherError::with_source(
1077                            WatcherErrorKind::Other,
1078                            format!("failed to parse message metadata: {}", err),
1079                        )
1080                    })?;
1081
1082                    let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1083
1084                    let key = message
1085                        .subject
1086                        .strip_prefix(&self.prefix)
1087                        .map(|s| s.to_string())
1088                        .unwrap();
1089
1090                    Poll::Ready(Some(Ok(Entry {
1091                        bucket: self.bucket.clone(),
1092                        key,
1093                        value: message.payload.clone(),
1094                        revision: info.stream_sequence,
1095                        created: info.published,
1096                        delta: info.pending,
1097                        operation,
1098                    })))
1099                }
1100            },
1101            std::task::Poll::Pending => Poll::Pending,
1102        }
1103    }
1104
1105    fn size_hint(&self) -> (usize, Option<usize>) {
1106        (0, None)
1107    }
1108}
1109
1110/// A structure representing the history of a key-value bucket, yielding past values.
1111pub struct History {
1112    subscription: super::consumer::push::Ordered,
1113    done: bool,
1114    prefix: String,
1115    bucket: String,
1116}
1117
1118impl futures::Stream for History {
1119    type Item = Result<Entry, WatcherError>;
1120
1121    fn poll_next(
1122        mut self: std::pin::Pin<&mut Self>,
1123        cx: &mut std::task::Context<'_>,
1124    ) -> std::task::Poll<Option<Self::Item>> {
1125        if self.done {
1126            return Poll::Ready(None);
1127        }
1128        match self.subscription.poll_next_unpin(cx) {
1129            Poll::Ready(message) => match message {
1130                None => Poll::Ready(None),
1131                Some(message) => {
1132                    let message = message?;
1133                    let info = message.info().map_err(|err| {
1134                        WatcherError::with_source(
1135                            WatcherErrorKind::Other,
1136                            format!("failed to parse message metadata: {}", err),
1137                        )
1138                    })?;
1139                    if info.pending == 0 {
1140                        self.done = true;
1141                    }
1142
1143                    let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1144
1145                    let key = message
1146                        .subject
1147                        .strip_prefix(&self.prefix)
1148                        .map(|s| s.to_string())
1149                        .unwrap();
1150
1151                    Poll::Ready(Some(Ok(Entry {
1152                        bucket: self.bucket.clone(),
1153                        key,
1154                        value: message.payload.clone(),
1155                        revision: info.stream_sequence,
1156                        created: info.published,
1157                        delta: info.pending,
1158                        operation,
1159                    })))
1160                }
1161            },
1162            std::task::Poll::Pending => Poll::Pending,
1163        }
1164    }
1165
1166    fn size_hint(&self) -> (usize, Option<usize>) {
1167        (0, None)
1168    }
1169}
1170
1171pub struct Keys {
1172    inner: History,
1173}
1174
1175impl futures::Stream for Keys {
1176    type Item = Result<String, WatcherError>;
1177
1178    fn poll_next(
1179        mut self: std::pin::Pin<&mut Self>,
1180        cx: &mut std::task::Context<'_>,
1181    ) -> std::task::Poll<Option<Self::Item>> {
1182        loop {
1183            match self.inner.poll_next_unpin(cx) {
1184                Poll::Ready(None) => return Poll::Ready(None),
1185                Poll::Ready(Some(res)) => match res {
1186                    Ok(entry) => {
1187                        // Skip purged and deleted keys
1188                        if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1189                            // Try to poll again if we skip this one
1190                            continue;
1191                        } else {
1192                            return Poll::Ready(Some(Ok(entry.key)));
1193                        }
1194                    }
1195                    Err(e) => return Poll::Ready(Some(Err(e))),
1196                },
1197                Poll::Pending => return Poll::Pending,
1198            }
1199        }
1200    }
1201}
1202
1203/// An entry in a key-value bucket.
1204#[derive(Debug, Clone)]
1205pub struct Entry {
1206    /// Name of the bucket the entry is in.
1207    pub bucket: String,
1208    /// The key that was retrieved.
1209    pub key: String,
1210    /// The value that was retrieved.
1211    pub value: Bytes,
1212    /// A unique sequence for this value.
1213    pub revision: u64,
1214    /// Distance from the latest value.
1215    pub delta: u64,
1216    /// The time the data was put in the bucket.
1217    pub created: OffsetDateTime,
1218    /// The kind of operation that caused this entry.
1219    pub operation: Operation,
1220}
1221
1222#[derive(Clone, Debug, PartialEq)]
1223pub enum StatusErrorKind {
1224    JetStream(crate::jetstream::Error),
1225    TimedOut,
1226}
1227
1228impl Display for StatusErrorKind {
1229    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1230        match self {
1231            Self::JetStream(err) => write!(f, "jetstream request failed: {}", err),
1232            Self::TimedOut => write!(f, "timed out"),
1233        }
1234    }
1235}
1236
1237pub type StatusError = Error<StatusErrorKind>;
1238
1239#[derive(Clone, Copy, Debug, PartialEq)]
1240pub enum CreateErrorKind {
1241    AlreadyExists,
1242    InvalidKey,
1243    Publish,
1244    Ack,
1245    Other,
1246}
1247
1248impl From<UpdateError> for CreateError {
1249    fn from(error: UpdateError) -> Self {
1250        match error.kind() {
1251            UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1252            UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1253            UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
1254        }
1255    }
1256}
1257
1258impl From<PutError> for CreateError {
1259    fn from(error: PutError) -> Self {
1260        match error.kind() {
1261            PutErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1262            PutErrorKind::Publish => Error::from(CreateErrorKind::Publish),
1263            PutErrorKind::Ack => Error::from(CreateErrorKind::Ack),
1264        }
1265    }
1266}
1267
1268impl From<EntryError> for CreateError {
1269    fn from(error: EntryError) -> Self {
1270        match error.kind() {
1271            EntryErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1272            EntryErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1273            EntryErrorKind::Other => Error::from(CreateErrorKind::Other),
1274        }
1275    }
1276}
1277
1278impl Display for CreateErrorKind {
1279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1280        match self {
1281            Self::AlreadyExists => write!(f, "key already exists"),
1282            Self::Publish => write!(f, "failed to create key in store"),
1283            Self::Ack => write!(f, "ack error"),
1284            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1285            Self::Other => write!(f, "other error"),
1286        }
1287    }
1288}
1289
1290pub type CreateError = Error<CreateErrorKind>;
1291
1292#[derive(Clone, Copy, Debug, PartialEq)]
1293pub enum PutErrorKind {
1294    InvalidKey,
1295    Publish,
1296    Ack,
1297}
1298
1299impl Display for PutErrorKind {
1300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1301        match self {
1302            Self::Publish => write!(f, "failed to put key into store"),
1303            Self::Ack => write!(f, "ack error"),
1304            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1305        }
1306    }
1307}
1308
1309pub type PutError = Error<PutErrorKind>;
1310
1311#[derive(Clone, Copy, Debug, PartialEq)]
1312pub enum EntryErrorKind {
1313    InvalidKey,
1314    TimedOut,
1315    Other,
1316}
1317
1318impl Display for EntryErrorKind {
1319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1320        match self {
1321            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1322            Self::TimedOut => write!(f, "timed out"),
1323            Self::Other => write!(f, "failed getting entry"),
1324        }
1325    }
1326}
1327
1328pub type EntryError = Error<EntryErrorKind>;
1329
1330crate::from_with_timeout!(
1331    EntryError,
1332    EntryErrorKind,
1333    DirectGetError,
1334    DirectGetErrorKind
1335);
1336
1337#[derive(Clone, Copy, Debug, PartialEq)]
1338pub enum WatchErrorKind {
1339    InvalidKey,
1340    TimedOut,
1341    ConsumerCreate,
1342    Other,
1343}
1344
1345impl Display for WatchErrorKind {
1346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1347        match self {
1348            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1349            Self::Other => write!(f, "watch failed"),
1350            Self::TimedOut => write!(f, "timed out"),
1351            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1352        }
1353    }
1354}
1355
1356pub type WatchError = Error<WatchErrorKind>;
1357
1358crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1359crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1360
1361#[derive(Clone, Copy, Debug, PartialEq)]
1362pub enum UpdateErrorKind {
1363    InvalidKey,
1364    TimedOut,
1365    Other,
1366}
1367
1368impl Display for UpdateErrorKind {
1369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1370        match self {
1371            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1372            Self::TimedOut => write!(f, "timed out"),
1373            Self::Other => write!(f, "failed getting entry"),
1374        }
1375    }
1376}
1377
1378pub type UpdateError = Error<UpdateErrorKind>;
1379
1380crate::from_with_timeout!(UpdateError, UpdateErrorKind, PublishError, PublishErrorKind);
1381
1382#[derive(Clone, Copy, Debug, PartialEq)]
1383pub enum WatcherErrorKind {
1384    Consumer,
1385    Other,
1386}
1387
1388impl Display for WatcherErrorKind {
1389    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1390        match self {
1391            Self::Consumer => write!(f, "watcher consumer error"),
1392            Self::Other => write!(f, "watcher error"),
1393        }
1394    }
1395}
1396
1397pub type WatcherError = Error<WatcherErrorKind>;
1398
1399impl From<OrderedError> for WatcherError {
1400    fn from(err: OrderedError) -> Self {
1401        WatcherError::with_source(WatcherErrorKind::Consumer, err)
1402    }
1403}
1404
1405pub type DeleteError = UpdateError;
1406pub type DeleteErrorKind = UpdateErrorKind;
1407
1408pub type PurgeError = UpdateError;
1409pub type PurgeErrorKind = UpdateErrorKind;
1410
1411pub type HistoryError = WatchError;
1412pub type HistoryErrorKind = WatchErrorKind;