use super::types::{PendingKvDeleteHandle, PendingKvInsertHandle, PendingKvLookupHandle};
use crate::session::PeekableTask;
use crate::session::{PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask};
use {
crate::{
body::Body,
error::Error,
object_store::{ObjectKey, ObjectStoreError},
session::Session,
wiggle_abi::{
fastly_object_store::FastlyObjectStore,
types::{BodyHandle, ObjectStoreHandle},
},
},
wiggle::{GuestMemory, GuestPtr},
};
impl FastlyObjectStore for Session {
fn open(
&mut self,
memory: &mut GuestMemory<'_>,
name: GuestPtr<str>,
) -> Result<ObjectStoreHandle, Error> {
let name = memory.as_str(name)?.ok_or(Error::SharedMemory)?;
if self.kv_store().store_exists(name)? {
Ok(self.kv_store_handle(name).into())
} else {
Err(Error::ObjectStoreError(
ObjectStoreError::UnknownObjectStore(name.to_owned()),
))
}
}
fn lookup(
&mut self,
memory: &mut GuestMemory<'_>,
store: ObjectStoreHandle,
key: GuestPtr<str>,
opt_body_handle_out: GuestPtr<BodyHandle>,
) -> Result<(), Error> {
let store = self.get_kv_store_key(store.into()).unwrap();
let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
match self.obj_lookup(store.clone(), key) {
Ok(Some(obj)) => {
let new_handle = self.insert_body(Body::from(obj.body));
memory.write(opt_body_handle_out, new_handle)?;
Ok(())
}
Ok(None) => Ok(()),
Err(err) => Err(err.into()),
}
}
async fn lookup_async(
&mut self,
memory: &mut GuestMemory<'_>,
store: ObjectStoreHandle,
key: GuestPtr<str>,
opt_pending_body_handle_out: GuestPtr<PendingKvLookupHandle>,
) -> Result<(), Error> {
let store = self.get_kv_store_key(store.into()).unwrap();
let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
let fut = futures::future::ok(self.obj_lookup(store.clone(), key));
let task = PeekableTask::spawn(fut).await;
memory.write(
opt_pending_body_handle_out,
self.insert_pending_kv_lookup(PendingKvLookupTask::new(task)),
)?;
Ok(())
}
async fn pending_lookup_wait(
&mut self,
memory: &mut wiggle::GuestMemory<'_>,
pending_body_handle: PendingKvLookupHandle,
opt_body_handle_out: GuestPtr<BodyHandle>,
) -> Result<(), Error> {
let pending_obj = self
.take_pending_kv_lookup(pending_body_handle)?
.task()
.recv()
.await?;
match pending_obj {
Ok(Some(obj)) => {
let new_handle = self.insert_body(Body::from(obj.body));
memory.write(opt_body_handle_out, new_handle)?;
Ok(())
}
Ok(None) => Ok(()),
Err(err) => Err(err.into()),
}
}
async fn insert(
&mut self,
memory: &mut wiggle::GuestMemory<'_>,
store: ObjectStoreHandle,
key: GuestPtr<str>,
body_handle: BodyHandle,
) -> Result<(), Error> {
let store = self.get_kv_store_key(store.into()).unwrap().clone();
let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
let bytes = self.take_body(body_handle)?.read_into_vec().await?;
self.kv_insert(store, key, bytes, None, None, None, None)?;
Ok(())
}
async fn insert_async(
&mut self,
memory: &mut GuestMemory<'_>,
store: ObjectStoreHandle,
key: GuestPtr<str>,
body_handle: BodyHandle,
opt_pending_body_handle_out: GuestPtr<PendingKvInsertHandle>,
) -> Result<(), Error> {
let store = self.get_kv_store_key(store.into()).unwrap().clone();
let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
let bytes = self.take_body(body_handle)?.read_into_vec().await?;
let fut = futures::future::ok(self.kv_insert(store, key, bytes, None, None, None, None));
let task = PeekableTask::spawn(fut).await;
memory.write(
opt_pending_body_handle_out,
self.insert_pending_kv_insert(PendingKvInsertTask::new(task))
.into(),
)?;
Ok(())
}
async fn pending_insert_wait(
&mut self,
_memory: &mut GuestMemory<'_>,
pending_insert_handle: PendingKvInsertHandle,
) -> Result<(), Error> {
Ok((self
.take_pending_kv_insert(pending_insert_handle)?
.task()
.recv()
.await?)?)
}
async fn delete_async(
&mut self,
memory: &mut wiggle::GuestMemory<'_>,
store: ObjectStoreHandle,
key: GuestPtr<str>,
opt_pending_delete_handle_out: GuestPtr<PendingKvDeleteHandle>,
) -> Result<(), Error> {
let store = self.get_kv_store_key(store.into()).unwrap().clone();
let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?;
let fut = futures::future::ok(self.kv_delete(store, key));
let task = PeekableTask::spawn(fut).await;
memory.write(
opt_pending_delete_handle_out,
self.insert_pending_kv_delete(PendingKvDeleteTask::new(task)),
)?;
Ok(())
}
async fn pending_delete_wait(
&mut self,
_memory: &mut GuestMemory<'_>,
pending_delete_handle: PendingKvDeleteHandle,
) -> Result<(), Error> {
if !(self
.take_pending_kv_delete(pending_delete_handle)?
.task()
.recv()
.await?)?
{
Err(Error::ValueAbsent)
} else {
Ok(())
}
}
}