use crate::kv_store;
use crate::Body;
use serde::Deserialize;
pub use self::handle::InsertMode;
pub use self::handle::KVStoreError;
pub use self::handle::ListMode;
pub use self::handle::StoreHandle;
pub use self::handle::PendingDeleteHandle;
pub use self::handle::PendingInsertHandle;
pub use self::handle::PendingListHandle;
pub use self::handle::PendingLookupHandle;
#[doc(hidden)]
pub mod handle;
pub struct LookupResponse {
body: Option<Body>,
metadata: Option<bytes::Bytes>,
generation: u64,
}
impl LookupResponse {
pub fn take_body(&mut self) -> Body {
self.body.take().unwrap_or_else(Body::new)
}
pub fn try_take_body(&mut self) -> Option<Body> {
self.body.take()
}
pub fn take_body_bytes(&mut self) -> Vec<u8> {
if let Some(body) = self.try_take_body() {
body.into_bytes()
} else {
Vec::new()
}
}
pub fn metadata(&self) -> Option<bytes::Bytes> {
self.metadata.to_owned()
}
#[deprecated(
since = "0.11.0",
note = "`generation` has a bug in this version of the SDK, and will always return 0"
)]
pub fn generation(&self) -> u32 {
0
}
pub fn current_generation(&self) -> u64 {
self.generation
}
}
#[derive(Deserialize, Debug, Clone)]
struct ListMetadata {
limit: u32,
next_cursor: Option<String>,
prefix: Option<String>,
consistency: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct ListPage {
data: Vec<String>,
meta: ListMetadata,
}
impl ListPage {
pub fn keys(&self) -> &[String] {
self.data.as_slice()
}
pub fn into_keys(self) -> Vec<String> {
self.data
}
pub fn next_cursor(&self) -> Option<String> {
self.meta.next_cursor.clone()
}
pub fn prefix(&self) -> Option<&str> {
self.meta.prefix.as_deref()
}
pub fn limit(&self) -> u32 {
self.meta.limit
}
pub fn mode(&self) -> ListMode {
match self.meta.consistency.as_deref() {
None => ListMode::Strong,
Some("strong") => ListMode::Strong,
Some("eventual") => ListMode::Eventual,
Some(other) => ListMode::Other(other.to_string()),
}
}
}
#[derive(Debug)]
pub struct ListResponse<'a> {
store_handle: &'a kv_store::StoreHandle,
page: ListPage,
iterator_did_error: bool,
}
impl<'a> ListResponse<'a> {
fn new(
handle: &'a kv_store::StoreHandle,
mode: ListMode,
cursor: Option<String>,
limit: Option<u32>,
prefix: Option<String>,
) -> Self {
let m_limit = limit.unwrap_or(1000);
let m_consistency = match mode {
ListMode::Strong => None,
ListMode::Eventual => Some("eventual".to_string()),
ListMode::Other(_) => None,
};
ListResponse {
store_handle: handle,
page: ListPage {
data: vec![],
meta: ListMetadata {
limit: m_limit,
next_cursor: cursor,
prefix,
consistency: m_consistency,
},
},
iterator_did_error: false,
}
}
pub fn into_page(self) -> ListPage {
self.page
}
}
impl Iterator for ListResponse<'_> {
type Item = Result<ListPage, KVStoreError>;
fn next(&mut self) -> Option<Self::Item> {
let cursor = self.page.meta.next_cursor.to_owned();
if self.iterator_did_error {
return None;
}
if self.page.meta.next_cursor.is_none() && !self.page.data.is_empty() {
return None;
}
let mode = match self.page.meta.consistency.as_deref() {
None => ListMode::Strong, Some("strong") => ListMode::Strong,
Some("eventual") => ListMode::Eventual,
Some(other) => ListMode::Other(other.to_string()),
};
let limit = Some(self.page.meta.limit);
let prefix = self.page.meta.prefix.to_owned();
let h_res = self.store_handle.list(mode, cursor, limit, prefix);
if let Err(e) = h_res {
return Some(Err(e));
}
let handle = h_res.unwrap();
let w_res = self.store_handle.pending_list_wait(handle);
if let Err(e) = w_res {
return Some(Err(e));
}
let out = w_res.unwrap().into_page();
let keys = out.keys();
if keys.is_empty() {
return None;
}
self.page.data = keys.to_vec();
self.page.meta.next_cursor = out.next_cursor();
Some(Ok(self.page.clone()))
}
}
pub struct InsertBuilder<'a> {
store: &'a KVStore,
mode: InsertMode,
background_fetch: bool,
if_generation_match: Option<u64>,
metadata: String,
time_to_live_sec: Option<std::time::Duration>,
}
impl InsertBuilder<'_> {
pub fn mode(self, mode: InsertMode) -> Self {
InsertBuilder { mode, ..self }
}
pub fn background_fetch(self) -> Self {
InsertBuilder {
background_fetch: true,
..self
}
}
pub fn if_generation_match(self, gen: u64) -> Self {
InsertBuilder {
if_generation_match: Some(gen),
..self
}
}
pub fn metadata(self, data: &str) -> Self {
InsertBuilder {
metadata: data.to_string(),
..self
}
}
pub fn time_to_live(self, ttl: std::time::Duration) -> Self {
InsertBuilder {
time_to_live_sec: Some(ttl),
..self
}
}
pub fn execute(self, key: &str, value: impl Into<Body>) -> Result<(), KVStoreError> {
let handle = self.store.handle.insert(
key,
value.into().into_handle(),
self.mode,
self.background_fetch,
self.if_generation_match,
self.metadata.clone(),
self.time_to_live_sec,
)?;
self.store.pending_insert_wait(handle)
}
pub fn execute_async(
&self,
key: &str,
value: impl Into<Body>,
) -> Result<PendingInsertHandle, KVStoreError> {
self.store.handle.insert(
key,
value.into().into_handle(),
self.mode,
self.background_fetch,
self.if_generation_match,
self.metadata.clone(),
self.time_to_live_sec,
)
}
}
pub struct LookupBuilder<'a> {
store: &'a KVStore,
}
impl LookupBuilder<'_> {
pub fn execute(&self, key: &str) -> Result<LookupResponse, KVStoreError> {
self.store
.handle
.pending_lookup_wait(self.store.handle.lookup(key.as_bytes())?)
}
pub fn execute_async(&self, key: &str) -> Result<PendingLookupHandle, KVStoreError> {
self.store.handle.lookup(key.as_bytes())
}
}
pub struct DeleteBuilder<'a> {
store: &'a KVStore,
}
impl DeleteBuilder<'_> {
pub fn execute(&self, key: &str) -> Result<(), KVStoreError> {
self.store
.pending_delete_wait(self.store.handle.delete(key)?)
}
pub fn execute_async(&self, key: &str) -> Result<PendingDeleteHandle, KVStoreError> {
self.store.handle.delete(key)
}
}
pub struct ListBuilder<'a> {
store: &'a KVStore,
mode: ListMode,
cursor: Option<String>,
limit: Option<u32>,
prefix: Option<String>,
}
impl<'a> ListBuilder<'a> {
pub fn eventual_consistency(self) -> Self {
ListBuilder {
mode: ListMode::Eventual,
..self
}
}
pub fn cursor(self, cursor: &str) -> Self {
ListBuilder {
cursor: Some(cursor.to_string()),
..self
}
}
pub fn limit(self, limit: u32) -> Self {
ListBuilder {
limit: Some(limit),
..self
}
}
pub fn prefix(self, prefix: &str) -> Self {
ListBuilder {
prefix: Some(prefix.to_string()),
..self
}
}
pub fn execute(self) -> Result<ListPage, KVStoreError> {
let handle = self
.store
.handle
.list(self.mode, self.cursor, self.limit, self.prefix)?;
let res = self.store.pending_list_wait(handle)?;
Ok(res)
}
pub fn iter(self) -> ListResponse<'a> {
ListResponse::new(
&self.store.handle,
self.mode,
self.cursor,
self.limit,
self.prefix,
)
}
pub fn execute_async(&self) -> Result<PendingListHandle, KVStoreError> {
self.store.handle.list(
self.mode.clone(),
self.cursor.clone(),
self.limit,
self.prefix.clone(),
)
}
}
pub struct KVStore {
handle: StoreHandle,
}
impl KVStore {
#[doc(hidden)]
pub fn as_handle(&self) -> &StoreHandle {
&self.handle
}
pub fn open(name: &str) -> Result<Option<Self>, KVStoreError> {
match StoreHandle::open(name)? {
Some(handle) => Ok(Some(Self { handle })),
None => Ok(None),
}
}
pub fn lookup(&self, key: &str) -> Result<LookupResponse, KVStoreError> {
self.handle
.pending_lookup_wait(self.handle.lookup(key.as_bytes())?)
}
pub fn build_lookup(&self) -> LookupBuilder<'_> {
LookupBuilder { store: self }
}
pub fn pending_lookup_wait(
&self,
pending_request_handle: PendingLookupHandle,
) -> Result<LookupResponse, KVStoreError> {
self.handle.pending_lookup_wait(pending_request_handle)
}
pub fn insert(&self, key: &str, value: impl Into<Body>) -> Result<(), KVStoreError> {
self.pending_insert_wait(self.handle.insert(
key,
value.into().into_handle(),
InsertMode::Overwrite,
false,
None,
"",
None,
)?)
}
pub fn build_insert(&self) -> InsertBuilder<'_> {
InsertBuilder {
store: self,
mode: InsertMode::Overwrite,
background_fetch: false,
if_generation_match: None,
metadata: String::new(),
time_to_live_sec: None,
}
}
pub fn pending_insert_wait(
&self,
pending_insert_handle: PendingInsertHandle,
) -> Result<(), KVStoreError> {
self.handle.pending_insert_wait(pending_insert_handle)?;
Ok(())
}
pub fn delete(&self, key: &str) -> Result<(), KVStoreError> {
self.pending_delete_wait(self.handle.delete(key)?)
}
pub fn build_delete(&self) -> DeleteBuilder<'_> {
DeleteBuilder { store: self }
}
pub fn pending_delete_wait(
&self,
pending_delete_handle: PendingDeleteHandle,
) -> Result<(), KVStoreError> {
self.handle.pending_delete_wait(pending_delete_handle)?;
Ok(())
}
pub fn list(&self) -> Result<ListPage, KVStoreError> {
let lh = self.handle.list(ListMode::Strong, None, None, None);
self.pending_list_wait(lh?)
}
pub fn build_list(&self) -> ListBuilder<'_> {
ListBuilder {
store: self,
mode: ListMode::Strong,
cursor: None,
limit: None,
prefix: None,
}
}
pub fn pending_list_wait(
&self,
pending_request_handle: PendingListHandle,
) -> Result<ListPage, KVStoreError> {
let res = self.handle.pending_list_wait(pending_request_handle);
Ok(res?.into_page())
}
}