Struct wasmcloud_control_interface::kv::CachedKvStore
source · pub struct CachedKvStore { /* private fields */ }Implementations§
source§impl CachedKvStore
impl CachedKvStore
sourcepub async fn new(
nc: Client,
lattice_prefix: &str,
js_domain: Option<String>
) -> Result<Self, Box<dyn Error + Send + Sync>>
pub async fn new( nc: Client, lattice_prefix: &str, js_domain: Option<String> ) -> Result<Self, Box<dyn Error + Send + Sync>>
Create a new KV store with the given configuration. This function will do an initial fetch of all claims and linkdefs from the store and then start a watcher to keep the cache up to date. All data fetched from this store will be from the in memory cache
Methods from Deref<Target = Store>§
sourcepub async fn status(&self) -> impl Future<Output = Result<Status, StatusError>>
pub async fn status(&self) -> impl Future<Output = Result<Status, StatusError>>
Queries the server and returns status from the server.
Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let status = kv.status().await?;
println!("status: {:?}", status);sourcepub async fn put<T>(
&self,
key: T,
value: Bytes
) -> impl Future<Output = Result<u64, PutError>>where
T: AsRef<str>,
pub async fn put<T>( &self, key: T, value: Bytes ) -> impl Future<Output = Result<u64, PutError>>where T: AsRef<str>,
Puts new key value pair into the bucket. If key didn’t exist, it is created. If it did exist, a new value with a new version is added.
Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let status = kv.put("key", "value".into()).await?;sourcepub async fn entry<T>(
&self,
key: T
) -> impl Future<Output = Result<Option<Entry>, EntryError>>where
T: Into<String>,
pub async fn entry<T>( &self, key: T ) -> impl Future<Output = Result<Option<Entry>, EntryError>>where T: Into<String>,
Retrieves the last Entry for a given key from a bucket.
Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let status = kv.put("key", "value".into()).await?;
let entry = kv.entry("key").await?;
println!("entry: {:?}", entry);sourcepub async fn watch<T>(
&self,
key: T
) -> impl Future<Output = Result<Watch<'_>, WatchError>>where
T: AsRef<str>,
pub async fn watch<T>( &self, key: T ) -> impl Future<Output = Result<Watch<'_>, WatchError>>where T: AsRef<str>,
Creates a [futures::Stream] over Entries a given key in the bucket, which yields values whenever there are changes for that key.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.watch("kv").await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}sourcepub async fn watch_with_history<T>(
&self,
key: T
) -> impl Future<Output = Result<Watch<'_>, WatchError>>where
T: AsRef<str>,
pub async fn watch_with_history<T>( &self, key: T ) -> impl Future<Output = Result<Watch<'_>, WatchError>>where T: AsRef<str>,
Creates a [futures::Stream] over Entries a given key in the bucket, which yields values whenever there are changes for that key with as well as last value.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.watch_with_history("kv").await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}sourcepub async fn watch_all(
&self
) -> impl Future<Output = Result<Watch<'_>, WatchError>>
pub async fn watch_all( &self ) -> impl Future<Output = Result<Watch<'_>, WatchError>>
Creates a [futures::Stream] over Entries for all keys, which yields values whenever there are changes in the bucket.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.watch_all().await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}sourcepub async fn get<T>(
&self,
key: T
) -> impl Future<Output = Result<Option<Bytes>, EntryError>>where
T: Into<String>,
pub async fn get<T>( &self, key: T ) -> impl Future<Output = Result<Option<Bytes>, EntryError>>where T: Into<String>,
Retrieves the Entry for a given key from a bucket.
Examples
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let value = kv.get("key").await?;
match value {
Some(bytes) => {
let value_str = std::str::from_utf8(&bytes)?;
println!("Value: {}", value_str);
}
None => {
println!("Key not found or value not set");
}
}sourcepub async fn update<T>(
&self,
key: T,
value: Bytes,
revision: u64
) -> impl Future<Output = Result<u64, UpdateError>>where
T: AsRef<str>,
pub async fn update<T>( &self, key: T, value: Bytes, revision: u64 ) -> impl Future<Output = Result<u64, UpdateError>>where T: AsRef<str>,
Updates a value for a given key, but only if passed revision is the last revision in
the bucket.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let revision = kv.put("key", "value".into()).await?;
kv.update("key", "updated".into(), revision).await?;sourcepub async fn delete<T>(
&self,
key: T
) -> impl Future<Output = Result<(), UpdateError>>where
T: AsRef<str>,
pub async fn delete<T>( &self, key: T ) -> impl Future<Output = Result<(), UpdateError>>where T: AsRef<str>,
Deletes a given key. This is a non-destructive operation, which sets a DELETE marker.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
kv.put("key", "value".into()).await?;
kv.delete("key").await?;sourcepub async fn purge<T>(
&self,
key: T
) -> impl Future<Output = Result<(), UpdateError>>where
T: AsRef<str>,
pub async fn purge<T>( &self, key: T ) -> impl Future<Output = Result<(), UpdateError>>where T: AsRef<str>,
Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
kv.put("key", "value".into()).await?;
kv.put("key", "another".into()).await?;
kv.purge("key").await?;sourcepub async fn history<T>(
&self,
key: T
) -> impl Future<Output = Result<History<'_>, WatchError>>where
T: AsRef<str>,
pub async fn history<T>( &self, key: T ) -> impl Future<Output = Result<History<'_>, WatchError>>where T: AsRef<str>,
Returns a [futures::Stream] that allows iterating over all Operations that happen for given key.
Examples
use futures::StreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut entries = kv.history("kv").await?;
while let Some(entry) = entries.next().await {
println!("entry: {:?}", entry);
}sourcepub async fn keys(&self) -> impl Future<Output = Result<Keys<'_>, WatchError>>
pub async fn keys(&self) -> impl Future<Output = Result<Keys<'_>, WatchError>>
Returns a [futures::Stream] that allows iterating over all keys in the bucket.
Examples
Iterating over each each key individually
use futures::{StreamExt, TryStreamExt};
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let mut keys = kv.keys().await?.boxed();
while let Some(key) = keys.try_next().await? {
println!("key: {:?}", key);
}Collecting it into a vector of keys
use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "kv".to_string(),
history: 10,
..Default::default()
})
.await?;
let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
println!("Keys: {:?}", keys);Trait Implementations§
source§impl AsRef<Store> for CachedKvStore
impl AsRef<Store> for CachedKvStore
source§impl Clone for CachedKvStore
impl Clone for CachedKvStore
source§fn clone(&self) -> CachedKvStore
fn clone(&self) -> CachedKvStore
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moresource§impl Debug for CachedKvStore
impl Debug for CachedKvStore
source§impl Deref for CachedKvStore
impl Deref for CachedKvStore
source§impl Drop for CachedKvStore
impl Drop for CachedKvStore
source§impl KvStore for CachedKvStore
impl KvStore for CachedKvStore
source§fn get_links<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_links<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,
Return a copy of all link definitions in the store
source§fn get_all_claims<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_all_claims<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,
Return a copy of all claims in the store
source§fn get_provider_claims<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_provider_claims<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,
Return a copy of all provider claims in the store
source§fn get_actor_claims<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_actor_claims<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait,
Return a copy of all actor claims in the store
source§fn get_filtered_links<'life0, 'async_trait, F>(
&'life0 self,
filter_fn: F
) -> Pin<Box<dyn Future<Output = Result<Vec<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
F: FnMut(&LinkDefinition) -> bool + Send + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
fn get_filtered_links<'life0, 'async_trait, F>( &'life0 self, filter_fn: F ) -> Pin<Box<dyn Future<Output = Result<Vec<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where F: FnMut(&LinkDefinition) -> bool + Send + 'async_trait, Self: 'async_trait, 'life0: 'async_trait,
A convenience function to get a list of link definitions filtered using the given filter function
source§fn get_link<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
actor_id: &'life1 str,
link_name: &'life2 str,
contract_id: &'life3 str
) -> Pin<Box<dyn Future<Output = Result<Option<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
fn get_link<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, actor_id: &'life1 str, link_name: &'life2 str, contract_id: &'life3 str ) -> Pin<Box<dyn Future<Output = Result<Option<LinkDefinition>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,
Get a link definition for a specific ID (actor_id, contract_id, link_name)
source§fn get_claims<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 str
) -> Pin<Box<dyn Future<Output = Result<Option<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_claims<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 str ) -> Pin<Box<dyn Future<Output = Result<Option<HashMap<String, String>>, Box<dyn Error + Send + Sync>>> + Send + 'async_trait>>where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,
Get claims for a specific provider or actor id