async_nats_wrpc/jetstream/kv/mod.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19 fmt::{self, Display},
20 str::FromStr,
21 task::Poll,
22};
23
24use crate::{HeaderValue, StatusCode};
25use bytes::Bytes;
26use futures::StreamExt;
27use once_cell::sync::Lazy;
28use regex::Regex;
29use time::{format_description::well_known::Rfc3339, OffsetDateTime};
30use tracing::debug;
31
32use crate::error::Error;
33use crate::{header, Message};
34
35use self::bucket::Status;
36
37use super::{
38 consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
39 context::{PublishError, PublishErrorKind},
40 stream::{
41 self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, RawMessage,
42 Republish, Source, StorageType, Stream,
43 },
44};
45
46fn kv_operation_from_stream_message(message: &RawMessage) -> Operation {
47 match message.headers.as_deref() {
48 Some(headers) => headers.parse().unwrap_or(Operation::Put),
49 None => Operation::Put,
50 }
51}
52
53fn kv_operation_from_message(message: &Message) -> Result<Operation, EntryError> {
54 let headers = message
55 .headers
56 .as_ref()
57 .ok_or_else(|| EntryError::with_source(EntryErrorKind::Other, "missing headers"))?;
58
59 if let Some(op) = headers.get(KV_OPERATION) {
60 Operation::from_str(op.as_str())
61 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
62 } else {
63 Err(EntryError::with_source(
64 EntryErrorKind::Other,
65 "missing operation",
66 ))
67 }
68}
69
70static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
71static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
72
73pub(crate) const MAX_HISTORY: i64 = 64;
74const ALL_KEYS: &str = ">";
75
76const KV_OPERATION: &str = "KV-Operation";
77const KV_OPERATION_DELETE: &str = "DEL";
78const KV_OPERATION_PURGE: &str = "PURGE";
79const KV_OPERATION_PUT: &str = "PUT";
80
81const NATS_ROLLUP: &str = "Nats-Rollup";
82const ROLLUP_SUBJECT: &str = "sub";
83
84pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
85 VALID_BUCKET_RE.is_match(bucket_name)
86}
87
88pub(crate) fn is_valid_key(key: &str) -> bool {
89 if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
90 return false;
91 }
92
93 VALID_KEY_RE.is_match(key)
94}
95
96/// Configuration values for key value stores.
97#[derive(Debug, Default)]
98pub struct Config {
99 /// Name of the bucket
100 pub bucket: String,
101 /// Human readable description.
102 pub description: String,
103 /// Maximum size of a single value.
104 pub max_value_size: i32,
105 /// Maximum historical entries.
106 pub history: i64,
107 /// Maximum age of any entry in the bucket, expressed in nanoseconds
108 pub max_age: std::time::Duration,
109 /// How large the bucket may become in total bytes before the configured discard policy kicks in
110 pub max_bytes: i64,
111 /// The type of storage backend, `File` (default) and `Memory`
112 pub storage: StorageType,
113 /// How many replicas to keep for each entry in a cluster.
114 pub num_replicas: usize,
115 /// Republish is for republishing messages once persistent in the Key Value Bucket.
116 pub republish: Option<Republish>,
117 /// Bucket mirror configuration.
118 pub mirror: Option<Source>,
119 /// Bucket sources configuration.
120 pub sources: Option<Vec<Source>>,
121 /// Allow mirrors using direct API.
122 pub mirror_direct: bool,
123 /// Compression
124 #[cfg(feature = "server_2_10")]
125 pub compression: bool,
126 /// Cluster and tag placement for the bucket.
127 pub placement: Option<stream::Placement>,
128}
129
130/// Describes what kind of operation and entry represents
131#[derive(Debug, Clone, Copy, Eq, PartialEq)]
132pub enum Operation {
133 /// A value was put into the bucket
134 Put,
135 /// A value was deleted from a bucket
136 Delete,
137 /// A value was purged from a bucket
138 Purge,
139}
140
141impl FromStr for Operation {
142 type Err = ParseOperationError;
143
144 fn from_str(s: &str) -> Result<Self, Self::Err> {
145 match s {
146 KV_OPERATION_DELETE => Ok(Operation::Delete),
147 KV_OPERATION_PURGE => Ok(Operation::Purge),
148 KV_OPERATION_PUT => Ok(Operation::Put),
149 _ => Err(ParseOperationError),
150 }
151 }
152}
153
154#[derive(Debug, Clone)]
155pub struct ParseOperationError;
156
157impl fmt::Display for ParseOperationError {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
160 }
161}
162
163impl std::error::Error for ParseOperationError {}
164
165/// A struct used as a handle for the bucket.
166#[derive(Debug, Clone)]
167pub struct Store {
168 /// The name of the Store.
169 pub name: String,
170 /// The name of the stream associated with the Store.
171 pub stream_name: String,
172 /// The prefix for keys in the Store.
173 pub prefix: String,
174 /// The optional prefix to use when putting new key-value pairs.
175 pub put_prefix: Option<String>,
176 /// Indicates whether to use the JetStream prefix.
177 pub use_jetstream_prefix: bool,
178 /// The stream associated with the Store.
179 pub stream: Stream,
180}
181
182impl Store {
183 /// Queries the server and returns status from the server.
184 ///
185 /// # Examples
186 ///
187 /// ```no_run
188 /// # #[tokio::main]
189 /// # async fn main() -> Result<(), async_nats::Error> {
190 /// let client = async_nats::connect("demo.nats.io:4222").await?;
191 /// let jetstream = async_nats::jetstream::new(client);
192 /// let kv = jetstream
193 /// .create_key_value(async_nats::jetstream::kv::Config {
194 /// bucket: "kv".to_string(),
195 /// history: 10,
196 /// ..Default::default()
197 /// })
198 /// .await?;
199 /// let status = kv.status().await?;
200 /// println!("status: {:?}", status);
201 /// # Ok(())
202 /// # }
203 /// ```
204 pub async fn status(&self) -> Result<Status, StatusError> {
205 // TODO: should we poll for fresh info here? probably yes.
206 let info = self.stream.info.clone();
207
208 Ok(Status {
209 info,
210 bucket: self.name.to_string(),
211 })
212 }
213
214 /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
215 ///
216 /// # Examples
217 ///
218 /// ```no_run
219 /// # #[tokio::main]
220 /// # async fn main() -> Result<(), async_nats::Error> {
221 /// let client = async_nats::connect("demo.nats.io:4222").await?;
222 /// let jetstream = async_nats::jetstream::new(client);
223 /// let kv = jetstream
224 /// .create_key_value(async_nats::jetstream::kv::Config {
225 /// bucket: "kv".to_string(),
226 /// history: 10,
227 /// ..Default::default()
228 /// })
229 /// .await?;
230 ///
231 /// let status = kv.create("key", "value".into()).await;
232 /// assert!(status.is_ok());
233 ///
234 /// let status = kv.create("key", "value".into()).await;
235 /// assert!(status.is_err());
236 ///
237 /// # Ok(())
238 /// # }
239 /// ```
240 pub async fn create<T: AsRef<str>>(
241 &self,
242 key: T,
243 value: bytes::Bytes,
244 ) -> Result<u64, CreateError> {
245 let update_err = match self.update(key.as_ref(), value.clone(), 0).await {
246 Ok(revision) => return Ok(revision),
247 Err(err) => err,
248 };
249
250 match self.entry(key.as_ref()).await? {
251 // Deleted or Purged key, we can create it again.
252 Some(Entry {
253 operation: Operation::Delete | Operation::Purge,
254 ..
255 }) => {
256 let revision = self.put(key, value).await?;
257 Ok(revision)
258 }
259
260 // key already exists.
261 Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
262
263 // Something went wrong with the initial update, return that error
264 None => Err(update_err.into()),
265 }
266 }
267
268 /// Puts new key value pair into the bucket.
269 /// If key didn't exist, it is created. If it did exist, a new value with a new version is
270 /// added.
271 ///
272 /// # Examples
273 ///
274 /// ```no_run
275 /// # #[tokio::main]
276 /// # async fn main() -> Result<(), async_nats::Error> {
277 /// let client = async_nats::connect("demo.nats.io:4222").await?;
278 /// let jetstream = async_nats::jetstream::new(client);
279 /// let kv = jetstream
280 /// .create_key_value(async_nats::jetstream::kv::Config {
281 /// bucket: "kv".to_string(),
282 /// history: 10,
283 /// ..Default::default()
284 /// })
285 /// .await?;
286 /// let status = kv.put("key", "value".into()).await?;
287 /// # Ok(())
288 /// # }
289 /// ```
290 pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
291 if !is_valid_key(key.as_ref()) {
292 return Err(PutError::new(PutErrorKind::InvalidKey));
293 }
294 let mut subject = String::new();
295 if self.use_jetstream_prefix {
296 subject.push_str(&self.stream.context.prefix);
297 subject.push('.');
298 }
299 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
300 subject.push_str(key.as_ref());
301
302 let publish_ack = self
303 .stream
304 .context
305 .publish(subject, value)
306 .await
307 .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
308 let ack = publish_ack
309 .await
310 .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
311
312 Ok(ack.sequence)
313 }
314
315 /// Retrieves the last [Entry] for a given key from a bucket.
316 ///
317 /// # Examples
318 ///
319 /// ```no_run
320 /// # #[tokio::main]
321 /// # async fn main() -> Result<(), async_nats::Error> {
322 /// let client = async_nats::connect("demo.nats.io:4222").await?;
323 /// let jetstream = async_nats::jetstream::new(client);
324 /// let kv = jetstream
325 /// .create_key_value(async_nats::jetstream::kv::Config {
326 /// bucket: "kv".to_string(),
327 /// history: 10,
328 /// ..Default::default()
329 /// })
330 /// .await?;
331 /// let status = kv.put("key", "value".into()).await?;
332 /// let entry = kv.entry("key").await?;
333 /// println!("entry: {:?}", entry);
334 /// # Ok(())
335 /// # }
336 /// ```
337 pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
338 let key: String = key.into();
339 if !is_valid_key(key.as_ref()) {
340 return Err(EntryError::new(EntryErrorKind::InvalidKey));
341 }
342
343 let subject = format!("{}{}", self.prefix.as_str(), &key);
344
345 let result: Option<(Message, Operation, u64, OffsetDateTime)> = {
346 if self.stream.info.config.allow_direct {
347 let message = self
348 .stream
349 .direct_get_last_for_subject(subject.as_str())
350 .await;
351
352 match message {
353 Ok(message) => {
354 let headers = message.headers.as_ref().ok_or_else(|| {
355 EntryError::with_source(EntryErrorKind::Other, "missing headers")
356 })?;
357
358 let operation =
359 kv_operation_from_message(&message).unwrap_or(Operation::Put);
360
361 let sequence = headers
362 .get_last(header::NATS_SEQUENCE)
363 .ok_or_else(|| {
364 EntryError::with_source(
365 EntryErrorKind::Other,
366 "missing sequence headers",
367 )
368 })?
369 .as_str()
370 .parse()
371 .map_err(|err| {
372 EntryError::with_source(
373 EntryErrorKind::Other,
374 format!("failed to parse headers sequence value: {}", err),
375 )
376 })?;
377
378 let created = headers
379 .get_last(header::NATS_TIME_STAMP)
380 .ok_or_else(|| {
381 EntryError::with_source(
382 EntryErrorKind::Other,
383 "did not found timestamp header",
384 )
385 })
386 .and_then(|created| {
387 OffsetDateTime::parse(created.as_str(), &Rfc3339).map_err(|err| {
388 EntryError::with_source(
389 EntryErrorKind::Other,
390 format!(
391 "failed to parse headers timestampt value: {}",
392 err
393 ),
394 )
395 })
396 })?;
397
398 Some((message.message, operation, sequence, created))
399 }
400 Err(err) => {
401 if err.kind() == DirectGetErrorKind::NotFound {
402 None
403 } else {
404 return Err(err.into());
405 }
406 }
407 }
408 } else {
409 let raw_message = self
410 .stream
411 .get_last_raw_message_by_subject(subject.as_str())
412 .await;
413 match raw_message {
414 Ok(raw_message) => {
415 let operation = kv_operation_from_stream_message(&raw_message);
416 // TODO: unnecessary expensive, cloning whole Message.
417 let nats_message = Message::try_from(raw_message.clone())
418 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))?;
419 Some((
420 nats_message,
421 operation,
422 raw_message.sequence,
423 raw_message.time,
424 ))
425 }
426 Err(err) => match err.kind() {
427 crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
428 crate::jetstream::stream::LastRawMessageErrorKind::Other => {
429 return Err(EntryError::with_source(EntryErrorKind::Other, err))
430 }
431 crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
432 return Err(EntryError::with_source(EntryErrorKind::Other, err))
433 }
434 },
435 }
436 }
437 };
438
439 match result {
440 Some((message, operation, revision, created)) => {
441 if message.status == Some(StatusCode::NO_RESPONDERS) {
442 return Ok(None);
443 }
444
445 let entry = Entry {
446 bucket: self.name.clone(),
447 key,
448 value: message.payload,
449 revision,
450 created,
451 operation,
452 delta: 0,
453 };
454 Ok(Some(entry))
455 }
456 // TODO: remember to touch this when Errors are in place.
457 None => Ok(None),
458 }
459 }
460
461 /// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, which yields
462 /// values whenever there are changes for that key.
463 ///
464 /// # Examples
465 ///
466 /// ```no_run
467 /// # #[tokio::main]
468 /// # async fn main() -> Result<(), async_nats::Error> {
469 /// use futures::StreamExt;
470 /// let client = async_nats::connect("demo.nats.io:4222").await?;
471 /// let jetstream = async_nats::jetstream::new(client);
472 /// let kv = jetstream
473 /// .create_key_value(async_nats::jetstream::kv::Config {
474 /// bucket: "kv".to_string(),
475 /// history: 10,
476 /// ..Default::default()
477 /// })
478 /// .await?;
479 /// let mut entries = kv.watch("kv").await?;
480 /// while let Some(entry) = entries.next().await {
481 /// println!("entry: {:?}", entry);
482 /// }
483 /// # Ok(())
484 /// # }
485 /// ```
486 pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
487 self.watch_with_deliver_policy(key, DeliverPolicy::New)
488 .await
489 }
490
491 /// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, starting from
492 /// provided revision. This is useful to resume watching over big KV buckets without a need to
493 /// replay all the history.
494 ///
495 /// # Examples
496 ///
497 /// ```no_run
498 /// # #[tokio::main]
499 /// # async fn main() -> Result<(), async_nats::Error> {
500 /// use futures::StreamExt;
501 /// let client = async_nats::connect("demo.nats.io:4222").await?;
502 /// let jetstream = async_nats::jetstream::new(client);
503 /// let kv = jetstream
504 /// .create_key_value(async_nats::jetstream::kv::Config {
505 /// bucket: "kv".to_string(),
506 /// history: 10,
507 /// ..Default::default()
508 /// })
509 /// .await?;
510 /// let mut entries = kv.watch_from_revision("kv", 5).await?;
511 /// while let Some(entry) = entries.next().await {
512 /// println!("entry: {:?}", entry);
513 /// }
514 /// # Ok(())
515 /// # }
516 /// ```
517 pub async fn watch_from_revision<T: AsRef<str>>(
518 &self,
519 key: T,
520 revision: u64,
521 ) -> Result<Watch, WatchError> {
522 self.watch_with_deliver_policy(
523 key,
524 DeliverPolicy::ByStartSequence {
525 start_sequence: revision,
526 },
527 )
528 .await
529 }
530
531 /// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, which yields
532 /// values whenever there are changes for that key with as well as last value.
533 ///
534 /// # Examples
535 ///
536 /// ```no_run
537 /// # #[tokio::main]
538 /// # async fn main() -> Result<(), async_nats::Error> {
539 /// use futures::StreamExt;
540 /// let client = async_nats::connect("demo.nats.io:4222").await?;
541 /// let jetstream = async_nats::jetstream::new(client);
542 /// let kv = jetstream
543 /// .create_key_value(async_nats::jetstream::kv::Config {
544 /// bucket: "kv".to_string(),
545 /// history: 10,
546 /// ..Default::default()
547 /// })
548 /// .await?;
549 /// let mut entries = kv.watch_with_history("kv").await?;
550 /// while let Some(entry) = entries.next().await {
551 /// println!("entry: {:?}", entry);
552 /// }
553 /// # Ok(())
554 /// # }
555 /// ```
556 pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
557 self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
558 .await
559 }
560
561 async fn watch_with_deliver_policy<T: AsRef<str>>(
562 &self,
563 key: T,
564 deliver_policy: DeliverPolicy,
565 ) -> Result<Watch, WatchError> {
566 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
567
568 debug!("initial consumer creation");
569 let consumer = self
570 .stream
571 .create_consumer(super::consumer::push::OrderedConfig {
572 deliver_subject: self.stream.context.client.new_inbox(),
573 description: Some("kv watch consumer".to_string()),
574 filter_subject: subject,
575 replay_policy: super::consumer::ReplayPolicy::Instant,
576 deliver_policy,
577 ..Default::default()
578 })
579 .await
580 .map_err(|err| match err.kind() {
581 crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
582 WatchError::new(WatchErrorKind::TimedOut)
583 }
584 _ => WatchError::with_source(WatchErrorKind::Other, err),
585 })?;
586
587 Ok(Watch {
588 subscription: consumer.messages().await.map_err(|err| match err.kind() {
589 crate::jetstream::consumer::StreamErrorKind::TimedOut => {
590 WatchError::new(WatchErrorKind::TimedOut)
591 }
592 crate::jetstream::consumer::StreamErrorKind::Other => {
593 WatchError::with_source(WatchErrorKind::Other, err)
594 }
595 })?,
596 prefix: self.prefix.clone(),
597 bucket: self.name.clone(),
598 })
599 }
600
601 /// Creates a [futures::Stream] over [Entries][Entry] for all keys, which yields
602 /// values whenever there are changes in the bucket.
603 ///
604 /// # Examples
605 ///
606 /// ```no_run
607 /// # #[tokio::main]
608 /// # async fn main() -> Result<(), async_nats::Error> {
609 /// use futures::StreamExt;
610 /// let client = async_nats::connect("demo.nats.io:4222").await?;
611 /// let jetstream = async_nats::jetstream::new(client);
612 /// let kv = jetstream
613 /// .create_key_value(async_nats::jetstream::kv::Config {
614 /// bucket: "kv".to_string(),
615 /// history: 10,
616 /// ..Default::default()
617 /// })
618 /// .await?;
619 /// let mut entries = kv.watch_all().await?;
620 /// while let Some(entry) = entries.next().await {
621 /// println!("entry: {:?}", entry);
622 /// }
623 /// # Ok(())
624 /// # }
625 /// ```
626 pub async fn watch_all(&self) -> Result<Watch, WatchError> {
627 self.watch(ALL_KEYS).await
628 }
629
630 /// Creates a [futures::Stream] over [Entries][Entry] for all keys starting
631 /// from a provider revision. This can be useful when resuming watching over a big bucket
632 /// without the need to replay all the history.
633 ///
634 /// # Examples
635 ///
636 /// ```no_run
637 /// # #[tokio::main]
638 /// # async fn main() -> Result<(), async_nats::Error> {
639 /// use futures::StreamExt;
640 /// let client = async_nats::connect("demo.nats.io:4222").await?;
641 /// let jetstream = async_nats::jetstream::new(client);
642 /// let kv = jetstream
643 /// .create_key_value(async_nats::jetstream::kv::Config {
644 /// bucket: "kv".to_string(),
645 /// history: 10,
646 /// ..Default::default()
647 /// })
648 /// .await?;
649 /// let mut entries = kv.watch_all_from_revision(40).await?;
650 /// while let Some(entry) = entries.next().await {
651 /// println!("entry: {:?}", entry);
652 /// }
653 /// # Ok(())
654 /// # }
655 /// ```
656 pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
657 self.watch_from_revision(ALL_KEYS, revision).await
658 }
659
660 /// Retrieves the [Entry] for a given key from a bucket.
661 ///
662 /// # Examples
663 ///
664 /// ```no_run
665 /// # #[tokio::main]
666 /// # async fn main() -> Result<(), async_nats::Error> {
667 /// let client = async_nats::connect("demo.nats.io:4222").await?;
668 /// let jetstream = async_nats::jetstream::new(client);
669 /// let kv = jetstream
670 /// .create_key_value(async_nats::jetstream::kv::Config {
671 /// bucket: "kv".to_string(),
672 /// history: 10,
673 /// ..Default::default()
674 /// })
675 /// .await?;
676 /// let value = kv.get("key").await?;
677 /// match value {
678 /// Some(bytes) => {
679 /// let value_str = std::str::from_utf8(&bytes)?;
680 /// println!("Value: {}", value_str);
681 /// }
682 /// None => {
683 /// println!("Key not found or value not set");
684 /// }
685 /// }
686 /// # Ok(())
687 /// # }
688 /// ```
689 pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
690 match self.entry(key).await {
691 Ok(Some(entry)) => match entry.operation {
692 Operation::Put => Ok(Some(entry.value)),
693 _ => Ok(None),
694 },
695 Ok(None) => Ok(None),
696 Err(err) => Err(err),
697 }
698 }
699
700 /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
701 /// the bucket.
702 ///
703 /// # Examples
704 ///
705 /// ```no_run
706 /// # #[tokio::main]
707 /// # async fn main() -> Result<(), async_nats::Error> {
708 /// use futures::StreamExt;
709 /// let client = async_nats::connect("demo.nats.io:4222").await?;
710 /// let jetstream = async_nats::jetstream::new(client);
711 /// let kv = jetstream
712 /// .create_key_value(async_nats::jetstream::kv::Config {
713 /// bucket: "kv".to_string(),
714 /// history: 10,
715 /// ..Default::default()
716 /// })
717 /// .await?;
718 /// let revision = kv.put("key", "value".into()).await?;
719 /// kv.update("key", "updated".into(), revision).await?;
720 /// # Ok(())
721 /// # }
722 /// ```
723 pub async fn update<T: AsRef<str>>(
724 &self,
725 key: T,
726 value: Bytes,
727 revision: u64,
728 ) -> Result<u64, UpdateError> {
729 if !is_valid_key(key.as_ref()) {
730 return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
731 }
732 let mut subject = String::new();
733 if self.use_jetstream_prefix {
734 subject.push_str(&self.stream.context.prefix);
735 subject.push('.');
736 }
737 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
738 subject.push_str(key.as_ref());
739
740 let mut headers = crate::HeaderMap::default();
741 headers.insert(
742 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
743 HeaderValue::from(revision),
744 );
745
746 self.stream
747 .context
748 .publish_with_headers(subject, headers, value)
749 .await?
750 .await
751 .map_err(|err| err.into())
752 .map(|publish_ack| publish_ack.sequence)
753 }
754
755 /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
756 ///
757 /// # Examples
758 ///
759 /// ```no_run
760 /// # #[tokio::main]
761 /// # async fn main() -> Result<(), async_nats::Error> {
762 /// use futures::StreamExt;
763 /// let client = async_nats::connect("demo.nats.io:4222").await?;
764 /// let jetstream = async_nats::jetstream::new(client);
765 /// let kv = jetstream
766 /// .create_key_value(async_nats::jetstream::kv::Config {
767 /// bucket: "kv".to_string(),
768 /// history: 10,
769 /// ..Default::default()
770 /// })
771 /// .await?;
772 /// kv.put("key", "value".into()).await?;
773 /// kv.delete("key").await?;
774 /// # Ok(())
775 /// # }
776 /// ```
777 pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
778 self.delete_expect_revision(key, None).await
779 }
780
781 /// Deletes a given key if the revision matches. This is a non-destructive operation, which
782 /// sets a `DELETE` marker.
783 ///
784 /// # Examples
785 ///
786 /// ```no_run
787 /// # #[tokio::main]
788 /// # async fn main() -> Result<(), async_nats::Error> {
789 /// use futures::StreamExt;
790 /// let client = async_nats::connect("demo.nats.io:4222").await?;
791 /// let jetstream = async_nats::jetstream::new(client);
792 /// let kv = jetstream
793 /// .create_key_value(async_nats::jetstream::kv::Config {
794 /// bucket: "kv".to_string(),
795 /// history: 10,
796 /// ..Default::default()
797 /// })
798 /// .await?;
799 /// let revision = kv.put("key", "value".into()).await?;
800 /// kv.delete_expect_revision("key", Some(revision)).await?;
801 /// # Ok(())
802 /// # }
803 /// ```
804 pub async fn delete_expect_revision<T: AsRef<str>>(
805 &self,
806 key: T,
807 revison: Option<u64>,
808 ) -> Result<(), DeleteError> {
809 if !is_valid_key(key.as_ref()) {
810 return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
811 }
812 let mut subject = String::new();
813 if self.use_jetstream_prefix {
814 subject.push_str(&self.stream.context.prefix);
815 subject.push('.');
816 }
817 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
818 subject.push_str(key.as_ref());
819
820 let mut headers = crate::HeaderMap::default();
821 // TODO: figure out which headers k/v should be where.
822 headers.insert(
823 KV_OPERATION,
824 KV_OPERATION_DELETE
825 .parse::<HeaderValue>()
826 .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
827 );
828
829 if let Some(revision) = revison {
830 headers.insert(
831 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
832 HeaderValue::from(revision),
833 );
834 }
835
836 self.stream
837 .context
838 .publish_with_headers(subject, headers, "".into())
839 .await?
840 .await?;
841 Ok(())
842 }
843
844 /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
845 ///
846 /// # Examples
847 ///
848 /// ```no_run
849 /// # #[tokio::main]
850 /// # async fn main() -> Result<(), async_nats::Error> {
851 /// use futures::StreamExt;
852 /// let client = async_nats::connect("demo.nats.io:4222").await?;
853 /// let jetstream = async_nats::jetstream::new(client);
854 /// let kv = jetstream
855 /// .create_key_value(async_nats::jetstream::kv::Config {
856 /// bucket: "kv".to_string(),
857 /// history: 10,
858 /// ..Default::default()
859 /// })
860 /// .await?;
861 /// kv.put("key", "value".into()).await?;
862 /// kv.put("key", "another".into()).await?;
863 /// kv.purge("key").await?;
864 /// # Ok(())
865 /// # }
866 /// ```
867 pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
868 self.purge_expect_revision(key, None).await
869 }
870
871 /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
872 /// purge entry in-place.
873 ///
874 /// # Examples
875 ///
876 /// ```no_run
877 /// # #[tokio::main]
878 /// # async fn main() -> Result<(), async_nats::Error> {
879 /// use futures::StreamExt;
880 /// let client = async_nats::connect("demo.nats.io:4222").await?;
881 /// let jetstream = async_nats::jetstream::new(client);
882 /// let kv = jetstream
883 /// .create_key_value(async_nats::jetstream::kv::Config {
884 /// bucket: "kv".to_string(),
885 /// history: 10,
886 /// ..Default::default()
887 /// })
888 /// .await?;
889 /// kv.put("key", "value".into()).await?;
890 /// let revision = kv.put("key", "another".into()).await?;
891 /// kv.purge_expect_revision("key", Some(revision)).await?;
892 /// # Ok(())
893 /// # }
894 /// ```
895 pub async fn purge_expect_revision<T: AsRef<str>>(
896 &self,
897 key: T,
898 revison: Option<u64>,
899 ) -> Result<(), PurgeError> {
900 if !is_valid_key(key.as_ref()) {
901 return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
902 }
903
904 let mut subject = String::new();
905 if self.use_jetstream_prefix {
906 subject.push_str(&self.stream.context.prefix);
907 subject.push('.');
908 }
909 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
910 subject.push_str(key.as_ref());
911
912 let mut headers = crate::HeaderMap::default();
913 headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
914 headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
915
916 if let Some(revision) = revison {
917 headers.insert(
918 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
919 HeaderValue::from(revision),
920 );
921 }
922
923 self.stream
924 .context
925 .publish_with_headers(subject, headers, "".into())
926 .await?
927 .await?;
928 Ok(())
929 }
930
931 /// Returns a [futures::Stream] that allows iterating over all [Operations][Operation] that
932 /// happen for given key.
933 ///
934 /// # Examples
935 ///
936 /// ```no_run
937 /// # #[tokio::main]
938 /// # async fn main() -> Result<(), async_nats::Error> {
939 /// use futures::StreamExt;
940 /// let client = async_nats::connect("demo.nats.io:4222").await?;
941 /// let jetstream = async_nats::jetstream::new(client);
942 /// let kv = jetstream
943 /// .create_key_value(async_nats::jetstream::kv::Config {
944 /// bucket: "kv".to_string(),
945 /// history: 10,
946 /// ..Default::default()
947 /// })
948 /// .await?;
949 /// let mut entries = kv.history("kv").await?;
950 /// while let Some(entry) = entries.next().await {
951 /// println!("entry: {:?}", entry);
952 /// }
953 /// # Ok(())
954 /// # }
955 /// ```
956 pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
957 if !is_valid_key(key.as_ref()) {
958 return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
959 }
960 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
961
962 let consumer = self
963 .stream
964 .create_consumer(super::consumer::push::OrderedConfig {
965 deliver_subject: self.stream.context.client.new_inbox(),
966 description: Some("kv history consumer".to_string()),
967 filter_subject: subject,
968 replay_policy: super::consumer::ReplayPolicy::Instant,
969 ..Default::default()
970 })
971 .await?;
972
973 Ok(History {
974 subscription: consumer.messages().await?,
975 done: false,
976 prefix: self.prefix.clone(),
977 bucket: self.name.clone(),
978 })
979 }
980
981 /// Returns a [futures::Stream] that allows iterating over all keys in the bucket.
982 ///
983 /// # Examples
984 ///
985 /// Iterating over each each key individually
986 ///
987 /// ```no_run
988 /// # #[tokio::main]
989 /// # async fn main() -> Result<(), async_nats::Error> {
990 /// use futures::{StreamExt, TryStreamExt};
991 /// let client = async_nats::connect("demo.nats.io:4222").await?;
992 /// let jetstream = async_nats::jetstream::new(client);
993 /// let kv = jetstream
994 /// .create_key_value(async_nats::jetstream::kv::Config {
995 /// bucket: "kv".to_string(),
996 /// history: 10,
997 /// ..Default::default()
998 /// })
999 /// .await?;
1000 /// let mut keys = kv.keys().await?.boxed();
1001 /// while let Some(key) = keys.try_next().await? {
1002 /// println!("key: {:?}", key);
1003 /// }
1004 /// # Ok(())
1005 /// # }
1006 /// ```
1007 ///
1008 /// Collecting it into a vector of keys
1009 ///
1010 /// ```no_run
1011 /// # #[tokio::main]
1012 /// # async fn main() -> Result<(), async_nats::Error> {
1013 /// use futures::TryStreamExt;
1014 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1015 /// let jetstream = async_nats::jetstream::new(client);
1016 /// let kv = jetstream
1017 /// .create_key_value(async_nats::jetstream::kv::Config {
1018 /// bucket: "kv".to_string(),
1019 /// history: 10,
1020 /// ..Default::default()
1021 /// })
1022 /// .await?;
1023 /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1024 /// println!("Keys: {:?}", keys);
1025 /// # Ok(())
1026 /// # }
1027 /// ```
1028 pub async fn keys(&self) -> Result<Keys, HistoryError> {
1029 let subject = format!("{}>", self.prefix.as_str());
1030
1031 let consumer = self
1032 .stream
1033 .create_consumer(super::consumer::push::OrderedConfig {
1034 deliver_subject: self.stream.context.client.new_inbox(),
1035 description: Some("kv history consumer".to_string()),
1036 filter_subject: subject,
1037 headers_only: true,
1038 replay_policy: super::consumer::ReplayPolicy::Instant,
1039 // We only need to know the latest state for each key, not the whole history
1040 deliver_policy: DeliverPolicy::LastPerSubject,
1041 ..Default::default()
1042 })
1043 .await?;
1044
1045 let entries = History {
1046 done: consumer.info.num_pending == 0,
1047 subscription: consumer.messages().await?,
1048 prefix: self.prefix.clone(),
1049 bucket: self.name.clone(),
1050 };
1051
1052 Ok(Keys { inner: entries })
1053 }
1054}
1055
1056/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1057pub struct Watch {
1058 subscription: super::consumer::push::Ordered,
1059 prefix: String,
1060 bucket: String,
1061}
1062
1063impl futures::Stream for Watch {
1064 type Item = Result<Entry, WatcherError>;
1065
1066 fn poll_next(
1067 mut self: std::pin::Pin<&mut Self>,
1068 cx: &mut std::task::Context<'_>,
1069 ) -> std::task::Poll<Option<Self::Item>> {
1070 match self.subscription.poll_next_unpin(cx) {
1071 Poll::Ready(message) => match message {
1072 None => Poll::Ready(None),
1073 Some(message) => {
1074 let message = message?;
1075 let info = message.info().map_err(|err| {
1076 WatcherError::with_source(
1077 WatcherErrorKind::Other,
1078 format!("failed to parse message metadata: {}", err),
1079 )
1080 })?;
1081
1082 let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1083
1084 let key = message
1085 .subject
1086 .strip_prefix(&self.prefix)
1087 .map(|s| s.to_string())
1088 .unwrap();
1089
1090 Poll::Ready(Some(Ok(Entry {
1091 bucket: self.bucket.clone(),
1092 key,
1093 value: message.payload.clone(),
1094 revision: info.stream_sequence,
1095 created: info.published,
1096 delta: info.pending,
1097 operation,
1098 })))
1099 }
1100 },
1101 std::task::Poll::Pending => Poll::Pending,
1102 }
1103 }
1104
1105 fn size_hint(&self) -> (usize, Option<usize>) {
1106 (0, None)
1107 }
1108}
1109
1110/// A structure representing the history of a key-value bucket, yielding past values.
1111pub struct History {
1112 subscription: super::consumer::push::Ordered,
1113 done: bool,
1114 prefix: String,
1115 bucket: String,
1116}
1117
1118impl futures::Stream for History {
1119 type Item = Result<Entry, WatcherError>;
1120
1121 fn poll_next(
1122 mut self: std::pin::Pin<&mut Self>,
1123 cx: &mut std::task::Context<'_>,
1124 ) -> std::task::Poll<Option<Self::Item>> {
1125 if self.done {
1126 return Poll::Ready(None);
1127 }
1128 match self.subscription.poll_next_unpin(cx) {
1129 Poll::Ready(message) => match message {
1130 None => Poll::Ready(None),
1131 Some(message) => {
1132 let message = message?;
1133 let info = message.info().map_err(|err| {
1134 WatcherError::with_source(
1135 WatcherErrorKind::Other,
1136 format!("failed to parse message metadata: {}", err),
1137 )
1138 })?;
1139 if info.pending == 0 {
1140 self.done = true;
1141 }
1142
1143 let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1144
1145 let key = message
1146 .subject
1147 .strip_prefix(&self.prefix)
1148 .map(|s| s.to_string())
1149 .unwrap();
1150
1151 Poll::Ready(Some(Ok(Entry {
1152 bucket: self.bucket.clone(),
1153 key,
1154 value: message.payload.clone(),
1155 revision: info.stream_sequence,
1156 created: info.published,
1157 delta: info.pending,
1158 operation,
1159 })))
1160 }
1161 },
1162 std::task::Poll::Pending => Poll::Pending,
1163 }
1164 }
1165
1166 fn size_hint(&self) -> (usize, Option<usize>) {
1167 (0, None)
1168 }
1169}
1170
1171pub struct Keys {
1172 inner: History,
1173}
1174
1175impl futures::Stream for Keys {
1176 type Item = Result<String, WatcherError>;
1177
1178 fn poll_next(
1179 mut self: std::pin::Pin<&mut Self>,
1180 cx: &mut std::task::Context<'_>,
1181 ) -> std::task::Poll<Option<Self::Item>> {
1182 loop {
1183 match self.inner.poll_next_unpin(cx) {
1184 Poll::Ready(None) => return Poll::Ready(None),
1185 Poll::Ready(Some(res)) => match res {
1186 Ok(entry) => {
1187 // Skip purged and deleted keys
1188 if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1189 // Try to poll again if we skip this one
1190 continue;
1191 } else {
1192 return Poll::Ready(Some(Ok(entry.key)));
1193 }
1194 }
1195 Err(e) => return Poll::Ready(Some(Err(e))),
1196 },
1197 Poll::Pending => return Poll::Pending,
1198 }
1199 }
1200 }
1201}
1202
1203/// An entry in a key-value bucket.
1204#[derive(Debug, Clone)]
1205pub struct Entry {
1206 /// Name of the bucket the entry is in.
1207 pub bucket: String,
1208 /// The key that was retrieved.
1209 pub key: String,
1210 /// The value that was retrieved.
1211 pub value: Bytes,
1212 /// A unique sequence for this value.
1213 pub revision: u64,
1214 /// Distance from the latest value.
1215 pub delta: u64,
1216 /// The time the data was put in the bucket.
1217 pub created: OffsetDateTime,
1218 /// The kind of operation that caused this entry.
1219 pub operation: Operation,
1220}
1221
1222#[derive(Clone, Debug, PartialEq)]
1223pub enum StatusErrorKind {
1224 JetStream(crate::jetstream::Error),
1225 TimedOut,
1226}
1227
1228impl Display for StatusErrorKind {
1229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1230 match self {
1231 Self::JetStream(err) => write!(f, "jetstream request failed: {}", err),
1232 Self::TimedOut => write!(f, "timed out"),
1233 }
1234 }
1235}
1236
1237pub type StatusError = Error<StatusErrorKind>;
1238
1239#[derive(Clone, Copy, Debug, PartialEq)]
1240pub enum CreateErrorKind {
1241 AlreadyExists,
1242 InvalidKey,
1243 Publish,
1244 Ack,
1245 Other,
1246}
1247
1248impl From<UpdateError> for CreateError {
1249 fn from(error: UpdateError) -> Self {
1250 match error.kind() {
1251 UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1252 UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1253 UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
1254 }
1255 }
1256}
1257
1258impl From<PutError> for CreateError {
1259 fn from(error: PutError) -> Self {
1260 match error.kind() {
1261 PutErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1262 PutErrorKind::Publish => Error::from(CreateErrorKind::Publish),
1263 PutErrorKind::Ack => Error::from(CreateErrorKind::Ack),
1264 }
1265 }
1266}
1267
1268impl From<EntryError> for CreateError {
1269 fn from(error: EntryError) -> Self {
1270 match error.kind() {
1271 EntryErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1272 EntryErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1273 EntryErrorKind::Other => Error::from(CreateErrorKind::Other),
1274 }
1275 }
1276}
1277
1278impl Display for CreateErrorKind {
1279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1280 match self {
1281 Self::AlreadyExists => write!(f, "key already exists"),
1282 Self::Publish => write!(f, "failed to create key in store"),
1283 Self::Ack => write!(f, "ack error"),
1284 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1285 Self::Other => write!(f, "other error"),
1286 }
1287 }
1288}
1289
1290pub type CreateError = Error<CreateErrorKind>;
1291
1292#[derive(Clone, Copy, Debug, PartialEq)]
1293pub enum PutErrorKind {
1294 InvalidKey,
1295 Publish,
1296 Ack,
1297}
1298
1299impl Display for PutErrorKind {
1300 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1301 match self {
1302 Self::Publish => write!(f, "failed to put key into store"),
1303 Self::Ack => write!(f, "ack error"),
1304 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1305 }
1306 }
1307}
1308
1309pub type PutError = Error<PutErrorKind>;
1310
1311#[derive(Clone, Copy, Debug, PartialEq)]
1312pub enum EntryErrorKind {
1313 InvalidKey,
1314 TimedOut,
1315 Other,
1316}
1317
1318impl Display for EntryErrorKind {
1319 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1320 match self {
1321 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1322 Self::TimedOut => write!(f, "timed out"),
1323 Self::Other => write!(f, "failed getting entry"),
1324 }
1325 }
1326}
1327
1328pub type EntryError = Error<EntryErrorKind>;
1329
1330crate::from_with_timeout!(
1331 EntryError,
1332 EntryErrorKind,
1333 DirectGetError,
1334 DirectGetErrorKind
1335);
1336
1337#[derive(Clone, Copy, Debug, PartialEq)]
1338pub enum WatchErrorKind {
1339 InvalidKey,
1340 TimedOut,
1341 ConsumerCreate,
1342 Other,
1343}
1344
1345impl Display for WatchErrorKind {
1346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1347 match self {
1348 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1349 Self::Other => write!(f, "watch failed"),
1350 Self::TimedOut => write!(f, "timed out"),
1351 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1352 }
1353 }
1354}
1355
1356pub type WatchError = Error<WatchErrorKind>;
1357
1358crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1359crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1360
1361#[derive(Clone, Copy, Debug, PartialEq)]
1362pub enum UpdateErrorKind {
1363 InvalidKey,
1364 TimedOut,
1365 Other,
1366}
1367
1368impl Display for UpdateErrorKind {
1369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1370 match self {
1371 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1372 Self::TimedOut => write!(f, "timed out"),
1373 Self::Other => write!(f, "failed getting entry"),
1374 }
1375 }
1376}
1377
1378pub type UpdateError = Error<UpdateErrorKind>;
1379
1380crate::from_with_timeout!(UpdateError, UpdateErrorKind, PublishError, PublishErrorKind);
1381
1382#[derive(Clone, Copy, Debug, PartialEq)]
1383pub enum WatcherErrorKind {
1384 Consumer,
1385 Other,
1386}
1387
1388impl Display for WatcherErrorKind {
1389 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1390 match self {
1391 Self::Consumer => write!(f, "watcher consumer error"),
1392 Self::Other => write!(f, "watcher error"),
1393 }
1394 }
1395}
1396
1397pub type WatcherError = Error<WatcherErrorKind>;
1398
1399impl From<OrderedError> for WatcherError {
1400 fn from(err: OrderedError) -> Self {
1401 WatcherError::with_source(WatcherErrorKind::Consumer, err)
1402 }
1403}
1404
1405pub type DeleteError = UpdateError;
1406pub type DeleteErrorKind = UpdateErrorKind;
1407
1408pub type PurgeError = UpdateError;
1409pub type PurgeErrorKind = UpdateErrorKind;
1410
1411pub type HistoryError = WatchError;
1412pub type HistoryErrorKind = WatchErrorKind;