wrpc_wasi_keyvalue_mem/
lib.rs

1use core::num::ParseIntError;
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use bytes::Bytes;
7use tokio::sync::RwLock;
8use tracing::{instrument, trace};
9use wrpc_transport::{ResourceBorrow, ResourceOwn};
10use wrpc_wasi_keyvalue::exports::wasi::keyvalue::{atomics, batch, store};
11
12pub type Result<T, E = store::Error> = core::result::Result<T, E>;
13
14pub type Bucket = Arc<RwLock<HashMap<String, Bytes>>>;
15
16#[derive(Clone, Debug, Default)]
17pub struct Handler(pub Arc<RwLock<HashMap<Bytes, Bucket>>>);
18
19impl Handler {
20    async fn bucket(&self, bucket: impl AsRef<[u8]>) -> Result<Bucket> {
21        trace!("looking up bucket");
22        let store = self.0.read().await;
23        store
24            .get(bucket.as_ref())
25            .ok_or(store::Error::NoSuchStore)
26            .cloned()
27    }
28}
29
30impl<C: Send + Sync> store::Handler<C> for Handler {
31    // NOTE: Resource handle returned is just the `identifier` itself
32    #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
33    async fn open(
34        &self,
35        _cx: C,
36        identifier: String,
37    ) -> anyhow::Result<Result<ResourceOwn<store::Bucket>>> {
38        let identifier = Bytes::from(identifier);
39        {
40            // first, optimistically try read-only lock
41            let store = self.0.read().await;
42            if store.contains_key(&identifier) {
43                return Ok(Ok(ResourceOwn::from(identifier)));
44            }
45        }
46        let mut store = self.0.write().await;
47        store.entry(identifier.clone()).or_default();
48        Ok(Ok(ResourceOwn::from(identifier)))
49    }
50}
51
52impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
53    #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
54    async fn get(
55        &self,
56        _cx: C,
57        bucket: ResourceBorrow<store::Bucket>,
58        key: String,
59    ) -> anyhow::Result<Result<Option<Bytes>>> {
60        let bucket = self.bucket(bucket).await?;
61        let bucket = bucket.read().await;
62        Ok(Ok(bucket.get(&key).cloned()))
63    }
64
65    #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
66    async fn set(
67        &self,
68        _cx: C,
69        bucket: ResourceBorrow<store::Bucket>,
70        key: String,
71        value: Bytes,
72    ) -> anyhow::Result<Result<()>> {
73        let bucket = self.bucket(bucket).await?;
74        let mut bucket = bucket.write().await;
75        bucket.insert(key, value);
76        Ok(Ok(()))
77    }
78
79    #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
80    async fn delete(
81        &self,
82        _cx: C,
83        bucket: ResourceBorrow<store::Bucket>,
84        key: String,
85    ) -> anyhow::Result<Result<()>> {
86        let bucket = self.bucket(bucket).await?;
87        let mut bucket = bucket.write().await;
88        bucket.remove(&key);
89        Ok(Ok(()))
90    }
91
92    #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
93    async fn exists(
94        &self,
95        _cx: C,
96        bucket: ResourceBorrow<store::Bucket>,
97        key: String,
98    ) -> anyhow::Result<Result<bool>> {
99        let bucket = self.bucket(bucket).await?;
100        let bucket = bucket.read().await;
101        Ok(Ok(bucket.contains_key(&key)))
102    }
103
104    #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
105    async fn list_keys(
106        &self,
107        _cx: C,
108        bucket: ResourceBorrow<store::Bucket>,
109        cursor: Option<String>,
110    ) -> anyhow::Result<Result<store::KeyResponse>> {
111        let bucket = self.bucket(bucket).await?;
112        let bucket = bucket.read().await;
113        let bucket = bucket.keys();
114        let keys = if let Some(cursor) = cursor {
115            let cursor = cursor
116                .parse()
117                .map_err(|err: ParseIntError| store::Error::Other(err.to_string()))?;
118            bucket.skip(cursor).cloned().collect()
119        } else {
120            bucket.cloned().collect()
121        };
122        Ok(Ok(store::KeyResponse { keys, cursor: None }))
123    }
124}
125
126impl<C: Send + Sync> atomics::Handler<C> for Handler {
127    async fn increment(
128        &self,
129        _cx: C,
130        _bucket: ResourceBorrow<store::Bucket>,
131        _key: String,
132        _delta: i64,
133    ) -> anyhow::Result<Result<i64>> {
134        Ok(Err(store::Error::Other("not supported".to_string())))
135    }
136
137    async fn swap(
138        &self,
139        _cx: C,
140        _cas: ResourceOwn<atomics::Cas>,
141        _value: Bytes,
142    ) -> anyhow::Result<Result<(), atomics::CasError>> {
143        Ok(Err(atomics::CasError::StoreError(store::Error::Other(
144            "not supported".to_string(),
145        ))))
146    }
147}
148
149impl<C: Send + Sync> atomics::HandlerCas<C> for Handler {
150    async fn new(
151        &self,
152        _cx: C,
153        _bucket: ResourceBorrow<store::Bucket>,
154        _key: String,
155    ) -> anyhow::Result<Result<ResourceOwn<atomics::Cas>>> {
156        Ok(Err(store::Error::Other("not supported".to_string())))
157    }
158
159    async fn current(
160        &self,
161        _cx: C,
162        _bucket: ResourceBorrow<atomics::Cas>,
163    ) -> anyhow::Result<Result<Option<Bytes>>> {
164        Ok(Err(store::Error::Other("not supported".to_string())))
165    }
166}
167
168impl<C: Send + Sync> batch::Handler<C> for Handler {
169    async fn get_many(
170        &self,
171        _cx: C,
172        _bucket: ResourceBorrow<store::Bucket>,
173        _keys: Vec<String>,
174    ) -> anyhow::Result<Result<Vec<Option<(String, Bytes)>>>> {
175        Ok(Err(store::Error::Other("not supported".to_string())))
176    }
177
178    async fn set_many(
179        &self,
180        _cx: C,
181        _bucket: ResourceBorrow<store::Bucket>,
182        _key_values: Vec<(String, Bytes)>,
183    ) -> anyhow::Result<Result<()>> {
184        Ok(Err(store::Error::Other("not supported".to_string())))
185    }
186
187    async fn delete_many(
188        &self,
189        _cx: C,
190        _bucket: ResourceBorrow<store::Bucket>,
191        _keys: Vec<String>,
192    ) -> anyhow::Result<Result<()>> {
193        Ok(Err(store::Error::Other("not supported".to_string())))
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[allow(unused)]
202    async fn bound(s: &impl wrpc_transport::Serve) -> anyhow::Result<()> {
203        wrpc_wasi_keyvalue::serve(s, Handler::default()).await?;
204        Ok(())
205    }
206}