use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use async_nats::jetstream::kv::{Entry, EntryError, Operation};
use async_nats::{jetstream::kv::Store, Client};
use futures::StreamExt;
use futures::TryStreamExt;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{debug, error, trace, Instrument};
use crate::LinkDefinition;
use crate::Result;
use super::{
delete_link, ld_hash, ld_hash_raw, put_link, Build, KvStore, CLAIMS_PREFIX, LINKDEF_PREFIX,
SUBJECT_KEY,
};
type ClaimsMap = HashMap<String, HashMap<String, String>>;
#[derive(Clone)]
pub struct CachedKvStore {
store: Store,
linkdefs: Arc<RwLock<HashMap<String, LinkDefinition>>>,
claims: Arc<RwLock<ClaimsMap>>,
_handle: Arc<WrappedHandle>,
}
impl AsRef<Store> for CachedKvStore {
fn as_ref(&self) -> &Store {
&self.store
}
}
impl Deref for CachedKvStore {
type Target = Store;
fn deref(&self) -> &Self::Target {
&self.store
}
}
struct WrappedHandle {
handle: JoinHandle<()>,
}
impl Drop for WrappedHandle {
fn drop(&mut self) {
self.handle.abort();
}
}
impl CachedKvStore {
pub async fn new(nc: Client, lattice_prefix: &str, js_domain: Option<String>) -> Result<Self> {
let store = super::get_kv_store(nc, lattice_prefix, js_domain).await?;
let linkdefs = Arc::new(RwLock::new(HashMap::new()));
let claims = Arc::new(RwLock::new(ClaimsMap::default()));
let linkdefs_clone = Arc::clone(&linkdefs);
let claims_clone = Arc::clone(&claims);
let cloned_store = store.clone();
let (tx, rx) = tokio::sync::oneshot::channel::<Result<()>>();
let handle = tokio::spawn(
async move {
let mut watcher = match cloned_store.watch_all().await {
Ok(w) => w,
Err(e) => {
error!(error = %e, "Unable to start watcher");
tx.send(Err(e.into())).unwrap();
return;
}
};
debug!("Getting initial data from store");
let keys = match cloned_store.keys().await {
Ok(k) => k,
Err(e) => {
error!(error = %e, "Unable to get keys from store");
tx.send(Err(e.into())).unwrap();
return;
}
};
let futs = match keys
.map_ok(|k| cloned_store.entry(k))
.try_collect::<Vec<_>>()
.await
{
Ok(f) => f,
Err(e) => {
error!(error = %e, "Unable to get keys from store");
tx.send(Err(e.into())).unwrap();
return;
}
};
let all_entries = match futures::future::join_all(futs)
.await
.into_iter()
.filter_map(|res| res.transpose())
.collect::<std::result::Result<Vec<_>, EntryError>>()
{
Ok(entries) => entries,
Err(e) => {
error!(error = %e, "Unable to get values from store");
tx.send(Err(e.into())).unwrap();
return;
}
};
debug!(num_entries = %all_entries.len(), "Finished fetching initial data, adding data to cache");
tx.send(Ok(())).unwrap();
for entry in all_entries {
handle_entry(entry, Arc::clone(&linkdefs_clone), Arc::clone(&claims_clone)).await;
}
trace!("Beginning watch on store");
while let Some(event) = watcher.next().await {
let entry = match event {
Ok(en) => en,
Err(e) => {
error!(error = %e, "Error from latticedata watcher");
continue;
}
};
trace!(key = %entry.key, bucket = %entry.bucket, operation = ?entry.operation, "Received entry from watcher, handling");
handle_entry(entry, Arc::clone(&linkdefs_clone), Arc::clone(&claims_clone)).in_current_span().await;
trace!("Finished handling entry from watcher");
}
error!("Cache watcher has exited");
}
.instrument(tracing::trace_span!("kvstore-watcher", %lattice_prefix)),
);
let kvstore = CachedKvStore {
store,
linkdefs,
claims,
_handle: Arc::new(WrappedHandle { handle }),
};
rx.await??;
Ok(kvstore)
}
}
#[async_trait::async_trait]
impl Build for CachedKvStore {
async fn build(nc: Client, lattice_prefix: &str, js_domain: Option<String>) -> Result<Self> {
CachedKvStore::new(nc, lattice_prefix, js_domain).await
}
}
#[async_trait::async_trait]
impl KvStore for CachedKvStore {
async fn get_links(&self) -> Result<Vec<LinkDefinition>> {
Ok(self.linkdefs.read().await.values().cloned().collect())
}
async fn get_all_claims(&self) -> Result<Vec<HashMap<String, String>>> {
Ok(self.claims.read().await.values().cloned().collect())
}
async fn get_provider_claims(&self) -> Result<Vec<HashMap<String, String>>> {
Ok(self
.claims
.read()
.await
.iter()
.filter_map(|(key, values)| key.starts_with('V').then_some(values))
.cloned()
.collect())
}
async fn get_actor_claims(&self) -> Result<Vec<HashMap<String, String>>> {
Ok(self
.claims
.read()
.await
.iter()
.filter_map(|(key, values)| key.starts_with('M').then_some(values))
.cloned()
.collect())
}
async fn get_filtered_links<F>(&self, mut filter_fn: F) -> Result<Vec<LinkDefinition>>
where
F: FnMut(&LinkDefinition) -> bool + Send,
{
Ok(self
.linkdefs
.read()
.await
.values()
.filter(|ld| filter_fn(ld))
.cloned()
.collect())
}
async fn get_link(
&self,
actor_id: &str,
link_name: &str,
contract_id: &str,
) -> Result<Option<LinkDefinition>> {
Ok(self
.linkdefs
.read()
.await
.get(&ld_hash_raw(actor_id, contract_id, link_name))
.cloned())
}
async fn get_claims(&self, id: &str) -> Result<Option<HashMap<String, String>>> {
Ok(self.claims.read().await.get(id).cloned())
}
async fn put_link(&self, ld: LinkDefinition) -> Result<()> {
put_link(&self.store, &ld).await?;
self.linkdefs.write().await.insert(ld_hash(&ld), ld);
Ok(())
}
async fn delete_link(&self, actor_id: &str, contract_id: &str, link_name: &str) -> Result<()> {
delete_link(&self.store, actor_id, contract_id, link_name).await?;
self.linkdefs
.write()
.await
.remove(&ld_hash_raw(actor_id, contract_id, link_name));
Ok(())
}
}
async fn handle_entry(
entry: Entry,
linkdefs: Arc<RwLock<HashMap<String, LinkDefinition>>>,
claims: Arc<RwLock<ClaimsMap>>,
) {
if entry.key.starts_with(LINKDEF_PREFIX) {
handle_linkdef(entry, linkdefs).in_current_span().await;
} else if entry.key.starts_with(CLAIMS_PREFIX) {
handle_claim(entry, claims).in_current_span().await;
} else {
debug!(key = %entry.key, "Ignoring entry with unrecognized key");
}
}
async fn handle_linkdef(entry: Entry, linkdefs: Arc<RwLock<HashMap<String, LinkDefinition>>>) {
match entry.operation {
Operation::Delete | Operation::Purge => {
trace!("Handling linkdef delete entry");
let mut linkdefs = linkdefs.write().await;
linkdefs.remove(entry.key.trim_start_matches(LINKDEF_PREFIX));
trace!(num_entries = %linkdefs.len(), "Finished handling linkdef delete entry");
}
Operation::Put => {
trace!("Handling linkdef put entry");
let ld: LinkDefinition = match serde_json::from_slice(&entry.value) {
Ok(ld) => ld,
Err(e) => {
error!(error = %e, "Unable to deserialize as link definition");
return;
}
};
let key = entry.key.trim_start_matches(LINKDEF_PREFIX).to_owned();
let mut lds = linkdefs.write().await;
match lds.insert(key, ld) {
Some(_) => {
trace!("Updated linkdef with new information");
}
None => {
trace!("Added new linkdef");
}
}
trace!(num_entries = %lds.len(), "Finished handling linkdef put entry");
}
}
}
async fn handle_claim(entry: Entry, claims: Arc<RwLock<ClaimsMap>>) {
match entry.operation {
Operation::Delete | Operation::Purge => {
trace!("Handling claim delete entry");
let mut claims = claims.write().await;
claims.remove(entry.key.trim_start_matches(CLAIMS_PREFIX));
trace!(num_entries = %claims.len(), "Finished handling claim delete entry");
}
Operation::Put => {
trace!("Handling claim put entry");
let json: HashMap<String, String> = match serde_json::from_slice(&entry.value) {
Ok(j) => j,
Err(e) => {
error!(error = %e, "Unable to deserialize claim as json");
return;
}
};
let sub = match json.get(SUBJECT_KEY) {
Some(s) => s.to_owned(),
None => {
debug!("Ignoring claim without sub");
return;
}
};
let mut c = claims.write().await;
match c.insert(sub, json) {
Some(_) => {
trace!("Updated claim with new information");
}
None => {
trace!("Added new claim");
}
}
trace!(num_entries = %c.len(), "Finished handling claim put entry");
}
}
}