async_nats/jetstream/kv/mod.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19 fmt::{self, Display},
20 str::FromStr,
21 task::Poll,
22 time::Duration,
23};
24
25use crate::HeaderValue;
26use bytes::Bytes;
27use futures::StreamExt;
28use once_cell::sync::Lazy;
29use regex::Regex;
30use time::OffsetDateTime;
31use tracing::debug;
32
33use crate::error::Error;
34use crate::header;
35
36use self::bucket::Status;
37
38use super::{
39 consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
40 context::{PublishError, PublishErrorKind},
41 message::StreamMessage,
42 stream::{
43 self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
44 Source, StorageType, Stream,
45 },
46};
47
48fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
49 if let Some(op) = message.headers.get(KV_OPERATION) {
50 Operation::from_str(op.as_str())
51 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
52 } else if let Some(reason) = message.headers.get(header::NATS_MARKER_REASON) {
53 match reason.as_str() {
54 "MaxAge" | "Purge" => Ok(Operation::Purge),
55 "Remove" => Ok(Operation::Delete),
56 _ => Err(EntryError::with_source(
57 EntryErrorKind::Other,
58 "invalid marker reason",
59 )),
60 }
61 } else {
62 Err(EntryError::with_source(
63 EntryErrorKind::Other,
64 "missing operation",
65 ))
66 }
67}
68fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
69 let headers = match message.headers.as_ref() {
70 Some(headers) => headers,
71 None => return Ok(Operation::Put),
72 };
73 if let Some(op) = headers.get(KV_OPERATION) {
74 Operation::from_str(op.as_str())
75 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
76 } else {
77 Ok(Operation::Put)
78 }
79}
80
81static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
82static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
83
84pub(crate) const MAX_HISTORY: i64 = 64;
85const ALL_KEYS: &str = ">";
86
87const KV_OPERATION: &str = "KV-Operation";
88const KV_OPERATION_DELETE: &str = "DEL";
89const KV_OPERATION_PURGE: &str = "PURGE";
90const KV_OPERATION_PUT: &str = "PUT";
91
92const NATS_ROLLUP: &str = "Nats-Rollup";
93const ROLLUP_SUBJECT: &str = "sub";
94
95pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
96 VALID_BUCKET_RE.is_match(bucket_name)
97}
98
99pub(crate) fn is_valid_key(key: &str) -> bool {
100 if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
101 return false;
102 }
103
104 VALID_KEY_RE.is_match(key)
105}
106
107/// Configuration values for key value stores.
108#[derive(Debug, Clone, Default)]
109pub struct Config {
110 /// Name of the bucket
111 pub bucket: String,
112 /// Human readable description.
113 pub description: String,
114 /// Maximum size of a single value.
115 pub max_value_size: i32,
116 /// Maximum historical entries.
117 pub history: i64,
118 /// Maximum age of any entry in the bucket, expressed in nanoseconds
119 pub max_age: std::time::Duration,
120 /// How large the bucket may become in total bytes before the configured discard policy kicks in
121 pub max_bytes: i64,
122 /// The type of storage backend, `File` (default) and `Memory`
123 pub storage: StorageType,
124 /// How many replicas to keep for each entry in a cluster.
125 pub num_replicas: usize,
126 /// Republish is for republishing messages once persistent in the Key Value Bucket.
127 pub republish: Option<Republish>,
128 /// Bucket mirror configuration.
129 pub mirror: Option<Source>,
130 /// Bucket sources configuration.
131 pub sources: Option<Vec<Source>>,
132 /// Allow mirrors using direct API.
133 pub mirror_direct: bool,
134 /// Compression
135 #[cfg(feature = "server_2_10")]
136 pub compression: bool,
137 /// Cluster and tag placement for the bucket.
138 pub placement: Option<stream::Placement>,
139 /// Enables per-message TTL and delete marker TTL for a bucket.
140 #[cfg(feature = "server_2_11")]
141 pub limit_markers: Option<Duration>,
142}
143
144/// Describes what kind of operation and entry represents
145#[derive(Debug, Clone, Copy, Eq, PartialEq)]
146pub enum Operation {
147 /// A value was put into the bucket
148 Put,
149 /// A value was deleted from a bucket
150 Delete,
151 /// A value was purged from a bucket
152 Purge,
153}
154
155impl FromStr for Operation {
156 type Err = ParseOperationError;
157
158 fn from_str(s: &str) -> Result<Self, Self::Err> {
159 match s {
160 KV_OPERATION_DELETE => Ok(Operation::Delete),
161 KV_OPERATION_PURGE => Ok(Operation::Purge),
162 KV_OPERATION_PUT => Ok(Operation::Put),
163 _ => Err(ParseOperationError),
164 }
165 }
166}
167
168#[derive(Debug, Clone)]
169pub struct ParseOperationError;
170
171impl fmt::Display for ParseOperationError {
172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
174 }
175}
176
177impl std::error::Error for ParseOperationError {}
178
179/// A struct used as a handle for the bucket.
180#[derive(Debug, Clone)]
181pub struct Store {
182 /// The name of the Store.
183 pub name: String,
184 /// The name of the stream associated with the Store.
185 pub stream_name: String,
186 /// The prefix for keys in the Store.
187 pub prefix: String,
188 /// The optional prefix to use when putting new key-value pairs.
189 pub put_prefix: Option<String>,
190 /// Indicates whether to use the JetStream prefix.
191 pub use_jetstream_prefix: bool,
192 /// The stream associated with the Store.
193 pub stream: Stream,
194}
195
196impl Store {
197 /// Queries the server and returns status from the server.
198 ///
199 /// # Examples
200 ///
201 /// ```no_run
202 /// # #[tokio::main]
203 /// # async fn main() -> Result<(), async_nats::Error> {
204 /// let client = async_nats::connect("demo.nats.io:4222").await?;
205 /// let jetstream = async_nats::jetstream::new(client);
206 /// let kv = jetstream
207 /// .create_key_value(async_nats::jetstream::kv::Config {
208 /// bucket: "kv".to_string(),
209 /// history: 10,
210 /// ..Default::default()
211 /// })
212 /// .await?;
213 /// let status = kv.status().await?;
214 /// println!("status: {:?}", status);
215 /// # Ok(())
216 /// # }
217 /// ```
218 pub async fn status(&self) -> Result<Status, StatusError> {
219 // TODO: should we poll for fresh info here? probably yes.
220 let info = self.stream.info.clone();
221
222 Ok(Status {
223 info,
224 bucket: self.name.to_string(),
225 })
226 }
227
228 /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
229 ///
230 /// # Examples
231 ///
232 /// ```no_run
233 /// # #[tokio::main]
234 /// # async fn main() -> Result<(), async_nats::Error> {
235 /// let client = async_nats::connect("demo.nats.io:4222").await?;
236 /// let jetstream = async_nats::jetstream::new(client);
237 /// let kv = jetstream
238 /// .create_key_value(async_nats::jetstream::kv::Config {
239 /// bucket: "kv".to_string(),
240 /// history: 10,
241 /// ..Default::default()
242 /// })
243 /// .await?;
244 ///
245 /// let status = kv.create("key", "value".into()).await;
246 /// assert!(status.is_ok());
247 ///
248 /// let status = kv.create("key", "value".into()).await;
249 /// assert!(status.is_err());
250 ///
251 /// # Ok(())
252 /// # }
253 /// ```
254 pub async fn create<T: AsRef<str>>(
255 &self,
256 key: T,
257 value: bytes::Bytes,
258 ) -> Result<u64, CreateError> {
259 self.create_maybe_ttl(key, value, None).await
260 }
261
262 /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
263 /// It will set a TTL specific for that key.
264 ///
265 /// # Examples
266 ///
267 /// ```no_run
268 /// # #[tokio::main]
269 /// # async fn main() -> Result<(), async_nats::Error> {
270 /// use std::time::Duration;
271 /// let client = async_nats::connect("demo.nats.io:4222").await?;
272 /// let jetstream = async_nats::jetstream::new(client);
273 /// let kv = jetstream
274 /// .create_key_value(async_nats::jetstream::kv::Config {
275 /// bucket: "kv".to_string(),
276 /// history: 10,
277 /// ..Default::default()
278 /// })
279 /// .await?;
280 ///
281 /// let status = kv
282 /// .create_with_ttl("key", "value".into(), Duration::from_secs(10))
283 /// .await;
284 /// assert!(status.is_ok());
285 ///
286 /// # Ok(())
287 /// # }
288 /// ```
289 pub async fn create_with_ttl<T: AsRef<str>>(
290 &self,
291 key: T,
292 value: bytes::Bytes,
293 ttl: Duration,
294 ) -> Result<u64, CreateError> {
295 self.create_maybe_ttl(key, value, Some(ttl)).await
296 }
297
298 async fn create_maybe_ttl<T: AsRef<str>>(
299 &self,
300 key: T,
301 value: bytes::Bytes,
302 ttl: Option<Duration>,
303 ) -> Result<u64, CreateError> {
304 let update_err = match self
305 .update_maybe_ttl(key.as_ref(), value.clone(), 0, ttl)
306 .await
307 {
308 Ok(revision) => return Ok(revision),
309 Err(err) => err,
310 };
311
312 match self.entry(key.as_ref()).await? {
313 // Deleted or Purged key, we can create it again.
314 Some(Entry {
315 operation: Operation::Delete | Operation::Purge,
316 revision,
317 ..
318 }) => {
319 let revision = self.update(key, value, revision).await?;
320 Ok(revision)
321 }
322
323 // key already exists.
324 Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
325
326 // Something went wrong with the initial update, return that error
327 None => Err(update_err.into()),
328 }
329 }
330
331 /// Puts new key value pair into the bucket.
332 /// If key didn't exist, it is created. If it did exist, a new value with a new version is
333 /// added.
334 ///
335 /// # Examples
336 ///
337 /// ```no_run
338 /// # #[tokio::main]
339 /// # async fn main() -> Result<(), async_nats::Error> {
340 /// let client = async_nats::connect("demo.nats.io:4222").await?;
341 /// let jetstream = async_nats::jetstream::new(client);
342 /// let kv = jetstream
343 /// .create_key_value(async_nats::jetstream::kv::Config {
344 /// bucket: "kv".to_string(),
345 /// history: 10,
346 /// ..Default::default()
347 /// })
348 /// .await?;
349 /// let status = kv.put("key", "value".into()).await?;
350 /// # Ok(())
351 /// # }
352 /// ```
353 pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
354 if !is_valid_key(key.as_ref()) {
355 return Err(PutError::new(PutErrorKind::InvalidKey));
356 }
357 let mut subject = String::new();
358 if self.use_jetstream_prefix {
359 subject.push_str(&self.stream.context.prefix);
360 subject.push('.');
361 }
362 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
363 subject.push_str(key.as_ref());
364
365 let publish_ack = self
366 .stream
367 .context
368 .publish(subject, value)
369 .await
370 .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
371 let ack = publish_ack
372 .await
373 .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
374
375 Ok(ack.sequence)
376 }
377
378 async fn entry_maybe_revision<T: Into<String>>(
379 &self,
380 key: T,
381 revision: Option<u64>,
382 ) -> Result<Option<Entry>, EntryError> {
383 let key: String = key.into();
384 if !is_valid_key(key.as_ref()) {
385 return Err(EntryError::new(EntryErrorKind::InvalidKey));
386 }
387
388 let subject = format!("{}{}", self.prefix.as_str(), &key);
389
390 let result: Option<(StreamMessage, Operation)> = {
391 if self.stream.info.config.allow_direct {
392 let message = match revision {
393 Some(revision) => {
394 let message = self.stream.direct_get(revision).await;
395 if let Ok(message) = message.as_ref() {
396 if message.subject.as_str() != subject {
397 println!("subject mismatch {}", message.subject);
398 return Ok(None);
399 }
400 }
401 message
402 }
403 None => {
404 self.stream
405 .direct_get_last_for_subject(subject.as_str())
406 .await
407 }
408 };
409
410 match message {
411 Ok(message) => {
412 let operation =
413 kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);
414
415 Some((message, operation))
416 }
417 Err(err) => {
418 if err.kind() == DirectGetErrorKind::NotFound {
419 None
420 } else {
421 return Err(err.into());
422 }
423 }
424 }
425 } else {
426 let raw_message = match revision {
427 Some(revision) => {
428 let message = self.stream.get_raw_message(revision).await;
429 if let Ok(message) = message.as_ref() {
430 if message.subject.as_str() != subject {
431 return Ok(None);
432 }
433 }
434 message
435 }
436 None => {
437 self.stream
438 .get_last_raw_message_by_subject(subject.as_str())
439 .await
440 }
441 };
442 match raw_message {
443 Ok(raw_message) => {
444 let operation = kv_operation_from_stream_message(&raw_message)
445 .unwrap_or(Operation::Put);
446 // TODO: unnecessary expensive, cloning whole Message.
447 Some((raw_message, operation))
448 }
449 Err(err) => match err.kind() {
450 crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
451 crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
452 return Err(EntryError::new(EntryErrorKind::InvalidKey))
453 }
454 crate::jetstream::stream::LastRawMessageErrorKind::Other => {
455 return Err(EntryError::with_source(EntryErrorKind::Other, err))
456 }
457 crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
458 return Err(EntryError::with_source(EntryErrorKind::Other, err))
459 }
460 },
461 }
462 }
463 };
464
465 match result {
466 Some((message, operation)) => {
467 let entry = Entry {
468 bucket: self.name.clone(),
469 key,
470 value: message.payload,
471 revision: message.sequence,
472 created: message.time,
473 operation,
474 delta: 0,
475 seen_current: false,
476 };
477 Ok(Some(entry))
478 }
479 // TODO: remember to touch this when Errors are in place.
480 None => Ok(None),
481 }
482 }
483
484 /// Retrieves the last [Entry] for a given key from a bucket.
485 ///
486 /// # Examples
487 ///
488 /// ```no_run
489 /// # #[tokio::main]
490 /// # async fn main() -> Result<(), async_nats::Error> {
491 /// let client = async_nats::connect("demo.nats.io:4222").await?;
492 /// let jetstream = async_nats::jetstream::new(client);
493 /// let kv = jetstream
494 /// .create_key_value(async_nats::jetstream::kv::Config {
495 /// bucket: "kv".to_string(),
496 /// history: 10,
497 /// ..Default::default()
498 /// })
499 /// .await?;
500 /// let status = kv.put("key", "value".into()).await?;
501 /// let entry = kv.entry("key").await?;
502 /// println!("entry: {:?}", entry);
503 /// # Ok(())
504 /// # }
505 /// ```
506 pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
507 self.entry_maybe_revision(key, None).await
508 }
509
510 /// Retrieves the [Entry] for a given key revision from a bucket.
511 ///
512 /// # Examples
513 ///
514 /// ```no_run
515 /// # #[tokio::main]
516 /// # async fn main() -> Result<(), async_nats::Error> {
517 /// let client = async_nats::connect("demo.nats.io:4222").await?;
518 /// let jetstream = async_nats::jetstream::new(client);
519 /// let kv = jetstream
520 /// .create_key_value(async_nats::jetstream::kv::Config {
521 /// bucket: "kv".to_string(),
522 /// history: 10,
523 /// ..Default::default()
524 /// })
525 /// .await?;
526 /// let status = kv.put("key", "value".into()).await?;
527 /// let status = kv.put("key", "value2".into()).await?;
528 /// let entry = kv.entry_for_revision("key", 2).await?;
529 /// println!("entry: {:?}", entry);
530 /// # Ok(())
531 /// # }
532 /// ```
533 pub async fn entry_for_revision<T: Into<String>>(
534 &self,
535 key: T,
536 revision: u64,
537 ) -> Result<Option<Entry>, EntryError> {
538 self.entry_maybe_revision(key, Some(revision)).await
539 }
540
541 /// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, which yields
542 /// values whenever there are changes for that key.
543 ///
544 /// # Examples
545 ///
546 /// ```no_run
547 /// # #[tokio::main]
548 /// # async fn main() -> Result<(), async_nats::Error> {
549 /// use futures::StreamExt;
550 /// let client = async_nats::connect("demo.nats.io:4222").await?;
551 /// let jetstream = async_nats::jetstream::new(client);
552 /// let kv = jetstream
553 /// .create_key_value(async_nats::jetstream::kv::Config {
554 /// bucket: "kv".to_string(),
555 /// history: 10,
556 /// ..Default::default()
557 /// })
558 /// .await?;
559 /// let mut entries = kv.watch("kv").await?;
560 /// while let Some(entry) = entries.next().await {
561 /// println!("entry: {:?}", entry);
562 /// }
563 /// # Ok(())
564 /// # }
565 /// ```
566 pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
567 self.watch_with_deliver_policy(key, DeliverPolicy::New)
568 .await
569 }
570
571 /// Creates a [futures::Stream] over [Entries][Entry] in the bucket, which yields
572 /// values whenever there are changes for given keys.
573 ///
574 /// # Examples
575 ///
576 /// ```no_run
577 /// # #[tokio::main]
578 /// # async fn main() -> Result<(), async_nats::Error> {
579 /// use futures::StreamExt;
580 /// let client = async_nats::connect("demo.nats.io:4222").await?;
581 /// let jetstream = async_nats::jetstream::new(client);
582 /// let kv = jetstream
583 /// .create_key_value(async_nats::jetstream::kv::Config {
584 /// bucket: "kv".to_string(),
585 /// history: 10,
586 /// ..Default::default()
587 /// })
588 /// .await?;
589 /// let mut entries = kv.watch_many(["foo", "bar"]).await?;
590 /// while let Some(entry) = entries.next().await {
591 /// println!("entry: {:?}", entry);
592 /// }
593 /// # Ok(())
594 /// # }
595 /// ```
596 #[cfg(feature = "server_2_10")]
597 pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
598 where
599 T: AsRef<str>,
600 K: IntoIterator<Item = T>,
601 {
602 self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
603 .await
604 }
605
606 /// Creates a [futures::Stream] over [Entries][Entry] for a given key in the bucket, starting from
607 /// provided revision. This is useful to resume watching over big KV buckets without a need to
608 /// replay all the history.
609 ///
610 /// # Examples
611 ///
612 /// ```no_run
613 /// # #[tokio::main]
614 /// # async fn main() -> Result<(), async_nats::Error> {
615 /// use futures::StreamExt;
616 /// let client = async_nats::connect("demo.nats.io:4222").await?;
617 /// let jetstream = async_nats::jetstream::new(client);
618 /// let kv = jetstream
619 /// .create_key_value(async_nats::jetstream::kv::Config {
620 /// bucket: "kv".to_string(),
621 /// history: 10,
622 /// ..Default::default()
623 /// })
624 /// .await?;
625 /// let mut entries = kv.watch_from_revision("kv", 5).await?;
626 /// while let Some(entry) = entries.next().await {
627 /// println!("entry: {:?}", entry);
628 /// }
629 /// # Ok(())
630 /// # }
631 /// ```
632 pub async fn watch_from_revision<T: AsRef<str>>(
633 &self,
634 key: T,
635 revision: u64,
636 ) -> Result<Watch, WatchError> {
637 self.watch_with_deliver_policy(
638 key,
639 DeliverPolicy::ByStartSequence {
640 start_sequence: revision,
641 },
642 )
643 .await
644 }
645
646 /// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, which yields
647 /// values whenever there are changes for that key with as well as last value.
648 ///
649 /// # Examples
650 ///
651 /// ```no_run
652 /// # #[tokio::main]
653 /// # async fn main() -> Result<(), async_nats::Error> {
654 /// use futures::StreamExt;
655 /// let client = async_nats::connect("demo.nats.io:4222").await?;
656 /// let jetstream = async_nats::jetstream::new(client);
657 /// let kv = jetstream
658 /// .create_key_value(async_nats::jetstream::kv::Config {
659 /// bucket: "kv".to_string(),
660 /// history: 10,
661 /// ..Default::default()
662 /// })
663 /// .await?;
664 /// let mut entries = kv.watch_with_history("kv").await?;
665 /// while let Some(entry) = entries.next().await {
666 /// println!("entry: {:?}", entry);
667 /// }
668 /// # Ok(())
669 /// # }
670 /// ```
671 pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
672 self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
673 .await
674 }
675
676 /// Creates a [futures::Stream] over [Entries][Entry] a given keys in the bucket, which yields
677 /// values whenever there are changes for those keys with as well as last value.
678 /// This requires server version > 2.10 as it uses consumers with multiple subject filters.
679 ///
680 /// # Examples
681 ///
682 /// ```no_run
683 /// # #[tokio::main]
684 /// # async fn main() -> Result<(), async_nats::Error> {
685 /// use futures::StreamExt;
686 /// let client = async_nats::connect("demo.nats.io:4222").await?;
687 /// let jetstream = async_nats::jetstream::new(client);
688 /// let kv = jetstream
689 /// .create_key_value(async_nats::jetstream::kv::Config {
690 /// bucket: "kv".to_string(),
691 /// history: 10,
692 /// ..Default::default()
693 /// })
694 /// .await?;
695 /// let mut entries = kv.watch_many_with_history(["key1", "key2"]).await?;
696 /// while let Some(entry) = entries.next().await {
697 /// println!("entry: {:?}", entry);
698 /// }
699 /// # Ok(())
700 /// # }
701 /// ```
702 #[cfg(feature = "server_2_10")]
703 pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
704 &self,
705 keys: K,
706 ) -> Result<Watch, WatchError> {
707 self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
708 .await
709 }
710
711 #[cfg(feature = "server_2_10")]
712 async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
713 &self,
714 keys: K,
715 deliver_policy: DeliverPolicy,
716 ) -> Result<Watch, WatchError> {
717 let subjects = keys
718 .into_iter()
719 .map(|key| {
720 let key = key.as_ref();
721 format!("{}{}", self.prefix.as_str(), key)
722 })
723 .collect::<Vec<_>>();
724
725 debug!("initial consumer creation");
726 let consumer = self
727 .stream
728 .create_consumer(super::consumer::push::OrderedConfig {
729 deliver_subject: self.stream.context.client.new_inbox(),
730 description: Some("kv watch consumer".to_string()),
731 filter_subjects: subjects,
732 replay_policy: super::consumer::ReplayPolicy::Instant,
733 deliver_policy,
734 ..Default::default()
735 })
736 .await
737 .map_err(|err| match err.kind() {
738 crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
739 WatchError::new(WatchErrorKind::TimedOut)
740 }
741 _ => WatchError::with_source(WatchErrorKind::Other, err),
742 })?;
743
744 let seen_current = consumer.cached_info().num_pending == 0;
745
746 Ok(Watch {
747 subscription: consumer.messages().await.map_err(|err| match err.kind() {
748 crate::jetstream::consumer::StreamErrorKind::TimedOut => {
749 WatchError::new(WatchErrorKind::TimedOut)
750 }
751 crate::jetstream::consumer::StreamErrorKind::Other => {
752 WatchError::with_source(WatchErrorKind::Other, err)
753 }
754 })?,
755 prefix: self.prefix.clone(),
756 bucket: self.name.clone(),
757 seen_current,
758 })
759 }
760
761 async fn watch_with_deliver_policy<T: AsRef<str>>(
762 &self,
763 key: T,
764 deliver_policy: DeliverPolicy,
765 ) -> Result<Watch, WatchError> {
766 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
767
768 debug!("initial consumer creation");
769 let consumer = self
770 .stream
771 .create_consumer(super::consumer::push::OrderedConfig {
772 deliver_subject: self.stream.context.client.new_inbox(),
773 description: Some("kv watch consumer".to_string()),
774 filter_subject: subject,
775 replay_policy: super::consumer::ReplayPolicy::Instant,
776 deliver_policy,
777 ..Default::default()
778 })
779 .await
780 .map_err(|err| match err.kind() {
781 crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
782 WatchError::new(WatchErrorKind::TimedOut)
783 }
784 _ => WatchError::with_source(WatchErrorKind::Other, err),
785 })?;
786
787 let seen_current = consumer.cached_info().num_pending == 0;
788
789 Ok(Watch {
790 subscription: consumer.messages().await.map_err(|err| match err.kind() {
791 crate::jetstream::consumer::StreamErrorKind::TimedOut => {
792 WatchError::new(WatchErrorKind::TimedOut)
793 }
794 crate::jetstream::consumer::StreamErrorKind::Other => {
795 WatchError::with_source(WatchErrorKind::Other, err)
796 }
797 })?,
798 prefix: self.prefix.clone(),
799 bucket: self.name.clone(),
800 seen_current,
801 })
802 }
803
804 /// Creates a [futures::Stream] over [Entries][Entry] for all keys, which yields
805 /// values whenever there are changes in the bucket.
806 ///
807 /// # Examples
808 ///
809 /// ```no_run
810 /// # #[tokio::main]
811 /// # async fn main() -> Result<(), async_nats::Error> {
812 /// use futures::StreamExt;
813 /// let client = async_nats::connect("demo.nats.io:4222").await?;
814 /// let jetstream = async_nats::jetstream::new(client);
815 /// let kv = jetstream
816 /// .create_key_value(async_nats::jetstream::kv::Config {
817 /// bucket: "kv".to_string(),
818 /// history: 10,
819 /// ..Default::default()
820 /// })
821 /// .await?;
822 /// let mut entries = kv.watch_all().await?;
823 /// while let Some(entry) = entries.next().await {
824 /// println!("entry: {:?}", entry);
825 /// }
826 /// # Ok(())
827 /// # }
828 /// ```
829 pub async fn watch_all(&self) -> Result<Watch, WatchError> {
830 self.watch(ALL_KEYS).await
831 }
832
833 /// Creates a [futures::Stream] over [Entries][Entry] for all keys starting
834 /// from a provider revision. This can be useful when resuming watching over a big bucket
835 /// without the need to replay all the history.
836 ///
837 /// # Examples
838 ///
839 /// ```no_run
840 /// # #[tokio::main]
841 /// # async fn main() -> Result<(), async_nats::Error> {
842 /// use futures::StreamExt;
843 /// let client = async_nats::connect("demo.nats.io:4222").await?;
844 /// let jetstream = async_nats::jetstream::new(client);
845 /// let kv = jetstream
846 /// .create_key_value(async_nats::jetstream::kv::Config {
847 /// bucket: "kv".to_string(),
848 /// history: 10,
849 /// ..Default::default()
850 /// })
851 /// .await?;
852 /// let mut entries = kv.watch_all_from_revision(40).await?;
853 /// while let Some(entry) = entries.next().await {
854 /// println!("entry: {:?}", entry);
855 /// }
856 /// # Ok(())
857 /// # }
858 /// ```
859 pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
860 self.watch_from_revision(ALL_KEYS, revision).await
861 }
862
863 /// Retrieves the [Entry] for a given key from a bucket.
864 ///
865 /// # Examples
866 ///
867 /// ```no_run
868 /// # #[tokio::main]
869 /// # async fn main() -> Result<(), async_nats::Error> {
870 /// let client = async_nats::connect("demo.nats.io:4222").await?;
871 /// let jetstream = async_nats::jetstream::new(client);
872 /// let kv = jetstream
873 /// .create_key_value(async_nats::jetstream::kv::Config {
874 /// bucket: "kv".to_string(),
875 /// history: 10,
876 /// ..Default::default()
877 /// })
878 /// .await?;
879 /// let value = kv.get("key").await?;
880 /// match value {
881 /// Some(bytes) => {
882 /// let value_str = std::str::from_utf8(&bytes)?;
883 /// println!("Value: {}", value_str);
884 /// }
885 /// None => {
886 /// println!("Key not found or value not set");
887 /// }
888 /// }
889 /// # Ok(())
890 /// # }
891 /// ```
892 pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
893 match self.entry(key).await {
894 Ok(Some(entry)) => match entry.operation {
895 Operation::Put => Ok(Some(entry.value)),
896 _ => Ok(None),
897 },
898 Ok(None) => Ok(None),
899 Err(err) => Err(err),
900 }
901 }
902
903 /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
904 /// the bucket.
905 ///
906 /// # Examples
907 ///
908 /// ```no_run
909 /// # #[tokio::main]
910 /// # async fn main() -> Result<(), async_nats::Error> {
911 /// use futures::StreamExt;
912 /// let client = async_nats::connect("demo.nats.io:4222").await?;
913 /// let jetstream = async_nats::jetstream::new(client);
914 /// let kv = jetstream
915 /// .create_key_value(async_nats::jetstream::kv::Config {
916 /// bucket: "kv".to_string(),
917 /// history: 10,
918 /// ..Default::default()
919 /// })
920 /// .await?;
921 /// let revision = kv.put("key", "value".into()).await?;
922 /// kv.update("key", "updated".into(), revision).await?;
923 /// # Ok(())
924 /// # }
925 /// ```
926 pub async fn update<T: AsRef<str>>(
927 &self,
928 key: T,
929 value: Bytes,
930 revision: u64,
931 ) -> Result<u64, UpdateError> {
932 self.update_maybe_ttl(key, value, revision, None).await
933 }
934
935 async fn update_maybe_ttl<T: AsRef<str>>(
936 &self,
937 key: T,
938 value: Bytes,
939 revision: u64,
940 ttl: Option<Duration>,
941 ) -> Result<u64, UpdateError> {
942 if !is_valid_key(key.as_ref()) {
943 return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
944 }
945 let mut subject = String::new();
946 if self.use_jetstream_prefix {
947 subject.push_str(&self.stream.context.prefix);
948 subject.push('.');
949 }
950 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
951 subject.push_str(key.as_ref());
952
953 let mut headers = crate::HeaderMap::default();
954 headers.insert(
955 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
956 HeaderValue::from(revision),
957 );
958
959 if let Some(ttl) = ttl {
960 headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
961 }
962
963 self.stream
964 .context
965 .publish_with_headers(subject, headers, value)
966 .await?
967 .await
968 .map_err(|err| err.into())
969 .map(|publish_ack| publish_ack.sequence)
970 }
971
972 /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
973 ///
974 /// # Examples
975 ///
976 /// ```no_run
977 /// # #[tokio::main]
978 /// # async fn main() -> Result<(), async_nats::Error> {
979 /// use futures::StreamExt;
980 /// let client = async_nats::connect("demo.nats.io:4222").await?;
981 /// let jetstream = async_nats::jetstream::new(client);
982 /// let kv = jetstream
983 /// .create_key_value(async_nats::jetstream::kv::Config {
984 /// bucket: "kv".to_string(),
985 /// history: 10,
986 /// ..Default::default()
987 /// })
988 /// .await?;
989 /// kv.put("key", "value".into()).await?;
990 /// kv.delete("key").await?;
991 /// # Ok(())
992 /// # }
993 /// ```
994 pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
995 self.delete_expect_revision(key, None).await
996 }
997
998 /// Deletes a given key if the revision matches. This is a non-destructive operation, which
999 /// sets a `DELETE` marker.
1000 ///
1001 /// # Examples
1002 ///
1003 /// ```no_run
1004 /// # #[tokio::main]
1005 /// # async fn main() -> Result<(), async_nats::Error> {
1006 /// use futures::StreamExt;
1007 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1008 /// let jetstream = async_nats::jetstream::new(client);
1009 /// let kv = jetstream
1010 /// .create_key_value(async_nats::jetstream::kv::Config {
1011 /// bucket: "kv".to_string(),
1012 /// history: 10,
1013 /// ..Default::default()
1014 /// })
1015 /// .await?;
1016 /// let revision = kv.put("key", "value".into()).await?;
1017 /// kv.delete_expect_revision("key", Some(revision)).await?;
1018 /// # Ok(())
1019 /// # }
1020 /// ```
1021 pub async fn delete_expect_revision<T: AsRef<str>>(
1022 &self,
1023 key: T,
1024 revison: Option<u64>,
1025 ) -> Result<(), DeleteError> {
1026 if !is_valid_key(key.as_ref()) {
1027 return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
1028 }
1029 let mut subject = String::new();
1030 if self.use_jetstream_prefix {
1031 subject.push_str(&self.stream.context.prefix);
1032 subject.push('.');
1033 }
1034 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1035 subject.push_str(key.as_ref());
1036
1037 let mut headers = crate::HeaderMap::default();
1038 // TODO: figure out which headers k/v should be where.
1039 headers.insert(
1040 KV_OPERATION,
1041 KV_OPERATION_DELETE
1042 .parse::<HeaderValue>()
1043 .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
1044 );
1045
1046 if let Some(revision) = revison {
1047 headers.insert(
1048 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1049 HeaderValue::from(revision),
1050 );
1051 }
1052
1053 self.stream
1054 .context
1055 .publish_with_headers(subject, headers, "".into())
1056 .await?
1057 .await?;
1058 Ok(())
1059 }
1060
1061 /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1062 ///
1063 /// # Examples
1064 ///
1065 /// ```no_run
1066 /// # #[tokio::main]
1067 /// # async fn main() -> Result<(), async_nats::Error> {
1068 /// use futures::StreamExt;
1069 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1070 /// let jetstream = async_nats::jetstream::new(client);
1071 /// let kv = jetstream
1072 /// .create_key_value(async_nats::jetstream::kv::Config {
1073 /// bucket: "kv".to_string(),
1074 /// history: 10,
1075 /// ..Default::default()
1076 /// })
1077 /// .await?;
1078 /// kv.put("key", "value".into()).await?;
1079 /// kv.put("key", "another".into()).await?;
1080 /// kv.purge("key").await?;
1081 /// # Ok(())
1082 /// # }
1083 /// ```
1084 pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
1085 self.purge_expect_revision(key, None).await
1086 }
1087
1088 /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1089 /// The purge entry will remain valid for the given `ttl`.
1090 ///
1091 /// # Examples
1092 ///
1093 /// ```no_run
1094 /// # #[tokio::main]
1095 /// # async fn main() -> Result<(), async_nats::Error> {
1096 /// use futures::StreamExt;
1097 /// use std::time::Duration;
1098 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1099 /// let jetstream = async_nats::jetstream::new(client);
1100 /// let kv = jetstream
1101 /// .create_key_value(async_nats::jetstream::kv::Config {
1102 /// bucket: "kv".to_string(),
1103 /// history: 10,
1104 /// ..Default::default()
1105 /// })
1106 /// .await?;
1107 /// kv.put("key", "value".into()).await?;
1108 /// kv.put("key", "another".into()).await?;
1109 /// kv.purge_with_ttl("key", Duration::from_secs(10)).await?;
1110 /// # Ok(())
1111 /// # }
1112 /// ```
1113 pub async fn purge_with_ttl<T: AsRef<str>>(
1114 &self,
1115 key: T,
1116 ttl: Duration,
1117 ) -> Result<(), PurgeError> {
1118 self.purge_expect_revision_maybe_ttl(key, None, Some(ttl))
1119 .await
1120 }
1121
1122 /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1123 /// purge entry in-place.
1124 ///
1125 /// # Examples
1126 ///
1127 /// ```no_run
1128 /// # #[tokio::main]
1129 /// # async fn main() -> Result<(), async_nats::Error> {
1130 /// use futures::StreamExt;
1131 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1132 /// let jetstream = async_nats::jetstream::new(client);
1133 /// let kv = jetstream
1134 /// .create_key_value(async_nats::jetstream::kv::Config {
1135 /// bucket: "kv".to_string(),
1136 /// history: 10,
1137 /// ..Default::default()
1138 /// })
1139 /// .await?;
1140 /// kv.put("key", "value".into()).await?;
1141 /// let revision = kv.put("key", "another".into()).await?;
1142 /// kv.purge_expect_revision("key", Some(revision)).await?;
1143 /// # Ok(())
1144 /// # }
1145 /// ```
1146 pub async fn purge_expect_revision<T: AsRef<str>>(
1147 &self,
1148 key: T,
1149 revision: Option<u64>,
1150 ) -> Result<(), PurgeError> {
1151 self.purge_expect_revision_maybe_ttl(key, revision, None)
1152 .await
1153 }
1154
1155 /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1156 /// purge entry in-place. The purge entry will remain valid for the given `ttl`.
1157 ///
1158 /// # Examples
1159 ///
1160 /// ```no_run
1161 /// # #[tokio::main]
1162 /// # async fn main() -> Result<(), async_nats::Error> {
1163 /// use futures::StreamExt;
1164 /// use std::time::Duration;
1165 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1166 /// let jetstream = async_nats::jetstream::new(client);
1167 /// let kv = jetstream
1168 /// .create_key_value(async_nats::jetstream::kv::Config {
1169 /// bucket: "kv".to_string(),
1170 /// history: 10,
1171 /// ..Default::default()
1172 /// })
1173 /// .await?;
1174 /// kv.put("key", "value".into()).await?;
1175 /// let revision = kv.put("key", "another".into()).await?;
1176 /// kv.purge_expect_revision_with_ttl("key", revision, Duration::from_secs(10))
1177 /// .await?;
1178 /// # Ok(())
1179 /// # }
1180 /// ```
1181 pub async fn purge_expect_revision_with_ttl<T: AsRef<str>>(
1182 &self,
1183 key: T,
1184 revision: u64,
1185 ttl: Duration,
1186 ) -> Result<(), PurgeError> {
1187 self.purge_expect_revision_maybe_ttl(key, Some(revision), Some(ttl))
1188 .await
1189 }
1190
1191 async fn purge_expect_revision_maybe_ttl<T: AsRef<str>>(
1192 &self,
1193 key: T,
1194 revision: Option<u64>,
1195 ttl: Option<Duration>,
1196 ) -> Result<(), PurgeError> {
1197 if !is_valid_key(key.as_ref()) {
1198 return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
1199 }
1200
1201 let mut subject = String::new();
1202 if self.use_jetstream_prefix {
1203 subject.push_str(&self.stream.context.prefix);
1204 subject.push('.');
1205 }
1206 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1207 subject.push_str(key.as_ref());
1208
1209 let mut headers = crate::HeaderMap::default();
1210 headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
1211 headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
1212 if let Some(ttl) = ttl {
1213 headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
1214 }
1215
1216 if let Some(revision) = revision {
1217 headers.insert(
1218 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1219 HeaderValue::from(revision),
1220 );
1221 }
1222
1223 self.stream
1224 .context
1225 .publish_with_headers(subject, headers, "".into())
1226 .await?
1227 .await?;
1228 Ok(())
1229 }
1230
1231 /// Returns a [futures::Stream] that allows iterating over all [Operations][Operation] that
1232 /// happen for given key.
1233 ///
1234 /// # Examples
1235 ///
1236 /// ```no_run
1237 /// # #[tokio::main]
1238 /// # async fn main() -> Result<(), async_nats::Error> {
1239 /// use futures::StreamExt;
1240 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1241 /// let jetstream = async_nats::jetstream::new(client);
1242 /// let kv = jetstream
1243 /// .create_key_value(async_nats::jetstream::kv::Config {
1244 /// bucket: "kv".to_string(),
1245 /// history: 10,
1246 /// ..Default::default()
1247 /// })
1248 /// .await?;
1249 /// let mut entries = kv.history("kv").await?;
1250 /// while let Some(entry) = entries.next().await {
1251 /// println!("entry: {:?}", entry);
1252 /// }
1253 /// # Ok(())
1254 /// # }
1255 /// ```
1256 pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
1257 if !is_valid_key(key.as_ref()) {
1258 return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
1259 }
1260 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
1261
1262 let consumer = self
1263 .stream
1264 .create_consumer(super::consumer::push::OrderedConfig {
1265 deliver_subject: self.stream.context.client.new_inbox(),
1266 description: Some("kv history consumer".to_string()),
1267 filter_subject: subject,
1268 replay_policy: super::consumer::ReplayPolicy::Instant,
1269 ..Default::default()
1270 })
1271 .await?;
1272
1273 Ok(History {
1274 subscription: consumer.messages().await?,
1275 done: false,
1276 prefix: self.prefix.clone(),
1277 bucket: self.name.clone(),
1278 })
1279 }
1280
1281 /// Returns a [futures::Stream] that allows iterating over all keys in the bucket.
1282 ///
1283 /// # Examples
1284 ///
1285 /// Iterating over each each key individually
1286 ///
1287 /// ```no_run
1288 /// # #[tokio::main]
1289 /// # async fn main() -> Result<(), async_nats::Error> {
1290 /// use futures::{StreamExt, TryStreamExt};
1291 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1292 /// let jetstream = async_nats::jetstream::new(client);
1293 /// let kv = jetstream
1294 /// .create_key_value(async_nats::jetstream::kv::Config {
1295 /// bucket: "kv".to_string(),
1296 /// history: 10,
1297 /// ..Default::default()
1298 /// })
1299 /// .await?;
1300 /// let mut keys = kv.keys().await?.boxed();
1301 /// while let Some(key) = keys.try_next().await? {
1302 /// println!("key: {:?}", key);
1303 /// }
1304 /// # Ok(())
1305 /// # }
1306 /// ```
1307 ///
1308 /// Collecting it into a vector of keys
1309 ///
1310 /// ```no_run
1311 /// # #[tokio::main]
1312 /// # async fn main() -> Result<(), async_nats::Error> {
1313 /// use futures::TryStreamExt;
1314 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1315 /// let jetstream = async_nats::jetstream::new(client);
1316 /// let kv = jetstream
1317 /// .create_key_value(async_nats::jetstream::kv::Config {
1318 /// bucket: "kv".to_string(),
1319 /// history: 10,
1320 /// ..Default::default()
1321 /// })
1322 /// .await?;
1323 /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1324 /// println!("Keys: {:?}", keys);
1325 /// # Ok(())
1326 /// # }
1327 /// ```
1328 pub async fn keys(&self) -> Result<Keys, HistoryError> {
1329 let subject = format!("{}>", self.prefix.as_str());
1330
1331 let consumer = self
1332 .stream
1333 .create_consumer(super::consumer::push::OrderedConfig {
1334 deliver_subject: self.stream.context.client.new_inbox(),
1335 description: Some("kv history consumer".to_string()),
1336 filter_subject: subject,
1337 headers_only: true,
1338 replay_policy: super::consumer::ReplayPolicy::Instant,
1339 // We only need to know the latest state for each key, not the whole history
1340 deliver_policy: DeliverPolicy::LastPerSubject,
1341 ..Default::default()
1342 })
1343 .await?;
1344
1345 let entries = History {
1346 done: consumer.info.num_pending == 0,
1347 subscription: consumer.messages().await?,
1348 prefix: self.prefix.clone(),
1349 bucket: self.name.clone(),
1350 };
1351
1352 Ok(Keys { inner: entries })
1353 }
1354}
1355
1356/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1357pub struct Watch {
1358 seen_current: bool,
1359 subscription: super::consumer::push::Ordered,
1360 prefix: String,
1361 bucket: String,
1362}
1363
1364impl futures::Stream for Watch {
1365 type Item = Result<Entry, WatcherError>;
1366
1367 fn poll_next(
1368 mut self: std::pin::Pin<&mut Self>,
1369 cx: &mut std::task::Context<'_>,
1370 ) -> std::task::Poll<Option<Self::Item>> {
1371 match self.subscription.poll_next_unpin(cx) {
1372 Poll::Ready(message) => match message {
1373 None => Poll::Ready(None),
1374 Some(message) => {
1375 let message = message?;
1376 let info = message.info().map_err(|err| {
1377 WatcherError::with_source(
1378 WatcherErrorKind::Other,
1379 format!("failed to parse message metadata: {}", err),
1380 )
1381 })?;
1382
1383 let operation =
1384 kv_operation_from_message(&message.message).unwrap_or(Operation::Put);
1385
1386 let key = message
1387 .subject
1388 .strip_prefix(&self.prefix)
1389 .map(|s| s.to_string())
1390 .unwrap();
1391
1392 if !self.seen_current && info.pending == 0 {
1393 self.seen_current = true;
1394 }
1395
1396 Poll::Ready(Some(Ok(Entry {
1397 bucket: self.bucket.clone(),
1398 key,
1399 value: message.payload.clone(),
1400 revision: info.stream_sequence,
1401 created: info.published,
1402 delta: info.pending,
1403 operation,
1404 seen_current: self.seen_current,
1405 })))
1406 }
1407 },
1408 std::task::Poll::Pending => Poll::Pending,
1409 }
1410 }
1411
1412 fn size_hint(&self) -> (usize, Option<usize>) {
1413 (0, None)
1414 }
1415}
1416
1417/// A structure representing the history of a key-value bucket, yielding past values.
1418pub struct History {
1419 subscription: super::consumer::push::Ordered,
1420 done: bool,
1421 prefix: String,
1422 bucket: String,
1423}
1424
1425impl futures::Stream for History {
1426 type Item = Result<Entry, WatcherError>;
1427
1428 fn poll_next(
1429 mut self: std::pin::Pin<&mut Self>,
1430 cx: &mut std::task::Context<'_>,
1431 ) -> std::task::Poll<Option<Self::Item>> {
1432 if self.done {
1433 return Poll::Ready(None);
1434 }
1435 match self.subscription.poll_next_unpin(cx) {
1436 Poll::Ready(message) => match message {
1437 None => Poll::Ready(None),
1438 Some(message) => {
1439 let message = message?;
1440 let info = message.info().map_err(|err| {
1441 WatcherError::with_source(
1442 WatcherErrorKind::Other,
1443 format!("failed to parse message metadata: {}", err),
1444 )
1445 })?;
1446 if info.pending == 0 {
1447 self.done = true;
1448 }
1449
1450 let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1451
1452 let key = message
1453 .subject
1454 .strip_prefix(&self.prefix)
1455 .map(|s| s.to_string())
1456 .unwrap();
1457
1458 Poll::Ready(Some(Ok(Entry {
1459 bucket: self.bucket.clone(),
1460 key,
1461 value: message.payload.clone(),
1462 revision: info.stream_sequence,
1463 created: info.published,
1464 delta: info.pending,
1465 operation,
1466 seen_current: self.done,
1467 })))
1468 }
1469 },
1470 std::task::Poll::Pending => Poll::Pending,
1471 }
1472 }
1473
1474 fn size_hint(&self) -> (usize, Option<usize>) {
1475 (0, None)
1476 }
1477}
1478
1479pub struct Keys {
1480 inner: History,
1481}
1482
1483impl futures::Stream for Keys {
1484 type Item = Result<String, WatcherError>;
1485
1486 fn poll_next(
1487 mut self: std::pin::Pin<&mut Self>,
1488 cx: &mut std::task::Context<'_>,
1489 ) -> std::task::Poll<Option<Self::Item>> {
1490 loop {
1491 match self.inner.poll_next_unpin(cx) {
1492 Poll::Ready(None) => return Poll::Ready(None),
1493 Poll::Ready(Some(res)) => match res {
1494 Ok(entry) => {
1495 // Skip purged and deleted keys
1496 if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1497 // Try to poll again if we skip this one
1498 continue;
1499 } else {
1500 return Poll::Ready(Some(Ok(entry.key)));
1501 }
1502 }
1503 Err(e) => return Poll::Ready(Some(Err(e))),
1504 },
1505 Poll::Pending => return Poll::Pending,
1506 }
1507 }
1508 }
1509}
1510
1511/// An entry in a key-value bucket.
1512#[derive(Debug, Clone, PartialEq, Eq)]
1513pub struct Entry {
1514 /// Name of the bucket the entry is in.
1515 pub bucket: String,
1516 /// The key that was retrieved.
1517 pub key: String,
1518 /// The value that was retrieved.
1519 pub value: Bytes,
1520 /// A unique sequence for this value.
1521 pub revision: u64,
1522 /// Distance from the latest value.
1523 pub delta: u64,
1524 /// The time the data was put in the bucket.
1525 pub created: OffsetDateTime,
1526 /// The kind of operation that caused this entry.
1527 pub operation: Operation,
1528 /// Set to true after all historical messages have been received, and
1529 /// now all Entries are the new ones.
1530 pub seen_current: bool,
1531}
1532
1533#[derive(Clone, Debug, PartialEq)]
1534pub enum StatusErrorKind {
1535 JetStream(crate::jetstream::Error),
1536 TimedOut,
1537}
1538
1539impl Display for StatusErrorKind {
1540 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1541 match self {
1542 Self::JetStream(err) => write!(f, "jetstream request failed: {}", err),
1543 Self::TimedOut => write!(f, "timed out"),
1544 }
1545 }
1546}
1547
1548pub type StatusError = Error<StatusErrorKind>;
1549
1550#[derive(Clone, Copy, Debug, PartialEq)]
1551pub enum CreateErrorKind {
1552 AlreadyExists,
1553 InvalidKey,
1554 Publish,
1555 Ack,
1556 Other,
1557}
1558
1559impl From<UpdateError> for CreateError {
1560 fn from(error: UpdateError) -> Self {
1561 match error.kind() {
1562 UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1563 UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1564 UpdateErrorKind::WrongLastRevision => Error::from(CreateErrorKind::AlreadyExists),
1565 UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
1566 }
1567 }
1568}
1569
1570impl From<PutError> for CreateError {
1571 fn from(error: PutError) -> Self {
1572 match error.kind() {
1573 PutErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1574 PutErrorKind::Publish => Error::from(CreateErrorKind::Publish),
1575 PutErrorKind::Ack => Error::from(CreateErrorKind::Ack),
1576 }
1577 }
1578}
1579
1580impl From<EntryError> for CreateError {
1581 fn from(error: EntryError) -> Self {
1582 match error.kind() {
1583 EntryErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1584 EntryErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1585 EntryErrorKind::Other => Error::from(CreateErrorKind::Other),
1586 }
1587 }
1588}
1589
1590impl Display for CreateErrorKind {
1591 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1592 match self {
1593 Self::AlreadyExists => write!(f, "key already exists"),
1594 Self::Publish => write!(f, "failed to create key in store"),
1595 Self::Ack => write!(f, "ack error"),
1596 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1597 Self::Other => write!(f, "other error"),
1598 }
1599 }
1600}
1601
1602pub type CreateError = Error<CreateErrorKind>;
1603
1604#[derive(Clone, Copy, Debug, PartialEq)]
1605pub enum PutErrorKind {
1606 InvalidKey,
1607 Publish,
1608 Ack,
1609}
1610
1611impl Display for PutErrorKind {
1612 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1613 match self {
1614 Self::Publish => write!(f, "failed to put key into store"),
1615 Self::Ack => write!(f, "ack error"),
1616 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1617 }
1618 }
1619}
1620
1621pub type PutError = Error<PutErrorKind>;
1622
1623#[derive(Clone, Copy, Debug, PartialEq)]
1624pub enum EntryErrorKind {
1625 InvalidKey,
1626 TimedOut,
1627 Other,
1628}
1629
1630impl Display for EntryErrorKind {
1631 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1632 match self {
1633 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1634 Self::TimedOut => write!(f, "timed out"),
1635 Self::Other => write!(f, "failed getting entry"),
1636 }
1637 }
1638}
1639
1640pub type EntryError = Error<EntryErrorKind>;
1641
1642crate::from_with_timeout!(
1643 EntryError,
1644 EntryErrorKind,
1645 DirectGetError,
1646 DirectGetErrorKind
1647);
1648
1649#[derive(Clone, Copy, Debug, PartialEq)]
1650pub enum WatchErrorKind {
1651 InvalidKey,
1652 TimedOut,
1653 ConsumerCreate,
1654 Other,
1655}
1656
1657impl Display for WatchErrorKind {
1658 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1659 match self {
1660 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1661 Self::Other => write!(f, "watch failed"),
1662 Self::TimedOut => write!(f, "timed out"),
1663 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1664 }
1665 }
1666}
1667
1668pub type WatchError = Error<WatchErrorKind>;
1669
1670crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1671crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1672
1673#[derive(Clone, Copy, Debug, PartialEq)]
1674pub enum UpdateErrorKind {
1675 InvalidKey,
1676 TimedOut,
1677 WrongLastRevision,
1678 Other,
1679}
1680
1681impl Display for UpdateErrorKind {
1682 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1683 match self {
1684 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1685 Self::TimedOut => write!(f, "timed out"),
1686 Self::WrongLastRevision => write!(f, "wrong last revision"),
1687 Self::Other => write!(f, "failed getting entry"),
1688 }
1689 }
1690}
1691
1692pub type UpdateError = Error<UpdateErrorKind>;
1693
1694impl From<PublishError> for UpdateError {
1695 fn from(err: PublishError) -> Self {
1696 match err.kind() {
1697 PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
1698 PublishErrorKind::WrongLastSequence => {
1699 Self::with_source(UpdateErrorKind::WrongLastRevision, err)
1700 }
1701 _ => Self::with_source(UpdateErrorKind::Other, err),
1702 }
1703 }
1704}
1705
1706#[derive(Clone, Copy, Debug, PartialEq)]
1707pub enum WatcherErrorKind {
1708 Consumer,
1709 Other,
1710}
1711
1712impl Display for WatcherErrorKind {
1713 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1714 match self {
1715 Self::Consumer => write!(f, "watcher consumer error"),
1716 Self::Other => write!(f, "watcher error"),
1717 }
1718 }
1719}
1720
1721pub type WatcherError = Error<WatcherErrorKind>;
1722
1723impl From<OrderedError> for WatcherError {
1724 fn from(err: OrderedError) -> Self {
1725 WatcherError::with_source(WatcherErrorKind::Consumer, err)
1726 }
1727}
1728
1729pub type DeleteError = UpdateError;
1730pub type DeleteErrorKind = UpdateErrorKind;
1731
1732pub type PurgeError = UpdateError;
1733pub type PurgeErrorKind = UpdateErrorKind;
1734
1735pub type HistoryError = WatchError;
1736pub type HistoryErrorKind = WatchErrorKind;