use async_trait::async_trait;
use std::fmt;
use tokio::sync::mpsc::Sender;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct WatchCursor(VersionToken);
impl WatchCursor {
pub fn none() -> Self {
Self(VersionToken::unknown())
}
pub fn is_none(&self) -> bool {
self.0.is_unknown()
}
pub fn from_version(token: VersionToken) -> Self {
Self(token)
}
pub fn from_u64(rev: u64) -> Self {
Self(VersionToken::from_u64(rev))
}
#[must_use]
pub fn as_u64(&self) -> Option<u64> {
self.0.as_u64()
}
pub(crate) fn version(&self) -> &VersionToken {
&self.0
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum KvError {
#[error("store not connected")]
NotConnected,
#[error("connection failed: {0}")]
ConnectionFailed(String),
#[error("key not found")]
KeyNotFound,
#[error("key already exists")]
AlreadyExists,
#[error("revision mismatch")]
RevisionMismatch,
#[error("deserialization error: {0}")]
DeserializationError(String),
#[error("serialization error: {0}")]
SerializationError(String),
#[error("watch error: {0}")]
WatchError(String),
#[error("operation failed: {0}")]
OperationFailed(String),
#[error("operation timed out")]
Timeout,
#[error("watch cursor expired (compacted)")]
CursorExpired,
}
#[derive(Clone, Default, PartialEq, Eq, Hash)]
pub struct VersionToken {
len: u8,
buf: [u8; 10],
}
impl fmt::Debug for VersionToken {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let bytes = self.as_bytes();
if let Some(v) = self.as_u64() {
write!(f, "VersionToken(u64: {v})")
} else if bytes.is_empty() {
write!(f, "VersionToken(unknown)")
} else {
write!(f, "VersionToken({bytes:?})")
}
}
}
impl VersionToken {
pub fn unknown() -> Self {
Self::default()
}
pub fn is_unknown(&self) -> bool {
self.len == 0
}
pub fn from_u64(rev: u64) -> Self {
let mut buf = [0u8; 10];
buf[..8].copy_from_slice(&rev.to_be_bytes());
Self { len: 8, buf }
}
#[cfg(test)]
pub(crate) fn from_fdb_versionstamp(vs: &[u8; 10]) -> Self {
Self { len: 10, buf: *vs }
}
#[must_use]
pub fn as_u64(&self) -> Option<u64> {
if self.len == 8 {
Some(u64::from_be_bytes(self.buf[..8].try_into().unwrap_or_else(
|_| unreachable!("len == 8 guarantees an 8-byte slice"),
)))
} else {
None
}
}
pub fn as_bytes(&self) -> &[u8] {
&self.buf[..self.len as usize]
}
#[must_use]
pub(crate) fn from_raw(bytes: &[u8]) -> Option<Self> {
if bytes.len() > 10 {
return None;
}
let len = bytes.len() as u8;
let mut buf = [0u8; 10];
buf[..len as usize].copy_from_slice(bytes);
Some(Self { len, buf })
}
}
#[derive(Debug, Clone)]
pub struct KvEntry {
pub key: String,
pub value: Vec<u8>,
pub version: VersionToken,
}
#[derive(Debug, Clone)]
pub enum KvUpdate {
Put(KvEntry),
Delete { key: String, version: VersionToken },
Purge { key: String, version: VersionToken },
}
impl KvUpdate {
pub fn key(&self) -> &str {
match self {
KvUpdate::Put(e) => &e.key,
KvUpdate::Delete { key, .. } => key,
KvUpdate::Purge { key, .. } => key,
}
}
pub fn version(&self) -> &VersionToken {
match self {
KvUpdate::Put(e) => &e.version,
KvUpdate::Delete { version, .. } => version,
KvUpdate::Purge { version, .. } => version,
}
}
}
#[async_trait]
pub trait KvReader: Send + Sync {
async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError>;
async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError>;
async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError>;
async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError>;
}
#[async_trait]
pub trait KvWatcher: Send + Sync {
async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError>;
async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError>;
async fn watch_prefixes(&self, prefixes: &[&str], tx: Sender<KvUpdate>) -> Result<(), KvError>;
async fn watch_all_from(
&self,
cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
let _ = cursor;
self.watch_all(tx).await
}
async fn watch_prefix_from(
&self,
prefix: &str,
cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
let _ = cursor;
self.watch_prefix(prefix, tx).await
}
async fn watch_prefixes_from(
&self,
prefixes: &[&str],
cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
let _ = cursor;
self.watch_prefixes(prefixes, tx).await
}
}
#[async_trait]
pub trait KvWriter: Send + Sync {
async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError>;
async fn delete(&self, key: &str) -> Result<bool, KvError>;
async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError>;
async fn update(
&self,
key: &str,
value: &[u8],
expected: &VersionToken,
) -> Result<VersionToken, KvError>;
async fn delete_with_version(
&self,
key: &str,
expected: &VersionToken,
) -> Result<bool, KvError>;
}
#[async_trait]
pub trait KvTtl: KvWriter {
async fn put_with_ttl(
&self,
key: &str,
value: &[u8],
ttl: std::time::Duration,
) -> Result<VersionToken, KvError>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn from_raw_roundtrips_within_capacity() {
let bytes = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let token = VersionToken::from_raw(&bytes).expect("10 bytes is within capacity");
assert_eq!(token.as_bytes(), &bytes);
let rev = 0x0102_0304_0506_0708u64;
let token = VersionToken::from_raw(&rev.to_be_bytes()).expect("8 bytes is within capacity");
assert_eq!(token.as_u64(), Some(rev));
assert!(
VersionToken::from_raw(&[])
.expect("empty is within capacity")
.is_unknown()
);
}
#[test]
fn from_raw_rejects_above_capacity() {
assert!(VersionToken::from_raw(&[0u8; 11]).is_none());
}
}