wasmcloud_runtime/component/
keyvalue.rs1use super::{new_store, Ctx, Handler, Instance, ReplacedInstanceTarget};
2
3use crate::capability::keyvalue::{atomics, batch, store};
4use crate::capability::wrpc;
5
6use anyhow::Context;
7use async_trait::async_trait;
8use bytes::Bytes;
9use std::sync::Arc;
10use tracing::{debug, instrument, trace};
11use wasmtime::component::Resource;
12
13type Result<T, E = store::Error> = core::result::Result<T, E>;
14
15pub mod keyvalue_watcher_bindings {
16 wasmtime::component::bindgen!({
17 world: "watcher",
18 async: true,
19 trappable_imports: true,
20 with: {
21 "wasi:keyvalue/store" : crate::capability::keyvalue::store,
22 }
23 });
24}
25
26impl From<wrpc::wrpc::keyvalue::store::Error> for store::Error {
27 fn from(value: wrpc::wrpc::keyvalue::store::Error) -> Self {
28 match value {
29 wrpc::wrpc::keyvalue::store::Error::NoSuchStore => Self::NoSuchStore,
30 wrpc::wrpc::keyvalue::store::Error::AccessDenied => Self::AccessDenied,
31 wrpc::wrpc::keyvalue::store::Error::Other(other) => Self::Other(other),
32 }
33 }
34}
35
36#[async_trait]
37impl<H> atomics::Host for Ctx<H>
38where
39 H: Handler,
40{
41 #[instrument(level = "debug", skip_all)]
42 async fn increment(
43 &mut self,
44 bucket: Resource<store::Bucket>,
45 key: String,
46 delta: u64,
47 ) -> anyhow::Result<Result<u64>> {
48 self.attach_parent_context();
49 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
50 match wrpc::wrpc::keyvalue::atomics::increment(
51 &self.handler,
52 Some(ReplacedInstanceTarget::KeyvalueAtomics),
53 bucket,
54 &key,
55 delta,
56 )
57 .await?
58 {
59 Ok(n) => Ok(Ok(n)),
60 Err(err) => Ok(Err(err.into())),
61 }
62 }
63}
64
65#[async_trait]
66impl<H> store::Host for Ctx<H>
67where
68 H: Handler,
69{
70 #[instrument]
71 async fn open(&mut self, name: String) -> anyhow::Result<Result<Resource<store::Bucket>>> {
72 self.attach_parent_context();
73 let bucket = self
74 .table
75 .push(Arc::from(name))
76 .context("failed to open bucket")?;
77 Ok(Ok(bucket))
78 }
79}
80
81#[async_trait]
82impl<H> batch::Host for Ctx<H>
83where
84 H: Handler,
85{
86 #[instrument(skip_all, fields(num_keys = keys.len()))]
87 async fn get_many(
88 &mut self,
89 bucket: Resource<store::Bucket>,
90 keys: Vec<String>,
91 ) -> anyhow::Result<Result<Vec<Option<(String, Vec<u8>)>>>> {
92 self.attach_parent_context();
93 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
94 let keys = keys.iter().map(String::as_str).collect::<Vec<_>>();
97
98 match wrpc::wrpc::keyvalue::batch::get_many(
99 &self.handler,
100 Some(ReplacedInstanceTarget::KeyvalueBatch),
101 bucket,
102 &keys,
103 )
104 .await?
105 {
106 Ok(res) => Ok(Ok(res
107 .into_iter()
108 .map(|opt| opt.map(|(k, v)| (k, Vec::from(v))))
109 .collect())),
110 Err(err) => Err(err.into()),
111 }
112 }
113
114 #[instrument(skip_all, fields(num_entries = entries.len()))]
115 async fn set_many(
116 &mut self,
117 bucket: Resource<store::Bucket>,
118 entries: Vec<(String, Vec<u8>)>,
119 ) -> anyhow::Result<Result<()>> {
120 self.attach_parent_context();
121 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
122 let entries = entries
123 .into_iter()
124 .map(|(k, v)| (k, Bytes::from(v)))
125 .collect::<Vec<_>>();
126 let massaged = entries
127 .iter()
128 .map(|(k, v)| (k.as_str(), v))
129 .collect::<Vec<_>>();
130 match wrpc::wrpc::keyvalue::batch::set_many(
131 &self.handler,
132 Some(ReplacedInstanceTarget::KeyvalueBatch),
133 bucket,
134 &massaged,
135 )
136 .await?
137 {
138 Ok(()) => Ok(Ok(())),
139 Err(err) => Err(err.into()),
140 }
141 }
142
143 #[instrument(skip_all, fields(num_keys = keys.len()))]
144 async fn delete_many(
145 &mut self,
146 bucket: Resource<store::Bucket>,
147 keys: Vec<String>,
148 ) -> anyhow::Result<Result<()>> {
149 self.attach_parent_context();
150 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
151 let keys = keys.iter().map(String::as_str).collect::<Vec<_>>();
152 match wrpc::wrpc::keyvalue::batch::delete_many(
153 &self.handler,
154 Some(ReplacedInstanceTarget::KeyvalueBatch),
155 bucket,
156 &keys,
157 )
158 .await?
159 {
160 Ok(()) => Ok(Ok(())),
161 Err(err) => Err(err.into()),
162 }
163 }
164}
165
166#[async_trait]
167impl<H> store::HostBucket for Ctx<H>
168where
169 H: Handler,
170{
171 #[instrument]
172 async fn get(
173 &mut self,
174 bucket: Resource<store::Bucket>,
175 key: String,
176 ) -> anyhow::Result<Result<Option<Vec<u8>>>> {
177 self.attach_parent_context();
178 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
179 match wrpc::wrpc::keyvalue::store::get(
180 &self.handler,
181 Some(ReplacedInstanceTarget::KeyvalueStore),
182 bucket,
183 &key,
184 )
185 .await?
186 {
187 Ok(buf) => Ok(Ok(buf.map(Into::into))),
188 Err(err) => Ok(Err(err.into())),
189 }
190 }
191
192 #[instrument]
193 async fn set(
194 &mut self,
195 bucket: Resource<store::Bucket>,
196 key: String,
197 outgoing_value: Vec<u8>,
198 ) -> anyhow::Result<Result<()>> {
199 self.attach_parent_context();
200 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
201 match wrpc::wrpc::keyvalue::store::set(
202 &self.handler,
203 Some(ReplacedInstanceTarget::KeyvalueStore),
204 bucket,
205 &key,
206 &Bytes::from(outgoing_value),
207 )
208 .await?
209 {
210 Ok(()) => Ok(Ok(())),
211 Err(err) => Err(err.into()),
212 }
213 }
214
215 #[instrument]
216 async fn delete(
217 &mut self,
218 bucket: Resource<store::Bucket>,
219 key: String,
220 ) -> anyhow::Result<Result<()>> {
221 self.attach_parent_context();
222 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
223 match wrpc::wrpc::keyvalue::store::delete(
224 &self.handler,
225 Some(ReplacedInstanceTarget::KeyvalueStore),
226 bucket,
227 &key,
228 )
229 .await?
230 {
231 Ok(()) => Ok(Ok(())),
232 Err(err) => Err(err.into()),
233 }
234 }
235
236 #[instrument]
237 async fn exists(
238 &mut self,
239 bucket: Resource<store::Bucket>,
240 key: String,
241 ) -> anyhow::Result<Result<bool>> {
242 self.attach_parent_context();
243 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
244 match wrpc::wrpc::keyvalue::store::exists(
245 &self.handler,
246 Some(ReplacedInstanceTarget::KeyvalueStore),
247 bucket,
248 &key,
249 )
250 .await?
251 {
252 Ok(ok) => Ok(Ok(ok)),
253 Err(err) => Err(err.into()),
254 }
255 }
256
257 #[instrument]
258 async fn list_keys(
259 &mut self,
260 bucket: Resource<store::Bucket>,
261 cursor: Option<u64>,
262 ) -> anyhow::Result<Result<store::KeyResponse>> {
263 self.attach_parent_context();
264 let bucket = self.table.get(&bucket).context("failed to get bucket")?;
265 match wrpc::wrpc::keyvalue::store::list_keys(
266 &self.handler,
267 Some(ReplacedInstanceTarget::KeyvalueStore),
268 bucket,
269 cursor,
270 )
271 .await?
272 {
273 Ok(wrpc::wrpc::keyvalue::store::KeyResponse { keys, cursor }) => {
274 Ok(Ok(store::KeyResponse { keys, cursor }))
275 }
276 Err(err) => Err(err.into()),
277 }
278 }
279
280 #[instrument]
281 async fn drop(&mut self, bucket: Resource<store::Bucket>) -> anyhow::Result<()> {
282 self.attach_parent_context();
283 self.table
284 .delete(bucket)
285 .context("failed to delete bucket")?;
286 Ok(())
287 }
288}
289
290impl<H, C> wrpc::exports::wrpc::keyvalue::watcher::Handler<C> for Instance<H, C>
291where
292 H: Handler,
293 C: Send,
294{
295 #[instrument(level = "info", skip_all)]
296 async fn on_set(
297 &self,
298 _cx: C,
299 bucket: String,
300 key: String,
301 value: bytes::Bytes,
302 ) -> anyhow::Result<(), anyhow::Error> {
303 let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
304 let pre = keyvalue_watcher_bindings::WatcherPre::new(self.pre.clone())
305 .context("failed to pre-instantiate `wasi:keyvalue/watcher`")?;
306 trace!("instantiating `wasi:keyvalue/watcher`");
307 let bindings = pre
308 .instantiate_async(&mut store)
309 .await
310 .context("failed to instantiate `wasi:keyvalue/watcher.on_set`")?;
311 let bucket_repr: u32 = bucket.parse().context("failed to parse bucket as u32")?;
312 let new_bucket = Resource::new_own(bucket_repr);
313 debug!("invoking `wasi:keyvalue/watcher.on_set`");
314 bindings
315 .wasi_keyvalue_watcher()
316 .call_on_set(&mut store, new_bucket, &key, &value)
317 .await
318 .context("failed to call `wasi:keyvalue/watcher.on_set`")?;
319 Ok(())
320 }
321
322 #[instrument(level = "info", skip_all)]
323 async fn on_delete(
324 &self,
325 _cx: C,
326 bucket: String,
327 key: String,
328 ) -> anyhow::Result<(), anyhow::Error> {
329 let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
330 let pre = keyvalue_watcher_bindings::WatcherPre::new(self.pre.clone())
331 .context("failed to pre-instantiate `wasi:keyvalue/watcher`")?;
332 trace!("instantiating `wasi:keyvalue/watcher`");
333 let bindings = pre
334 .instantiate_async(&mut store)
335 .await
336 .context("failed to instantiate `wasi:keyvalue/watcher.on_delete`")?;
337 let bucket_repr: u32 = bucket.parse().context("failed to parse bucket as u32")?;
338 let new_bucket = Resource::new_own(bucket_repr);
339 debug!("invoking `wasi:keyvalue/watcher.on_delete`");
340 bindings
341 .wasi_keyvalue_watcher()
342 .call_on_delete(&mut store, new_bucket, &key)
343 .await
344 .context("failed to call `wasi:keyvalue/watcher.on_delete`")?;
345 Ok(())
346 }
347}