use std::{borrow::Cow, marker::PhantomData, ops::Range};
use async_trait::async_trait;
use serde::Serialize;
use crate::{
document::{Document, Header},
schema::{self, map::MappedDocument, Map},
transaction::{self, Command, Operation, OperationResult, Transaction},
Error,
};
#[async_trait]
pub trait Connection<'a>: Send + Sync {
fn collection<C: schema::Collection + 'static>(
&'a self,
) -> Result<Collection<'a, Self, C>, Error>
where
Self: Sized;
async fn insert<C: schema::Collection>(&self, contents: Vec<u8>) -> Result<Header, Error> {
let mut tx = Transaction::default();
tx.push(Operation {
collection: C::id(),
command: Command::Insert {
contents: Cow::from(contents),
},
});
let results = self.apply_transaction(tx).await?;
if let OperationResult::DocumentUpdated { header, .. } = &results[0] {
Ok(header.clone())
} else {
unreachable!(
"apply_transaction on a single insert should yield a single DocumentUpdated entry"
)
}
}
async fn update(&self, doc: &mut Document<'_>) -> Result<(), Error> {
let mut tx = Transaction::default();
tx.push(Operation {
collection: doc.collection.clone(),
command: Command::Update {
header: Cow::Owned(doc.header.as_ref().clone()),
contents: Cow::Owned(doc.contents.to_vec()),
},
});
let results = self.apply_transaction(tx).await?;
if let OperationResult::DocumentUpdated { header, .. } = &results[0] {
doc.header = Cow::Owned(header.clone());
Ok(())
} else {
unreachable!(
"apply_transaction on a single update should yield a single DocumentUpdated entry"
)
}
}
async fn get<C: schema::Collection>(&self, id: u64)
-> Result<Option<Document<'static>>, Error>;
async fn get_multiple<C: schema::Collection>(
&self,
ids: &[u64],
) -> Result<Vec<Document<'static>>, Error>;
async fn delete(&self, doc: &Document<'_>) -> Result<(), Error> {
let mut tx = Transaction::default();
tx.push(Operation {
collection: doc.collection.clone(),
command: Command::Delete {
header: Cow::Owned(doc.header.as_ref().clone()),
},
});
let results = self.apply_transaction(tx).await?;
if let OperationResult::DocumentDeleted { .. } = &results[0] {
Ok(())
} else {
unreachable!(
"apply_transaction on a single update should yield a single DocumentUpdated entry"
)
}
}
#[must_use]
fn view<V: schema::View>(&'a self) -> View<'a, Self, V>
where
Self: Sized,
{
View::new(self)
}
#[must_use]
async fn query<'k, V: schema::View>(
&self,
query: View<'a, Self, V>,
) -> Result<Vec<Map<V::Key, V::Value>>, Error>
where
Self: Sized;
#[must_use]
async fn query_with_docs<'k, V: schema::View>(
&self,
query: View<'a, Self, V>,
) -> Result<Vec<MappedDocument<V::Key, V::Value>>, Error>
where
Self: Sized;
#[must_use]
async fn reduce<'k, V: schema::View>(
&self,
query: View<'a, Self, V>,
) -> Result<V::Value, Error>
where
Self: Sized;
async fn apply_transaction(
&self,
transaction: Transaction<'static>,
) -> Result<Vec<OperationResult>, Error>;
async fn list_executed_transactions(
&self,
starting_id: Option<u64>,
result_limit: Option<usize>,
) -> Result<Vec<transaction::Executed<'static>>, Error>;
}
pub struct Collection<'a, Cn, Cl> {
connection: &'a Cn,
_phantom: PhantomData<Cl>,
}
impl<'a, Cn, Cl> Collection<'a, Cn, Cl>
where
Cn: Connection<'a>,
Cl: schema::Collection,
{
pub fn new(connection: &'a Cn) -> Self {
Self {
connection,
_phantom: PhantomData::default(),
}
}
pub async fn push<S: Serialize + Sync>(&self, item: &S) -> Result<Header, crate::Error> {
let contents = serde_cbor::to_vec(item)?;
Ok(self.connection.insert::<Cl>(contents).await?)
}
pub async fn get(&self, id: u64) -> Result<Option<Document<'static>>, Error> {
self.connection.get::<Cl>(id).await
}
}
pub struct View<'a, Cn, V: schema::View> {
connection: &'a Cn,
pub key: Option<QueryKey<V::Key>>,
pub access_policy: AccessPolicy,
}
impl<'a, Cn, V> View<'a, Cn, V>
where
V: schema::View,
Cn: Connection<'a>,
{
fn new(connection: &'a Cn) -> Self {
Self {
connection,
key: None,
access_policy: AccessPolicy::UpdateBefore,
}
}
#[must_use]
pub fn with_key(mut self, key: V::Key) -> Self {
self.key = Some(QueryKey::Matches(key));
self
}
#[must_use]
pub fn with_keys(mut self, keys: Vec<V::Key>) -> Self {
self.key = Some(QueryKey::Multiple(keys));
self
}
#[must_use]
pub fn with_key_range(mut self, range: Range<V::Key>) -> Self {
self.key = Some(QueryKey::Range(range));
self
}
pub fn with_access_policy(mut self, policy: AccessPolicy) -> Self {
self.access_policy = policy;
self
}
pub async fn query(self) -> Result<Vec<Map<V::Key, V::Value>>, Error> {
self.connection.query(self).await
}
pub async fn query_with_docs(self) -> Result<Vec<MappedDocument<V::Key, V::Value>>, Error> {
self.connection.query_with_docs(self).await
}
pub async fn reduce(self) -> Result<V::Value, Error> {
self.connection.reduce(self).await
}
}
pub enum QueryKey<K> {
Matches(K),
Range(Range<K>),
Multiple(Vec<K>),
}
pub enum AccessPolicy {
UpdateBefore,
UpdateAfter,
NoUpdate,
}