wasmcloud_runtime/component/
keyvalue.rs

1use super::{new_store, Ctx, Handler, Instance, ReplacedInstanceTarget};
2
3use crate::capability::keyvalue::{atomics, batch, store};
4use crate::capability::wrpc;
5
6use anyhow::Context;
7use async_trait::async_trait;
8use bytes::Bytes;
9use std::sync::Arc;
10use tracing::{debug, instrument, trace};
11use wasmtime::component::Resource;
12
13type Result<T, E = store::Error> = core::result::Result<T, E>;
14
15pub mod keyvalue_watcher_bindings {
16    wasmtime::component::bindgen!({
17        world: "watcher",
18        async: true,
19        trappable_imports: true,
20        with: {
21            "wasi:keyvalue/store" : crate::capability::keyvalue::store,
22        }
23    });
24}
25
26impl From<wrpc::wrpc::keyvalue::store::Error> for store::Error {
27    fn from(value: wrpc::wrpc::keyvalue::store::Error) -> Self {
28        match value {
29            wrpc::wrpc::keyvalue::store::Error::NoSuchStore => Self::NoSuchStore,
30            wrpc::wrpc::keyvalue::store::Error::AccessDenied => Self::AccessDenied,
31            wrpc::wrpc::keyvalue::store::Error::Other(other) => Self::Other(other),
32        }
33    }
34}
35
36#[async_trait]
37impl<H> atomics::Host for Ctx<H>
38where
39    H: Handler,
40{
41    #[instrument(level = "debug", skip_all)]
42    async fn increment(
43        &mut self,
44        bucket: Resource<store::Bucket>,
45        key: String,
46        delta: u64,
47    ) -> anyhow::Result<Result<u64>> {
48        self.attach_parent_context();
49        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
50        match wrpc::wrpc::keyvalue::atomics::increment(
51            &self.handler,
52            Some(ReplacedInstanceTarget::KeyvalueAtomics),
53            bucket,
54            &key,
55            delta,
56        )
57        .await?
58        {
59            Ok(n) => Ok(Ok(n)),
60            Err(err) => Ok(Err(err.into())),
61        }
62    }
63}
64
65#[async_trait]
66impl<H> store::Host for Ctx<H>
67where
68    H: Handler,
69{
70    #[instrument]
71    async fn open(&mut self, name: String) -> anyhow::Result<Result<Resource<store::Bucket>>> {
72        self.attach_parent_context();
73        let bucket = self
74            .table
75            .push(Arc::from(name))
76            .context("failed to open bucket")?;
77        Ok(Ok(bucket))
78    }
79}
80
81#[async_trait]
82impl<H> batch::Host for Ctx<H>
83where
84    H: Handler,
85{
86    #[instrument(skip_all, fields(num_keys = keys.len()))]
87    async fn get_many(
88        &mut self,
89        bucket: Resource<store::Bucket>,
90        keys: Vec<String>,
91    ) -> anyhow::Result<Result<Vec<Option<(String, Vec<u8>)>>>> {
92        self.attach_parent_context();
93        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
94        // NOTE(thomastaylor312): I don't like allocating a new vec, but I need borrowed strings to
95        // have the right type
96        let keys = keys.iter().map(String::as_str).collect::<Vec<_>>();
97
98        match wrpc::wrpc::keyvalue::batch::get_many(
99            &self.handler,
100            Some(ReplacedInstanceTarget::KeyvalueBatch),
101            bucket,
102            &keys,
103        )
104        .await?
105        {
106            Ok(res) => Ok(Ok(res
107                .into_iter()
108                .map(|opt| opt.map(|(k, v)| (k, Vec::from(v))))
109                .collect())),
110            Err(err) => Err(err.into()),
111        }
112    }
113
114    #[instrument(skip_all, fields(num_entries = entries.len()))]
115    async fn set_many(
116        &mut self,
117        bucket: Resource<store::Bucket>,
118        entries: Vec<(String, Vec<u8>)>,
119    ) -> anyhow::Result<Result<()>> {
120        self.attach_parent_context();
121        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
122        let entries = entries
123            .into_iter()
124            .map(|(k, v)| (k, Bytes::from(v)))
125            .collect::<Vec<_>>();
126        let massaged = entries
127            .iter()
128            .map(|(k, v)| (k.as_str(), v))
129            .collect::<Vec<_>>();
130        match wrpc::wrpc::keyvalue::batch::set_many(
131            &self.handler,
132            Some(ReplacedInstanceTarget::KeyvalueBatch),
133            bucket,
134            &massaged,
135        )
136        .await?
137        {
138            Ok(()) => Ok(Ok(())),
139            Err(err) => Err(err.into()),
140        }
141    }
142
143    #[instrument(skip_all, fields(num_keys = keys.len()))]
144    async fn delete_many(
145        &mut self,
146        bucket: Resource<store::Bucket>,
147        keys: Vec<String>,
148    ) -> anyhow::Result<Result<()>> {
149        self.attach_parent_context();
150        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
151        let keys = keys.iter().map(String::as_str).collect::<Vec<_>>();
152        match wrpc::wrpc::keyvalue::batch::delete_many(
153            &self.handler,
154            Some(ReplacedInstanceTarget::KeyvalueBatch),
155            bucket,
156            &keys,
157        )
158        .await?
159        {
160            Ok(()) => Ok(Ok(())),
161            Err(err) => Err(err.into()),
162        }
163    }
164}
165
166#[async_trait]
167impl<H> store::HostBucket for Ctx<H>
168where
169    H: Handler,
170{
171    #[instrument]
172    async fn get(
173        &mut self,
174        bucket: Resource<store::Bucket>,
175        key: String,
176    ) -> anyhow::Result<Result<Option<Vec<u8>>>> {
177        self.attach_parent_context();
178        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
179        match wrpc::wrpc::keyvalue::store::get(
180            &self.handler,
181            Some(ReplacedInstanceTarget::KeyvalueStore),
182            bucket,
183            &key,
184        )
185        .await?
186        {
187            Ok(buf) => Ok(Ok(buf.map(Into::into))),
188            Err(err) => Ok(Err(err.into())),
189        }
190    }
191
192    #[instrument]
193    async fn set(
194        &mut self,
195        bucket: Resource<store::Bucket>,
196        key: String,
197        outgoing_value: Vec<u8>,
198    ) -> anyhow::Result<Result<()>> {
199        self.attach_parent_context();
200        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
201        match wrpc::wrpc::keyvalue::store::set(
202            &self.handler,
203            Some(ReplacedInstanceTarget::KeyvalueStore),
204            bucket,
205            &key,
206            &Bytes::from(outgoing_value),
207        )
208        .await?
209        {
210            Ok(()) => Ok(Ok(())),
211            Err(err) => Err(err.into()),
212        }
213    }
214
215    #[instrument]
216    async fn delete(
217        &mut self,
218        bucket: Resource<store::Bucket>,
219        key: String,
220    ) -> anyhow::Result<Result<()>> {
221        self.attach_parent_context();
222        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
223        match wrpc::wrpc::keyvalue::store::delete(
224            &self.handler,
225            Some(ReplacedInstanceTarget::KeyvalueStore),
226            bucket,
227            &key,
228        )
229        .await?
230        {
231            Ok(()) => Ok(Ok(())),
232            Err(err) => Err(err.into()),
233        }
234    }
235
236    #[instrument]
237    async fn exists(
238        &mut self,
239        bucket: Resource<store::Bucket>,
240        key: String,
241    ) -> anyhow::Result<Result<bool>> {
242        self.attach_parent_context();
243        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
244        match wrpc::wrpc::keyvalue::store::exists(
245            &self.handler,
246            Some(ReplacedInstanceTarget::KeyvalueStore),
247            bucket,
248            &key,
249        )
250        .await?
251        {
252            Ok(ok) => Ok(Ok(ok)),
253            Err(err) => Err(err.into()),
254        }
255    }
256
257    #[instrument]
258    async fn list_keys(
259        &mut self,
260        bucket: Resource<store::Bucket>,
261        cursor: Option<u64>,
262    ) -> anyhow::Result<Result<store::KeyResponse>> {
263        self.attach_parent_context();
264        let bucket = self.table.get(&bucket).context("failed to get bucket")?;
265        match wrpc::wrpc::keyvalue::store::list_keys(
266            &self.handler,
267            Some(ReplacedInstanceTarget::KeyvalueStore),
268            bucket,
269            cursor,
270        )
271        .await?
272        {
273            Ok(wrpc::wrpc::keyvalue::store::KeyResponse { keys, cursor }) => {
274                Ok(Ok(store::KeyResponse { keys, cursor }))
275            }
276            Err(err) => Err(err.into()),
277        }
278    }
279
280    #[instrument]
281    async fn drop(&mut self, bucket: Resource<store::Bucket>) -> anyhow::Result<()> {
282        self.attach_parent_context();
283        self.table
284            .delete(bucket)
285            .context("failed to delete bucket")?;
286        Ok(())
287    }
288}
289
290impl<H, C> wrpc::exports::wrpc::keyvalue::watcher::Handler<C> for Instance<H, C>
291where
292    H: Handler,
293    C: Send,
294{
295    #[instrument(level = "info", skip_all)]
296    async fn on_set(
297        &self,
298        _cx: C,
299        bucket: String,
300        key: String,
301        value: bytes::Bytes,
302    ) -> anyhow::Result<(), anyhow::Error> {
303        let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
304        let pre = keyvalue_watcher_bindings::WatcherPre::new(self.pre.clone())
305            .context("failed to pre-instantiate `wasi:keyvalue/watcher`")?;
306        trace!("instantiating `wasi:keyvalue/watcher`");
307        let bindings = pre
308            .instantiate_async(&mut store)
309            .await
310            .context("failed to instantiate `wasi:keyvalue/watcher.on_set`")?;
311        let bucket_repr: u32 = bucket.parse().context("failed to parse bucket as u32")?;
312        let new_bucket = Resource::new_own(bucket_repr);
313        debug!("invoking `wasi:keyvalue/watcher.on_set`");
314        bindings
315            .wasi_keyvalue_watcher()
316            .call_on_set(&mut store, new_bucket, &key, &value)
317            .await
318            .context("failed to call `wasi:keyvalue/watcher.on_set`")?;
319        Ok(())
320    }
321
322    #[instrument(level = "info", skip_all)]
323    async fn on_delete(
324        &self,
325        _cx: C,
326        bucket: String,
327        key: String,
328    ) -> anyhow::Result<(), anyhow::Error> {
329        let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
330        let pre = keyvalue_watcher_bindings::WatcherPre::new(self.pre.clone())
331            .context("failed to pre-instantiate `wasi:keyvalue/watcher`")?;
332        trace!("instantiating `wasi:keyvalue/watcher`");
333        let bindings = pre
334            .instantiate_async(&mut store)
335            .await
336            .context("failed to instantiate `wasi:keyvalue/watcher.on_delete`")?;
337        let bucket_repr: u32 = bucket.parse().context("failed to parse bucket as u32")?;
338        let new_bucket = Resource::new_own(bucket_repr);
339        debug!("invoking `wasi:keyvalue/watcher.on_delete`");
340        bindings
341            .wasi_keyvalue_watcher()
342            .call_on_delete(&mut store, new_bucket, &key)
343            .await
344            .context("failed to call `wasi:keyvalue/watcher.on_delete`")?;
345        Ok(())
346    }
347}