async_nats/jetstream/kv/mod.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19 fmt::{self, Display},
20 str::FromStr,
21 task::Poll,
22 time::Duration,
23};
24
25use crate::HeaderValue;
26use bytes::Bytes;
27use futures_util::StreamExt;
28use regex::Regex;
29use std::sync::LazyLock;
30use time::OffsetDateTime;
31use tracing::debug;
32
33use crate::error::Error;
34use crate::header;
35
36use self::bucket::Status;
37
38use super::{
39 consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
40 context::{PublishError, PublishErrorKind},
41 message::StreamMessage,
42 stream::{
43 self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
44 Source, StorageType, Stream,
45 },
46};
47
48fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
49 if let Some(op) = message.headers.get(KV_OPERATION) {
50 Operation::from_str(op.as_str())
51 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
52 } else if let Some(reason) = message.headers.get(header::NATS_MARKER_REASON) {
53 match reason.as_str() {
54 "MaxAge" | "Purge" => Ok(Operation::Purge),
55 "Remove" => Ok(Operation::Delete),
56 _ => Err(EntryError::with_source(
57 EntryErrorKind::Other,
58 "invalid marker reason",
59 )),
60 }
61 } else {
62 Err(EntryError::with_source(
63 EntryErrorKind::Other,
64 "missing operation",
65 ))
66 }
67}
68fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
69 let headers = match message.headers.as_ref() {
70 Some(headers) => headers,
71 None => return Ok(Operation::Put),
72 };
73 if let Some(op) = headers.get(KV_OPERATION) {
74 Operation::from_str(op.as_str())
75 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
76 } else if let Some(reason) = headers.get(header::NATS_MARKER_REASON) {
77 match reason.as_str() {
78 "MaxAge" | "Purge" => Ok(Operation::Purge),
79 "Remove" => Ok(Operation::Delete),
80 _ => Err(EntryError::with_source(
81 EntryErrorKind::Other,
82 "invalid marker reason",
83 )),
84 }
85 } else {
86 Ok(Operation::Put)
87 }
88}
89
90static VALID_BUCKET_RE: LazyLock<Regex> =
91 LazyLock::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
92static VALID_KEY_RE: LazyLock<Regex> =
93 LazyLock::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
94
95pub(crate) const MAX_HISTORY: i64 = 64;
96const ALL_KEYS: &str = ">";
97
98const KV_OPERATION: &str = "KV-Operation";
99const KV_OPERATION_DELETE: &str = "DEL";
100const KV_OPERATION_PURGE: &str = "PURGE";
101const KV_OPERATION_PUT: &str = "PUT";
102
103const NATS_ROLLUP: &str = "Nats-Rollup";
104const ROLLUP_SUBJECT: &str = "sub";
105
106pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
107 VALID_BUCKET_RE.is_match(bucket_name)
108}
109
110pub(crate) fn is_valid_key(key: &str) -> bool {
111 if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
112 return false;
113 }
114
115 VALID_KEY_RE.is_match(key)
116}
117
118/// Configuration values for key value stores.
119#[derive(Debug, Clone, Default)]
120pub struct Config {
121 /// Name of the bucket
122 pub bucket: String,
123 /// Human readable description.
124 pub description: String,
125 /// Maximum size of a single value.
126 pub max_value_size: i32,
127 /// Maximum historical entries.
128 pub history: i64,
129 /// Maximum age of any entry in the bucket, expressed in nanoseconds
130 pub max_age: std::time::Duration,
131 /// How large the bucket may become in total bytes before the configured discard policy kicks in
132 pub max_bytes: i64,
133 /// The type of storage backend, `File` (default) and `Memory`
134 pub storage: StorageType,
135 /// How many replicas to keep for each entry in a cluster.
136 pub num_replicas: usize,
137 /// Republish is for republishing messages once persistent in the Key Value Bucket.
138 pub republish: Option<Republish>,
139 /// Bucket mirror configuration.
140 pub mirror: Option<Source>,
141 /// Bucket sources configuration.
142 pub sources: Option<Vec<Source>>,
143 /// Allow mirrors using direct API.
144 pub mirror_direct: bool,
145 /// Compression
146 #[cfg(feature = "server_2_10")]
147 pub compression: bool,
148 /// Cluster and tag placement for the bucket.
149 pub placement: Option<stream::Placement>,
150 /// Enables per-message TTL and delete marker TTL for a bucket.
151 #[cfg(feature = "server_2_11")]
152 pub limit_markers: Option<Duration>,
153}
154
155/// Describes what kind of operation and entry represents
156#[derive(Debug, Clone, Copy, Eq, PartialEq)]
157pub enum Operation {
158 /// A value was put into the bucket
159 Put,
160 /// A value was deleted from a bucket
161 Delete,
162 /// A value was purged from a bucket
163 Purge,
164}
165
166impl FromStr for Operation {
167 type Err = ParseOperationError;
168
169 fn from_str(s: &str) -> Result<Self, Self::Err> {
170 match s {
171 KV_OPERATION_DELETE => Ok(Operation::Delete),
172 KV_OPERATION_PURGE => Ok(Operation::Purge),
173 KV_OPERATION_PUT => Ok(Operation::Put),
174 _ => Err(ParseOperationError),
175 }
176 }
177}
178
179#[derive(Debug, Clone)]
180pub struct ParseOperationError;
181
182impl fmt::Display for ParseOperationError {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
185 }
186}
187
188impl std::error::Error for ParseOperationError {}
189
190/// A struct used as a handle for the bucket.
191#[derive(Debug, Clone)]
192pub struct Store {
193 /// The name of the Store.
194 pub name: String,
195 /// The name of the stream associated with the Store.
196 pub stream_name: String,
197 /// The prefix for keys in the Store.
198 pub prefix: String,
199 /// The optional prefix to use when putting new key-value pairs.
200 pub put_prefix: Option<String>,
201 /// Indicates whether to use the JetStream prefix.
202 pub use_jetstream_prefix: bool,
203 /// The stream associated with the Store.
204 pub stream: Stream,
205}
206
207impl Store {
208 /// Queries the server and returns status from the server.
209 ///
210 /// # Examples
211 ///
212 /// ```no_run
213 /// # #[tokio::main]
214 /// # async fn main() -> Result<(), async_nats::Error> {
215 /// let client = async_nats::connect("demo.nats.io:4222").await?;
216 /// let jetstream = async_nats::jetstream::new(client);
217 /// let kv = jetstream
218 /// .create_key_value(async_nats::jetstream::kv::Config {
219 /// bucket: "kv".to_string(),
220 /// history: 10,
221 /// ..Default::default()
222 /// })
223 /// .await?;
224 /// let status = kv.status().await?;
225 /// println!("status: {:?}", status);
226 /// # Ok(())
227 /// # }
228 /// ```
229 pub async fn status(&self) -> Result<Status, StatusError> {
230 // TODO: should we poll for fresh info here? probably yes.
231 let info = self.stream.info.clone();
232
233 Ok(Status {
234 info,
235 bucket: self.name.to_string(),
236 })
237 }
238
239 /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
240 ///
241 /// # Examples
242 ///
243 /// ```no_run
244 /// # #[tokio::main]
245 /// # async fn main() -> Result<(), async_nats::Error> {
246 /// let client = async_nats::connect("demo.nats.io:4222").await?;
247 /// let jetstream = async_nats::jetstream::new(client);
248 /// let kv = jetstream
249 /// .create_key_value(async_nats::jetstream::kv::Config {
250 /// bucket: "kv".to_string(),
251 /// history: 10,
252 /// ..Default::default()
253 /// })
254 /// .await?;
255 ///
256 /// let status = kv.create("key", "value".into()).await;
257 /// assert!(status.is_ok());
258 ///
259 /// let status = kv.create("key", "value".into()).await;
260 /// assert!(status.is_err());
261 ///
262 /// # Ok(())
263 /// # }
264 /// ```
265 pub async fn create<T: AsRef<str>>(
266 &self,
267 key: T,
268 value: bytes::Bytes,
269 ) -> Result<u64, CreateError> {
270 self.create_maybe_ttl(key, value, None).await
271 }
272
273 /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
274 /// It will set a TTL specific for that key.
275 ///
276 /// # Examples
277 ///
278 /// ```no_run
279 /// # #[tokio::main]
280 /// # async fn main() -> Result<(), async_nats::Error> {
281 /// use std::time::Duration;
282 /// let client = async_nats::connect("demo.nats.io:4222").await?;
283 /// let jetstream = async_nats::jetstream::new(client);
284 /// let kv = jetstream
285 /// .create_key_value(async_nats::jetstream::kv::Config {
286 /// bucket: "kv".to_string(),
287 /// history: 10,
288 /// ..Default::default()
289 /// })
290 /// .await?;
291 ///
292 /// let status = kv
293 /// .create_with_ttl("key", "value".into(), Duration::from_secs(10))
294 /// .await;
295 /// assert!(status.is_ok());
296 ///
297 /// # Ok(())
298 /// # }
299 /// ```
300 pub async fn create_with_ttl<T: AsRef<str>>(
301 &self,
302 key: T,
303 value: bytes::Bytes,
304 ttl: Duration,
305 ) -> Result<u64, CreateError> {
306 self.create_maybe_ttl(key, value, Some(ttl)).await
307 }
308
309 async fn create_maybe_ttl<T: AsRef<str>>(
310 &self,
311 key: T,
312 value: bytes::Bytes,
313 ttl: Option<Duration>,
314 ) -> Result<u64, CreateError> {
315 let update_err = match self
316 .update_maybe_ttl(key.as_ref(), value.clone(), 0, ttl)
317 .await
318 {
319 Ok(revision) => return Ok(revision),
320 Err(err) => err,
321 };
322
323 match self.entry(key.as_ref()).await? {
324 // Deleted or Purged key, we can create it again.
325 Some(Entry {
326 operation: Operation::Delete | Operation::Purge,
327 revision,
328 ..
329 }) => {
330 let revision = self.update(key, value, revision).await?;
331 Ok(revision)
332 }
333
334 // key already exists.
335 Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
336
337 // Something went wrong with the initial update, return that error
338 None => Err(update_err.into()),
339 }
340 }
341
342 /// Puts new key value pair into the bucket.
343 /// If key didn't exist, it is created. If it did exist, a new value with a new version is
344 /// added.
345 ///
346 /// # Examples
347 ///
348 /// ```no_run
349 /// # #[tokio::main]
350 /// # async fn main() -> Result<(), async_nats::Error> {
351 /// let client = async_nats::connect("demo.nats.io:4222").await?;
352 /// let jetstream = async_nats::jetstream::new(client);
353 /// let kv = jetstream
354 /// .create_key_value(async_nats::jetstream::kv::Config {
355 /// bucket: "kv".to_string(),
356 /// history: 10,
357 /// ..Default::default()
358 /// })
359 /// .await?;
360 /// let status = kv.put("key", "value".into()).await?;
361 /// # Ok(())
362 /// # }
363 /// ```
364 pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
365 if !is_valid_key(key.as_ref()) {
366 return Err(PutError::new(PutErrorKind::InvalidKey));
367 }
368 let mut subject = String::new();
369 if self.use_jetstream_prefix {
370 subject.push_str(&self.stream.context.prefix);
371 subject.push('.');
372 }
373 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
374 subject.push_str(key.as_ref());
375
376 let publish_ack = self
377 .stream
378 .context
379 .publish(subject, value)
380 .await
381 .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
382 let ack = publish_ack
383 .await
384 .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
385
386 Ok(ack.sequence)
387 }
388
389 async fn entry_maybe_revision<T: Into<String>>(
390 &self,
391 key: T,
392 revision: Option<u64>,
393 ) -> Result<Option<Entry>, EntryError> {
394 let key: String = key.into();
395 if !is_valid_key(key.as_ref()) {
396 return Err(EntryError::new(EntryErrorKind::InvalidKey));
397 }
398
399 let subject = format!("{}{}", self.prefix.as_str(), &key);
400
401 let result: Option<(StreamMessage, Operation)> = {
402 if self.stream.info.config.allow_direct {
403 let message = match revision {
404 Some(revision) => {
405 let message = self.stream.direct_get(revision).await;
406 if let Ok(message) = message.as_ref() {
407 if message.subject.as_str() != subject {
408 println!("subject mismatch {}", message.subject);
409 return Ok(None);
410 }
411 }
412 message
413 }
414 None => {
415 self.stream
416 .direct_get_last_for_subject(subject.as_str())
417 .await
418 }
419 };
420
421 match message {
422 Ok(message) => {
423 let operation =
424 kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);
425
426 Some((message, operation))
427 }
428 Err(err) => {
429 if err.kind() == DirectGetErrorKind::NotFound {
430 None
431 } else {
432 return Err(err.into());
433 }
434 }
435 }
436 } else {
437 let raw_message = match revision {
438 Some(revision) => {
439 let message = self.stream.get_raw_message(revision).await;
440 if let Ok(message) = message.as_ref() {
441 if message.subject.as_str() != subject {
442 return Ok(None);
443 }
444 }
445 message
446 }
447 None => {
448 self.stream
449 .get_last_raw_message_by_subject(subject.as_str())
450 .await
451 }
452 };
453 match raw_message {
454 Ok(raw_message) => {
455 let operation = kv_operation_from_stream_message(&raw_message)
456 .unwrap_or(Operation::Put);
457 // TODO: unnecessary expensive, cloning whole Message.
458 Some((raw_message, operation))
459 }
460 Err(err) => match err.kind() {
461 crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
462 crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
463 return Err(EntryError::new(EntryErrorKind::InvalidKey))
464 }
465 crate::jetstream::stream::LastRawMessageErrorKind::Other => {
466 return Err(EntryError::with_source(EntryErrorKind::Other, err))
467 }
468 crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
469 return Err(EntryError::with_source(EntryErrorKind::Other, err))
470 }
471 },
472 }
473 }
474 };
475
476 match result {
477 Some((message, operation)) => {
478 let entry = Entry {
479 bucket: self.name.clone(),
480 key,
481 value: message.payload,
482 revision: message.sequence,
483 created: message.time,
484 operation,
485 delta: 0,
486 seen_current: false,
487 };
488 Ok(Some(entry))
489 }
490 // TODO: remember to touch this when Errors are in place.
491 None => Ok(None),
492 }
493 }
494
495 /// Retrieves the last [Entry] for a given key from a bucket.
496 ///
497 /// # Examples
498 ///
499 /// ```no_run
500 /// # #[tokio::main]
501 /// # async fn main() -> Result<(), async_nats::Error> {
502 /// let client = async_nats::connect("demo.nats.io:4222").await?;
503 /// let jetstream = async_nats::jetstream::new(client);
504 /// let kv = jetstream
505 /// .create_key_value(async_nats::jetstream::kv::Config {
506 /// bucket: "kv".to_string(),
507 /// history: 10,
508 /// ..Default::default()
509 /// })
510 /// .await?;
511 /// let status = kv.put("key", "value".into()).await?;
512 /// let entry = kv.entry("key").await?;
513 /// println!("entry: {:?}", entry);
514 /// # Ok(())
515 /// # }
516 /// ```
517 pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
518 self.entry_maybe_revision(key, None).await
519 }
520
521 /// Retrieves the [Entry] for a given key revision from a bucket.
522 ///
523 /// # Examples
524 ///
525 /// ```no_run
526 /// # #[tokio::main]
527 /// # async fn main() -> Result<(), async_nats::Error> {
528 /// let client = async_nats::connect("demo.nats.io:4222").await?;
529 /// let jetstream = async_nats::jetstream::new(client);
530 /// let kv = jetstream
531 /// .create_key_value(async_nats::jetstream::kv::Config {
532 /// bucket: "kv".to_string(),
533 /// history: 10,
534 /// ..Default::default()
535 /// })
536 /// .await?;
537 /// let status = kv.put("key", "value".into()).await?;
538 /// let status = kv.put("key", "value2".into()).await?;
539 /// let entry = kv.entry_for_revision("key", 2).await?;
540 /// println!("entry: {:?}", entry);
541 /// # Ok(())
542 /// # }
543 /// ```
544 pub async fn entry_for_revision<T: Into<String>>(
545 &self,
546 key: T,
547 revision: u64,
548 ) -> Result<Option<Entry>, EntryError> {
549 self.entry_maybe_revision(key, Some(revision)).await
550 }
551
552 /// Creates a [futures_util::Stream] over [Entries][Entry] a given key in the bucket, which yields
553 /// values whenever there are changes for that key.
554 ///
555 /// # Examples
556 ///
557 /// ```no_run
558 /// # #[tokio::main]
559 /// # async fn main() -> Result<(), async_nats::Error> {
560 /// use futures_util::StreamExt;
561 /// let client = async_nats::connect("demo.nats.io:4222").await?;
562 /// let jetstream = async_nats::jetstream::new(client);
563 /// let kv = jetstream
564 /// .create_key_value(async_nats::jetstream::kv::Config {
565 /// bucket: "kv".to_string(),
566 /// history: 10,
567 /// ..Default::default()
568 /// })
569 /// .await?;
570 /// let mut entries = kv.watch("kv").await?;
571 /// while let Some(entry) = entries.next().await {
572 /// println!("entry: {:?}", entry);
573 /// }
574 /// # Ok(())
575 /// # }
576 /// ```
577 pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
578 self.watch_with_deliver_policy(key, DeliverPolicy::New)
579 .await
580 }
581
582 /// Creates a [futures_util::Stream] over [Entries][Entry] in the bucket, which yields
583 /// values whenever there are changes for given keys.
584 ///
585 /// # Examples
586 ///
587 /// ```no_run
588 /// # #[tokio::main]
589 /// # async fn main() -> Result<(), async_nats::Error> {
590 /// use futures_util::StreamExt;
591 /// let client = async_nats::connect("demo.nats.io:4222").await?;
592 /// let jetstream = async_nats::jetstream::new(client);
593 /// let kv = jetstream
594 /// .create_key_value(async_nats::jetstream::kv::Config {
595 /// bucket: "kv".to_string(),
596 /// history: 10,
597 /// ..Default::default()
598 /// })
599 /// .await?;
600 /// let mut entries = kv.watch_many(["foo", "bar"]).await?;
601 /// while let Some(entry) = entries.next().await {
602 /// println!("entry: {:?}", entry);
603 /// }
604 /// # Ok(())
605 /// # }
606 /// ```
607 #[cfg(feature = "server_2_10")]
608 pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
609 where
610 T: AsRef<str>,
611 K: IntoIterator<Item = T>,
612 {
613 self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
614 .await
615 }
616
617 /// Creates a [futures_util::Stream] over [Entries][Entry] for a given key in the bucket, starting from
618 /// provided revision. This is useful to resume watching over big KV buckets without a need to
619 /// replay all the history.
620 ///
621 /// # Examples
622 ///
623 /// ```no_run
624 /// # #[tokio::main]
625 /// # async fn main() -> Result<(), async_nats::Error> {
626 /// use futures_util::StreamExt;
627 /// let client = async_nats::connect("demo.nats.io:4222").await?;
628 /// let jetstream = async_nats::jetstream::new(client);
629 /// let kv = jetstream
630 /// .create_key_value(async_nats::jetstream::kv::Config {
631 /// bucket: "kv".to_string(),
632 /// history: 10,
633 /// ..Default::default()
634 /// })
635 /// .await?;
636 /// let mut entries = kv.watch_from_revision("kv", 5).await?;
637 /// while let Some(entry) = entries.next().await {
638 /// println!("entry: {:?}", entry);
639 /// }
640 /// # Ok(())
641 /// # }
642 /// ```
643 pub async fn watch_from_revision<T: AsRef<str>>(
644 &self,
645 key: T,
646 revision: u64,
647 ) -> Result<Watch, WatchError> {
648 self.watch_with_deliver_policy(
649 key,
650 DeliverPolicy::ByStartSequence {
651 start_sequence: revision,
652 },
653 )
654 .await
655 }
656
657 /// Creates a [futures_util::Stream] over [Entries][Entry] a given key in the bucket, which yields
658 /// values whenever there are changes for that key with as well as last value.
659 ///
660 /// # Examples
661 ///
662 /// ```no_run
663 /// # #[tokio::main]
664 /// # async fn main() -> Result<(), async_nats::Error> {
665 /// use futures_util::StreamExt;
666 /// let client = async_nats::connect("demo.nats.io:4222").await?;
667 /// let jetstream = async_nats::jetstream::new(client);
668 /// let kv = jetstream
669 /// .create_key_value(async_nats::jetstream::kv::Config {
670 /// bucket: "kv".to_string(),
671 /// history: 10,
672 /// ..Default::default()
673 /// })
674 /// .await?;
675 /// let mut entries = kv.watch_with_history("kv").await?;
676 /// while let Some(entry) = entries.next().await {
677 /// println!("entry: {:?}", entry);
678 /// }
679 /// # Ok(())
680 /// # }
681 /// ```
682 pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
683 self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
684 .await
685 }
686
687 /// Creates a [futures_util::Stream] over [Entries][Entry] a given keys in the bucket, which yields
688 /// values whenever there are changes for those keys with as well as last value.
689 /// This requires server version > 2.10 as it uses consumers with multiple subject filters.
690 ///
691 /// # Examples
692 ///
693 /// ```no_run
694 /// # #[tokio::main]
695 /// # async fn main() -> Result<(), async_nats::Error> {
696 /// use futures_util::StreamExt;
697 /// let client = async_nats::connect("demo.nats.io:4222").await?;
698 /// let jetstream = async_nats::jetstream::new(client);
699 /// let kv = jetstream
700 /// .create_key_value(async_nats::jetstream::kv::Config {
701 /// bucket: "kv".to_string(),
702 /// history: 10,
703 /// ..Default::default()
704 /// })
705 /// .await?;
706 /// let mut entries = kv.watch_many_with_history(["key1", "key2"]).await?;
707 /// while let Some(entry) = entries.next().await {
708 /// println!("entry: {:?}", entry);
709 /// }
710 /// # Ok(())
711 /// # }
712 /// ```
713 #[cfg(feature = "server_2_10")]
714 pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
715 &self,
716 keys: K,
717 ) -> Result<Watch, WatchError> {
718 self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
719 .await
720 }
721
722 #[cfg(feature = "server_2_10")]
723 async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
724 &self,
725 keys: K,
726 deliver_policy: DeliverPolicy,
727 ) -> Result<Watch, WatchError> {
728 let subjects = keys
729 .into_iter()
730 .map(|key| {
731 let key = key.as_ref();
732 format!("{}{}", self.prefix.as_str(), key)
733 })
734 .collect::<Vec<_>>();
735
736 debug!("initial consumer creation");
737 let consumer = self
738 .stream
739 .create_consumer(super::consumer::push::OrderedConfig {
740 deliver_subject: self.stream.context.client.new_inbox(),
741 description: Some("kv watch consumer".to_string()),
742 filter_subjects: subjects,
743 replay_policy: super::consumer::ReplayPolicy::Instant,
744 deliver_policy,
745 ..Default::default()
746 })
747 .await
748 .map_err(|err| match err.kind() {
749 crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
750 WatchError::new(WatchErrorKind::TimedOut)
751 }
752 _ => WatchError::with_source(WatchErrorKind::Other, err),
753 })?;
754
755 let seen_current = consumer.cached_info().num_pending == 0;
756
757 Ok(Watch {
758 subscription: consumer.messages().await.map_err(|err| match err.kind() {
759 crate::jetstream::consumer::StreamErrorKind::TimedOut => {
760 WatchError::new(WatchErrorKind::TimedOut)
761 }
762 crate::jetstream::consumer::StreamErrorKind::Other => {
763 WatchError::with_source(WatchErrorKind::Other, err)
764 }
765 })?,
766 prefix: self.prefix.clone(),
767 bucket: self.name.clone(),
768 seen_current,
769 })
770 }
771
772 async fn watch_with_deliver_policy<T: AsRef<str>>(
773 &self,
774 key: T,
775 deliver_policy: DeliverPolicy,
776 ) -> Result<Watch, WatchError> {
777 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
778
779 debug!("initial consumer creation");
780 let consumer = self
781 .stream
782 .create_consumer(super::consumer::push::OrderedConfig {
783 deliver_subject: self.stream.context.client.new_inbox(),
784 description: Some("kv watch consumer".to_string()),
785 filter_subject: subject,
786 replay_policy: super::consumer::ReplayPolicy::Instant,
787 deliver_policy,
788 ..Default::default()
789 })
790 .await
791 .map_err(|err| match err.kind() {
792 crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
793 WatchError::new(WatchErrorKind::TimedOut)
794 }
795 _ => WatchError::with_source(WatchErrorKind::Other, err),
796 })?;
797
798 let seen_current = consumer.cached_info().num_pending == 0;
799
800 Ok(Watch {
801 subscription: consumer.messages().await.map_err(|err| match err.kind() {
802 crate::jetstream::consumer::StreamErrorKind::TimedOut => {
803 WatchError::new(WatchErrorKind::TimedOut)
804 }
805 crate::jetstream::consumer::StreamErrorKind::Other => {
806 WatchError::with_source(WatchErrorKind::Other, err)
807 }
808 })?,
809 prefix: self.prefix.clone(),
810 bucket: self.name.clone(),
811 seen_current,
812 })
813 }
814
815 /// Creates a [futures_util::Stream] over [Entries][Entry] for all keys, which yields
816 /// values whenever there are changes in the bucket.
817 ///
818 /// # Examples
819 ///
820 /// ```no_run
821 /// # #[tokio::main]
822 /// # async fn main() -> Result<(), async_nats::Error> {
823 /// use futures_util::StreamExt;
824 /// let client = async_nats::connect("demo.nats.io:4222").await?;
825 /// let jetstream = async_nats::jetstream::new(client);
826 /// let kv = jetstream
827 /// .create_key_value(async_nats::jetstream::kv::Config {
828 /// bucket: "kv".to_string(),
829 /// history: 10,
830 /// ..Default::default()
831 /// })
832 /// .await?;
833 /// let mut entries = kv.watch_all().await?;
834 /// while let Some(entry) = entries.next().await {
835 /// println!("entry: {:?}", entry);
836 /// }
837 /// # Ok(())
838 /// # }
839 /// ```
840 pub async fn watch_all(&self) -> Result<Watch, WatchError> {
841 self.watch(ALL_KEYS).await
842 }
843
844 /// Creates a [futures_util::Stream] over [Entries][Entry] for all keys starting
845 /// from a provider revision. This can be useful when resuming watching over a big bucket
846 /// without the need to replay all the history.
847 ///
848 /// # Examples
849 ///
850 /// ```no_run
851 /// # #[tokio::main]
852 /// # async fn main() -> Result<(), async_nats::Error> {
853 /// use futures_util::StreamExt;
854 /// let client = async_nats::connect("demo.nats.io:4222").await?;
855 /// let jetstream = async_nats::jetstream::new(client);
856 /// let kv = jetstream
857 /// .create_key_value(async_nats::jetstream::kv::Config {
858 /// bucket: "kv".to_string(),
859 /// history: 10,
860 /// ..Default::default()
861 /// })
862 /// .await?;
863 /// let mut entries = kv.watch_all_from_revision(40).await?;
864 /// while let Some(entry) = entries.next().await {
865 /// println!("entry: {:?}", entry);
866 /// }
867 /// # Ok(())
868 /// # }
869 /// ```
870 pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
871 self.watch_from_revision(ALL_KEYS, revision).await
872 }
873
874 /// Retrieves the [Entry] for a given key from a bucket.
875 ///
876 /// # Examples
877 ///
878 /// ```no_run
879 /// # #[tokio::main]
880 /// # async fn main() -> Result<(), async_nats::Error> {
881 /// let client = async_nats::connect("demo.nats.io:4222").await?;
882 /// let jetstream = async_nats::jetstream::new(client);
883 /// let kv = jetstream
884 /// .create_key_value(async_nats::jetstream::kv::Config {
885 /// bucket: "kv".to_string(),
886 /// history: 10,
887 /// ..Default::default()
888 /// })
889 /// .await?;
890 /// let value = kv.get("key").await?;
891 /// match value {
892 /// Some(bytes) => {
893 /// let value_str = std::str::from_utf8(&bytes)?;
894 /// println!("Value: {}", value_str);
895 /// }
896 /// None => {
897 /// println!("Key not found or value not set");
898 /// }
899 /// }
900 /// # Ok(())
901 /// # }
902 /// ```
903 pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
904 match self.entry(key).await {
905 Ok(Some(entry)) => match entry.operation {
906 Operation::Put => Ok(Some(entry.value)),
907 _ => Ok(None),
908 },
909 Ok(None) => Ok(None),
910 Err(err) => Err(err),
911 }
912 }
913
914 /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
915 /// the bucket.
916 ///
917 /// # Examples
918 ///
919 /// ```no_run
920 /// # #[tokio::main]
921 /// # async fn main() -> Result<(), async_nats::Error> {
922 /// use futures_util::StreamExt;
923 /// let client = async_nats::connect("demo.nats.io:4222").await?;
924 /// let jetstream = async_nats::jetstream::new(client);
925 /// let kv = jetstream
926 /// .create_key_value(async_nats::jetstream::kv::Config {
927 /// bucket: "kv".to_string(),
928 /// history: 10,
929 /// ..Default::default()
930 /// })
931 /// .await?;
932 /// let revision = kv.put("key", "value".into()).await?;
933 /// kv.update("key", "updated".into(), revision).await?;
934 /// # Ok(())
935 /// # }
936 /// ```
937 pub async fn update<T: AsRef<str>>(
938 &self,
939 key: T,
940 value: Bytes,
941 revision: u64,
942 ) -> Result<u64, UpdateError> {
943 self.update_maybe_ttl(key, value, revision, None).await
944 }
945
946 async fn update_maybe_ttl<T: AsRef<str>>(
947 &self,
948 key: T,
949 value: Bytes,
950 revision: u64,
951 ttl: Option<Duration>,
952 ) -> Result<u64, UpdateError> {
953 if !is_valid_key(key.as_ref()) {
954 return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
955 }
956 let mut subject = String::new();
957 if self.use_jetstream_prefix {
958 subject.push_str(&self.stream.context.prefix);
959 subject.push('.');
960 }
961 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
962 subject.push_str(key.as_ref());
963
964 let mut headers = crate::HeaderMap::default();
965 headers.insert(
966 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
967 HeaderValue::from(revision),
968 );
969
970 if let Some(ttl) = ttl {
971 headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
972 }
973
974 self.stream
975 .context
976 .publish_with_headers(subject, headers, value)
977 .await?
978 .await
979 .map_err(|err| err.into())
980 .map(|publish_ack| publish_ack.sequence)
981 }
982
983 /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
984 ///
985 /// # Examples
986 ///
987 /// ```no_run
988 /// # #[tokio::main]
989 /// # async fn main() -> Result<(), async_nats::Error> {
990 /// use futures_util::StreamExt;
991 /// let client = async_nats::connect("demo.nats.io:4222").await?;
992 /// let jetstream = async_nats::jetstream::new(client);
993 /// let kv = jetstream
994 /// .create_key_value(async_nats::jetstream::kv::Config {
995 /// bucket: "kv".to_string(),
996 /// history: 10,
997 /// ..Default::default()
998 /// })
999 /// .await?;
1000 /// kv.put("key", "value".into()).await?;
1001 /// kv.delete("key").await?;
1002 /// # Ok(())
1003 /// # }
1004 /// ```
1005 pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
1006 self.delete_expect_revision(key, None).await
1007 }
1008
1009 /// Deletes a given key if the revision matches. This is a non-destructive operation, which
1010 /// sets a `DELETE` marker.
1011 ///
1012 /// # Examples
1013 ///
1014 /// ```no_run
1015 /// # #[tokio::main]
1016 /// # async fn main() -> Result<(), async_nats::Error> {
1017 /// use futures_util::StreamExt;
1018 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1019 /// let jetstream = async_nats::jetstream::new(client);
1020 /// let kv = jetstream
1021 /// .create_key_value(async_nats::jetstream::kv::Config {
1022 /// bucket: "kv".to_string(),
1023 /// history: 10,
1024 /// ..Default::default()
1025 /// })
1026 /// .await?;
1027 /// let revision = kv.put("key", "value".into()).await?;
1028 /// kv.delete_expect_revision("key", Some(revision)).await?;
1029 /// # Ok(())
1030 /// # }
1031 /// ```
1032 pub async fn delete_expect_revision<T: AsRef<str>>(
1033 &self,
1034 key: T,
1035 revison: Option<u64>,
1036 ) -> Result<(), DeleteError> {
1037 if !is_valid_key(key.as_ref()) {
1038 return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
1039 }
1040 let mut subject = String::new();
1041 if self.use_jetstream_prefix {
1042 subject.push_str(&self.stream.context.prefix);
1043 subject.push('.');
1044 }
1045 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1046 subject.push_str(key.as_ref());
1047
1048 let mut headers = crate::HeaderMap::default();
1049 // TODO: figure out which headers k/v should be where.
1050 headers.insert(
1051 KV_OPERATION,
1052 KV_OPERATION_DELETE
1053 .parse::<HeaderValue>()
1054 .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
1055 );
1056
1057 if let Some(revision) = revison {
1058 headers.insert(
1059 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1060 HeaderValue::from(revision),
1061 );
1062 }
1063
1064 self.stream
1065 .context
1066 .publish_with_headers(subject, headers, "".into())
1067 .await?
1068 .await?;
1069 Ok(())
1070 }
1071
1072 /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1073 ///
1074 /// # Examples
1075 ///
1076 /// ```no_run
1077 /// # #[tokio::main]
1078 /// # async fn main() -> Result<(), async_nats::Error> {
1079 /// use futures_util::StreamExt;
1080 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1081 /// let jetstream = async_nats::jetstream::new(client);
1082 /// let kv = jetstream
1083 /// .create_key_value(async_nats::jetstream::kv::Config {
1084 /// bucket: "kv".to_string(),
1085 /// history: 10,
1086 /// ..Default::default()
1087 /// })
1088 /// .await?;
1089 /// kv.put("key", "value".into()).await?;
1090 /// kv.put("key", "another".into()).await?;
1091 /// kv.purge("key").await?;
1092 /// # Ok(())
1093 /// # }
1094 /// ```
1095 pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
1096 self.purge_expect_revision(key, None).await
1097 }
1098
1099 /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
1100 /// The purge entry will remain valid for the given `ttl`.
1101 ///
1102 /// # Examples
1103 ///
1104 /// ```no_run
1105 /// # #[tokio::main]
1106 /// # async fn main() -> Result<(), async_nats::Error> {
1107 /// use futures_util::StreamExt;
1108 /// use std::time::Duration;
1109 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1110 /// let jetstream = async_nats::jetstream::new(client);
1111 /// let kv = jetstream
1112 /// .create_key_value(async_nats::jetstream::kv::Config {
1113 /// bucket: "kv".to_string(),
1114 /// history: 10,
1115 /// ..Default::default()
1116 /// })
1117 /// .await?;
1118 /// kv.put("key", "value".into()).await?;
1119 /// kv.put("key", "another".into()).await?;
1120 /// kv.purge_with_ttl("key", Duration::from_secs(10)).await?;
1121 /// # Ok(())
1122 /// # }
1123 /// ```
1124 pub async fn purge_with_ttl<T: AsRef<str>>(
1125 &self,
1126 key: T,
1127 ttl: Duration,
1128 ) -> Result<(), PurgeError> {
1129 self.purge_expect_revision_maybe_ttl(key, None, Some(ttl))
1130 .await
1131 }
1132
1133 /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1134 /// purge entry in-place.
1135 ///
1136 /// # Examples
1137 ///
1138 /// ```no_run
1139 /// # #[tokio::main]
1140 /// # async fn main() -> Result<(), async_nats::Error> {
1141 /// use futures_util::StreamExt;
1142 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1143 /// let jetstream = async_nats::jetstream::new(client);
1144 /// let kv = jetstream
1145 /// .create_key_value(async_nats::jetstream::kv::Config {
1146 /// bucket: "kv".to_string(),
1147 /// history: 10,
1148 /// ..Default::default()
1149 /// })
1150 /// .await?;
1151 /// kv.put("key", "value".into()).await?;
1152 /// let revision = kv.put("key", "another".into()).await?;
1153 /// kv.purge_expect_revision("key", Some(revision)).await?;
1154 /// # Ok(())
1155 /// # }
1156 /// ```
1157 pub async fn purge_expect_revision<T: AsRef<str>>(
1158 &self,
1159 key: T,
1160 revision: Option<u64>,
1161 ) -> Result<(), PurgeError> {
1162 self.purge_expect_revision_maybe_ttl(key, revision, None)
1163 .await
1164 }
1165
1166 /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1167 /// purge entry in-place. The purge entry will remain valid for the given `ttl`.
1168 ///
1169 /// # Examples
1170 ///
1171 /// ```no_run
1172 /// # #[tokio::main]
1173 /// # async fn main() -> Result<(), async_nats::Error> {
1174 /// use futures_util::StreamExt;
1175 /// use std::time::Duration;
1176 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1177 /// let jetstream = async_nats::jetstream::new(client);
1178 /// let kv = jetstream
1179 /// .create_key_value(async_nats::jetstream::kv::Config {
1180 /// bucket: "kv".to_string(),
1181 /// history: 10,
1182 /// ..Default::default()
1183 /// })
1184 /// .await?;
1185 /// kv.put("key", "value".into()).await?;
1186 /// let revision = kv.put("key", "another".into()).await?;
1187 /// kv.purge_expect_revision_with_ttl("key", revision, Duration::from_secs(10))
1188 /// .await?;
1189 /// # Ok(())
1190 /// # }
1191 /// ```
1192 pub async fn purge_expect_revision_with_ttl<T: AsRef<str>>(
1193 &self,
1194 key: T,
1195 revision: u64,
1196 ttl: Duration,
1197 ) -> Result<(), PurgeError> {
1198 self.purge_expect_revision_maybe_ttl(key, Some(revision), Some(ttl))
1199 .await
1200 }
1201
1202 async fn purge_expect_revision_maybe_ttl<T: AsRef<str>>(
1203 &self,
1204 key: T,
1205 revision: Option<u64>,
1206 ttl: Option<Duration>,
1207 ) -> Result<(), PurgeError> {
1208 if !is_valid_key(key.as_ref()) {
1209 return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
1210 }
1211
1212 let mut subject = String::new();
1213 if self.use_jetstream_prefix {
1214 subject.push_str(&self.stream.context.prefix);
1215 subject.push('.');
1216 }
1217 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1218 subject.push_str(key.as_ref());
1219
1220 let mut headers = crate::HeaderMap::default();
1221 headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
1222 headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
1223 if let Some(ttl) = ttl {
1224 headers.insert(header::NATS_MESSAGE_TTL, HeaderValue::from(ttl.as_secs()));
1225 }
1226
1227 if let Some(revision) = revision {
1228 headers.insert(
1229 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1230 HeaderValue::from(revision),
1231 );
1232 }
1233
1234 self.stream
1235 .context
1236 .publish_with_headers(subject, headers, "".into())
1237 .await?
1238 .await?;
1239 Ok(())
1240 }
1241
1242 /// Returns a [futures_util::Stream] that allows iterating over all [Operations][Operation] that
1243 /// happen for given key.
1244 ///
1245 /// # Examples
1246 ///
1247 /// ```no_run
1248 /// # #[tokio::main]
1249 /// # async fn main() -> Result<(), async_nats::Error> {
1250 /// use futures_util::StreamExt;
1251 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1252 /// let jetstream = async_nats::jetstream::new(client);
1253 /// let kv = jetstream
1254 /// .create_key_value(async_nats::jetstream::kv::Config {
1255 /// bucket: "kv".to_string(),
1256 /// history: 10,
1257 /// ..Default::default()
1258 /// })
1259 /// .await?;
1260 /// let mut entries = kv.history("kv").await?;
1261 /// while let Some(entry) = entries.next().await {
1262 /// println!("entry: {:?}", entry);
1263 /// }
1264 /// # Ok(())
1265 /// # }
1266 /// ```
1267 pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
1268 if !is_valid_key(key.as_ref()) {
1269 return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
1270 }
1271 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
1272
1273 let consumer = self
1274 .stream
1275 .create_consumer(super::consumer::push::OrderedConfig {
1276 deliver_subject: self.stream.context.client.new_inbox(),
1277 description: Some("kv history consumer".to_string()),
1278 filter_subject: subject,
1279 replay_policy: super::consumer::ReplayPolicy::Instant,
1280 ..Default::default()
1281 })
1282 .await?;
1283
1284 Ok(History {
1285 subscription: consumer.messages().await?,
1286 done: false,
1287 prefix: self.prefix.clone(),
1288 bucket: self.name.clone(),
1289 })
1290 }
1291
1292 /// Returns a [futures_util::Stream] that allows iterating over all keys in the bucket.
1293 ///
1294 /// # Examples
1295 ///
1296 /// Iterating over each each key individually
1297 ///
1298 /// ```no_run
1299 /// # #[tokio::main]
1300 /// # async fn main() -> Result<(), async_nats::Error> {
1301 /// use futures_util::{StreamExt, TryStreamExt};
1302 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1303 /// let jetstream = async_nats::jetstream::new(client);
1304 /// let kv = jetstream
1305 /// .create_key_value(async_nats::jetstream::kv::Config {
1306 /// bucket: "kv".to_string(),
1307 /// history: 10,
1308 /// ..Default::default()
1309 /// })
1310 /// .await?;
1311 /// let mut keys = kv.keys().await?.boxed();
1312 /// while let Some(key) = keys.try_next().await? {
1313 /// println!("key: {:?}", key);
1314 /// }
1315 /// # Ok(())
1316 /// # }
1317 /// ```
1318 ///
1319 /// Collecting it into a vector of keys
1320 ///
1321 /// ```no_run
1322 /// # #[tokio::main]
1323 /// # async fn main() -> Result<(), async_nats::Error> {
1324 /// use futures_util::TryStreamExt;
1325 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1326 /// let jetstream = async_nats::jetstream::new(client);
1327 /// let kv = jetstream
1328 /// .create_key_value(async_nats::jetstream::kv::Config {
1329 /// bucket: "kv".to_string(),
1330 /// history: 10,
1331 /// ..Default::default()
1332 /// })
1333 /// .await?;
1334 /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1335 /// println!("Keys: {:?}", keys);
1336 /// # Ok(())
1337 /// # }
1338 /// ```
1339 pub async fn keys(&self) -> Result<Keys, HistoryError> {
1340 let subject = format!("{}>", self.prefix.as_str());
1341
1342 let consumer = self
1343 .stream
1344 .create_consumer(super::consumer::push::OrderedConfig {
1345 deliver_subject: self.stream.context.client.new_inbox(),
1346 description: Some("kv history consumer".to_string()),
1347 filter_subject: subject,
1348 headers_only: true,
1349 replay_policy: super::consumer::ReplayPolicy::Instant,
1350 // We only need to know the latest state for each key, not the whole history
1351 deliver_policy: DeliverPolicy::LastPerSubject,
1352 ..Default::default()
1353 })
1354 .await?;
1355
1356 let entries = History {
1357 done: consumer.info.num_pending == 0,
1358 subscription: consumer.messages().await?,
1359 prefix: self.prefix.clone(),
1360 bucket: self.name.clone(),
1361 };
1362
1363 Ok(Keys { inner: entries })
1364 }
1365}
1366
1367/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1368pub struct Watch {
1369 seen_current: bool,
1370 subscription: super::consumer::push::Ordered,
1371 prefix: String,
1372 bucket: String,
1373}
1374
1375impl futures_util::Stream for Watch {
1376 type Item = Result<Entry, WatcherError>;
1377
1378 fn poll_next(
1379 mut self: std::pin::Pin<&mut Self>,
1380 cx: &mut std::task::Context<'_>,
1381 ) -> std::task::Poll<Option<Self::Item>> {
1382 match self.subscription.poll_next_unpin(cx) {
1383 Poll::Ready(message) => match message {
1384 None => Poll::Ready(None),
1385 Some(message) => {
1386 let message = message?;
1387 let info = message.info().map_err(|err| {
1388 WatcherError::with_source(
1389 WatcherErrorKind::Other,
1390 format!("failed to parse message metadata: {err}"),
1391 )
1392 })?;
1393
1394 let operation =
1395 kv_operation_from_message(&message.message).unwrap_or(Operation::Put);
1396
1397 let key = message
1398 .subject
1399 .strip_prefix(&self.prefix)
1400 .map(|s| s.to_string())
1401 .unwrap();
1402
1403 if !self.seen_current && info.pending == 0 {
1404 self.seen_current = true;
1405 }
1406
1407 Poll::Ready(Some(Ok(Entry {
1408 bucket: self.bucket.clone(),
1409 key,
1410 value: message.payload.clone(),
1411 revision: info.stream_sequence,
1412 created: info.published,
1413 delta: info.pending,
1414 operation,
1415 seen_current: self.seen_current,
1416 })))
1417 }
1418 },
1419 std::task::Poll::Pending => Poll::Pending,
1420 }
1421 }
1422
1423 fn size_hint(&self) -> (usize, Option<usize>) {
1424 (0, None)
1425 }
1426}
1427
1428/// A structure representing the history of a key-value bucket, yielding past values.
1429pub struct History {
1430 subscription: super::consumer::push::Ordered,
1431 done: bool,
1432 prefix: String,
1433 bucket: String,
1434}
1435
1436impl futures_util::Stream for History {
1437 type Item = Result<Entry, WatcherError>;
1438
1439 fn poll_next(
1440 mut self: std::pin::Pin<&mut Self>,
1441 cx: &mut std::task::Context<'_>,
1442 ) -> std::task::Poll<Option<Self::Item>> {
1443 if self.done {
1444 return Poll::Ready(None);
1445 }
1446 match self.subscription.poll_next_unpin(cx) {
1447 Poll::Ready(message) => match message {
1448 None => Poll::Ready(None),
1449 Some(message) => {
1450 let message = message?;
1451 let info = message.info().map_err(|err| {
1452 WatcherError::with_source(
1453 WatcherErrorKind::Other,
1454 format!("failed to parse message metadata: {err}"),
1455 )
1456 })?;
1457 if info.pending == 0 {
1458 self.done = true;
1459 }
1460
1461 let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1462
1463 let key = message
1464 .subject
1465 .strip_prefix(&self.prefix)
1466 .map(|s| s.to_string())
1467 .unwrap();
1468
1469 Poll::Ready(Some(Ok(Entry {
1470 bucket: self.bucket.clone(),
1471 key,
1472 value: message.payload.clone(),
1473 revision: info.stream_sequence,
1474 created: info.published,
1475 delta: info.pending,
1476 operation,
1477 seen_current: self.done,
1478 })))
1479 }
1480 },
1481 std::task::Poll::Pending => Poll::Pending,
1482 }
1483 }
1484
1485 fn size_hint(&self) -> (usize, Option<usize>) {
1486 (0, None)
1487 }
1488}
1489
1490pub struct Keys {
1491 inner: History,
1492}
1493
1494impl futures_util::Stream for Keys {
1495 type Item = Result<String, WatcherError>;
1496
1497 fn poll_next(
1498 mut self: std::pin::Pin<&mut Self>,
1499 cx: &mut std::task::Context<'_>,
1500 ) -> std::task::Poll<Option<Self::Item>> {
1501 loop {
1502 match self.inner.poll_next_unpin(cx) {
1503 Poll::Ready(None) => return Poll::Ready(None),
1504 Poll::Ready(Some(res)) => match res {
1505 Ok(entry) => {
1506 // Skip purged and deleted keys
1507 if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1508 // Try to poll again if we skip this one
1509 continue;
1510 } else {
1511 return Poll::Ready(Some(Ok(entry.key)));
1512 }
1513 }
1514 Err(e) => return Poll::Ready(Some(Err(e))),
1515 },
1516 Poll::Pending => return Poll::Pending,
1517 }
1518 }
1519 }
1520}
1521
1522/// An entry in a key-value bucket.
1523#[derive(Debug, Clone, PartialEq, Eq)]
1524pub struct Entry {
1525 /// Name of the bucket the entry is in.
1526 pub bucket: String,
1527 /// The key that was retrieved.
1528 pub key: String,
1529 /// The value that was retrieved.
1530 pub value: Bytes,
1531 /// A unique sequence for this value.
1532 pub revision: u64,
1533 /// Distance from the latest value.
1534 pub delta: u64,
1535 /// The time the data was put in the bucket.
1536 pub created: OffsetDateTime,
1537 /// The kind of operation that caused this entry.
1538 pub operation: Operation,
1539 /// Set to true after all historical messages have been received, and
1540 /// now all Entries are the new ones.
1541 pub seen_current: bool,
1542}
1543
1544#[derive(Clone, Debug, PartialEq)]
1545pub enum StatusErrorKind {
1546 JetStream(crate::jetstream::Error),
1547 TimedOut,
1548}
1549
1550impl Display for StatusErrorKind {
1551 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1552 match self {
1553 Self::JetStream(err) => write!(f, "jetstream request failed: {err}"),
1554 Self::TimedOut => write!(f, "timed out"),
1555 }
1556 }
1557}
1558
1559pub type StatusError = Error<StatusErrorKind>;
1560
1561#[derive(Clone, Copy, Debug, PartialEq)]
1562pub enum CreateErrorKind {
1563 AlreadyExists,
1564 InvalidKey,
1565 Publish,
1566 Ack,
1567 Other,
1568}
1569
1570impl From<UpdateError> for CreateError {
1571 fn from(error: UpdateError) -> Self {
1572 match error.kind() {
1573 UpdateErrorKind::InvalidKey => {
1574 CreateError::with_source(CreateErrorKind::InvalidKey, error)
1575 }
1576 UpdateErrorKind::TimedOut => CreateError::with_source(CreateErrorKind::Publish, error),
1577 UpdateErrorKind::WrongLastRevision => {
1578 CreateError::with_source(CreateErrorKind::AlreadyExists, error)
1579 }
1580 UpdateErrorKind::Other => CreateError::with_source(CreateErrorKind::Other, error),
1581 }
1582 }
1583}
1584
1585impl From<PutError> for CreateError {
1586 fn from(error: PutError) -> Self {
1587 match error.kind() {
1588 PutErrorKind::InvalidKey => {
1589 CreateError::with_source(CreateErrorKind::InvalidKey, error)
1590 }
1591 PutErrorKind::Publish => CreateError::with_source(CreateErrorKind::Publish, error),
1592 PutErrorKind::Ack => CreateError::with_source(CreateErrorKind::Ack, error),
1593 }
1594 }
1595}
1596
1597impl From<EntryError> for CreateError {
1598 fn from(error: EntryError) -> Self {
1599 match error.kind() {
1600 EntryErrorKind::InvalidKey => {
1601 CreateError::with_source(CreateErrorKind::InvalidKey, error)
1602 }
1603 EntryErrorKind::TimedOut => CreateError::with_source(CreateErrorKind::Publish, error),
1604 EntryErrorKind::Other => CreateError::with_source(CreateErrorKind::Other, error),
1605 }
1606 }
1607}
1608
1609impl Display for CreateErrorKind {
1610 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1611 match self {
1612 Self::AlreadyExists => write!(f, "key already exists"),
1613 Self::Publish => write!(f, "failed to create key in store"),
1614 Self::Ack => write!(f, "ack error"),
1615 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1616 Self::Other => write!(f, "other error"),
1617 }
1618 }
1619}
1620
1621pub type CreateError = Error<CreateErrorKind>;
1622
1623#[derive(Clone, Copy, Debug, PartialEq)]
1624pub enum PutErrorKind {
1625 InvalidKey,
1626 Publish,
1627 Ack,
1628}
1629
1630impl Display for PutErrorKind {
1631 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1632 match self {
1633 Self::Publish => write!(f, "failed to put key into store"),
1634 Self::Ack => write!(f, "ack error"),
1635 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1636 }
1637 }
1638}
1639
1640pub type PutError = Error<PutErrorKind>;
1641
1642#[derive(Clone, Copy, Debug, PartialEq)]
1643pub enum EntryErrorKind {
1644 InvalidKey,
1645 TimedOut,
1646 Other,
1647}
1648
1649impl Display for EntryErrorKind {
1650 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1651 match self {
1652 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1653 Self::TimedOut => write!(f, "timed out"),
1654 Self::Other => write!(f, "failed getting entry"),
1655 }
1656 }
1657}
1658
1659pub type EntryError = Error<EntryErrorKind>;
1660
1661crate::from_with_timeout!(
1662 EntryError,
1663 EntryErrorKind,
1664 DirectGetError,
1665 DirectGetErrorKind
1666);
1667
1668#[derive(Clone, Copy, Debug, PartialEq)]
1669pub enum WatchErrorKind {
1670 InvalidKey,
1671 TimedOut,
1672 ConsumerCreate,
1673 Other,
1674}
1675
1676impl Display for WatchErrorKind {
1677 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1678 match self {
1679 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1680 Self::Other => write!(f, "watch failed"),
1681 Self::TimedOut => write!(f, "timed out"),
1682 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1683 }
1684 }
1685}
1686
1687pub type WatchError = Error<WatchErrorKind>;
1688
1689crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1690crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1691
1692#[derive(Clone, Copy, Debug, PartialEq)]
1693pub enum UpdateErrorKind {
1694 InvalidKey,
1695 TimedOut,
1696 WrongLastRevision,
1697 Other,
1698}
1699
1700impl Display for UpdateErrorKind {
1701 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1702 match self {
1703 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1704 Self::TimedOut => write!(f, "timed out"),
1705 Self::WrongLastRevision => write!(f, "wrong last revision"),
1706 Self::Other => write!(f, "failed getting entry"),
1707 }
1708 }
1709}
1710
1711pub type UpdateError = Error<UpdateErrorKind>;
1712
1713impl From<PublishError> for UpdateError {
1714 fn from(err: PublishError) -> Self {
1715 match err.kind() {
1716 PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
1717 PublishErrorKind::WrongLastSequence => {
1718 Self::with_source(UpdateErrorKind::WrongLastRevision, err)
1719 }
1720 _ => Self::with_source(UpdateErrorKind::Other, err),
1721 }
1722 }
1723}
1724
1725#[derive(Clone, Copy, Debug, PartialEq)]
1726pub enum WatcherErrorKind {
1727 Consumer,
1728 Other,
1729}
1730
1731impl Display for WatcherErrorKind {
1732 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1733 match self {
1734 Self::Consumer => write!(f, "watcher consumer error"),
1735 Self::Other => write!(f, "watcher error"),
1736 }
1737 }
1738}
1739
1740pub type WatcherError = Error<WatcherErrorKind>;
1741
1742impl From<OrderedError> for WatcherError {
1743 fn from(err: OrderedError) -> Self {
1744 WatcherError::with_source(WatcherErrorKind::Consumer, err)
1745 }
1746}
1747
1748pub type DeleteError = UpdateError;
1749pub type DeleteErrorKind = UpdateErrorKind;
1750
1751pub type PurgeError = UpdateError;
1752pub type PurgeErrorKind = UpdateErrorKind;
1753
1754pub type HistoryError = WatchError;
1755pub type HistoryErrorKind = WatchErrorKind;