use crate::handle::BodyHandle;
use crate::kv_store::{ListResponse, LookupResponse};
use bytes::BytesMut;
use fastly_shared::{
FastlyStatus, INVALID_BODY_HANDLE, INVALID_KV_PENDING_DELETE_HANDLE,
INVALID_KV_PENDING_INSERT_HANDLE, INVALID_KV_PENDING_LIST_HANDLE,
INVALID_KV_PENDING_LOOKUP_HANDLE, INVALID_KV_STORE_HANDLE,
};
use fastly_sys::fastly_kv_store as sys;
use fastly_sys::ListModeInternal;
pub use fastly_sys::{
DeleteConfig, DeleteConfigOptions, InsertConfig, InsertConfigOptions, InsertMode, ListConfig,
ListConfigOptions, ListMode, LookupConfig, LookupConfigOptions,
};
use serde_json;
use sys::KvError as KvSysError;
pub const METADATA_MAX_BYTES: usize = 2000;
#[derive(Clone, Debug, Eq, PartialEq, thiserror::Error)]
#[non_exhaustive]
pub enum KVStoreError {
#[error("Invalid KV Store key")]
InvalidKey,
#[error("Invalid KV Store handle")]
InvalidStoreHandle,
#[error("Invalid KV Store options")]
InvalidStoreOptions,
#[error("KV Store item request was bad")]
ItemBadRequest,
#[error("KV Store item not found")]
ItemNotFound,
#[error("KV Store item precondition failed")]
ItemPreconditionFailed,
#[error("KV Store item exceeded payload limit")]
ItemPayloadTooLarge,
#[error("KV Store {0:?} not found")]
StoreNotFound(String),
#[error("Too many KV Store requests")]
TooManyRequests,
#[error("Unexpected KV Store error: {0:?}")]
Unexpected(FastlyStatus),
}
impl From<FastlyStatus> for KVStoreError {
fn from(st: FastlyStatus) -> Self {
KVStoreError::Unexpected(st)
}
}
impl From<KvSysError> for KVStoreError {
fn from(err: KvSysError) -> Self {
match err {
KvSysError::Uninitialized => KVStoreError::Unexpected(FastlyStatus::ERROR),
KvSysError::Ok => KVStoreError::Unexpected(FastlyStatus::ERROR),
KvSysError::BadRequest => KVStoreError::ItemBadRequest,
KvSysError::NotFound => KVStoreError::ItemNotFound,
KvSysError::PreconditionFailed => KVStoreError::ItemPreconditionFailed,
KvSysError::PayloadTooLarge => KVStoreError::ItemPayloadTooLarge,
KvSysError::InternalError => KVStoreError::Unexpected(FastlyStatus::ERROR),
}
}
}
pub struct PendingLookupHandle {
pub(super) handle: u32,
}
impl PendingLookupHandle {
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub fn as_u32(&self) -> u32 {
self.handle
}
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub unsafe fn from_u32(handle: u32) -> Self {
Self { handle }
}
}
pub struct PendingInsertHandle {
pub(super) handle: u32,
}
impl PendingInsertHandle {
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub fn as_u32(&self) -> u32 {
self.handle
}
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub unsafe fn from_u32(handle: u32) -> Self {
Self { handle }
}
}
pub struct PendingDeleteHandle {
pub(super) handle: u32,
}
impl PendingDeleteHandle {
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub fn as_u32(&self) -> u32 {
self.handle
}
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub unsafe fn from_u32(handle: u32) -> Self {
Self { handle }
}
}
pub struct PendingListHandle {
pub(super) handle: u32,
}
impl PendingListHandle {
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub fn as_u32(&self) -> u32 {
self.handle
}
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub unsafe fn from_u32(handle: u32) -> Self {
Self { handle }
}
}
#[derive(Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct StoreHandle {
handle: u32,
}
impl StoreHandle {
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub fn as_u32(&self) -> u32 {
self.handle
}
pub fn open(name: &str) -> Result<Option<StoreHandle>, KVStoreError> {
let mut store_handle_out = INVALID_KV_STORE_HANDLE;
let status = unsafe { sys::open_v2(name.as_ptr(), name.len(), &mut store_handle_out) };
status.result().map_err(|st| match st {
FastlyStatus::INVAL => KVStoreError::StoreNotFound(name.to_owned()),
_ => st.into(),
})?;
if store_handle_out == INVALID_KV_STORE_HANDLE {
Ok(None)
} else {
Ok(Some(StoreHandle {
handle: store_handle_out,
}))
}
}
pub fn lookup(&self, key: impl AsRef<[u8]>) -> Result<PendingLookupHandle, KVStoreError> {
let mut pending_lookup_handle_out = INVALID_KV_PENDING_LOOKUP_HANDLE;
let key = key.as_ref();
let config_options = LookupConfigOptions::empty();
let config = LookupConfig::default();
let status = unsafe {
sys::lookup_v2(
self.as_u32(),
key.as_ptr(),
key.len(),
config_options,
&config,
&mut pending_lookup_handle_out,
)
};
status.result().map_err(|st| match st {
FastlyStatus::BADF => KVStoreError::InvalidStoreHandle,
FastlyStatus::INVAL => KVStoreError::ItemBadRequest,
_ => st.into(),
})?;
if pending_lookup_handle_out == INVALID_KV_PENDING_LOOKUP_HANDLE {
Err(KVStoreError::Unexpected(FastlyStatus::ERROR))
} else {
Ok(unsafe { PendingLookupHandle::from_u32(pending_lookup_handle_out) })
}
}
pub fn pending_lookup_wait(
&self,
pending_lookup_handle: PendingLookupHandle,
) -> Result<LookupResponse, KVStoreError> {
let mut body_handle_out = INVALID_BODY_HANDLE;
let metadata_buf_len = METADATA_MAX_BYTES;
let mut metadata_buf = BytesMut::zeroed(metadata_buf_len);
let mut metadata_len_out = 0usize;
let mut generation = 0u64;
let mut kv_sys_error = KvSysError::Uninitialized;
let status = unsafe {
sys::lookup_wait_v2(
pending_lookup_handle.as_u32(),
&mut body_handle_out,
metadata_buf.as_mut_ptr(),
metadata_buf_len,
&mut metadata_len_out,
&mut generation,
&mut kv_sys_error,
)
};
status.result().map_err(|st| match st {
FastlyStatus::BADF => KVStoreError::InvalidStoreHandle,
FastlyStatus::INVAL => KVStoreError::ItemBadRequest,
_ => st.into(),
})?;
let metadata = match metadata_len_out {
0 => None,
_ => {
unsafe {
metadata_buf.set_len(metadata_len_out);
}
Some(metadata_buf.freeze())
}
};
if kv_sys_error != KvSysError::Ok {
return Err(kv_sys_error.into());
}
if body_handle_out == INVALID_BODY_HANDLE {
Err(KVStoreError::Unexpected(FastlyStatus::ERROR))
} else {
Ok(LookupResponse {
body: unsafe { Some(BodyHandle::from_u32(body_handle_out).into()) },
metadata,
generation,
})
}
}
pub fn insert(
&self,
key: impl AsRef<str>,
value: BodyHandle,
mode: InsertMode,
background_fetch: bool,
if_generation_match: Option<u64>,
metadata: impl AsRef<str>,
time_to_live_sec: Option<std::time::Duration>,
) -> Result<PendingInsertHandle, KVStoreError> {
let key = key.as_ref();
let metadata = metadata.as_ref();
let mut config_options = InsertConfigOptions::empty();
let mut config = InsertConfig::default();
config.mode = mode;
if background_fetch {
config_options.insert(InsertConfigOptions::BACKGROUND_FETCH);
}
if let Some(igm) = if_generation_match {
config.if_generation_match = igm;
config_options.insert(InsertConfigOptions::IF_GENERATION_MATCH);
}
if !metadata.is_empty() {
config.metadata = metadata.as_ptr();
config.metadata_len = metadata.len() as u32;
config_options.insert(InsertConfigOptions::METADATA);
}
if let Some(ttl) = time_to_live_sec {
config.time_to_live_sec = ttl.as_secs().try_into().unwrap_or_default();
config_options.insert(InsertConfigOptions::TIME_TO_LIVE_SEC);
}
let mut pending_insert_handle_out = INVALID_KV_PENDING_INSERT_HANDLE;
let status = unsafe {
sys::insert_v2(
self.as_u32(),
key.as_ptr(),
key.len(),
value.into_u32(),
config_options,
&config,
&mut pending_insert_handle_out,
)
};
status.result().map_err(|st| match st {
FastlyStatus::BADF => KVStoreError::InvalidStoreHandle,
FastlyStatus::INVAL => KVStoreError::ItemBadRequest,
_ => st.into(),
})?;
if pending_insert_handle_out == INVALID_KV_PENDING_INSERT_HANDLE {
Err(KVStoreError::Unexpected(FastlyStatus::ERROR))
} else {
Ok(unsafe { PendingInsertHandle::from_u32(pending_insert_handle_out) })
}
}
pub fn pending_insert_wait(
&self,
pending_insert_handle: PendingInsertHandle,
) -> Result<(), KVStoreError> {
let mut kv_sys_error = KvSysError::Uninitialized;
let status = unsafe {
sys::pending_insert_wait_v2(pending_insert_handle.as_u32(), &mut kv_sys_error)
};
status.result().map_err(|st| match st {
FastlyStatus::BADF => KVStoreError::InvalidStoreHandle,
FastlyStatus::INVAL => KVStoreError::ItemBadRequest,
FastlyStatus::LIMITEXCEEDED => KVStoreError::TooManyRequests,
_ => st.into(),
})?;
if kv_sys_error != KvSysError::Ok {
return Err(kv_sys_error.into());
}
Ok(())
}
pub fn delete(&self, key: impl AsRef<str>) -> Result<PendingDeleteHandle, KVStoreError> {
let mut pending_delete_handle_out = INVALID_KV_PENDING_DELETE_HANDLE;
let key = key.as_ref();
let status = unsafe {
sys::delete_v2(
self.as_u32(),
key.as_ptr(),
key.len(),
DeleteConfigOptions::empty(),
&DeleteConfig::default(),
&mut pending_delete_handle_out,
)
};
status.result().map_err(|st| match st {
FastlyStatus::BADF => KVStoreError::InvalidStoreHandle,
FastlyStatus::INVAL => KVStoreError::ItemBadRequest,
_ => st.into(),
})?;
Ok(unsafe { PendingDeleteHandle::from_u32(pending_delete_handle_out) })
}
pub fn pending_delete_wait(
&self,
pending_delete_handle: PendingDeleteHandle,
) -> Result<(), KVStoreError> {
let mut kv_sys_error = KvSysError::Uninitialized;
let status = unsafe {
sys::pending_delete_wait_v2(pending_delete_handle.as_u32(), &mut kv_sys_error)
};
status.result().map_err(|st| match st {
FastlyStatus::BADF => KVStoreError::InvalidStoreHandle,
FastlyStatus::INVAL => KVStoreError::ItemBadRequest,
_ => st.into(),
})?;
if kv_sys_error != KvSysError::Ok {
return Err(kv_sys_error.into());
}
Ok(())
}
pub fn list(
&self,
mode: ListMode,
cursor: Option<String>,
limit: Option<u32>,
prefix: Option<String>,
) -> Result<PendingListHandle, KVStoreError> {
let mut pending_list_handle_out = INVALID_KV_PENDING_LIST_HANDLE;
let prefix = prefix.as_ref();
let mut config_options = ListConfigOptions::empty();
let mut config = ListConfig::default();
config.mode = match mode {
ListMode::Strong | ListMode::Other(_) => ListModeInternal::Strong,
ListMode::Eventual => ListModeInternal::Eventual,
};
if let Some(c) = &cursor {
if !c.is_empty() {
config.cursor = c.as_ptr();
config.cursor_len = c.len() as u32;
config_options.insert(ListConfigOptions::CURSOR);
}
}
if let Some(l) = limit {
config.limit = l;
config_options.insert(ListConfigOptions::LIMIT);
}
if let Some(p) = prefix {
if !p.is_empty() {
config.prefix = p.as_ptr();
config.prefix_len = p.len() as u32;
config_options.insert(ListConfigOptions::PREFIX);
}
}
let status = unsafe {
sys::list_v2(
self.as_u32(),
config_options,
&config,
&mut pending_list_handle_out,
)
};
status.result().map_err(|st| match st {
FastlyStatus::BADF => KVStoreError::InvalidStoreHandle,
_ => st.into(),
})?;
Ok(unsafe { PendingListHandle::from_u32(pending_list_handle_out) })
}
pub fn pending_list_wait(
&self,
pending_list_handle: PendingListHandle,
) -> Result<ListResponse<'_>, KVStoreError> {
let mut kv_sys_error = KvSysError::Uninitialized;
let mut body_handle_out = INVALID_BODY_HANDLE;
let status = unsafe {
sys::pending_list_wait_v2(
pending_list_handle.as_u32(),
&mut body_handle_out,
&mut kv_sys_error,
)
};
status.result().map_err(|st| match st {
FastlyStatus::BADF => KVStoreError::InvalidStoreHandle,
_ => st.into(),
})?;
if kv_sys_error != KvSysError::Ok {
return Err(kv_sys_error.into());
}
if body_handle_out == INVALID_BODY_HANDLE {
Err(KVStoreError::Unexpected(FastlyStatus::ERROR))
} else {
let body = unsafe { BodyHandle::from_u32(body_handle_out) };
let lrp = serde_json::from_reader(body)
.map_err(|_| KVStoreError::Unexpected(FastlyStatus::ERROR))?;
Ok(ListResponse {
store_handle: self,
page: lrp,
iterator_did_error: false,
})
}
}
}