viceroy_lib/wiggle_abi/
obj_store_impl.rs

1//! fastly_obj_store` hostcall implementations.
2
3use super::types::{PendingKvDeleteHandle, PendingKvInsertHandle, PendingKvLookupHandle};
4use crate::session::PeekableTask;
5use crate::session::{PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask};
6
7use {
8    crate::{
9        body::Body,
10        error::Error,
11        object_store::{ObjectKey, ObjectStoreError},
12        session::Session,
13        wiggle_abi::{
14            fastly_object_store::FastlyObjectStore,
15            types::{BodyHandle, ObjectStoreHandle},
16        },
17    },
18    wiggle::{GuestMemory, GuestPtr},
19};
20
21#[wiggle::async_trait]
22impl FastlyObjectStore for Session {
23    fn open(
24        &mut self,
25        memory: &mut GuestMemory<'_>,
26        name: GuestPtr<str>,
27    ) -> Result<ObjectStoreHandle, Error> {
28        let name = memory.as_str(name)?.ok_or(Error::SharedMemory)?;
29        if self.kv_store().store_exists(name)? {
30            Ok(self.kv_store_handle(name).into())
31        } else {
32            Err(Error::ObjectStoreError(
33                ObjectStoreError::UnknownObjectStore(name.to_owned()),
34            ))
35        }
36    }
37
38    fn lookup(
39        &mut self,
40        memory: &mut GuestMemory<'_>,
41        store: ObjectStoreHandle,
42        key: GuestPtr<str>,
43        opt_body_handle_out: GuestPtr<BodyHandle>,
44    ) -> Result<(), Error> {
45        let store = self.get_kv_store_key(store.into()).unwrap();
46        let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
47        match self.obj_lookup(store.clone(), key) {
48            Ok(Some(obj)) => {
49                let new_handle = self.insert_body(Body::from(obj.body));
50                memory.write(opt_body_handle_out, new_handle)?;
51                Ok(())
52            }
53            // Don't write to the invalid handle as the SDK will return Ok(None)
54            // if the object does not exist. We need to return `Ok(())` here to
55            // make sure Viceroy does not crash
56            Ok(None) => Ok(()),
57            Err(err) => Err(err.into()),
58        }
59    }
60
61    async fn lookup_async(
62        &mut self,
63        memory: &mut GuestMemory<'_>,
64        store: ObjectStoreHandle,
65        key: GuestPtr<str>,
66        opt_pending_body_handle_out: GuestPtr<PendingKvLookupHandle>,
67    ) -> Result<(), Error> {
68        let store = self.get_kv_store_key(store.into()).unwrap();
69        let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
70        // just create a future that's already ready
71        let fut = futures::future::ok(self.obj_lookup(store.clone(), key));
72        let task = PeekableTask::spawn(fut).await;
73        memory.write(
74            opt_pending_body_handle_out,
75            self.insert_pending_kv_lookup(PendingKvLookupTask::new(task)),
76        )?;
77        Ok(())
78    }
79
80    async fn pending_lookup_wait(
81        &mut self,
82        memory: &mut wiggle::GuestMemory<'_>,
83        pending_body_handle: PendingKvLookupHandle,
84        opt_body_handle_out: GuestPtr<BodyHandle>,
85    ) -> Result<(), Error> {
86        let pending_obj = self
87            .take_pending_kv_lookup(pending_body_handle)?
88            .task()
89            .recv()
90            .await?;
91        // proceed with the normal match from lookup()
92        match pending_obj {
93            Ok(Some(obj)) => {
94                let new_handle = self.insert_body(Body::from(obj.body));
95                memory.write(opt_body_handle_out, new_handle)?;
96                Ok(())
97            }
98            Ok(None) => Ok(()),
99            Err(err) => Err(err.into()),
100        }
101    }
102
103    async fn insert(
104        &mut self,
105        memory: &mut wiggle::GuestMemory<'_>,
106        store: ObjectStoreHandle,
107        key: GuestPtr<str>,
108        body_handle: BodyHandle,
109    ) -> Result<(), Error> {
110        let store = self.get_kv_store_key(store.into()).unwrap().clone();
111        let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
112        let bytes = self.take_body(body_handle)?.read_into_vec().await?;
113        self.kv_insert(store, key, bytes, None, None, None, None)?;
114
115        Ok(())
116    }
117
118    async fn insert_async(
119        &mut self,
120        memory: &mut GuestMemory<'_>,
121        store: ObjectStoreHandle,
122        key: GuestPtr<str>,
123        body_handle: BodyHandle,
124        opt_pending_body_handle_out: GuestPtr<PendingKvInsertHandle>,
125    ) -> Result<(), Error> {
126        let store = self.get_kv_store_key(store.into()).unwrap().clone();
127        let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
128        let bytes = self.take_body(body_handle)?.read_into_vec().await?;
129        let fut = futures::future::ok(self.kv_insert(store, key, bytes, None, None, None, None));
130        let task = PeekableTask::spawn(fut).await;
131        memory.write(
132            opt_pending_body_handle_out,
133            self.insert_pending_kv_insert(PendingKvInsertTask::new(task))
134                .into(),
135        )?;
136        Ok(())
137    }
138
139    async fn pending_insert_wait(
140        &mut self,
141        _memory: &mut GuestMemory<'_>,
142        pending_insert_handle: PendingKvInsertHandle,
143    ) -> Result<(), Error> {
144        Ok((self
145            .take_pending_kv_insert(pending_insert_handle)?
146            .task()
147            .recv()
148            .await?)?)
149    }
150
151    async fn delete_async(
152        &mut self,
153        memory: &mut wiggle::GuestMemory<'_>,
154        store: ObjectStoreHandle,
155        key: GuestPtr<str>,
156        opt_pending_delete_handle_out: GuestPtr<PendingKvDeleteHandle>,
157    ) -> Result<(), Error> {
158        let store = self.get_kv_store_key(store.into()).unwrap().clone();
159        let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
160        let fut = futures::future::ok(self.kv_delete(store, key));
161        let task = PeekableTask::spawn(fut).await;
162        memory.write(
163            opt_pending_delete_handle_out,
164            self.insert_pending_kv_delete(PendingKvDeleteTask::new(task)),
165        )?;
166        Ok(())
167    }
168
169    async fn pending_delete_wait(
170        &mut self,
171        _memory: &mut GuestMemory<'_>,
172        pending_delete_handle: PendingKvDeleteHandle,
173    ) -> Result<(), Error> {
174        if !(self
175            .take_pending_kv_delete(pending_delete_handle)?
176            .task()
177            .recv()
178            .await?)?
179        {
180            Err(Error::ValueAbsent)
181        } else {
182            Ok(())
183        }
184    }
185}