#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use pas_external::epoch::{
Cache as SvCache, CompositeEpochRevocation, EpochRevocation, EpochRevocationError, FetchError,
Fetcher, SharedCacheCache,
};
use ppoppo_infra::{Cache as InfraCache, Result as InfraResult};
use ppoppo_token::sv_cache_key;
use serde_json::Value as Json;
#[derive(Default)]
struct MemCache {
store: Mutex<HashMap<String, Json>>,
last_set_ttl: Mutex<Option<i32>>,
set_calls: Mutex<u32>,
get_calls: Mutex<u32>,
}
#[async_trait]
impl InfraCache for MemCache {
async fn get(&self, key: &str) -> InfraResult<Option<Json>> {
*self.get_calls.lock().unwrap() += 1;
Ok(self.store.lock().unwrap().get(key).cloned())
}
async fn set(
&self,
key: &str,
value: &Json,
ttl_seconds: Option<i32>,
) -> InfraResult<()> {
*self.set_calls.lock().unwrap() += 1;
*self.last_set_ttl.lock().unwrap() = ttl_seconds;
self.store
.lock()
.unwrap()
.insert(key.to_string(), value.clone());
Ok(())
}
async fn del(&self, _: &str) -> InfraResult<bool> {
unimplemented!("not exercised")
}
async fn exists(&self, _: &str) -> InfraResult<bool> {
unimplemented!("not exercised")
}
async fn ttl(&self, _: &str) -> InfraResult<Option<i32>> {
unimplemented!("not exercised")
}
async fn mset(&self, _: &[(&str, Json, Option<i32>)]) -> InfraResult<usize> {
unimplemented!("not exercised")
}
async fn mget(&self, _: &[&str]) -> InfraResult<Vec<(String, Option<Json>)>> {
unimplemented!("not exercised")
}
async fn mdel(&self, _: &[&str]) -> InfraResult<usize> {
unimplemented!("not exercised")
}
async fn keys(&self, _: &str, _: i32) -> InfraResult<Vec<String>> {
unimplemented!("not exercised")
}
}
struct CountingFetcher {
response: Mutex<Result<i64, &'static str>>,
calls: Mutex<u32>,
}
#[async_trait]
impl Fetcher for CountingFetcher {
async fn fetch(&self, _sub: &str) -> Result<i64, FetchError> {
*self.calls.lock().unwrap() += 1;
match *self.response.lock().unwrap() {
Ok(v) => Ok(v),
Err(e) => Err(FetchError::Other(e.to_string())),
}
}
}
const SUB: &str = "01HSAB00000000000000000000";
#[tokio::test]
async fn shared_cache_hit_short_circuits_fetcher() {
let mem = Arc::new(MemCache::default());
mem.store
.lock()
.unwrap()
.insert(sv_cache_key(SUB), serde_json::json!(7));
let cache: Arc<dyn SvCache> = Arc::new(SharedCacheCache::new(mem.clone() as Arc<dyn InfraCache>));
let fetcher = Arc::new(CountingFetcher {
response: Mutex::new(Ok(99)),
calls: Mutex::new(0),
});
let composer = CompositeEpochRevocation::new(cache, fetcher.clone());
assert_eq!(composer.current(SUB).await.unwrap(), 7);
assert_eq!(*fetcher.calls.lock().unwrap(), 0);
assert_eq!(*mem.get_calls.lock().unwrap(), 1);
}
#[tokio::test]
async fn shared_cache_miss_fetches_and_writes_back_through_adapter() {
let mem = Arc::new(MemCache::default());
let cache: Arc<dyn SvCache> = Arc::new(SharedCacheCache::new(mem.clone() as Arc<dyn InfraCache>));
let fetcher = Arc::new(CountingFetcher {
response: Mutex::new(Ok(42)),
calls: Mutex::new(0),
});
let composer = CompositeEpochRevocation::new(cache, fetcher.clone());
assert_eq!(composer.current(SUB).await.unwrap(), 42);
assert_eq!(*fetcher.calls.lock().unwrap(), 1);
assert_eq!(*mem.set_calls.lock().unwrap(), 1);
let stored = mem.store.lock().unwrap().get(&sv_cache_key(SUB)).cloned();
assert_eq!(stored, Some(serde_json::json!(42)));
}
#[tokio::test]
async fn shared_cache_set_ttl_clamps_subsecond_to_one_second() {
let mem = Arc::new(MemCache::default());
let cache = SharedCacheCache::new(mem.clone() as Arc<dyn InfraCache>);
cache
.set(&sv_cache_key(SUB), 1, std::time::Duration::from_millis(500))
.await;
assert_eq!(*mem.last_set_ttl.lock().unwrap(), Some(1));
}
#[tokio::test]
async fn shared_cache_fetcher_transient_collapses_to_engine_port_transient() {
let mem = Arc::new(MemCache::default());
let cache: Arc<dyn SvCache> = Arc::new(SharedCacheCache::new(mem as Arc<dyn InfraCache>));
let fetcher = Arc::new(CountingFetcher {
response: Mutex::new(Err("substrate down")),
calls: Mutex::new(0),
});
let composer = CompositeEpochRevocation::new(cache, fetcher);
let err = composer.current(SUB).await.unwrap_err();
match err {
EpochRevocationError::Transient(detail) => {
assert!(detail.contains("substrate down"), "{detail}");
}
}
}