Struct wasmcloud_control_interface::kv::CachedKvStore  
source · pub struct CachedKvStore { /* private fields */ }Expand description
A KV store that caches all link definitions and claims in memory as it receives updates from the NATS KV bucket. This store is recommended for use in situations where there are many data lookups (an example of this is Wadm).
Implementations§
source§impl CachedKvStore
 
impl CachedKvStore
sourcepub async fn new(
    nc: Client,
    lattice_prefix: &str,
    js_domain: Option<String>
) -> Result<Self, Box<dyn Error + Send + Sync>>
 
pub async fn new( nc: Client, lattice_prefix: &str, js_domain: Option<String> ) -> Result<Self, Box<dyn Error + Send + Sync>>
Create a new KV store with the given configuration. This function will do an initial fetch of all claims and linkdefs from the store and then start a watcher to keep the cache up to date. All data fetched from this store will be from the in memory cache
Methods from Deref<Target = Store>§
sourcepub async fn status(&self) -> Result<Status, StatusError>
 
pub async fn status(&self) -> Result<Status, StatusError>
Queries the server and returns status from the server.
Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let status = kv.status().await?;
println!("status: {:?}", status);sourcepub async fn put<T>(&self, key: T, value: Bytes) -> Result<u64, PutError>where
    T: AsRef<str>,
 
pub async fn put<T>(&self, key: T, value: Bytes) -> Result<u64, PutError>where T: AsRef<str>,
Puts new key value pair into the bucket. If key didn’t exist, it is created. If it did exist, a new value with a new version is added.
Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let status = kv.put("key", "value".into()).await?;sourcepub async fn entry<T>(&self, key: T) -> Result<Option<Entry>, EntryError>where
    T: Into<String>,
 
pub async fn entry<T>(&self, key: T) -> Result<Option<Entry>, EntryError>where T: Into<String>,
Retrieves the last Entry for a given key from a bucket.
Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let status = kv.put("key", "value".into()).await?;
let entry = kv.entry("key").await?;
println!("entry: {:?}", entry);sourcepub async fn watch<T>(&self, key: T) -> Result<Watch<'_>, WatchError>where
    T: AsRef<str>,
 
pub async fn watch<T>(&self, key: T) -> Result<Watch<'_>, WatchError>where T: AsRef<str>,
Creates a [futures::Stream] over Entries a given key in the bucket, which yields values whenever there are changes for that key.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut entries = kv.watch("kv").await?;
while let Some(entry) = entries.next().await {
    println!("entry: {:?}", entry);
}sourcepub async fn watch_with_history<T>(
    &self,
    key: T
) -> Result<Watch<'_>, WatchError>where
    T: AsRef<str>,
 
pub async fn watch_with_history<T>( &self, key: T ) -> Result<Watch<'_>, WatchError>where T: AsRef<str>,
Creates a [futures::Stream] over Entries a given key in the bucket, which yields values whenever there are changes for that key with as well as last value.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut entries = kv.watch_with_history("kv").await?;
while let Some(entry) = entries.next().await {
    println!("entry: {:?}", entry);
}sourcepub async fn watch_all(&self) -> Result<Watch<'_>, WatchError>
 
pub async fn watch_all(&self) -> Result<Watch<'_>, WatchError>
Creates a [futures::Stream] over Entries for all keys, which yields values whenever there are changes in the bucket.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut entries = kv.watch_all().await?;
while let Some(entry) = entries.next().await {
    println!("entry: {:?}", entry);
}sourcepub async fn get<T>(&self, key: T) -> Result<Option<Bytes>, EntryError>where
    T: Into<String>,
 
pub async fn get<T>(&self, key: T) -> Result<Option<Bytes>, EntryError>where T: Into<String>,
Retrieves the Entry for a given key from a bucket.
Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let value = kv.get("key").await?;
match value {
    Some(bytes) => {
        let value_str = std::str::from_utf8(&bytes)?;
        println!("Value: {}", value_str);
    }
    None => {
        println!("Key not found or value not set");
    }
}sourcepub async fn update<T>(
    &self,
    key: T,
    value: Bytes,
    revision: u64
) -> Result<u64, UpdateError>where
    T: AsRef<str>,
 
pub async fn update<T>( &self, key: T, value: Bytes, revision: u64 ) -> Result<u64, UpdateError>where T: AsRef<str>,
Updates a value for a given key, but only if passed revision is the last revision in
the bucket.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let revision = kv.put("key", "value".into()).await?;
kv.update("key", "updated".into(), revision).await?;sourcepub async fn delete<T>(&self, key: T) -> Result<(), UpdateError>where
    T: AsRef<str>,
 
pub async fn delete<T>(&self, key: T) -> Result<(), UpdateError>where T: AsRef<str>,
Deletes a given key. This is a non-destructive operation, which sets a DELETE marker.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
kv.put("key", "value".into()).await?;
kv.delete("key").await?;sourcepub async fn purge<T>(&self, key: T) -> Result<(), UpdateError>where
    T: AsRef<str>,
 
