1use crate::object_store::KvStoreError;
4use crate::session::PeekableTask;
5use crate::session::{
6 PendingKvDeleteTask, PendingKvInsertTask, PendingKvListTask, PendingKvLookupTask,
7};
8
9use {
10 crate::{
11 error::Error,
12 object_store::{ObjectKey, ObjectStoreError},
13 session::Session,
14 wiggle_abi::{
15 fastly_kv_store::FastlyKvStore,
16 types::{
17 BodyHandle, KvDeleteConfig, KvDeleteConfigOptions, KvError, KvInsertConfig,
18 KvInsertConfigOptions, KvListConfig, KvListConfigOptions, KvLookupConfig,
19 KvLookupConfigOptions, KvStoreDeleteHandle, KvStoreHandle, KvStoreInsertHandle,
20 KvStoreListHandle, KvStoreLookupHandle,
21 },
22 },
23 },
24 wiggle::{GuestMemory, GuestPtr},
25};
26
27#[wiggle::async_trait]
28impl FastlyKvStore for Session {
29 fn open(
30 &mut self,
31 memory: &mut GuestMemory<'_>,
32 name: GuestPtr<str>,
33 ) -> Result<KvStoreHandle, Error> {
34 let name = memory.as_str(name)?.ok_or(Error::SharedMemory)?;
35 if self.kv_store().store_exists(&name)? {
36 Ok(self.kv_store_handle(&name))
37 } else {
38 Err(Error::ObjectStoreError(
39 ObjectStoreError::UnknownObjectStore(name.to_owned()),
40 ))
41 }
42 }
43
44 async fn lookup(
45 &mut self,
46 memory: &mut GuestMemory<'_>,
47 store: KvStoreHandle,
48 key: GuestPtr<str>,
49 _lookup_config_mask: KvLookupConfigOptions,
50 _lookup_configuration: GuestPtr<KvLookupConfig>,
51 handle_out: GuestPtr<KvStoreLookupHandle>,
52 ) -> Result<(), Error> {
53 let store = self.get_kv_store_key(store).unwrap();
54 let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())
55 .map_err(|_| KvStoreError::BadRequest)?;
56 let fut = futures::future::ok(self.obj_lookup(store.clone(), key));
58 let task = PeekableTask::spawn(fut).await;
59 memory.write(
60 handle_out,
61 self.insert_pending_kv_lookup(PendingKvLookupTask::new(task))
62 .into(),
63 )?;
64 Ok(())
65 }
66
67 async fn lookup_wait(
68 &mut self,
69 memory: &mut GuestMemory<'_>,
70 pending_kv_lookup_handle: KvStoreLookupHandle,
71 body_handle_out: GuestPtr<BodyHandle>,
72 metadata_buf: GuestPtr<u8>,
73 metadata_buf_len: u32,
74 nwritten_out: GuestPtr<u32>,
75 generation_out: GuestPtr<u32>,
76 kv_error_out: GuestPtr<KvError>,
77 ) -> Result<(), Error> {
78 let resp = self
79 .take_pending_kv_lookup(pending_kv_lookup_handle.into())?
80 .task()
81 .recv()
82 .await?;
83
84 match resp {
85 Ok(Some(value)) => {
86 let body_handle = self.insert_body(value.body.into());
87
88 memory.write(body_handle_out, body_handle)?;
89 match value.metadata_len {
90 0 => memory.write(nwritten_out, 0)?,
91 len => {
92 let meta_len_u32 =
93 u32::try_from(len).expect("metadata len is outside the bounds of u32");
94 memory.write(nwritten_out, meta_len_u32)?;
95 if meta_len_u32 > metadata_buf_len {
96 return Err(Error::BufferLengthError {
97 buf: "metadata",
98 len: "specified length",
99 });
100 }
101 memory.copy_from_slice(
102 value.metadata.as_bytes(),
103 metadata_buf.as_array(meta_len_u32),
104 )?;
105 }
106 }
107 memory.write(generation_out, 0)?;
108 memory.write(kv_error_out, KvError::Ok)?;
109 Ok(())
110 }
111 Ok(None) => {
112 memory.write(kv_error_out, KvError::NotFound)?;
113 Ok(())
114 }
115 Err(e) => {
116 memory.write(kv_error_out, (&e).into())?;
117 Ok(())
118 }
119 }
120 }
121
122 async fn lookup_wait_v2(
123 &mut self,
124 memory: &mut GuestMemory<'_>,
125 pending_kv_lookup_handle: KvStoreLookupHandle,
126 body_handle_out: GuestPtr<BodyHandle>,
127 metadata_buf: GuestPtr<u8>,
128 metadata_buf_len: u32,
129 nwritten_out: GuestPtr<u32>,
130 generation_out: GuestPtr<u64>,
131 kv_error_out: GuestPtr<KvError>,
132 ) -> Result<(), Error> {
133 let resp = self
134 .take_pending_kv_lookup(pending_kv_lookup_handle.into())?
135 .task()
136 .recv()
137 .await?;
138
139 match resp {
140 Ok(Some(value)) => {
141 let body_handle = self.insert_body(value.body.into());
142
143 memory.write(body_handle_out, body_handle)?;
144 match value.metadata_len {
145 0 => memory.write(nwritten_out, 0)?,
146 len => {
147 let meta_len_u32 =
148 u32::try_from(len).expect("metadata len is outside the bounds of u32");
149 memory.write(nwritten_out, meta_len_u32)?;
150 if meta_len_u32 > metadata_buf_len {
151 return Err(Error::BufferLengthError {
152 buf: "metadata",
153 len: "specified length",
154 });
155 }
156 memory.copy_from_slice(
157 value.metadata.as_bytes(),
158 metadata_buf.as_array(meta_len_u32),
159 )?;
160 }
161 }
162 memory.write(generation_out, value.generation)?;
163 memory.write(kv_error_out, KvError::Ok)?;
164 Ok(())
165 }
166 Ok(None) => {
167 memory.write(kv_error_out, KvError::NotFound)?;
168 Ok(())
169 }
170 Err(e) => {
171 memory.write(kv_error_out, (&e).into())?;
172 Ok(())
173 }
174 }
175 }
176
177 async fn insert(
178 &mut self,
179 memory: &mut GuestMemory<'_>,
180 store: KvStoreHandle,
181 key: GuestPtr<str>,
182 body_handle: BodyHandle,
183 insert_config_mask: KvInsertConfigOptions,
184 insert_configuration: GuestPtr<KvInsertConfig>,
185 pending_handle_out: GuestPtr<KvStoreInsertHandle>,
186 ) -> Result<(), Error> {
187 let store = self.get_kv_store_key(store).unwrap().clone();
188 let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())
189 .map_err(|_| KvStoreError::BadRequest)?;
190 let body = self.take_body(body_handle)?.read_into_vec().await?;
191
192 let config = memory.read(insert_configuration)?;
193
194 let config_str_or_none = |flag, str_field: GuestPtr<u8>, len_field| {
195 if insert_config_mask.contains(flag) {
196 if len_field == 0 {
197 return Err(Error::InvalidArgument);
198 }
199
200 Ok(Some(memory.to_vec(str_field.as_array(len_field))?))
201 } else {
202 Ok(None)
203 }
204 };
205
206 let mode = config.mode;
207
208 let igm = if insert_config_mask.contains(KvInsertConfigOptions::IF_GENERATION_MATCH) {
212 Some(config.if_generation_match)
213 } else {
214 None
215 };
216
217 let meta = config_str_or_none(
218 KvInsertConfigOptions::METADATA,
219 config.metadata,
220 config.metadata_len,
221 )?;
222 let meta = if let Some(meta) = meta {
223 Some(String::from_utf8(meta).map_err(|_| Error::InvalidArgument)?)
224 } else {
225 None
226 };
227
228 let ttl = if insert_config_mask.contains(KvInsertConfigOptions::TIME_TO_LIVE_SEC) {
229 Some(std::time::Duration::from_secs(
230 config.time_to_live_sec as u64,
231 ))
232 } else {
233 None
234 };
235
236 let fut = futures::future::ok(self.kv_insert(store, key, body, Some(mode), igm, meta, ttl));
237 let task = PeekableTask::spawn(fut).await;
238 memory.write(
239 pending_handle_out,
240 self.insert_pending_kv_insert(PendingKvInsertTask::new(task)),
241 )?;
242
243 Ok(())
244 }
245
246 async fn insert_wait(
247 &mut self,
248 memory: &mut GuestMemory<'_>,
249 pending_insert_handle: KvStoreInsertHandle,
250 kv_error_out: GuestPtr<KvError>,
251 ) -> Result<(), Error> {
252 let resp = self
253 .take_pending_kv_insert(pending_insert_handle.into())?
254 .task()
255 .recv()
256 .await?;
257
258 match resp {
259 Ok(_) => {
260 memory.write(kv_error_out, KvError::Ok)?;
261 Ok(())
262 }
263 Err(e) => {
264 memory.write(kv_error_out, (&e).into())?;
265 Ok(())
266 }
267 }
268 }
269
270 async fn delete(
271 &mut self,
272 memory: &mut GuestMemory<'_>,
273 store: KvStoreHandle,
274 key: GuestPtr<str>,
275 _delete_config_mask: KvDeleteConfigOptions,
276 _delete_configuration: GuestPtr<KvDeleteConfig>,
277 pending_handle_out: GuestPtr<KvStoreDeleteHandle>,
278 ) -> Result<(), Error> {
279 let store = self.get_kv_store_key(store).unwrap().clone();
280 let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())
281 .map_err(|_| KvStoreError::BadRequest)?;
282 let fut = futures::future::ok(self.kv_delete(store, key));
283 let task = PeekableTask::spawn(fut).await;
284 memory.write(
285 pending_handle_out,
286 self.insert_pending_kv_delete(PendingKvDeleteTask::new(task))
287 .into(),
288 )?;
289 Ok(())
290 }
291
292 async fn delete_wait(
293 &mut self,
294 memory: &mut GuestMemory<'_>,
295 pending_delete_handle: KvStoreDeleteHandle,
296 kv_error_out: GuestPtr<KvError>,
297 ) -> Result<(), Error> {
298 let resp = self
299 .take_pending_kv_delete(pending_delete_handle.into())?
300 .task()
301 .recv()
302 .await?;
303
304 match resp {
305 Ok(_) => {
306 memory.write(kv_error_out, KvError::Ok)?;
307 Ok(())
308 }
309 Err(e) => {
310 memory.write(kv_error_out, (&e).into())?;
311 Ok(())
312 }
313 }
314 }
315
316 async fn list(
317 &mut self,
318 memory: &mut GuestMemory<'_>,
319 store: KvStoreHandle,
320 list_config_mask: KvListConfigOptions,
321 list_configuration: GuestPtr<KvListConfig>,
322 pending_handle_out: GuestPtr<KvStoreListHandle>,
323 ) -> Result<(), Error> {
324 let store = self.get_kv_store_key(store).unwrap().clone();
325
326 let config = memory.read(list_configuration)?;
327
328 let config_string_or_none = |flag, str_field: GuestPtr<u8>, len_field| {
329 if list_config_mask.contains(flag) {
330 if len_field == 0 {
331 return Err(Error::InvalidArgument);
332 }
333
334 let byte_vec = memory.to_vec(str_field.as_array(len_field))?;
335
336 Ok(Some(
337 String::from_utf8(byte_vec).map_err(|_| Error::InvalidArgument)?,
338 ))
339 } else {
340 Ok(None)
341 }
342 };
343
344 let cursor = config_string_or_none(
345 KvListConfigOptions::CURSOR,
346 config.cursor,
347 config.cursor_len,
348 )?;
349
350 let prefix = config_string_or_none(
351 KvListConfigOptions::PREFIX,
352 config.prefix,
353 config.prefix_len,
354 )?;
355
356 let limit = match list_config_mask.contains(KvListConfigOptions::LIMIT) {
357 true => Some(config.limit),
358 false => None,
359 };
360
361 let fut = futures::future::ok(self.kv_list(store, cursor, prefix, limit));
362 let task = PeekableTask::spawn(fut).await;
363 memory.write(
364 pending_handle_out,
365 self.insert_pending_kv_list(PendingKvListTask::new(task))
366 .into(),
367 )?;
368 Ok(())
369 }
370
371 async fn list_wait(
372 &mut self,
373 memory: &mut GuestMemory<'_>,
374 pending_kv_list_handle: KvStoreListHandle,
375 body_handle_out: GuestPtr<BodyHandle>,
376 kv_error_out: GuestPtr<KvError>,
377 ) -> Result<(), Error> {
378 let resp = self
379 .take_pending_kv_list(pending_kv_list_handle.into())?
380 .task()
381 .recv()
382 .await?;
383
384 match resp {
385 Ok(value) => {
386 let body_handle = self.insert_body(value.into()).into();
387
388 memory.write(body_handle_out, body_handle)?;
389
390 memory.write(kv_error_out, KvError::Ok)?;
391 Ok(())
392 }
393 Err(e) => {
394 memory.write(kv_error_out, (&e).into())?;
395 Ok(())
396 }
397 }
398 }
399}