wrpc_wasi_keyvalue_mem/
lib.rs1use core::num::ParseIntError;
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use bytes::Bytes;
7use tokio::sync::RwLock;
8use tracing::{instrument, trace};
9use wrpc_transport::{ResourceBorrow, ResourceOwn};
10use wrpc_wasi_keyvalue::exports::wasi::keyvalue::{atomics, batch, store};
11
12pub type Result<T, E = store::Error> = core::result::Result<T, E>;
13
14pub type Bucket = Arc<RwLock<HashMap<String, Bytes>>>;
15
16#[derive(Clone, Debug, Default)]
17pub struct Handler(pub Arc<RwLock<HashMap<Bytes, Bucket>>>);
18
19impl Handler {
20 async fn bucket(&self, bucket: impl AsRef<[u8]>) -> Result<Bucket> {
21 trace!("looking up bucket");
22 let store = self.0.read().await;
23 store
24 .get(bucket.as_ref())
25 .ok_or(store::Error::NoSuchStore)
26 .cloned()
27 }
28}
29
30impl<C: Send + Sync> store::Handler<C> for Handler {
31 #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
33 async fn open(
34 &self,
35 _cx: C,
36 identifier: String,
37 ) -> anyhow::Result<Result<ResourceOwn<store::Bucket>>> {
38 let identifier = Bytes::from(identifier);
39 {
40 let store = self.0.read().await;
42 if store.contains_key(&identifier) {
43 return Ok(Ok(ResourceOwn::from(identifier)));
44 }
45 }
46 let mut store = self.0.write().await;
47 store.entry(identifier.clone()).or_default();
48 Ok(Ok(ResourceOwn::from(identifier)))
49 }
50}
51
52impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
53 #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
54 async fn get(
55 &self,
56 _cx: C,
57 bucket: ResourceBorrow<store::Bucket>,
58 key: String,
59 ) -> anyhow::Result<Result<Option<Bytes>>> {
60 let bucket = self.bucket(bucket).await?;
61 let bucket = bucket.read().await;
62 Ok(Ok(bucket.get(&key).cloned()))
63 }
64
65 #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
66 async fn set(
67 &self,
68 _cx: C,
69 bucket: ResourceBorrow<store::Bucket>,
70 key: String,
71 value: Bytes,
72 ) -> anyhow::Result<Result<()>> {
73 let bucket = self.bucket(bucket).await?;
74 let mut bucket = bucket.write().await;
75 bucket.insert(key, value);
76 Ok(Ok(()))
77 }
78
79 #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
80 async fn delete(
81 &self,
82 _cx: C,
83 bucket: ResourceBorrow<store::Bucket>,
84 key: String,
85 ) -> anyhow::Result<Result<()>> {
86 let bucket = self.bucket(bucket).await?;
87 let mut bucket = bucket.write().await;
88 bucket.remove(&key);
89 Ok(Ok(()))
90 }
91
92 #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
93 async fn exists(
94 &self,
95 _cx: C,
96 bucket: ResourceBorrow<store::Bucket>,
97 key: String,
98 ) -> anyhow::Result<Result<bool>> {
99 let bucket = self.bucket(bucket).await?;
100 let bucket = bucket.read().await;
101 Ok(Ok(bucket.contains_key(&key)))
102 }
103
104 #[instrument(level = "trace", skip(_cx), ret(level = "trace"))]
105 async fn list_keys(
106 &self,
107 _cx: C,
108 bucket: ResourceBorrow<store::Bucket>,
109 cursor: Option<String>,
110 ) -> anyhow::Result<Result<store::KeyResponse>> {
111 let bucket = self.bucket(bucket).await?;
112 let bucket = bucket.read().await;
113 let bucket = bucket.keys();
114 let keys = if let Some(cursor) = cursor {
115 let cursor = cursor
116 .parse()
117 .map_err(|err: ParseIntError| store::Error::Other(err.to_string()))?;
118 bucket.skip(cursor).cloned().collect()
119 } else {
120 bucket.cloned().collect()
121 };
122 Ok(Ok(store::KeyResponse { keys, cursor: None }))
123 }
124}
125
126impl<C: Send + Sync> atomics::Handler<C> for Handler {
127 async fn increment(
128 &self,
129 _cx: C,
130 _bucket: ResourceBorrow<store::Bucket>,
131 _key: String,
132 _delta: i64,
133 ) -> anyhow::Result<Result<i64>> {
134 Ok(Err(store::Error::Other("not supported".to_string())))
135 }
136
137 async fn swap(
138 &self,
139 _cx: C,
140 _cas: ResourceOwn<atomics::Cas>,
141 _value: Bytes,
142 ) -> anyhow::Result<Result<(), atomics::CasError>> {
143 Ok(Err(atomics::CasError::StoreError(store::Error::Other(
144 "not supported".to_string(),
145 ))))
146 }
147}
148
149impl<C: Send + Sync> atomics::HandlerCas<C> for Handler {
150 async fn new(
151 &self,
152 _cx: C,
153 _bucket: ResourceBorrow<store::Bucket>,
154 _key: String,
155 ) -> anyhow::Result<Result<ResourceOwn<atomics::Cas>>> {
156 Ok(Err(store::Error::Other("not supported".to_string())))
157 }
158
159 async fn current(
160 &self,
161 _cx: C,
162 _bucket: ResourceBorrow<atomics::Cas>,
163 ) -> anyhow::Result<Result<Option<Bytes>>> {
164 Ok(Err(store::Error::Other("not supported".to_string())))
165 }
166}
167
168impl<C: Send + Sync> batch::Handler<C> for Handler {
169 async fn get_many(
170 &self,
171 _cx: C,
172 _bucket: ResourceBorrow<store::Bucket>,
173 _keys: Vec<String>,
174 ) -> anyhow::Result<Result<Vec<Option<(String, Bytes)>>>> {
175 Ok(Err(store::Error::Other("not supported".to_string())))
176 }
177
178 async fn set_many(
179 &self,
180 _cx: C,
181 _bucket: ResourceBorrow<store::Bucket>,
182 _key_values: Vec<(String, Bytes)>,
183 ) -> anyhow::Result<Result<()>> {
184 Ok(Err(store::Error::Other("not supported".to_string())))
185 }
186
187 async fn delete_many(
188 &self,
189 _cx: C,
190 _bucket: ResourceBorrow<store::Bucket>,
191 _keys: Vec<String>,
192 ) -> anyhow::Result<Result<()>> {
193 Ok(Err(store::Error::Other("not supported".to_string())))
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[allow(unused)]
202 async fn bound(s: &impl wrpc_transport::Serve) -> anyhow::Result<()> {
203 wrpc_wasi_keyvalue::serve(s, Handler::default()).await?;
204 Ok(())
205 }
206}