pub async fn purge<T>(&self, key: T) -> Result<(), UpdateError>where T: AsRef<str>,
Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
kv.put("key", "value".into()).await?;
kv.put("key", "another".into()).await?;
kv.purge("key").await?;sourcepub async fn history<T>(&self, key: T) -> Result<History<'_>, WatchError>where
    T: AsRef<str>,
 
pub async fn history<T>(&self, key: T) -> Result<History<'_>, WatchError>where T: AsRef<str>,
Returns a [futures::Stream] that allows iterating over all Operations that happen for given key.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut entries = kv.history("kv").await?;
while let Some(entry) = entries.next().await {
    println!("entry: {:?}", entry);
}sourcepub async fn keys(&self) -> Result<Keys<'_>, WatchError>
 
pub async fn keys(&self) -> Result<Keys<'_>, WatchError>
Returns a [futures::Stream] that allows iterating over all keys in the bucket.
Examples
Iterating over each each key individually
use futures::{StreamExt, TryStreamExt};
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let mut keys = kv.keys().await?.boxed();
while let Some(key) = keys.try_next().await? {
    println!("key: {:?}", key);
}Collecting it into a vector of keys
use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
    .create_key_value(async_nats::jetstream::kv::Config {
        bucket: "kv".to_string(),
        history: 10,
        ..Default::default()
    })
    .await?;
let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
println!("Keys: {:?}", keys);Trait Implementations§
source§impl AsRef<Store> for CachedKvStore
 
impl AsRef<Store> for CachedKvStore
source§impl Build for CachedKvStore
 
impl Build for CachedKvStore
source§fn build<'life0, 'async_trait>(
    nc: Client,
    lattice_prefix: &'life0 str,
    js_domain: Option<String>
) -> Pin<Box<dyn Future<Output = Result<Self, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
    Self: 'async_trait,
    'life0: 'async_trait,
 
fn build<'life0, 'async_trait>( nc: Client, lattice_prefix: &'life0 str, js_domain: Option<String> ) -> Pin<Box<dyn Future<Output = Result<Self, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,
KvStore using the given NATS client and lattice prefixsource§impl Clone for CachedKvStore
 
impl Clone for CachedKvStore
source§fn clone(&self) -> CachedKvStore
 
fn clone(&self) -> CachedKvStore
1.0.0 · source§fn clone_from(&mut self, source: &Self)
 
fn clone_from(&mut self, source: &Self)
source. Read moresource§impl KvStore for CachedKvStore
 
impl KvStore for CachedKvStore
source§fn get_links<'life0, 'async_trait>(
    &'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
    Self: 'async_trait,
    'life0: 'async_trait,
 
fn get_links<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,
Return a copy of all link definitions in the store
source§fn get_all_claims<'life0, 'async_trait>(
    &'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
    Self: 'async_trait,
    'life0: 'async_trait,
 
fn get_all_claims<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,
Return a copy of all claims in the store
source§fn get_provider_claims<'life0, 'async_trait>(
    &'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
    Self: 'async_trait,
    'life0: 'async_trait,
 
fn get_provider_claims<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,
Return a copy of all provider claims in the store
source§fn get_actor_claims<'life0, 'async_trait>(
    &'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
    Self: 'async_trait,
    'life0: 'async_trait,
 
fn get_actor_claims<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,
Return a copy of all actor claims in the store
source§fn get_filtered_links<'life0, 'async_trait, F>(
    &'life0 self,
    filter_fn: F
) -> Pin<Box<dyn Future<Output = Result<Vec<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
    F: FnMut(&LinkDefinition) -> bool + Send + 'async_trait,
    Self: 'async_trait,
    'life0: 'async_trait,
 
fn get_filtered_links<'life0, 'async_trait, F>( &'life0 self, filter_fn: F ) -> Pin<Box<dyn Future<Output = Result<Vec<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where F: FnMut(&LinkDefinition) -> bool + Send + 'async_trait, Self: 'async_trait, 'life0: 'async_trait,
A convenience function to get a list of link definitions filtered using the given filter function
source§fn get_link<'life0, 'life1, 'life2, 'life3, 'async_trait>(
    &'life0 self,
    actor_id: &'life1 str,
    link_name: &'life2 str,
    contract_id: &'life3 str
) -> Pin<Box<dyn Future<Output = Result<Option<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
    Self: 'async_trait,
    'life0: 'async_trait,
    'life1: 'async_trait,
    'life2: 'async_trait,
    'life3: 'async_trait,
 
fn get_link<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, actor_id: &'life1 str, link_name: &'life2 str, contract_id: &'life3 str ) -> Pin<Box<dyn Future<Output = Result<Option<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,
Get a link definition for a specific ID (actor_id, contract_id, link_name)
source§fn get_claims<'life0, 'life1, 'async_trait>(
    &'life0 self,
    id: &'life1 str
) -> Pin<Box<dyn Future<Output = Result<Option<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
    Self: 'async_trait,
    'life0: 'async_trait,
    'life1: 'async_trait,
 
fn get_claims<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 str ) -> Pin<Box<dyn Future<Output = Result<Option<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,
Get claims for a specific provider or actor id