use std::{borrow::Cow, marker::PhantomData, ops::Range};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::{
document::{Document, Header},
schema::{
self,
map::MappedDocument,
view::{self, map::MappedValue},
Key, Map,
},
transaction::{self, Command, Operation, OperationResult, Transaction},
Error,
};
#[async_trait]
pub trait Connection: Send + Sync {
fn collection<'a, C: schema::Collection + 'static>(&'a self) -> Collection<'a, Self, C>
where
Self: Sized,
{
Collection::new(self)
}
async fn insert<C: schema::Collection>(&self, contents: Vec<u8>) -> Result<Header, Error> {
let mut tx = Transaction::default();
tx.push(Operation {
collection: C::collection_id(),
command: Command::Insert {
contents: Cow::from(contents),
},
});
let results = self.apply_transaction(tx).await?;
if let OperationResult::DocumentUpdated { header, .. } = &results[0] {
Ok(header.clone())
} else {
unreachable!(
"apply_transaction on a single insert should yield a single DocumentUpdated entry"
)
}
}
async fn update(&self, doc: &mut Document<'_>) -> Result<(), Error> {
let mut tx = Transaction::default();
tx.push(Operation {
collection: doc.collection.clone(),
command: Command::Update {
header: Cow::Owned(doc.header.as_ref().clone()),
contents: Cow::Owned(doc.contents.to_vec()),
},
});
let results = self.apply_transaction(tx).await?;
if let OperationResult::DocumentUpdated { header, .. } = &results[0] {
doc.header = Cow::Owned(header.clone());
Ok(())
} else {
unreachable!(
"apply_transaction on a single update should yield a single DocumentUpdated entry"
)
}
}
async fn get<C: schema::Collection>(&self, id: u64)
-> Result<Option<Document<'static>>, Error>;
async fn get_multiple<C: schema::Collection>(
&self,
ids: &[u64],
) -> Result<Vec<Document<'static>>, Error>;
async fn delete(&self, doc: &Document<'_>) -> Result<(), Error> {
let mut tx = Transaction::default();
tx.push(Operation {
collection: doc.collection.clone(),
command: Command::Delete {
header: Cow::Owned(doc.header.as_ref().clone()),
},
});
let results = self.apply_transaction(tx).await?;
if let OperationResult::DocumentDeleted { .. } = &results[0] {
Ok(())
} else {
unreachable!(
"apply_transaction on a single update should yield a single DocumentUpdated entry"
)
}
}
#[must_use]
fn view<V: schema::View>(&'_ self) -> View<'_, Self, V>
where
Self: Sized,
{
View::new(self)
}
#[must_use]
async fn query<V: schema::View>(
&self,
key: Option<QueryKey<V::Key>>,
access_policy: AccessPolicy,
) -> Result<Vec<Map<V::Key, V::Value>>, Error>
where
Self: Sized;
#[must_use]
async fn query_with_docs<V: schema::View>(
&self,
key: Option<QueryKey<V::Key>>,
access_policy: AccessPolicy,
) -> Result<Vec<MappedDocument<V::Key, V::Value>>, Error>
where
Self: Sized;
#[must_use]
async fn reduce<V: schema::View>(
&self,
key: Option<QueryKey<V::Key>>,
access_policy: AccessPolicy,
) -> Result<V::Value, Error>
where
Self: Sized;
#[must_use]
async fn reduce_grouped<V: schema::View>(
&self,
key: Option<QueryKey<V::Key>>,
access_policy: AccessPolicy,
) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error>
where
Self: Sized;
async fn apply_transaction(
&self,
transaction: Transaction<'static>,
) -> Result<Vec<OperationResult>, Error>;
async fn list_executed_transactions(
&self,
starting_id: Option<u64>,
result_limit: Option<usize>,
) -> Result<Vec<transaction::Executed<'static>>, Error>;
async fn last_transaction_id(&self) -> Result<Option<u64>, Error>;
}
pub struct Collection<'a, Cn, Cl> {
connection: &'a Cn,
_phantom: PhantomData<Cl>,
}
impl<'a, Cn, Cl> Collection<'a, Cn, Cl>
where
Cn: Connection,
Cl: schema::Collection,
{
pub fn new(connection: &'a Cn) -> Self {
Self {
connection,
_phantom: PhantomData::default(),
}
}
pub async fn push<S: Serialize + Sync>(&self, item: &S) -> Result<Header, crate::Error> {
let contents = serde_cbor::to_vec(item)?;
Ok(self.connection.insert::<Cl>(contents).await?)
}
pub async fn get(&self, id: u64) -> Result<Option<Document<'static>>, Error> {
self.connection.get::<Cl>(id).await
}
}
pub struct View<'a, Cn, V: schema::View> {
connection: &'a Cn,
pub key: Option<QueryKey<V::Key>>,
pub access_policy: AccessPolicy,
}
impl<'a, Cn, V> View<'a, Cn, V>
where
V: schema::View,
Cn: Connection,
{
fn new(connection: &'a Cn) -> Self {
Self {
connection,
key: None,
access_policy: AccessPolicy::UpdateBefore,
}
}
#[must_use]
pub fn with_key(mut self, key: V::Key) -> Self {
self.key = Some(QueryKey::Matches(key));
self
}
#[must_use]
pub fn with_keys(mut self, keys: Vec<V::Key>) -> Self {
self.key = Some(QueryKey::Multiple(keys));
self
}
#[must_use]
pub fn with_key_range(mut self, range: Range<V::Key>) -> Self {
self.key = Some(QueryKey::Range(range));
self
}
pub fn with_access_policy(mut self, policy: AccessPolicy) -> Self {
self.access_policy = policy;
self
}
pub async fn query(self) -> Result<Vec<Map<V::Key, V::Value>>, Error> {
self.connection
.query::<V>(self.key, self.access_policy)
.await
}
pub async fn query_with_docs(self) -> Result<Vec<MappedDocument<V::Key, V::Value>>, Error> {
self.connection
.query_with_docs::<V>(self.key, self.access_policy)
.await
}
pub async fn reduce(self) -> Result<V::Value, Error> {
self.connection
.reduce::<V>(self.key, self.access_policy)
.await
}
pub async fn reduce_grouped(self) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error> {
self.connection
.reduce_grouped::<V>(self.key, self.access_policy)
.await
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum QueryKey<K> {
Matches(K),
Range(Range<K>),
Multiple(Vec<K>),
}
#[allow(clippy::use_self)]
impl<K: Key> QueryKey<K> {
pub fn serialized(&self) -> Result<QueryKey<Vec<u8>>, Error> {
match self {
Self::Matches(key) => key
.as_big_endian_bytes()
.map_err(|err| Error::Storage(view::Error::KeySerialization(err).to_string()))
.map(|v| QueryKey::Matches(v.to_vec())),
Self::Range(range) => {
let start = range
.start
.as_big_endian_bytes()
.map_err(|err| Error::Storage(view::Error::KeySerialization(err).to_string()))?
.to_vec();
let end = range
.end
.as_big_endian_bytes()
.map_err(|err| Error::Storage(view::Error::KeySerialization(err).to_string()))?
.to_vec();
Ok(QueryKey::Range(start..end))
}
Self::Multiple(keys) => {
let keys = keys
.iter()
.map(|key| {
key.as_big_endian_bytes()
.map(|key| key.to_vec())
.map_err(|err| {
Error::Storage(view::Error::KeySerialization(err).to_string())
})
})
.collect::<Result<Vec<_>, Error>>()?;
Ok(QueryKey::Multiple(keys))
}
}
}
}
#[allow(clippy::use_self)]
impl QueryKey<Vec<u8>> {
pub fn deserialized<K: Key>(&self) -> Result<QueryKey<K>, Error> {
match self {
Self::Matches(key) => K::from_big_endian_bytes(key)
.map_err(|err| Error::Storage(view::Error::KeySerialization(err).to_string()))
.map(QueryKey::Matches),
Self::Range(range) => {
let start = K::from_big_endian_bytes(&range.start).map_err(|err| {
Error::Storage(view::Error::KeySerialization(err).to_string())
})?;
let end = K::from_big_endian_bytes(&range.end).map_err(|err| {
Error::Storage(view::Error::KeySerialization(err).to_string())
})?;
Ok(QueryKey::Range(start..end))
}
Self::Multiple(keys) => {
let keys = keys
.iter()
.map(|key| {
K::from_big_endian_bytes(key).map_err(|err| {
Error::Storage(view::Error::KeySerialization(err).to_string())
})
})
.collect::<Result<Vec<_>, Error>>()?;
Ok(QueryKey::Multiple(keys))
}
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum AccessPolicy {
UpdateBefore,
UpdateAfter,
NoUpdate,
}