async_nats/jetstream/kv/mod.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19 fmt::{self, Display},
20 str::FromStr,
21 task::Poll,
22 time::Duration,
23};
24
25use crate::HeaderValue;
26use bytes::Bytes;
27use futures_util::StreamExt;
28use once_cell::sync::Lazy;
29use regex::Regex;
30use time::OffsetDateTime;
31use tracing::debug;
32
33use crate::error::Error;
34use crate::header;
35
36use self::bucket::Status;
37
38use super::{
39 consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
40 context::{PublishError, PublishErrorKind},
41 message::StreamMessage,
42 stream::{
43 self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
44 Source, StorageType, Stream,
45 },
46};
47
48fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
49 if let Some(op) = message.headers.get(KV_OPERATION) {
50 Operation::from_str(op.as_str())
51 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
52 } else if let Some(reason) = message.headers.get(header::NATS_MARKER_REASON) {
53 match reason.as_str() {
54 "MaxAge" | "Purge" => Ok(Operation::Purge),
55 "Remove" => Ok(Operation::Delete),
56 _ => Err(EntryError::with_source(
57 EntryErrorKind::Other,
58 "invalid marker reason",
59 )),
60 }
61 } else {
62 Err(EntryError::with_source(
63 EntryErrorKind::Other,
64 "missing operation",
65 ))
66 }
67}
68fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
69 let headers = match message.headers.as_ref() {
70 Some(headers) => headers,
71 None => return Ok(Operation::Put),
72 };
73 if let Some(op) = headers.get(KV_OPERATION) {
74 Operation::from_str(op.as_str())
75 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
76 } else if let Some(reason) = headers.get(header::NATS_MARKER_REASON) {
77 match reason.as_str() {
78 "MaxAge" | "Purge" => Ok(Operation::Purge),
79 "Remove" => Ok(Operation::Delete),
80 _ => Err(EntryError::with_source(
81 EntryErrorKind::Other,
82 "invalid marker reason",
83 )),
84 }
85 } else {
86 Ok(Operation::Put)
87 }
88}
89
90static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
91static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
92
93pub(crate) const MAX_HISTORY: i64 = 64;
94const ALL_KEYS: &str = ">";
95
96const KV_OPERATION: &str = "KV-Operation";
97const KV_OPERATION_DELETE: &str = "DEL";
98const KV_OPERATION_PURGE: &str = "PURGE";
99const KV_OPERATION_PUT: &str = "PUT";
100
101const NATS_ROLLUP: &str = "Nats-Rollup";
102const ROLLUP_SUBJECT: &str = "sub";
103
104pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
105 VALID_BUCKET_RE.is_match(bucket_name)
106}
107
108pub(crate) fn is_valid_key(key: &str) -> bool {
109 if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
110 return false;
111 }
112
113 VALID_KEY_RE.is_match(key)
114}
115
116/// Configuration values for key value stores.
117#[derive(Debug, Clone, Default)]
118pub struct Config {
119 /// Name of the bucket
120 pub bucket: String,
121 /// Human readable description.
122 pub description: String,
123 /// Maximum size of a single value.
124 pub max_value_size: i32,
125 /// Maximum historical entries.
126 pub history: i64,
127 /// Maximum age of any entry in the bucket, expressed in nanoseconds
128 pub max_age: std::time::Duration,
129 /// How large the bucket may become in total bytes before the configured discard policy kicks in
130 pub max_bytes: i64,
131 /// The type of storage backend, `File` (default) and `Memory`
132 pub storage: StorageType,
133 /// How many replicas to keep for each entry in a cluster.
134 pub num_replicas: usize,
135 /// Republish is for republishing messages once persistent in the Key Value Bucket.
136 pub republish: Option<Republish>,
137 /// Bucket mirror configuration.
138 pub mirror: Option<Source>,
139 /// Bucket sources configuration.
140 pub sources: Option<Vec<Source>>,
141 /// Allow mirrors using direct API.
142 pub mirror_direct: bool,
143 /// Compression
144 #[cfg(feature = "server_2_10")]
145 pub compression: bool,
146 /// Cluster and tag placement for the bucket.
147 pub placement: Option<stream::Placement>,
148 /// Enables per-message TTL and delete marker TTL for a bucket.
149 #[cfg(feature = "server_2_11")]
150 pub limit_markers: Option<Duration>,
151}
152
153/// Describes what kind of operation and entry represents
154#[derive(Debug, Clone, Copy, Eq, PartialEq)]
155pub enum Operation {
156 /// A value was put into the bucket
157 Put,
158 /// A value was deleted from a bucket
159 Delete,
160 /// A value was purged from a bucket
161 Purge,
162}
163
164impl FromStr for Operation {
165 type Err = ParseOperationError;
166
167 fn from_str(s: &str) -> Result<Self, Self::Err> {
168 match s {
169 KV_OPERATION_DELETE => Ok(Operation::Delete),
170 KV_OPERATION_PURGE => Ok(Operation::Purge),
171 KV_OPERATION_PUT => Ok(Operation::Put),
172 _ => Err(ParseOperationError),
173 }
174 }
175}
176
177#[derive(Debug, Clone)]
178pub struct ParseOperationError;
179
180impl fmt::Display for ParseOperationError {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
183 }
184}
185
186impl std::error::Error for ParseOperationError {}
187
188/// A struct used as a handle for the bucket.
189#[derive(Debug, Clone)]
190pub struct Store {
191 /// The name of the Store.
192 pub name: String,
193 /// The name of the stream associated with the Store.
194 pub stream_name: String,
195 /// The prefix for keys in the Store.
196 pub prefix: String,
197 /// The optional prefix to use when putting new key-value pairs.
198 pub put_prefix: Option<String>,
199 /// Indicates whether to use the JetStream prefix.
200 pub use_jetstream_prefix: bool,
201 /// The stream associated with the Store.
202 pub stream: Stream,
203}
204
205impl Store {
206 /// Queries the server and returns status from the server.
207 ///
208 /// # Examples
209 ///
210 /// ```no_run
211 /// # #[tokio::main]
212 /// # async fn main() -> Result<(), async_nats::Error> {
213 /// let client = async_nats::connect("demo.nats.io:4222").await?;
214 /// let jetstream = async_nats::jetstream::new(client);
215 /// let kv = jetstream
216 /// .create_key_value(async_nats::jetstream::kv::Config {
217 /// bucket: "kv".to_string(),
218 /// history: 10,
219 /// ..Default::default()
220 /// })
221 /// .await?;
222 /// let status = kv.status().await?;
223 /// println!("status: {:?}", status);
224 /// # Ok(())
225 /// # }
226 /// ```
227 pub async fn status(&self) -> Result<Status, StatusError> {
228 // TODO: should we poll for fresh info here? probably yes.
229 let info = self.stream.info.clone();
230
231 Ok(Status {
232 info,
233 bucket: self.name.to_string(),
234 })
235 }
236
237 /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
238 ///
239 /// # Examples
240 ///
241 /// ```no_run
242 /// # #[tokio::main]
243 /// # async fn main() -> Result<(), async_nats::Error> {
244 /// let client = async_nats::connect("demo.nats.io:4222").await?;
245 /// let jetstream = async_nats::jetstream::new(client);
246 /// let kv = jetstream
247 /// .create_key_value(async_nats::jetstream::kv::Config {
248 /// bucket: "kv".to_string(),
249 /// history: 10,
250 /// ..Default::default()
251 /// })
252 /// .await?;
253 ///
254 /// let status = kv.create("key", "value".into()).await;
255 /// assert!(status.is_ok());
256 ///
257 /// let status = kv.create("key", "value".into()).await;
258 /// assert!(status.is_err());
259 ///
260 /// # Ok(())
261 /// # }
262 /// ```
263 pub async fn create<T: AsRef<str>>(
264 &self,
265 key: T,
266 value: bytes::Bytes,
267 ) -> Result<u64, CreateError> {
268 self.create_maybe_ttl(key, value, None).await
269 }
270
271 /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
272 /// It will set a TTL specific for that key.
273 ///
274 /// # Examples
275 ///
276 /// ```no_run
277 /// # #[tokio::main]
278 /// # async fn main() -> Result<(), async_nats::Error> {
279 /// use std::time::Duration;
280 /// let client = async_nats::connect("demo.nats.io:4222").await?;
281 /// let jetstream = async_nats::jetstream::new(client);
282 /// let kv = jetstream
283 /// .create_key_value(async_nats::jetstream::kv::Config {
284 /// bucket: "kv".to_string(),
285 /// history: 10,
286 /// ..Default::default()
287 /// })
288 /// .await?;
289 ///
290 /// let status = kv
291 /// .create_with_ttl("key", "value".into(), Duration::from_secs(10))
292 /// .await;
293 /// assert!(status.is_ok());
294 ///
295 /// # Ok(())
296 /// # }
297 /// ```
298 pub async fn create_with_ttl<T: AsRef<str>>(
299 &self,
300 key: T,
301 value: bytes::Bytes,
302 ttl: Duration,
303 ) -> Result<u64, CreateError> {
304 self.create_maybe_ttl(key, value, Some(ttl)).await
305 }
306
307 async fn create_maybe_ttl<T: AsRef<str>>(
308 &self,
309 key: T,
310 value: bytes::Bytes,
311 ttl: Option<Duration>,
312 ) -> Result<u64, CreateError> {
313 let update_err = match self
314 .update_maybe_ttl(key.as_ref(), value.clone(), 0, ttl)
315 .await
316 {
317 Ok(revision) => return Ok(revision),
318 Err(err) => err,
319 };
320
321 match self.entry(key.as_ref()).await? {
322 // Deleted or Purged key, we can create it again.
323 Some(Entry {
324 operation: Operation::Delete | Operation::Purge,
325 revision,
326 ..
327 }) => {
328 let revision = self.update(key, value, revision).await?;
329 Ok(revision)
330 }
331
332 // key already exists.
333 Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
334
335 // Something went wrong with the initial update, return that error
336 None => Err(update_err.into()),
337 }
338 }
339
340 /// Puts new key value pair into the bucket.
341 /// If key didn't exist, it is created. If it did exist, a new value with a new version is
342 /// added.
343 ///
344 /// # Examples
345 ///
346 /// ```no_run
347 /// # #[tokio::main]
348 /// # async fn main() -> Result<(), async_nats::Error> {
349 /// let client = async_nats::connect("demo.nats.io:4222").await?;
350 /// let jetstream = async_nats::jetstream::new(client);
351 /// let kv = jetstream
352 /// .create_key_value(async_nats::jetstream::kv::Config {
353 /// bucket: "kv".to_string(),
354 /// history: 10,
355 /// ..Default::default()
356 /// })
357 /// .await?;
358 /// let status = kv.put("key", "value".into()).await?;
359 /// # Ok(())
360 /// # }
361 /// ```
362 pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
363 if !is_valid_key(key.as_ref()) {
364 return Err(PutError::new(PutErrorKind::InvalidKey));
365 }
366 let mut subject = String::new();
367 if self.use_jetstream_prefix {
368 subject.push_str(&self.stream.context.prefix);
369 subject.push('.');
370 }
371 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
372 subject.push_str(key.as_ref());
373
374 let publish_ack = self
375 .stream
376 .context
377 .publish(subject, value)
378 .await
379 .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
380 let ack = publish_ack
381 .await
382 .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
383
384 Ok(ack.sequence)
385 }
386
387 async fn entry_maybe_revision<T: Into<String>>(
388 &self,
389 key: T,
390 revision: Option<u64>,
391 ) -> Result<Option<Entry>, EntryError> {
392 let key: String = key.into();
393 if !is_valid_key(key.as_ref()) {
394 return Err(EntryError::new(EntryErrorKind::InvalidKey));
395 }
396
397 let subject = format!("{}{}", self.prefix.as_str(), &key);
398
399 let result: Option<(StreamMessage, Operation)> = {
400 if self.stream.info.config.allow_direct {
401 let message = match revision {
402 Some(revision) => {
403 let message = self.stream.direct_get(revision).await;
404 if let Ok(message) = message.as_ref() {
405 if message.subject.as_str() != subject {
406 println!("subject mismatch {}", message.subject);
407 return Ok(None);
408 }
409 }
410 message
411 }
412 None => {
413 self.stream
414 .direct_get_last_for_subject(subject.as_str())
415 .await
416 }
417 };
418
419 match message {
420 Ok(message) => {
421 let operation =
422 kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);
423
424 Some((message, operation))
425 }
426 Err(err) => {
427 if err.kind() == DirectGetErrorKind::NotFound {
428 None
429 } else {
430 return Err(err.into());
431 }
432 }
433 }
434 } else {
435 let raw_message = match revision {
436 Some(revision) => {
437 let message = self.stream.get_raw_message(revision).await;
438 if let Ok(message) = message.as_ref() {
439 if message.subject.as_str() != subject {
440 return Ok(None);
441 }
442 }
443 message
444 }
445 None => {
446 self.stream
447 .get_last_raw_message_by_subject(subject.as_str())
448 .await
449 }
450 };
451 match raw_message {
452 Ok(raw_message) => {
453 let operation = kv_operation_from_stream_message(&raw_message)
454 .unwrap_or(Operation::Put);
455 // TODO: unnecessary expensive, cloning whole Message.
456 Some((raw_message, operation))
457 }
458 Err(err) => match err.kind() {
459 crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
460 crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
461 return Err(EntryError::new(EntryErrorKind::InvalidKey))
462 }
463 crate::jetstream::stream::LastRawMessageErrorKind::Other => {
464 return Err(EntryError::with_source(EntryErrorKind::Other, err))
465 }
466 crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
467 return Err(EntryError::with_source(EntryErrorKind::Other, err))
468 }
469 },
470 }
471 }
472 };
473
474 match result {
475 Some((message, operation)) => {
476 let entry = Entry {
477 bucket: self.name.clone(),
478 key,
479 value: message.payload,
480 revision: message.sequence,
481 created: message.time,
482 operation,
483 delta: 0,
484 seen_current: false,
485 };
486 Ok(Some(entry))
487 }
488 // TODO: remember to touch this when Errors are in place.
489 None => Ok(None),
490 }
491 }
492
493 /// Retrieves the last [Entry] for a given key from a bucket.
494 ///
495 /// # Examples
496 ///
497 /// ```no_run
498 /// # #[tokio::main]
499 /// # async fn main() -> Result<(), async_nats::Error> {
500 /// let client = async_nats::connect("demo.nats.io:4222").await?;
501 /// let jetstream = async_nats::jetstream::new(client);
502 /// let kv = jetstream
503 /// .create_key_value(async_nats::jetstream::kv::Config {
504 /// bucket: "kv".to_string(),
505 /// history: 10,
506 /// ..Default::default()
507 /// })
508 /// .await?;
509 /// let status = kv.put("key", "value".into()).await?;
510 /// let entry = kv.entry("key").await?;
511 /// println!("entry: {:?}", entry);
512 /// # Ok(())
513 /// # }
514 /// ```
515 pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
516 self.entry_maybe_revision(key, None).await
517 }
518
519 /// Retrieves the [Entry] for a given key revision from a bucket.
520 ///
521 /// # Examples
522 ///
523 /// ```no_run
524 /// # #[tokio::main]
525 /// # async fn main() -> Result<(), async_nats::Error> {
526 /// let client = async_nats::connect("demo.nats.io:4222").await?;
527 /// let jetstream = async_nats::jetstream::new(client);
528 /// let kv = jetstream
529 /// .create_key_value(async_nats::jetstream::kv::Config {
530 /// bucket: "kv".to_string(),
531 /// history: 10,
532 /// ..Default::default()
533 /// })
534 /// .await?;
535 /// let status = kv.put("key", "value".into()).await?;
536 /// let status = kv.put("key", "value2".into()).await?;
537 /// let entry = kv.entry_for_revision("key", 2).await?;
538 /// println!("entry: {:?}", entry);
539 /// # Ok(())
540 /// # }
541 /// ```
542 pub async fn entry_for_revision<T: Into<String>>(
543 &self,
544 key: T,
545 revision: u64,
546 ) -> Result<Option<Entry>, EntryError> {
547 self.entry_maybe_revision(key, Some(revision)).await
548 }
549
550 /// Creates a [futures_util::Stream] over [Entries][Entry] a given key in the bucket, which yields
551 /// values whenever there are changes for that key.
552 ///
553 /// # Examples
554 ///
555 /// ```no_run
556 /// # #[tokio::main]
557 /// # async fn main() -> Result<(), async_nats::Error> {
558 /// use futures_util::StreamExt;
559 /// let client = async_nats::connect("demo.nats.io:4222").await?;
560 /// let jetstream = async_nats::jetstream::new(client);
561 /// let kv = jetstream
562 /// .create_key_value(async_nats::jetstream::kv::Config {
563 /// bucket: "kv".to_string(),
564 /// history: 10,
565 /// ..Default::default()
566 /// })
567 /// .await?;
568 /// let mut entries = kv.watch("kv").await?;
569 /// while let Some(entry) = entries.next().await {
570 /// println!("entry: {:?}", entry);
571 /// }
572 /// # Ok(())
573 /// # }
574 /// ```
575 pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
576 self.watch_with_deliver_policy(key, DeliverPolicy::New)
577 .await
578 }
579
580 /// Creates a [futures_util::Stream] over [Entries][Entry] in the bucket, which yields
581 /// values whenever there are changes for given keys.
582 ///
583 /// # Examples
584 ///
585 /// ```no_run
586 /// # #[tokio::main]
587 /// # async fn main() -> Result<(), async_nats::Error> {
588 /// use futures_util::StreamExt;
589 /// let client = async_nats::connect("demo.nats.io:4222").await?;
590 /// let jetstream = async_nats::jetstream::new(client);
591 /// let kv = jetstream
592 /// .create_key_value(async_nats::jetstream::kv::Config {
593 /// bucket: "kv".to_string(),
594 /// history: 10,
595 /// ..Default::default()
596 /// })
597 /// .await?;
598 /// let mut entries = kv.watch_many(["foo", "bar"]).await?;
599 /// while let Some(entry) = entries.next().await {
600 /// println!("entry: {:?}", entry);
601 /// }
602 /// # Ok(())
603 /// # }
604 /// ```
605 #[cfg(feature = "server_2_10")]
606 pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
607 where
608 T: AsRef<str>,
609 K: IntoIterator<Item = T>,
610 {
611 self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
612 .await
613 }
614
615 /// Creates a [futures_util::Stream] over [Entries][Entry] for a given key in the bucket, starting from
616 /// provided revision. This is useful to resume watching over big KV buckets without a need to
617 /// replay all the history.
618 ///
619 /// # Examples
620 ///
621 /// ```no_run
622 /// # #[tokio::main]
623 /// # async fn main() -> Result<(), async_nats::Error> {
624 /// use futures_util::StreamExt;
625 /// let client = async_nats::connect("demo.nats.io:4222").await?;
626 /// let jetstream = async_nats::jetstream::new(client);
627 /// let kv = jetstream
628 /// .create_key_value(async_nats::jetstream::kv::Config {
629 /// bucket: "kv".to_string(),
630 /// history: 10,
631 /// ..Default::default()
632 /// })
633 /// .await?;
634 /// let mut entries = kv.watch_from_revision("kv", 5).await?;
635 /// while let Some(entry) = entries.next().await {
636 /// println!("entry: {:?}", entry);
637 /// }
638 /// # Ok(())
639 /// # }
640 /// ```
641 pub async fn watch_from_revision<T: AsRef<str>>(
642 &self,
643 key: T,
644 revision: u64,
645 ) -> Result<Watch, WatchError> {
646 self.watch_with_deliver_policy(
647 key,
648 DeliverPolicy::ByStartSequence {
649 start_sequence: revision,
650 },
651 )
652 .await
653 }
654
655 /// Creates a [futures_util::Stream] over [Entries][Entry] a given key in the bucket, which yields
656 /// values whenever there are changes for that key with as well as last value.
657 ///
658 /// # Examples
659 ///
660 /// ```no_run
661 /// # #[tokio::main]
662 /// # async fn main() -> Result<(), async_nats::Error> {
663 /// use futures_util::StreamExt;
664 /// let client = async_nats::connect("demo.nats.io:4222").await?;
665 /// let jetstream = async_nats::jetstream::new(client);
666 /// let kv = jetstream
667 /// .create_key_value(async_nats::jetstream::kv::Config {
668 /// bucket: "kv".to_string(),
669 /// history: 10,
670 /// ..Default::default()
671 /// })
672 /// .await?;
673 /// let mut entries = kv.watch_with_history("kv").await?;
674 /// while let Some(entry) = entries.next().await {
675 /// println!("entry: {:?}", entry);
676 /// }
677 /// # Ok(())
678 /// # }
679 /// ```
680 pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
681 self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
682 .await
683 }
684
685 /// Creates a [futures_util::Stream] over [Entries][Entry] a given keys in the bucket, which yields
686 /// values whenever there are changes for those keys with as well as last value.
687 /// This requires server version > 2.10 as it uses consumers with multiple subject filters.
688 ///
689 /// # Examples
690 ///
691 /// ```no_run
692 /// # #[tokio::main]
693 /// # async fn main() -> Result<(), async_nats::Error> {
694 /// use futures_util::StreamExt;
695 /// let client = async_nats::connect("demo.nats.io:4222").await?;
696 /// let jetstream = async_nats::jetstream::new(client);
697 /// let kv = jetstream
698 /// .create_key_value(async_nats::jetstream::kv::Config {
699 /// bucket: "kv".to_string(),
700 /// history: 10,
701 /// ..Default::default()
702 /// })
703 /// .await?;
704 /// let mut entries = kv.watch_many_with_history(["key1", "key2"]).await?;
705 /// while let Some(entry) = entries.next().await {
706 /// println!("entry: {:?}", entry);
707 /// }
708 /// # Ok(())
709 /// # }
710 /// ```
711 #[cfg(feature = "server_2_10")]
712 pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
713 &self,
714 keys: K,
715 ) -> Result<Watch, WatchError> {
716 self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
717 .await
718 }
719
720 #[cfg(feature = "server_2_10")]
721 async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
722 &self,
723 keys: K,
724 deliver_policy: DeliverPolicy,
725 ) -> Result<Watch, WatchError> {
726 let subjects = keys
727 .into_iter()
728 .map(|key| {
729 let key = key.as_ref();
730 format!("{}{}", self.prefix.as_str(), key)
731 })
732 .collect::<Vec<_>>();
733
734 debug!("initial consumer creation");
735 let consumer = self
736 .stream
737 .create_consumer(super::consumer::push::OrderedConfig {
738 deliver_subject: self.stream.context.client.new_inbox(),
739 description: Some("kv watch consumer".to_string()),
740 filter_subjects: subjects,
741 replay_policy: super::consumer::ReplayPolicy::Instant,
742 deliver_policy,
743 ..Default::default()
744 })
745 .await
746 .map_err(|err| match err.kind() {
747 crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
748 WatchError::new(WatchErrorKind::TimedOut)
749 }
750 _ => WatchError::with_source(WatchErrorKind::Other, err),
751 })?;
752
753 let seen_current = consumer.cached_info().num_pending == 0;
754
755 Ok(Watch {
756 subscription: consumer.messages().await.map_err(|err| match err.kind() {
757 crate::jetstream::consumer::StreamErrorKind::TimedOut => {
758 WatchError::new(WatchErrorKind::TimedOut)
759 }
760 crate::jetstream::consumer::StreamErrorKind::Other => {
761 WatchError::with_source(WatchErrorKind::Other, err)
762 }
763 })?,
764 prefix: self.prefix.clone(),
765 bucket: self.name.clone(),
766 seen_current,
767 })
768 }
769
770 async fn watch_with_deliver_policy<T: AsRef<str>>(
771 &self,
772 key: T,
773 deliver_policy: DeliverPolicy,
774 ) -> Result<Watch, WatchError> {
775 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
776
777 debug!("initial consumer creation");
778 let consumer = self
779 .stream
780 .create_consumer(super::consumer::push::OrderedConfig {
781 deliver_subject: self.stream.context.client.new_inbox(),
782 description: Some("kv watch consumer".to_string()),
783 filter_subject: subject,
784 replay_policy: super::consumer::ReplayPolicy::Instant,
785 deliver_policy,
786 ..Default::default()
787 })
788 .await
789 .map_err(|err| match err.kind() {
790 crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
791 WatchError::new(WatchErrorKind::TimedOut)
792 }
793 _ => WatchError::with_source(WatchErrorKind::Other, err),
794 })?;
795
796 let seen_current = consumer.cached_info().num_pending == 0;
797
798 Ok(Watch {
799 subscription: consumer.messages().await.map_err(|err| match err.kind() {
800 crate::jetstream::consumer::StreamErrorKind::TimedOut => {
801 WatchError::new(WatchErrorKind::TimedOut)
802 }
803 crate::jetstream::consumer::StreamErrorKind::Other => {
804 WatchError::with_source(WatchErrorKind::Other, err)
805 }
806 })?,
807 prefix: self.prefix.clone(),
808 bucket: self.name.clone(),
809 seen_current,
810 })
811 }
812
813 /// Creates a [futures_util::Stream] over [Entries][Entry] for all keys, which yields
814 /// values whenever there are changes in the bucket.
815 ///
816 /// # Examples
817 ///
818 /// ```no_run
819 /// # #[tokio::main]
820 /// # async fn main() -> Result<(), async_nats::Error> {
821 /// use futures_util::StreamExt;
822 /// let client = async_nats::connect("demo.nats.io:4222").await?;
823 /// let jetstream = async_nats::jetstream::new(client);
824 /// let kv = jetstream
825 /// .create_key_value(async_nats::jetstream::kv::Config {
826 /// bucket: "kv".to_string(),
827 /// history: 10,
828 /// ..Default::default()
829 /// })
830 /// .await?;
831 /// let mut entries = kv.watch_all().await?;
832 /// while let Some(entry) = entries.next().await {
833 /// println!("entry: {:?}", entry);
834 /// }
835 /// # Ok(())
836 /// # }
837 /// ```
838 pub async fn watch_all(&self) -> Result<Watch, WatchError> {
839 self.watch(ALL_KEYS).await
840 }
841
842 /// Creates a [futures_util::Stream] over [Entries][Entry] for all keys starting
843 /// from a provider revision. This can be useful when resuming watching over a big bucket
844 /// without the need to replay all the history.
845 ///
846 /// # Examples
847 ///
848 /// ```no_run
849 /// # #[tokio::main]
850 /// # async fn main() -> Result<(), async_nats::Error> {
851 /// use futures_util::StreamExt;
852 /// let client = async_nats::connect("demo.nats.io:4222").await?;
853 /// let jetstream = async_nats::jetstream::new(client);
854 /// let kv = jetstream
855 /// .create_key_value(async_nats::jetstream::kv::Config {
856 /// bucket: "kv".to_string(),
857 /// history: 10,
858 /// ..Default::default()
859 /// })
860 /// .await?;
861 /// let mut entries = kv.watch_all_from_revision(40).await?;
862 /// while let Some(entry) = entries.next().await {
863 /// println!("entry: {:?}", entry);
864 /// }
865 /// # Ok(())
866 /// # }
867 /// ```
868 pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
869 self.watch_from_revision(ALL_KEYS, revision).await
870 }
871
872 /// Retrieves the [Entry] for a given key from a bucket.
873 ///
874 /// # Examples
875 ///
876 /// ```no_run
877 /// # #[tokio::main]
878 /// # async fn main() -> Result<(), async_nats::Error> {
879 /// let client = async_nats::connect("demo.nats.io:4222").await?;
880 /// let jetstream = async_nats::jetstream::new(client);
881 /// let kv = jetstream
882 /// .create_key_value(async_nats::jetstream::kv::Config {
883 /// bucket: "kv".to_string(),
884 /// history: 10,
885 /// ..Default::default()
886 /// })
887 /// .await?;
888 /// let value = kv.get("key").await?;
889 /// match value {
890 /// Some(bytes) => {
891 /// let value_str = std::str::from_utf8(&bytes)?;
892 /// println!("Value: {}", value_str);
893 /// }
894 /// None => {
895 /// println!("Key not found or value not set");
896 /// }
897 /// }
898 /// # Ok(())
899 /// # }
900 /// ```
901 pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
902 match self.entry(key).await {
903 Ok(Some(entry)) => match entry.operation {
904 Operation::Put => Ok(Some(entry.value)),
905 _ => Ok(None),
906 },
907 Ok(None) => Ok(None),
908 Err(err) => Err(err),
909 }
910 }
911
912 /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
913 /// the bucket.
914 ///
915 /// # Examples
916 ///
917 /// ```no_run
918 /// # #[tokio::main]
919 /// # async fn main() -> Result<(), async_nats::Error> {
920 /// use futures_util::StreamExt;
921 /// let client = async_nats::connect("demo.nats.io:4222").await?;
922 /// let jetstream = async_nats::jetstream::new(client);
923 /// let kv = jetstream
924 /// .create_key_value(async_nats::jetstream::kv::Config {
925 /// bucket: "kv".to_string(),
926 /// history: 10,
927 /// ..Default::default()
928 /// })
929 /// .await?;
930 /// let revision = kv.put("key", "value".into()).await?;
931 /// kv.update("key", "updated".into(), revision).await?;
932 /// # Ok(())
933 /// # }
934 /// ```
935 pub async fn update<T: AsRef<str>>(
936 &self,
937 key: T,
938 value: Bytes,
939 revision: u64,
940 ) -> Result<u64, UpdateError> {
941 self.update_maybe_ttl(key, value, revision, None).await
942 }
943
944 async fn update_maybe_ttl<T: AsRef<str>>(
945 &self,
946 key: T,
947 value: Bytes,
948 revision: u64,
949 ttl: Option<Duration>,
950 ) -> Result<u64, UpdateError> {
951 if !is_valid_key(key.as_ref()) {
952 return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
953 }
954 let mut subject = String::new();
955 if self.use_jetstream_prefix {
956 subject.push_str(&self.stream.context.prefix);
957 subject.push('.');
958 }
959 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
960 subject.push_str(key.as_ref());
961
962 let mut headers = crate::HeaderMap::default();
963 headers.insert(
964 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
965 HeaderValue::from(revision),
966 );
967
968 if let Some(ttl) = ttl {
969 headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
970 }
971
972 self.stream
973 .context
974 .publish_with_headers(subject, headers, value)
975 .await?
976 .await
977 .map_err(|err| err.into())
978 .map(|publish_ack| publish_ack.sequence)
979 }
980
981 /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
982 ///
983 /// # Examples
984 ///
985 /// ```no_run
986 /// # #[tokio::main]
987 /// # async fn main() -> Result<(), async_nats::Error> {
988 /// use futures_util::StreamExt;
989 /// let client = async_nats::connect("demo.nats.io:4222").await?;
990 /// let jetstream = async_nats::jetstream::new(client);
991 /// let kv = jetstream
992 /// .create_key_value(async_nats::jetstream::kv::Config {
993 /// bucket: "kv".to_string(),
994 /// history: 10,
995 /// ..Default::default()
996 /// })
997 /// .await?;
998 /// kv.put("key", "value".into()).await?;
999 /// kv.delete("key").await?;
1000 /// # Ok(())
1001 /// # }
1002 /// ```
1003 pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
1004 self.delete_expect_revision(key, None).await
1005 }
1006
1007 /// Deletes a given key if the revision matches. This is a non-destructive operation, which
1008 /// sets a `DELETE` marker.
1009 ///
1010 /// # Examples
1011 ///
1012 /// ```no_run
1013 /// # #[tokio::main]
1014 /// # async fn main() -> Result<(), async_nats::Error> {
1015 /// use futures_util::StreamExt;
1016 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1017 /// let jetstream = async_nats::jetstream::new(client);
1018 /// let kv = jetstream
1019 /// .create_key_value(async_nats::jetstream::kv::Config {
1020 /// bucket: "kv".to_string(),
1021 /// history: 10,
1022 /// ..Default::default()
1023 /// })
1024 /// .await?;
1025 /// let revision = kv.put("key", "value".into()).await?;
1026 /// kv.delete_expect_revision("key", Some(revision)).await?;
1027 /// # Ok(())
1028 /// # }
1029 /// ```
1030 pub async fn delete_expect_revision<T: AsRef<str>>(
1031 &self,
1032 key: T,
1033 revison: Option<u64>,
1034 ) -> Result<(), DeleteError> {
1035 if !is_valid_key(key.as_ref()) {
1036 return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
1037 }
1038 let mut subject = String::new();
1039 if self.use_jetstream_prefix {
1040 subject.push_str(&self.stream.context.prefix);
1041 subject.push('.');
1042 }
1043 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1044 subject.push_str(key.as_ref());
1045
1046 let mut headers = crate::HeaderMap::default();
1047 // TODO: figure out which headers k/v should be where.
1048 headers.insert(
1049 KV_OPERATION,
1050 KV_OPERATION_DELETE
1051 .parse::<HeaderValue>()
1052 .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
1053 );
1054
1055 if let Some(revision) = revison {
1056 headers.insert(
1057 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1058 HeaderValue::from(revision),
1059 );
1060 }
1061
1062 self.stream
1063 .context
1064 .publish_with_headers(subject, headers, "".into())
1065 .await?
1066 .await?;
1067 Ok(())
1068 }
1069
1070 /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1071 ///
1072 /// # Examples
1073 ///
1074 /// ```no_run
1075 /// # #[tokio::main]
1076 /// # async fn main() -> Result<(), async_nats::Error> {
1077 /// use futures_util::StreamExt;
1078 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1079 /// let jetstream = async_nats::jetstream::new(client);
1080 /// let kv = jetstream
1081 /// .create_key_value(async_nats::jetstream::kv::Config {
1082 /// bucket: "kv".to_string(),
1083 /// history: 10,
1084 /// ..Default::default()
1085 /// })
1086 /// .await?;
1087 /// kv.put("key", "value".into()).await?;
1088 /// kv.put("key", "another".into()).await?;
1089 /// kv.purge("key").await?;
1090 /// # Ok(())
1091 /// # }
1092 /// ```
1093 pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
1094 self.purge_expect_revision(key, None).await
1095 }
1096
1097 /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1098 /// The purge entry will remain valid for the given `ttl`.
1099 ///
1100 /// # Examples
1101 ///
1102 /// ```no_run
1103 /// # #[tokio::main]
1104 /// # async fn main() -> Result<(), async_nats::Error> {
1105 /// use futures_util::StreamExt;
1106 /// use std::time::Duration;
1107 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1108 /// let jetstream = async_nats::jetstream::new(client);
1109 /// let kv = jetstream
1110 /// .create_key_value(async_nats::jetstream::kv::Config {
1111 /// bucket: "kv".to_string(),
1112 /// history: 10,
1113 /// ..Default::default()
1114 /// })
1115 /// .await?;
1116 /// kv.put("key", "value".into()).await?;
1117 /// kv.put("key", "another".into()).await?;
1118 /// kv.purge_with_ttl("key", Duration::from_secs(10)).await?;
1119 /// # Ok(())
1120 /// # }
1121 /// ```
1122 pub async fn purge_with_ttl<T: AsRef<str>>(
1123 &self,
1124 key: T,
1125 ttl: Duration,
1126 ) -> Result<(), PurgeError> {
1127 self.purge_expect_revision_maybe_ttl(key, None, Some(ttl))
1128 .await
1129 }
1130
1131 /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1132 /// purge entry in-place.
1133 ///
1134 /// # Examples
1135 ///
1136 /// ```no_run
1137 /// # #[tokio::main]
1138 /// # async fn main() -> Result<(), async_nats::Error> {
1139 /// use futures_util::StreamExt;
1140 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1141 /// let jetstream = async_nats::jetstream::new(client);
1142 /// let kv = jetstream
1143 /// .create_key_value(async_nats::jetstream::kv::Config {
1144 /// bucket: "kv".to_string(),
1145 /// history: 10,
1146 /// ..Default::default()
1147 /// })
1148 /// .await?;
1149 /// kv.put("key", "value".into()).await?;
1150 /// let revision = kv.put("key", "another".into()).await?;
1151 /// kv.purge_expect_revision("key", Some(revision)).await?;
1152 /// # Ok(())
1153 /// # }
1154 /// ```
1155 pub async fn purge_expect_revision<T: AsRef<str>>(
1156 &self,
1157 key: T,
1158 revision: Option<u64>,
1159 ) -> Result<(), PurgeError> {
1160 self.purge_expect_revision_maybe_ttl(key, revision, None)
1161 .await
1162 }
1163
1164 /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1165 /// purge entry in-place. The purge entry will remain valid for the given `ttl`.
1166 ///
1167 /// # Examples
1168 ///
1169 /// ```no_run
1170 /// # #[tokio::main]
1171 /// # async fn main() -> Result<(), async_nats::Error> {
1172 /// use futures_util::StreamExt;
1173 /// use std::time::Duration;
1174 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1175 /// let jetstream = async_nats::jetstream::new(client);
1176 /// let kv = jetstream
1177 /// .create_key_value(async_nats::jetstream::kv::Config {
1178 /// bucket: "kv".to_string(),
1179 /// history: 10,
1180 /// ..Default::default()
1181 /// })
1182 /// .await?;
1183 /// kv.put("key", "value".into()).await?;
1184 /// let revision = kv.put("key", "another".into()).await?;
1185 /// kv.purge_expect_revision_with_ttl("key", revision, Duration::from_secs(10))
1186 /// .await?;
1187 /// # Ok(())
1188 /// # }
1189 /// ```
1190 pub async fn purge_expect_revision_with_ttl<T: AsRef<str>>(
1191 &self,
1192 key: T,
1193 revision: u64,
1194 ttl: Duration,
1195 ) -> Result<(), PurgeError> {
1196 self.purge_expect_revision_maybe_ttl(key, Some(revision), Some(ttl))
1197 .await
1198 }
1199
1200 async fn purge_expect_revision_maybe_ttl<T: AsRef<str>>(
1201 &self,
1202 key: T,
1203 revision: Option<u64>,
1204 ttl: Option<Duration>,
1205 ) -> Result<(), PurgeError> {
1206 if !is_valid_key(key.as_ref()) {
1207 return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
1208 }
1209
1210 let mut subject = String::new();
1211 if self.use_jetstream_prefix {
1212 subject.push_str(&self.stream.context.prefix);
1213 subject.push('.');
1214 }
1215 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1216 subject.push_str(key.as_ref());
1217
1218 let mut headers = crate::HeaderMap::default();
1219 headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
1220 headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
1221 if let Some(ttl) = ttl {
1222 headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
1223 }
1224
1225 if let Some(revision) = revision {
1226 headers.insert(
1227 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1228 HeaderValue::from(revision),
1229 );
1230 }
1231
1232 self.stream
1233 .context
1234 .publish_with_headers(subject, headers, "".into())
1235 .await?
1236 .await?;
1237 Ok(())
1238 }
1239
1240 /// Returns a [futures_util::Stream] that allows iterating over all [Operations][Operation] that
1241 /// happen for given key.
1242 ///
1243 /// # Examples
1244 ///
1245 /// ```no_run
1246 /// # #[tokio::main]
1247 /// # async fn main() -> Result<(), async_nats::Error> {
1248 /// use futures_util::StreamExt;
1249 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1250 /// let jetstream = async_nats::jetstream::new(client);
1251 /// let kv = jetstream
1252 /// .create_key_value(async_nats::jetstream::kv::Config {
1253 /// bucket: "kv".to_string(),
1254 /// history: 10,
1255 /// ..Default::default()
1256 /// })
1257 /// .await?;
1258 /// let mut entries = kv.history("kv").await?;
1259 /// while let Some(entry) = entries.next().await {
1260 /// println!("entry: {:?}", entry);
1261 /// }
1262 /// # Ok(())
1263 /// # }
1264 /// ```
1265 pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
1266 if !is_valid_key(key.as_ref()) {
1267 return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
1268 }
1269 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
1270
1271 let consumer = self
1272 .stream
1273 .create_consumer(super::consumer::push::OrderedConfig {
1274 deliver_subject: self.stream.context.client.new_inbox(),
1275 description: Some("kv history consumer".to_string()),
1276 filter_subject: subject,
1277 replay_policy: super::consumer::ReplayPolicy::Instant,
1278 ..Default::default()
1279 })
1280 .await?;
1281
1282 Ok(History {
1283 subscription: consumer.messages().await?,
1284 done: false,
1285 prefix: self.prefix.clone(),
1286 bucket: self.name.clone(),
1287 })
1288 }
1289
1290 /// Returns a [futures_util::Stream] that allows iterating over all keys in the bucket.
1291 ///
1292 /// # Examples
1293 ///
1294 /// Iterating over each each key individually
1295 ///
1296 /// ```no_run
1297 /// # #[tokio::main]
1298 /// # async fn main() -> Result<(), async_nats::Error> {
1299 /// use futures_util::{StreamExt, TryStreamExt};
1300 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1301 /// let jetstream = async_nats::jetstream::new(client);
1302 /// let kv = jetstream
1303 /// .create_key_value(async_nats::jetstream::kv::Config {
1304 /// bucket: "kv".to_string(),
1305 /// history: 10,
1306 /// ..Default::default()
1307 /// })
1308 /// .await?;
1309 /// let mut keys = kv.keys().await?.boxed();
1310 /// while let Some(key) = keys.try_next().await? {
1311 /// println!("key: {:?}", key);
1312 /// }
1313 /// # Ok(())
1314 /// # }
1315 /// ```
1316 ///
1317 /// Collecting it into a vector of keys
1318 ///
1319 /// ```no_run
1320 /// # #[tokio::main]
1321 /// # async fn main() -> Result<(), async_nats::Error> {
1322 /// use futures_util::TryStreamExt;
1323 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1324 /// let jetstream = async_nats::jetstream::new(client);
1325 /// let kv = jetstream
1326 /// .create_key_value(async_nats::jetstream::kv::Config {
1327 /// bucket: "kv".to_string(),
1328 /// history: 10,
1329 /// ..Default::default()
1330 /// })
1331 /// .await?;
1332 /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1333 /// println!("Keys: {:?}", keys);
1334 /// # Ok(())
1335 /// # }
1336 /// ```
1337 pub async fn keys(&self) -> Result<Keys, HistoryError> {
1338 let subject = format!("{}>", self.prefix.as_str());
1339
1340 let consumer = self
1341 .stream
1342 .create_consumer(super::consumer::push::OrderedConfig {
1343 deliver_subject: self.stream.context.client.new_inbox(),
1344 description: Some("kv history consumer".to_string()),
1345 filter_subject: subject,
1346 headers_only: true,
1347 replay_policy: super::consumer::ReplayPolicy::Instant,
1348 // We only need to know the latest state for each key, not the whole history
1349 deliver_policy: DeliverPolicy::LastPerSubject,
1350 ..Default::default()
1351 })
1352 .await?;
1353
1354 let entries = History {
1355 done: consumer.info.num_pending == 0,
1356 subscription: consumer.messages().await?,
1357 prefix: self.prefix.clone(),
1358 bucket: self.name.clone(),
1359 };
1360
1361 Ok(Keys { inner: entries })
1362 }
1363}
1364
1365/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1366pub struct Watch {
1367 seen_current: bool,
1368 subscription: super::consumer::push::Ordered,
1369 prefix: String,
1370 bucket: String,
1371}
1372
1373impl futures_util::Stream for Watch {
1374 type Item = Result<Entry, WatcherError>;
1375
1376 fn poll_next(
1377 mut self: std::pin::Pin<&mut Self>,
1378 cx: &mut std::task::Context<'_>,
1379 ) -> std::task::Poll<Option<Self::Item>> {
1380 match self.subscription.poll_next_unpin(cx) {
1381 Poll::Ready(message) => match message {
1382 None => Poll::Ready(None),
1383 Some(message) => {
1384 let message = message?;
1385 let info = message.info().map_err(|err| {
1386 WatcherError::with_source(
1387 WatcherErrorKind::Other,
1388 format!("failed to parse message metadata: {err}"),
1389 )
1390 })?;
1391
1392 let operation =
1393 kv_operation_from_message(&message.message).unwrap_or(Operation::Put);
1394
1395 let key = message
1396 .subject
1397 .strip_prefix(&self.prefix)
1398 .map(|s| s.to_string())
1399 .unwrap();
1400
1401 if !self.seen_current && info.pending == 0 {
1402 self.seen_current = true;
1403 }
1404
1405 Poll::Ready(Some(Ok(Entry {
1406 bucket: self.bucket.clone(),
1407 key,
1408 value: message.payload.clone(),
1409 revision: info.stream_sequence,
1410 created: info.published,
1411 delta: info.pending,
1412 operation,
1413 seen_current: self.seen_current,
1414 })))
1415 }
1416 },
1417 std::task::Poll::Pending => Poll::Pending,
1418 }
1419 }
1420
1421 fn size_hint(&self) -> (usize, Option<usize>) {
1422 (0, None)
1423 }
1424}
1425
1426/// A structure representing the history of a key-value bucket, yielding past values.
1427pub struct History {
1428 subscription: super::consumer::push::Ordered,
1429 done: bool,
1430 prefix: String,
1431 bucket: String,
1432}
1433
1434impl futures_util::Stream for History {
1435 type Item = Result<Entry, WatcherError>;
1436
1437 fn poll_next(
1438 mut self: std::pin::Pin<&mut Self>,
1439 cx: &mut std::task::Context<'_>,
1440 ) -> std::task::Poll<Option<Self::Item>> {
1441 if self.done {
1442 return Poll::Ready(None);
1443 }
1444 match self.subscription.poll_next_unpin(cx) {
1445 Poll::Ready(message) => match message {
1446 None => Poll::Ready(None),
1447 Some(message) => {
1448 let message = message?;
1449 let info = message.info().map_err(|err| {
1450 WatcherError::with_source(
1451 WatcherErrorKind::Other,
1452 format!("failed to parse message metadata: {err}"),
1453 )
1454 })?;
1455 if info.pending == 0 {
1456 self.done = true;
1457 }
1458
1459 let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1460
1461 let key = message
1462 .subject
1463 .strip_prefix(&self.prefix)
1464 .map(|s| s.to_string())
1465 .unwrap();
1466
1467 Poll::Ready(Some(Ok(Entry {
1468 bucket: self.bucket.clone(),
1469 key,
1470 value: message.payload.clone(),
1471 revision: info.stream_sequence,
1472 created: info.published,
1473 delta: info.pending,
1474 operation,
1475 seen_current: self.done,
1476 })))
1477 }
1478 },
1479 std::task::Poll::Pending => Poll::Pending,
1480 }
1481 }
1482
1483 fn size_hint(&self) -> (usize, Option<usize>) {
1484 (0, None)
1485 }
1486}
1487
1488pub struct Keys {
1489 inner: History,
1490}
1491
1492impl futures_util::Stream for Keys {
1493 type Item = Result<String, WatcherError>;
1494
1495 fn poll_next(
1496 mut self: std::pin::Pin<&mut Self>,
1497 cx: &mut std::task::Context<'_>,
1498 ) -> std::task::Poll<Option<Self::Item>> {
1499 loop {
1500 match self.inner.poll_next_unpin(cx) {
1501 Poll::Ready(None) => return Poll::Ready(None),
1502 Poll::Ready(Some(res)) => match res {
1503 Ok(entry) => {
1504 // Skip purged and deleted keys
1505 if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1506 // Try to poll again if we skip this one
1507 continue;
1508 } else {
1509 return Poll::Ready(Some(Ok(entry.key)));
1510 }
1511 }
1512 Err(e) => return Poll::Ready(Some(Err(e))),
1513 },
1514 Poll::Pending => return Poll::Pending,
1515 }
1516 }
1517 }
1518}
1519
1520/// An entry in a key-value bucket.
1521#[derive(Debug, Clone, PartialEq, Eq)]
1522pub struct Entry {
1523 /// Name of the bucket the entry is in.
1524 pub bucket: String,
1525 /// The key that was retrieved.
1526 pub key: String,
1527 /// The value that was retrieved.
1528 pub value: Bytes,
1529 /// A unique sequence for this value.
1530 pub revision: u64,
1531 /// Distance from the latest value.
1532 pub delta: u64,
1533 /// The time the data was put in the bucket.
1534 pub created: OffsetDateTime,
1535 /// The kind of operation that caused this entry.
1536 pub operation: Operation,
1537 /// Set to true after all historical messages have been received, and
1538 /// now all Entries are the new ones.
1539 pub seen_current: bool,
1540}
1541
1542#[derive(Clone, Debug, PartialEq)]
1543pub enum StatusErrorKind {
1544 JetStream(crate::jetstream::Error),
1545 TimedOut,
1546}
1547
1548impl Display for StatusErrorKind {
1549 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1550 match self {
1551 Self::JetStream(err) => write!(f, "jetstream request failed: {err}"),
1552 Self::TimedOut => write!(f, "timed out"),
1553 }
1554 }
1555}
1556
1557pub type StatusError = Error<StatusErrorKind>;
1558
1559#[derive(Clone, Copy, Debug, PartialEq)]
1560pub enum CreateErrorKind {
1561 AlreadyExists,
1562 InvalidKey,
1563 Publish,
1564 Ack,
1565 Other,
1566}
1567
1568impl From<UpdateError> for CreateError {
1569 fn from(error: UpdateError) -> Self {
1570 match error.kind() {
1571 UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1572 UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1573 UpdateErrorKind::WrongLastRevision => Error::from(CreateErrorKind::AlreadyExists),
1574 UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
1575 }
1576 }
1577}
1578
1579impl From<PutError> for CreateError {
1580 fn from(error: PutError) -> Self {
1581 match error.kind() {
1582 PutErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1583 PutErrorKind::Publish => Error::from(CreateErrorKind::Publish),
1584 PutErrorKind::Ack => Error::from(CreateErrorKind::Ack),
1585 }
1586 }
1587}
1588
1589impl From<EntryError> for CreateError {
1590 fn from(error: EntryError) -> Self {
1591 match error.kind() {
1592 EntryErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1593 EntryErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1594 EntryErrorKind::Other => Error::from(CreateErrorKind::Other),
1595 }
1596 }
1597}
1598
1599impl Display for CreateErrorKind {
1600 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1601 match self {
1602 Self::AlreadyExists => write!(f, "key already exists"),
1603 Self::Publish => write!(f, "failed to create key in store"),
1604 Self::Ack => write!(f, "ack error"),
1605 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1606 Self::Other => write!(f, "other error"),
1607 }
1608 }
1609}
1610
1611pub type CreateError = Error<CreateErrorKind>;
1612
1613#[derive(Clone, Copy, Debug, PartialEq)]
1614pub enum PutErrorKind {
1615 InvalidKey,
1616 Publish,
1617 Ack,
1618}
1619
1620impl Display for PutErrorKind {
1621 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1622 match self {
1623 Self::Publish => write!(f, "failed to put key into store"),
1624 Self::Ack => write!(f, "ack error"),
1625 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1626 }
1627 }
1628}
1629
1630pub type PutError = Error<PutErrorKind>;
1631
1632#[derive(Clone, Copy, Debug, PartialEq)]
1633pub enum EntryErrorKind {
1634 InvalidKey,
1635 TimedOut,
1636 Other,
1637}
1638
1639impl Display for EntryErrorKind {
1640 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1641 match self {
1642 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1643 Self::TimedOut => write!(f, "timed out"),
1644 Self::Other => write!(f, "failed getting entry"),
1645 }
1646 }
1647}
1648
1649pub type EntryError = Error<EntryErrorKind>;
1650
1651crate::from_with_timeout!(
1652 EntryError,
1653 EntryErrorKind,
1654 DirectGetError,
1655 DirectGetErrorKind
1656);
1657
1658#[derive(Clone, Copy, Debug, PartialEq)]
1659pub enum WatchErrorKind {
1660 InvalidKey,
1661 TimedOut,
1662 ConsumerCreate,
1663 Other,
1664}
1665
1666impl Display for WatchErrorKind {
1667 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1668 match self {
1669 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1670 Self::Other => write!(f, "watch failed"),
1671 Self::TimedOut => write!(f, "timed out"),
1672 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1673 }
1674 }
1675}
1676
1677pub type WatchError = Error<WatchErrorKind>;
1678
1679crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1680crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1681
1682#[derive(Clone, Copy, Debug, PartialEq)]
1683pub enum UpdateErrorKind {
1684 InvalidKey,
1685 TimedOut,
1686 WrongLastRevision,
1687 Other,
1688}
1689
1690impl Display for UpdateErrorKind {
1691 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1692 match self {
1693 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1694 Self::TimedOut => write!(f, "timed out"),
1695 Self::WrongLastRevision => write!(f, "wrong last revision"),
1696 Self::Other => write!(f, "failed getting entry"),
1697 }
1698 }
1699}
1700
1701pub type UpdateError = Error<UpdateErrorKind>;
1702
1703impl From<PublishError> for UpdateError {
1704 fn from(err: PublishError) -> Self {
1705 match err.kind() {
1706 PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
1707 PublishErrorKind::WrongLastSequence => {
1708 Self::with_source(UpdateErrorKind::WrongLastRevision, err)
1709 }
1710 _ => Self::with_source(UpdateErrorKind::Other, err),
1711 }
1712 }
1713}
1714
1715#[derive(Clone, Copy, Debug, PartialEq)]
1716pub enum WatcherErrorKind {
1717 Consumer,
1718 Other,
1719}
1720
1721impl Display for WatcherErrorKind {
1722 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1723 match self {
1724 Self::Consumer => write!(f, "watcher consumer error"),
1725 Self::Other => write!(f, "watcher error"),
1726 }
1727 }
1728}
1729
1730pub type WatcherError = Error<WatcherErrorKind>;
1731
1732impl From<OrderedError> for WatcherError {
1733 fn from(err: OrderedError) -> Self {
1734 WatcherError::with_source(WatcherErrorKind::Consumer, err)
1735 }
1736}
1737
1738pub type DeleteError = UpdateError;
1739pub type DeleteErrorKind = UpdateErrorKind;
1740
1741pub type PurgeError = UpdateError;
1742pub type PurgeErrorKind = UpdateErrorKind;
1743
1744pub type HistoryError = WatchError;
1745pub type HistoryErrorKind = WatchErrorKind;