use crate::engine::wasm::bindings::astrid::kv::host::{self as kv, ErrorCode, KeyPage};
use crate::engine::wasm::host::util;
use crate::engine::wasm::host_state::HostState;
fn store_err(op: &str, msg: impl std::fmt::Display) -> ErrorCode {
let s = msg.to_string();
if s.contains("invalid key") || s.contains("validation") {
ErrorCode::InvalidKey
} else if s.contains("quota") {
ErrorCode::Quota
} else if s.contains("too large") {
ErrorCode::TooLarge
} else {
ErrorCode::Unknown(format!("{op}: {s}"))
}
}
impl kv::Host for HostState {
fn kv_get(&mut self, key: String) -> Result<Option<Vec<u8>>, ErrorCode> {
let kv = self.effective_kv().clone();
util::bounded_block_on(&self.runtime_handle, &self.blocking_semaphore, async {
kv.get(&key).await
})
.map_err(|e| store_err("kv_get", e))
}
fn kv_set(&mut self, key: String, value: Vec<u8>) -> Result<(), ErrorCode> {
let kv = self.effective_kv().clone();
util::bounded_block_on(&self.runtime_handle, &self.blocking_semaphore, async {
kv.set(&key, value).await
})
.map_err(|e| store_err("kv_set", e))
}
fn kv_delete(&mut self, key: String) -> Result<(), ErrorCode> {
let kv = self.effective_kv().clone();
util::bounded_block_on(&self.runtime_handle, &self.blocking_semaphore, async {
kv.delete(&key).await
})
.map(|_| ())
.map_err(|e| store_err("kv_delete", e))
}
fn kv_list_keys(&mut self, prefix: String) -> Result<Vec<String>, ErrorCode> {
let kv = self.effective_kv().clone();
let keys = util::bounded_block_on(&self.runtime_handle, &self.blocking_semaphore, async {
kv.list_keys_with_prefix(&prefix).await
})
.map_err(|e| store_err("kv_list_keys", e))?;
if keys.len() > 1024 {
return Err(ErrorCode::TooLarge);
}
Ok(keys)
}
fn kv_list_keys_page(
&mut self,
prefix: String,
cursor: Option<String>,
limit: u32,
) -> Result<KeyPage, ErrorCode> {
let kv = self.effective_kv().clone();
let mut keys =
util::bounded_block_on(&self.runtime_handle, &self.blocking_semaphore, async {
kv.list_keys_with_prefix(&prefix).await
})
.map_err(|e| store_err("kv_list_keys_page", e))?;
keys.sort();
let limit = if limit == 0 { 1024 } else { limit.min(1024) } as usize;
let start = cursor
.as_deref()
.map(|c| keys.partition_point(|k| k.as_str() <= c))
.unwrap_or(0);
let end = (start + limit).min(keys.len());
let page_keys = keys[start..end].to_vec();
let next_cursor = if end < keys.len() {
page_keys.last().cloned()
} else {
None
};
Ok(KeyPage {
keys: page_keys,
next_cursor,
})
}
fn kv_clear_prefix(&mut self, prefix: String) -> Result<u64, ErrorCode> {
let kv = self.effective_kv().clone();
util::bounded_block_on(&self.runtime_handle, &self.blocking_semaphore, async {
kv.clear_prefix(&prefix).await
})
.map_err(|e| store_err("kv_clear_prefix", e))
}
fn kv_cas(
&mut self,
key: String,
expected: Option<Vec<u8>>,
new: Vec<u8>,
) -> Result<(), ErrorCode> {
let kv = self.effective_kv().clone();
util::bounded_block_on(&self.runtime_handle, &self.blocking_semaphore, async {
match kv.compare_and_swap(&key, expected.as_deref(), new).await {
Ok(true) => Ok(()),
Ok(false) => Err(ErrorCode::CasMismatch),
Err(e) => Err(store_err("kv_cas", e)),
}
})
}
}