use std::borrow::Cow;
use std::cmp::Ordering;
use std::num::NonZeroU32;
use std::pin::Pin;
use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use futures::Stream;
use num_bigint::BigInt;
use serde::Deserialize;
use serde::Serialize;
use uuid::Uuid;
use crate::codec::canonicalize_f64;
pub type WatchStream =
Pin<Box<dyn Stream<Item = Result<Vec<WatchKeyOutput>, anyhow::Error>>>>;
#[async_trait(?Send)]
pub trait Database: Clone + Sized {
type QMH: QueueMessageHandle + 'static;
async fn snapshot_read(
&self,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, anyhow::Error>;
async fn atomic_write(
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, anyhow::Error>;
async fn dequeue_next_message(
&self,
) -> Result<Option<Self::QMH>, anyhow::Error>;
fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream;
fn close(&self);
}
#[async_trait(?Send)]
pub trait QueueMessageHandle {
async fn take_payload(&mut self) -> Result<Vec<u8>, anyhow::Error>;
async fn finish(&self, success: bool) -> Result<(), anyhow::Error>;
}
#[async_trait(?Send)]
impl QueueMessageHandle for Box<dyn QueueMessageHandle> {
async fn take_payload(&mut self) -> Result<Vec<u8>, anyhow::Error> {
(**self).take_payload().await
}
async fn finish(&self, success: bool) -> Result<(), anyhow::Error> {
(**self).finish(success).await
}
}
#[derive(Clone, Debug)]
pub struct SnapshotReadOptions {
pub consistency: Consistency,
}
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum Consistency {
Strong,
Eventual,
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)]
pub struct Key(pub Vec<KeyPart>);
#[derive(Clone, Debug)]
pub enum KeyPart {
Bytes(Vec<u8>),
String(String),
Int(BigInt),
Float(f64),
False,
True,
}
impl KeyPart {
fn tag_ordering(&self) -> u8 {
match self {
KeyPart::Bytes(_) => 0,
KeyPart::String(_) => 1,
KeyPart::Int(_) => 2,
KeyPart::Float(_) => 3,
KeyPart::False => 4,
KeyPart::True => 5,
}
}
}
impl Eq for KeyPart {}
impl PartialEq for KeyPart {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Ord for KeyPart {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(KeyPart::Bytes(b1), KeyPart::Bytes(b2)) => b1.cmp(b2),
(KeyPart::String(s1), KeyPart::String(s2)) => {
s1.as_bytes().cmp(s2.as_bytes())
}
(KeyPart::Int(i1), KeyPart::Int(i2)) => i1.cmp(i2),
(KeyPart::Float(f1), KeyPart::Float(f2)) => {
canonicalize_f64(*f1).total_cmp(&canonicalize_f64(*f2))
}
_ => self.tag_ordering().cmp(&other.tag_ordering()),
}
}
}
impl PartialOrd for KeyPart {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Clone, Debug)]
pub struct ReadRange {
pub start: Vec<u8>,
pub end: Vec<u8>,
pub limit: NonZeroU32,
pub reverse: bool,
}
#[derive(Debug)]
pub struct ReadRangeOutput {
pub entries: Vec<KvEntry>,
}
pub type Versionstamp = [u8; 10];
#[derive(Debug)]
pub struct KvEntry {
pub key: Vec<u8>,
pub value: KvValue,
pub versionstamp: Versionstamp,
}
#[derive(Debug)]
pub enum KvValue {
V8(Vec<u8>),
Bytes(Vec<u8>),
U64(u64),
}
pub struct AtomicWrite {
pub checks: Vec<Check>,
pub mutations: Vec<Mutation>,
pub enqueues: Vec<Enqueue>,
}
pub struct Check {
pub key: Vec<u8>,
pub versionstamp: Option<Versionstamp>,
}
pub struct Mutation {
pub key: Vec<u8>,
pub kind: MutationKind,
pub expire_at: Option<DateTime<Utc>>,
}
pub struct Enqueue {
pub payload: Vec<u8>,
pub deadline: DateTime<Utc>,
pub keys_if_undelivered: Vec<Vec<u8>>,
pub backoff_schedule: Option<Vec<u32>>,
}
#[derive(Debug)]
pub enum MutationKind {
Set(KvValue),
Delete,
Sum {
value: KvValue,
min_v8: Vec<u8>,
max_v8: Vec<u8>,
clamp: bool,
},
Min(KvValue),
Max(KvValue),
SetSuffixVersionstampedKey(KvValue),
}
impl MutationKind {
pub fn value(&self) -> Option<&KvValue> {
match self {
MutationKind::Set(value) => Some(value),
MutationKind::Sum { value, .. } => Some(value),
MutationKind::Min(value) => Some(value),
MutationKind::Max(value) => Some(value),
MutationKind::SetSuffixVersionstampedKey(value) => Some(value),
MutationKind::Delete => None,
}
}
}
#[derive(Debug)]
pub struct CommitResult {
pub versionstamp: Versionstamp,
}
#[derive(Debug)]
pub enum WatchKeyOutput {
Unchanged,
Changed { entry: Option<KvEntry> },
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MetadataExchangeRequest {
#[serde(default)]
pub supported_versions: Vec<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DatabaseMetadata {
pub version: u64,
pub database_id: Uuid,
pub endpoints: Vec<EndpointInfo>,
pub token: Cow<'static, str>,
pub expires_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EndpointInfo {
pub url: Cow<'static, str>,
pub consistency: Cow<'static, str>,
}
pub const VALUE_ENCODING_V8: i64 = 1;
pub const VALUE_ENCODING_LE64: i64 = 2;
pub const VALUE_ENCODING_BYTES: i64 = 3;
pub fn decode_value(value: Vec<u8>, encoding: i64) -> Option<KvValue> {
let value = match encoding {
VALUE_ENCODING_V8 => KvValue::V8(value),
VALUE_ENCODING_BYTES => KvValue::Bytes(value),
VALUE_ENCODING_LE64 => {
let mut buf = [0; 8];
buf.copy_from_slice(&value);
KvValue::U64(u64::from_le_bytes(buf))
}
_ => return None,
};
Some(value)
}
pub fn encode_value(value: &KvValue) -> (Cow<'_, [u8]>, i64) {
match value {
KvValue::V8(value) => (Cow::Borrowed(value), VALUE_ENCODING_V8),
KvValue::Bytes(value) => (Cow::Borrowed(value), VALUE_ENCODING_BYTES),
KvValue::U64(value) => {
let mut buf = [0; 8];
buf.copy_from_slice(&value.to_le_bytes());
(Cow::Owned(buf.to_vec()), VALUE_ENCODING_LE64)
}
}
}
pub fn encode_value_owned(value: KvValue) -> (Vec<u8>, i64) {
match value {
KvValue::V8(value) => (value, VALUE_ENCODING_V8),
KvValue::Bytes(value) => (value, VALUE_ENCODING_BYTES),
KvValue::U64(value) => {
let mut buf = [0; 8];
buf.copy_from_slice(&value.to_le_bytes());
(buf.to_vec(), VALUE_ENCODING_LE64)
}
}
}