omnia_wasi_keyvalue/host/
default_impl.rs1use std::sync::Arc;
6
7use anyhow::Result;
8use futures::FutureExt;
9use moka::sync::Cache;
10use omnia::Backend;
11use tracing::instrument;
12
13use crate::host::WasiKeyValueCtx;
14use crate::host::resource::{Bucket, FutureResult};
15
16type BucketCache = Cache<String, Vec<u8>>;
17
18#[derive(Debug, Clone, Default)]
19pub struct ConnectOptions;
20
21impl omnia::FromEnv for ConnectOptions {
22 fn from_env() -> Result<Self> {
23 Ok(Self)
24 }
25}
26
27#[derive(Clone)]
29pub struct KeyValueDefault {
30 store: Cache<String, BucketCache>,
31}
32
33impl std::fmt::Debug for KeyValueDefault {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("KeyValueDefault").finish_non_exhaustive()
36 }
37}
38
39impl Backend for KeyValueDefault {
40 type ConnectOptions = ConnectOptions;
41
42 #[instrument]
43 async fn connect_with(options: Self::ConnectOptions) -> Result<Self> {
44 tracing::debug!("initializing in-memory key-value store");
45 Ok(Self {
46 store: Cache::builder().build(),
47 })
48 }
49}
50
51impl WasiKeyValueCtx for KeyValueDefault {
52 fn open_bucket(&self, identifier: String) -> FutureResult<Arc<dyn Bucket>> {
53 tracing::debug!("opening bucket: {identifier}");
54
55 let cache = self.store.get_with(identifier.clone(), || Cache::builder().build());
56
57 let bucket = InMemBucket {
58 name: identifier,
59 cache,
60 };
61
62 async move { Ok(Arc::new(bucket) as Arc<dyn Bucket>) }.boxed()
63 }
64}
65
66#[derive(Clone)]
67struct InMemBucket {
68 name: String,
69 cache: BucketCache,
70}
71
72impl std::fmt::Debug for InMemBucket {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("InMemBucket").field("name", &self.name).finish_non_exhaustive()
75 }
76}
77
78impl Bucket for InMemBucket {
79 fn name(&self) -> &'static str {
80 Box::leak(self.name.clone().into_boxed_str())
83 }
84
85 fn get(&self, key: String) -> FutureResult<Option<Vec<u8>>> {
86 tracing::debug!("getting key: {key} from bucket: {}", self.name);
87 let result = self.cache.get(&key);
88 async move { Ok(result) }.boxed()
89 }
90
91 fn set(&self, key: String, value: Vec<u8>) -> FutureResult<()> {
92 tracing::debug!("setting key: {key} in bucket: {}", self.name);
93 self.cache.insert(key, value);
94 async move { Ok(()) }.boxed()
95 }
96
97 fn delete(&self, key: String) -> FutureResult<()> {
98 tracing::debug!("deleting key: {key} from bucket: {}", self.name);
99 self.cache.invalidate(&key);
100 async move { Ok(()) }.boxed()
101 }
102
103 fn exists(&self, key: String) -> FutureResult<bool> {
104 tracing::debug!("checking existence of key: {key} in bucket: {}", self.name);
105 let exists = self.cache.contains_key(&key);
106 async move { Ok(exists) }.boxed()
107 }
108
109 fn keys(&self) -> FutureResult<Vec<String>> {
110 tracing::debug!("listing keys in bucket: {}", self.name);
111 let keys = self.cache.iter().map(|(k, _)| (*k).clone()).collect();
112 async move { Ok(keys) }.boxed()
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 #[tokio::test]
121 async fn bucket_operations() {
122 let ctx = KeyValueDefault::connect_with(ConnectOptions).await.expect("connect");
123
124 let bucket = ctx.open_bucket("test-bucket".to_string()).await.expect("open bucket");
125
126 bucket.set("key1".to_string(), b"value1".to_vec()).await.expect("set");
128 let value = bucket.get("key1".to_string()).await.expect("get");
129 assert_eq!(value, Some(b"value1".to_vec()));
130
131 assert!(bucket.exists("key1".to_string()).await.expect("exists"));
133 assert!(!bucket.exists("key2".to_string()).await.expect("exists"));
134
135 bucket.set("key2".to_string(), b"value2".to_vec()).await.expect("set");
137 let mut keys = bucket.keys().await.expect("keys");
138 keys.sort();
139 assert_eq!(keys, vec!["key1".to_string(), "key2".to_string()]);
140
141 bucket.delete("key1".to_string()).await.expect("delete");
143 assert!(!bucket.exists("key1".to_string()).await.expect("exists"));
144 }
145}