1use super::types::{PendingKvDeleteHandle, PendingKvInsertHandle, PendingKvLookupHandle};
4use crate::session::PeekableTask;
5use crate::session::{PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask};
6
7use {
8 crate::{
9 body::Body,
10 error::Error,
11 object_store::{ObjectKey, ObjectStoreError},
12 session::Session,
13 wiggle_abi::{
14 fastly_object_store::FastlyObjectStore,
15 types::{BodyHandle, ObjectStoreHandle},
16 },
17 },
18 wiggle::{GuestMemory, GuestPtr},
19};
20
21#[wiggle::async_trait]
22impl FastlyObjectStore for Session {
23 fn open(
24 &mut self,
25 memory: &mut GuestMemory<'_>,
26 name: GuestPtr<str>,
27 ) -> Result<ObjectStoreHandle, Error> {
28 let name = memory.as_str(name)?.ok_or(Error::SharedMemory)?;
29 if self.kv_store().store_exists(name)? {
30 Ok(self.kv_store_handle(name).into())
31 } else {
32 Err(Error::ObjectStoreError(
33 ObjectStoreError::UnknownObjectStore(name.to_owned()),
34 ))
35 }
36 }
37
38 fn lookup(
39 &mut self,
40 memory: &mut GuestMemory<'_>,
41 store: ObjectStoreHandle,
42 key: GuestPtr<str>,
43 opt_body_handle_out: GuestPtr<BodyHandle>,
44 ) -> Result<(), Error> {
45 let store = self.get_kv_store_key(store.into()).unwrap();
46 let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
47 match self.obj_lookup(store.clone(), key) {
48 Ok(Some(obj)) => {
49 let new_handle = self.insert_body(Body::from(obj.body));
50 memory.write(opt_body_handle_out, new_handle)?;
51 Ok(())
52 }
53 Ok(None) => Ok(()),
57 Err(err) => Err(err.into()),
58 }
59 }
60
61 async fn lookup_async(
62 &mut self,
63 memory: &mut GuestMemory<'_>,
64 store: ObjectStoreHandle,
65 key: GuestPtr<str>,
66 opt_pending_body_handle_out: GuestPtr<PendingKvLookupHandle>,
67 ) -> Result<(), Error> {
68 let store = self.get_kv_store_key(store.into()).unwrap();
69 let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
70 let fut = futures::future::ok(self.obj_lookup(store.clone(), key));
72 let task = PeekableTask::spawn(fut).await;
73 memory.write(
74 opt_pending_body_handle_out,
75 self.insert_pending_kv_lookup(PendingKvLookupTask::new(task)),
76 )?;
77 Ok(())
78 }
79
80 async fn pending_lookup_wait(
81 &mut self,
82 memory: &mut wiggle::GuestMemory<'_>,
83 pending_body_handle: PendingKvLookupHandle,
84 opt_body_handle_out: GuestPtr<BodyHandle>,
85 ) -> Result<(), Error> {
86 let pending_obj = self
87 .take_pending_kv_lookup(pending_body_handle)?
88 .task()
89 .recv()
90 .await?;
91 match pending_obj {
93 Ok(Some(obj)) => {
94 let new_handle = self.insert_body(Body::from(obj.body));
95 memory.write(opt_body_handle_out, new_handle)?;
96 Ok(())
97 }
98 Ok(None) => Ok(()),
99 Err(err) => Err(err.into()),
100 }
101 }
102
103 async fn insert(
104 &mut self,
105 memory: &mut wiggle::GuestMemory<'_>,
106 store: ObjectStoreHandle,
107 key: GuestPtr<str>,
108 body_handle: BodyHandle,
109 ) -> Result<(), Error> {
110 let store = self.get_kv_store_key(store.into()).unwrap().clone();
111 let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
112 let bytes = self.take_body(body_handle)?.read_into_vec().await?;
113 self.kv_insert(store, key, bytes, None, None, None, None)?;
114
115 Ok(())
116 }
117
118 async fn insert_async(
119 &mut self,
120 memory: &mut GuestMemory<'_>,
121 store: ObjectStoreHandle,
122 key: GuestPtr<str>,
123 body_handle: BodyHandle,
124 opt_pending_body_handle_out: GuestPtr<PendingKvInsertHandle>,
125 ) -> Result<(), Error> {
126 let store = self.get_kv_store_key(store.into()).unwrap().clone();
127 let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
128 let bytes = self.take_body(body_handle)?.read_into_vec().await?;
129 let fut = futures::future::ok(self.kv_insert(store, key, bytes, None, None, None, None));
130 let task = PeekableTask::spawn(fut).await;
131 memory.write(
132 opt_pending_body_handle_out,
133 self.insert_pending_kv_insert(PendingKvInsertTask::new(task))
134 .into(),
135 )?;
136 Ok(())
137 }
138
139 async fn pending_insert_wait(
140 &mut self,
141 _memory: &mut GuestMemory<'_>,
142 pending_insert_handle: PendingKvInsertHandle,
143 ) -> Result<(), Error> {
144 Ok((self
145 .take_pending_kv_insert(pending_insert_handle)?
146 .task()
147 .recv()
148 .await?)?)
149 }
150
151 async fn delete_async(
152 &mut self,
153 memory: &mut wiggle::GuestMemory<'_>,
154 store: ObjectStoreHandle,
155 key: GuestPtr<str>,
156 opt_pending_delete_handle_out: GuestPtr<PendingKvDeleteHandle>,
157 ) -> Result<(), Error> {
158 let store = self.get_kv_store_key(store.into()).unwrap().clone();
159 let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
160 let fut = futures::future::ok(self.kv_delete(store, key));
161 let task = PeekableTask::spawn(fut).await;
162 memory.write(
163 opt_pending_delete_handle_out,
164 self.insert_pending_kv_delete(PendingKvDeleteTask::new(task)),
165 )?;
166 Ok(())
167 }
168
169 async fn pending_delete_wait(
170 &mut self,
171 _memory: &mut GuestMemory<'_>,
172 pending_delete_handle: PendingKvDeleteHandle,
173 ) -> Result<(), Error> {
174 if !(self
175 .take_pending_kv_delete(pending_delete_handle)?
176 .task()
177 .recv()
178 .await?)?
179 {
180 Err(Error::ValueAbsent)
181 } else {
182 Ok(())
183 }
184 }
185}