pub struct CachedKvStore { /* private fields */ }
Expand description

A KV store that caches all link definitions and claims in memory as it receives updates from the NATS KV bucket. This store is recommended for use in situations where there are many data lookups (an example of this is Wadm).

Implementations§

source§

impl CachedKvStore

source

pub async fn new( nc: Client, lattice_prefix: &str, js_domain: Option<String> ) -> Result<Self, Box<dyn Error + Send + Sync>>

Create a new KV store with the given configuration. This function will do an initial fetch of all claims and linkdefs from the store and then start a watcher to keep the cache up to date. All data fetched from this store will be from the in memory cache

Methods from Deref<Target = Store>§

source

pub async fn status(&self) -> Result<Status, StatusError>

Queries the server and returns status from the server.

Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let status = kv.status().await?;
println!("status: {:?}", status);
source

pub async fn put<T>(&self, key: T, value: Bytes) -> Result<u64, PutError>where T: AsRef<str>,

Puts new key value pair into the bucket. If key didn’t exist, it is created. If it did exist, a new value with a new version is added.

Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let status = kv.put("key", "value".into()).await?;
source

pub async fn entry<T>(&self, key: T) -> Result<Option<Entry>, EntryError>where T: Into<String>,

Retrieves the last Entry for a given key from a bucket.

Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let status = kv.put("key", "value".into()).await?;
let entry = kv.entry("key").await?;
println!("entry: {:?}", entry);
source

pub async fn watch<T>(&self, key: T) -> Result<Watch<'_>, WatchError>where T: AsRef<str>,

Creates a [futures::Stream] over Entries a given key in the bucket, which yields values whenever there are changes for that key.

Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut entries = kv.watch("kv").await?;
while let Some(entry) = entries.next().await {
    println!("entry: {:?}", entry);
}
source

pub async fn watch_with_history<T>( &self, key: T ) -> Result<Watch<'_>, WatchError>where T: AsRef<str>,

Creates a [futures::Stream] over Entries a given key in the bucket, which yields values whenever there are changes for that key with as well as last value.

Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut entries = kv.watch_with_history("kv").await?;
while let Some(entry) = entries.next().await {
    println!("entry: {:?}", entry);
}
source

pub async fn watch_all(&self) -> Result<Watch<'_>, WatchError>

Creates a [futures::Stream] over Entries for all keys, which yields values whenever there are changes in the bucket.

Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut entries = kv.watch_all().await?;
while let Some(entry) = entries.next().await {
    println!("entry: {:?}", entry);
}
source

pub async fn get<T>(&self, key: T) -> Result<Option<Bytes>, EntryError>where T: Into<String>,

Retrieves the Entry for a given key from a bucket.

Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let value = kv.get("key").await?;
match value {
    Some(bytes) => {
        let value_str = std::str::from_utf8(&bytes)?;
        println!("Value: {}", value_str);
    }
    None => {
        println!("Key not found or value not set");
    }
}
source

pub async fn update<T>( &self, key: T, value: Bytes, revision: u64 ) -> Result<u64, UpdateError>where T: AsRef<str>,

Updates a value for a given key, but only if passed revision is the last revision in the bucket.

Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let revision = kv.put("key", "value".into()).await?;
kv.update("key", "updated".into(), revision).await?;
source

pub async fn delete<T>(&self, key: T) -> Result<(), UpdateError>where T: AsRef<str>,

Deletes a given key. This is a non-destructive operation, which sets a DELETE marker.

Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
kv.put("key", "value".into()).await?;
kv.delete("key").await?;
source

pub async fn purge<T>(&self, key: T) -> Result<(), UpdateError>where T: AsRef<str>,

Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.

Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
kv.put("key", "value".into()).await?;
kv.put("key", "another".into()).await?;
kv.purge("key").await?;
source

pub async fn history<T>(&self, key: T) -> Result<History<'_>, WatchError>where T: AsRef<str>,

Returns a [futures::Stream] that allows iterating over all Operations that happen for given key.

Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut entries = kv.history("kv").await?;
while let Some(entry) = entries.next().await {
    println!("entry: {:?}", entry);
}
source

pub async fn keys(&self) -> Result<Keys<'_>, WatchError>

Returns a [futures::Stream] that allows iterating over all keys in the bucket.

Examples

Iterating over each each key individually

use futures::{StreamExt, TryStreamExt};
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut keys = kv.keys().await?.boxed();
while let Some(key) = keys.try_next().await? {
    println!("key: {:?}", key);
}

Collecting it into a vector of keys

use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
println!("Keys: {:?}", keys);

Trait Implementations§

source§

impl AsRef<Store> for CachedKvStore

source§

fn as_ref(&self) -> &Store

Converts this type into a shared reference of the (usually inferred) input type.
source§

impl Build for CachedKvStore

source§

fn build<'life0, 'async_trait>( nc: Client, lattice_prefix: &'life0 str, js_domain: Option<String> ) -> Pin<Box<dyn Future<Output = Result<Self, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Builds a KvStore using the given NATS client and lattice prefix
source§

impl Clone for CachedKvStore

source§

fn clone(&self) -> CachedKvStore

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl KvStore for CachedKvStore

Return a copy of all link definitions in the store

source§

fn get_all_claims<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Return a copy of all claims in the store

source§

fn get_provider_claims<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Return a copy of all provider claims in the store

source§

fn get_actor_claims<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,

Return a copy of all actor claims in the store

A convenience function to get a list of link definitions filtered using the given filter function

Get a link definition for a specific ID (actor_id, contract_id, link_name)

source§

fn get_claims<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 str ) -> Pin<Box<dyn Future<Output = Result<Option<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get claims for a specific provider or actor id

Adds a link definition to the store
Deletes a link definition from the store
source§

impl Deref for CachedKvStore

§

type Target = Store

The resulting type after dereferencing.
source§

fn deref(&self) -> &Self::Target

Dereferences the value.

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FutureExt for T

§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more