viceroy_lib/wiggle_abi/
kv_store_impl.rs

1//! fastly_obj_store` hostcall implementations.
2
3use crate::object_store::KvStoreError;
4use crate::session::PeekableTask;
5use crate::session::{
6    PendingKvDeleteTask, PendingKvInsertTask, PendingKvListTask, PendingKvLookupTask,
7};
8
9use {
10    crate::{
11        error::Error,
12        object_store::{ObjectKey, ObjectStoreError},
13        session::Session,
14        wiggle_abi::{
15            fastly_kv_store::FastlyKvStore,
16            types::{
17                BodyHandle, KvDeleteConfig, KvDeleteConfigOptions, KvError, KvInsertConfig,
18                KvInsertConfigOptions, KvListConfig, KvListConfigOptions, KvLookupConfig,
19                KvLookupConfigOptions, KvStoreDeleteHandle, KvStoreHandle, KvStoreInsertHandle,
20                KvStoreListHandle, KvStoreLookupHandle,
21            },
22        },
23    },
24    wiggle::{GuestMemory, GuestPtr},
25};
26
27#[wiggle::async_trait]
28impl FastlyKvStore for Session {
29    fn open(
30        &mut self,
31        memory: &mut GuestMemory<'_>,
32        name: GuestPtr<str>,
33    ) -> Result<KvStoreHandle, Error> {
34        let name = memory.as_str(name)?.ok_or(Error::SharedMemory)?;
35        if self.kv_store().store_exists(&name)? {
36            Ok(self.kv_store_handle(&name))
37        } else {
38            Err(Error::ObjectStoreError(
39                ObjectStoreError::UnknownObjectStore(name.to_owned()),
40            ))
41        }
42    }
43
44    async fn lookup(
45        &mut self,
46        memory: &mut GuestMemory<'_>,
47        store: KvStoreHandle,
48        key: GuestPtr<str>,
49        _lookup_config_mask: KvLookupConfigOptions,
50        _lookup_configuration: GuestPtr<KvLookupConfig>,
51        handle_out: GuestPtr<KvStoreLookupHandle>,
52    ) -> Result<(), Error> {
53        let store = self.get_kv_store_key(store).unwrap();
54        let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())
55            .map_err(|_| KvStoreError::BadRequest)?;
56        // just create a future that's already ready
57        let fut = futures::future::ok(self.obj_lookup(store.clone(), key));
58        let task = PeekableTask::spawn(fut).await;
59        memory.write(
60            handle_out,
61            self.insert_pending_kv_lookup(PendingKvLookupTask::new(task))
62                .into(),
63        )?;
64        Ok(())
65    }
66
67    async fn lookup_wait(
68        &mut self,
69        memory: &mut GuestMemory<'_>,
70        pending_kv_lookup_handle: KvStoreLookupHandle,
71        body_handle_out: GuestPtr<BodyHandle>,
72        metadata_buf: GuestPtr<u8>,
73        metadata_buf_len: u32,
74        nwritten_out: GuestPtr<u32>,
75        generation_out: GuestPtr<u32>,
76        kv_error_out: GuestPtr<KvError>,
77    ) -> Result<(), Error> {
78        let resp = self
79            .take_pending_kv_lookup(pending_kv_lookup_handle.into())?
80            .task()
81            .recv()
82            .await?;
83
84        match resp {
85            Ok(Some(value)) => {
86                let body_handle = self.insert_body(value.body.into());
87
88                memory.write(body_handle_out, body_handle)?;
89                match value.metadata_len {
90                    0 => memory.write(nwritten_out, 0)?,
91                    len => {
92                        let meta_len_u32 =
93                            u32::try_from(len).expect("metadata len is outside the bounds of u32");
94                        memory.write(nwritten_out, meta_len_u32)?;
95                        if meta_len_u32 > metadata_buf_len {
96                            return Err(Error::BufferLengthError {
97                                buf: "metadata",
98                                len: "specified length",
99                            });
100                        }
101                        memory.copy_from_slice(
102                            value.metadata.as_bytes(),
103                            metadata_buf.as_array(meta_len_u32),
104                        )?;
105                    }
106                }
107                memory.write(generation_out, 0)?;
108                memory.write(kv_error_out, KvError::Ok)?;
109                Ok(())
110            }
111            Ok(None) => {
112                memory.write(kv_error_out, KvError::NotFound)?;
113                Ok(())
114            }
115            Err(e) => {
116                memory.write(kv_error_out, (&e).into())?;
117                Ok(())
118            }
119        }
120    }
121
122    async fn lookup_wait_v2(
123        &mut self,
124        memory: &mut GuestMemory<'_>,
125        pending_kv_lookup_handle: KvStoreLookupHandle,
126        body_handle_out: GuestPtr<BodyHandle>,
127        metadata_buf: GuestPtr<u8>,
128        metadata_buf_len: u32,
129        nwritten_out: GuestPtr<u32>,
130        generation_out: GuestPtr<u64>,
131        kv_error_out: GuestPtr<KvError>,
132    ) -> Result<(), Error> {
133        let resp = self
134            .take_pending_kv_lookup(pending_kv_lookup_handle.into())?
135            .task()
136            .recv()
137            .await?;
138
139        match resp {
140            Ok(Some(value)) => {
141                let body_handle = self.insert_body(value.body.into());
142
143                memory.write(body_handle_out, body_handle)?;
144                match value.metadata_len {
145                    0 => memory.write(nwritten_out, 0)?,
146                    len => {
147                        let meta_len_u32 =
148                            u32::try_from(len).expect("metadata len is outside the bounds of u32");
149                        memory.write(nwritten_out, meta_len_u32)?;
150                        if meta_len_u32 > metadata_buf_len {
151                            return Err(Error::BufferLengthError {
152                                buf: "metadata",
153                                len: "specified length",
154                            });
155                        }
156                        memory.copy_from_slice(
157                            value.metadata.as_bytes(),
158                            metadata_buf.as_array(meta_len_u32),
159                        )?;
160                    }
161                }
162                memory.write(generation_out, value.generation)?;
163                memory.write(kv_error_out, KvError::Ok)?;
164                Ok(())
165            }
166            Ok(None) => {
167                memory.write(kv_error_out, KvError::NotFound)?;
168                Ok(())
169            }
170            Err(e) => {
171                memory.write(kv_error_out, (&e).into())?;
172                Ok(())
173            }
174        }
175    }
176
177    async fn insert(
178        &mut self,
179        memory: &mut GuestMemory<'_>,
180        store: KvStoreHandle,
181        key: GuestPtr<str>,
182        body_handle: BodyHandle,
183        insert_config_mask: KvInsertConfigOptions,
184        insert_configuration: GuestPtr<KvInsertConfig>,
185        pending_handle_out: GuestPtr<KvStoreInsertHandle>,
186    ) -> Result<(), Error> {
187        let store = self.get_kv_store_key(store).unwrap().clone();
188        let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())
189            .map_err(|_| KvStoreError::BadRequest)?;
190        let body = self.take_body(body_handle)?.read_into_vec().await?;
191
192        let config = memory.read(insert_configuration)?;
193
194        let config_str_or_none = |flag, str_field: GuestPtr<u8>, len_field| {
195            if insert_config_mask.contains(flag) {
196                if len_field == 0 {
197                    return Err(Error::InvalidArgument);
198                }
199
200                Ok(Some(memory.to_vec(str_field.as_array(len_field))?))
201            } else {
202                Ok(None)
203            }
204        };
205
206        let mode = config.mode;
207
208        // won't actually do anything in viceroy
209        // let bgf = insert_config_mask.contains(KvInsertConfigOptions::BACKGROUND_FETCH);
210
211        let igm = if insert_config_mask.contains(KvInsertConfigOptions::IF_GENERATION_MATCH) {
212            Some(config.if_generation_match)
213        } else {
214            None
215        };
216
217        let meta = config_str_or_none(
218            KvInsertConfigOptions::METADATA,
219            config.metadata,
220            config.metadata_len,
221        )?;
222        let meta = if let Some(meta) = meta {
223            Some(String::from_utf8(meta).map_err(|_| Error::InvalidArgument)?)
224        } else {
225            None
226        };
227
228        let ttl = if insert_config_mask.contains(KvInsertConfigOptions::TIME_TO_LIVE_SEC) {
229            Some(std::time::Duration::from_secs(
230                config.time_to_live_sec as u64,
231            ))
232        } else {
233            None
234        };
235
236        let fut = futures::future::ok(self.kv_insert(store, key, body, Some(mode), igm, meta, ttl));
237        let task = PeekableTask::spawn(fut).await;
238        memory.write(
239            pending_handle_out,
240            self.insert_pending_kv_insert(PendingKvInsertTask::new(task)),
241        )?;
242
243        Ok(())
244    }
245
246    async fn insert_wait(
247        &mut self,
248        memory: &mut GuestMemory<'_>,
249        pending_insert_handle: KvStoreInsertHandle,
250        kv_error_out: GuestPtr<KvError>,
251    ) -> Result<(), Error> {
252        let resp = self
253            .take_pending_kv_insert(pending_insert_handle.into())?
254            .task()
255            .recv()
256            .await?;
257
258        match resp {
259            Ok(_) => {
260                memory.write(kv_error_out, KvError::Ok)?;
261                Ok(())
262            }
263            Err(e) => {
264                memory.write(kv_error_out, (&e).into())?;
265                Ok(())
266            }
267        }
268    }
269
270    async fn delete(
271        &mut self,
272        memory: &mut GuestMemory<'_>,
273        store: KvStoreHandle,
274        key: GuestPtr<str>,
275        _delete_config_mask: KvDeleteConfigOptions,
276        _delete_configuration: GuestPtr<KvDeleteConfig>,
277        pending_handle_out: GuestPtr<KvStoreDeleteHandle>,
278    ) -> Result<(), Error> {
279        let store = self.get_kv_store_key(store).unwrap().clone();
280        let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())
281            .map_err(|_| KvStoreError::BadRequest)?;
282        let fut = futures::future::ok(self.kv_delete(store, key));
283        let task = PeekableTask::spawn(fut).await;
284        memory.write(
285            pending_handle_out,
286            self.insert_pending_kv_delete(PendingKvDeleteTask::new(task))
287                .into(),
288        )?;
289        Ok(())
290    }
291
292    async fn delete_wait(
293        &mut self,
294        memory: &mut GuestMemory<'_>,
295        pending_delete_handle: KvStoreDeleteHandle,
296        kv_error_out: GuestPtr<KvError>,
297    ) -> Result<(), Error> {
298        let resp = self
299            .take_pending_kv_delete(pending_delete_handle.into())?
300            .task()
301            .recv()
302            .await?;
303
304        match resp {
305            Ok(_) => {
306                memory.write(kv_error_out, KvError::Ok)?;
307                Ok(())
308            }
309            Err(e) => {
310                memory.write(kv_error_out, (&e).into())?;
311                Ok(())
312            }
313        }
314    }
315
316    async fn list(
317        &mut self,
318        memory: &mut GuestMemory<'_>,
319        store: KvStoreHandle,
320        list_config_mask: KvListConfigOptions,
321        list_configuration: GuestPtr<KvListConfig>,
322        pending_handle_out: GuestPtr<KvStoreListHandle>,
323    ) -> Result<(), Error> {
324        let store = self.get_kv_store_key(store).unwrap().clone();
325
326        let config = memory.read(list_configuration)?;
327
328        let config_string_or_none = |flag, str_field: GuestPtr<u8>, len_field| {
329            if list_config_mask.contains(flag) {
330                if len_field == 0 {
331                    return Err(Error::InvalidArgument);
332                }
333
334                let byte_vec = memory.to_vec(str_field.as_array(len_field))?;
335
336                Ok(Some(
337                    String::from_utf8(byte_vec).map_err(|_| Error::InvalidArgument)?,
338                ))
339            } else {
340                Ok(None)
341            }
342        };
343
344        let cursor = config_string_or_none(
345            KvListConfigOptions::CURSOR,
346            config.cursor,
347            config.cursor_len,
348        )?;
349
350        let prefix = config_string_or_none(
351            KvListConfigOptions::PREFIX,
352            config.prefix,
353            config.prefix_len,
354        )?;
355
356        let limit = match list_config_mask.contains(KvListConfigOptions::LIMIT) {
357            true => Some(config.limit),
358            false => None,
359        };
360
361        let fut = futures::future::ok(self.kv_list(store, cursor, prefix, limit));
362        let task = PeekableTask::spawn(fut).await;
363        memory.write(
364            pending_handle_out,
365            self.insert_pending_kv_list(PendingKvListTask::new(task))
366                .into(),
367        )?;
368        Ok(())
369    }
370
371    async fn list_wait(
372        &mut self,
373        memory: &mut GuestMemory<'_>,
374        pending_kv_list_handle: KvStoreListHandle,
375        body_handle_out: GuestPtr<BodyHandle>,
376        kv_error_out: GuestPtr<KvError>,
377    ) -> Result<(), Error> {
378        let resp = self
379            .take_pending_kv_list(pending_kv_list_handle.into())?
380            .task()
381            .recv()
382            .await?;
383
384        match resp {
385            Ok(value) => {
386                let body_handle = self.insert_body(value.into()).into();
387
388                memory.write(body_handle_out, body_handle)?;
389
390                memory.write(kv_error_out, KvError::Ok)?;
391                Ok(())
392            }
393            Err(e) => {
394                memory.write(kv_error_out, (&e).into())?;
395                Ok(())
396            }
397        }
398    }
399}