use std::borrow::{Borrow, Cow};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::task::Poll;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{ready, Future, FutureExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use transmog::{Format, OwnedDeserializer};
use transmog_pot::Pot;
use crate::connection::{self, AsyncConnection, Connection, RangeRef};
use crate::document::{
BorrowedDocument, CollectionDocument, CollectionHeader, Document, DocumentId, Header, KeyId,
OwnedDocument, OwnedDocuments, Revision,
};
use crate::key::{IntoPrefixRange, Key, KeyEncoding};
use crate::schema::{CollectionName, Schematic};
use crate::transaction::{Operation, OperationResult, Transaction};
use crate::Error;
pub trait Collection: Send + Sync {
type PrimaryKey: for<'k> Key<'k> + Eq + Ord;
fn collection_name() -> CollectionName;
fn define_views(schema: &mut Schematic) -> Result<(), Error>;
#[must_use]
fn encryption_key() -> Option<KeyId> {
None
}
}
#[async_trait]
pub trait SerializedCollection: Collection {
type Contents: Send + Sync;
type Format: OwnedDeserializer<Self::Contents>;
#[allow(unused_variables)]
fn natural_id(contents: &Self::Contents) -> Option<Self::PrimaryKey>
where
Self: Sized,
{
None
}
fn format() -> Self::Format;
fn deserialize(data: &[u8]) -> Result<Self::Contents, Error> {
Self::format()
.deserialize_owned(data)
.map_err(|err| crate::Error::other("serialization", err))
}
fn document_contents<D: Document<Self>>(doc: &D) -> Result<Self::Contents, Error>
where
Self: Sized,
{
doc.contents()
}
fn set_document_contents<D: Document<Self>>(
doc: &mut D,
contents: Self::Contents,
) -> Result<(), Error>
where
Self: Sized,
{
doc.set_contents(contents)
}
fn serialize(item: &Self::Contents) -> Result<Vec<u8>, Error> {
Self::format()
.serialize(item)
.map_err(|err| crate::Error::other("serialization", err))
}
fn get<C, PrimaryKey>(
id: &PrimaryKey,
connection: &C,
) -> Result<Option<CollectionDocument<Self>>, Error>
where
C: Connection,
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Self: Sized,
{
let possible_doc = connection.get::<Self, _>(id)?;
possible_doc.as_ref().map(TryInto::try_into).transpose()
}
async fn get_async<C, PrimaryKey>(
id: &PrimaryKey,
connection: &C,
) -> Result<Option<CollectionDocument<Self>>, Error>
where
C: AsyncConnection,
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Self: Sized,
{
let possible_doc = connection.get::<Self, _>(id).await?;
Ok(possible_doc.as_ref().map(TryInto::try_into).transpose()?)
}
fn get_multiple<'id, C, DocumentIds, PrimaryKey, I>(
ids: DocumentIds,
connection: &C,
) -> Result<Vec<CollectionDocument<Self>>, Error>
where
C: Connection,
DocumentIds: IntoIterator<Item = &'id PrimaryKey, IntoIter = I> + Send + Sync,
I: Iterator<Item = &'id PrimaryKey> + Send + Sync,
PrimaryKey: KeyEncoding<Self::PrimaryKey> + 'id,
Self: Sized,
{
connection
.collection::<Self>()
.get_multiple(ids)
.and_then(|docs| docs.collection_documents())
}
async fn get_multiple_async<'id, C, DocumentIds, PrimaryKey, I>(
ids: DocumentIds,
connection: &C,
) -> Result<Vec<CollectionDocument<Self>>, Error>
where
C: AsyncConnection,
DocumentIds: IntoIterator<Item = &'id PrimaryKey, IntoIter = I> + Send + Sync,
I: Iterator<Item = &'id PrimaryKey> + Send + Sync,
PrimaryKey: KeyEncoding<Self::PrimaryKey> + 'id,
Self: Sized,
{
connection
.collection::<Self>()
.get_multiple(ids)
.await
.and_then(|docs| docs.collection_documents())
}
fn list<'id, R, PrimaryKey, C>(ids: R, connection: &'id C) -> List<'id, C, Self, PrimaryKey>
where
R: Into<RangeRef<'id, Self::PrimaryKey, PrimaryKey>>,
C: Connection,
PrimaryKey: KeyEncoding<Self::PrimaryKey> + PartialEq + 'id,
Self::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
Self: Sized,
{
List(connection::List::new(
connection::MaybeOwned::Owned(connection.collection::<Self>()),
ids.into(),
))
}
fn list_async<'id, R, PrimaryKey, C>(
ids: R,
connection: &'id C,
) -> AsyncList<'id, C, Self, PrimaryKey>
where
R: Into<RangeRef<'id, Self::PrimaryKey, PrimaryKey>>,
C: AsyncConnection,
PrimaryKey: KeyEncoding<Self::PrimaryKey> + PartialEq + 'id + ?Sized,
Self::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
Self: Sized,
{
AsyncList(connection::AsyncList::new(
connection::MaybeOwned::Owned(connection.collection::<Self>()),
ids.into(),
))
}
fn list_with_prefix<'a, PrimaryKey, C>(
prefix: &'a PrimaryKey,
connection: &'a C,
) -> List<'a, C, Self, PrimaryKey>
where
C: Connection,
Self: Sized,
PrimaryKey: IntoPrefixRange<'a, Self::PrimaryKey>
+ KeyEncoding<Self::PrimaryKey>
+ PartialEq
+ ?Sized,
Self::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
{
List(connection::List::new(
connection::MaybeOwned::Owned(connection.collection::<Self>()),
prefix.to_prefix_range(),
))
}
fn list_with_prefix_async<'a, PrimaryKey, C>(
prefix: &'a PrimaryKey,
connection: &'a C,
) -> AsyncList<'a, C, Self, PrimaryKey>
where
C: AsyncConnection,
Self: Sized,
PrimaryKey: IntoPrefixRange<'a, Self::PrimaryKey>
+ KeyEncoding<Self::PrimaryKey>
+ PartialEq
+ ?Sized,
Self::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
{
AsyncList(connection::AsyncList::new(
connection::MaybeOwned::Owned(connection.collection::<Self>()),
prefix.to_prefix_range(),
))
}
fn all<C: Connection>(connection: &C) -> List<'_, C, Self, Self::PrimaryKey>
where
Self: Sized,
{
List(connection::List::new(
connection::MaybeOwned::Owned(connection.collection::<Self>()),
RangeRef::from(..),
))
}
fn all_async<C: AsyncConnection>(connection: &C) -> AsyncList<'_, C, Self, Self::PrimaryKey>
where
Self: Sized,
{
AsyncList(connection::AsyncList::new(
connection::MaybeOwned::Owned(connection.collection::<Self>()),
RangeRef::from(..),
))
}
fn push<Cn: Connection>(
contents: Self::Contents,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
where
Self: Sized + 'static,
{
let header = match connection.collection::<Self>().push(&contents) {
Ok(header) => header,
Err(error) => return Err(InsertError { contents, error }),
};
Ok(CollectionDocument { header, contents })
}
async fn push_async<Cn: AsyncConnection>(
contents: Self::Contents,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
where
Self: Sized + 'static,
Self::Contents: 'async_trait,
{
let header = match connection.collection::<Self>().push(&contents).await {
Ok(header) => header,
Err(error) => return Err(InsertError { contents, error }),
};
Ok(CollectionDocument { header, contents })
}
fn push_all<Contents: IntoIterator<Item = Self::Contents>, Cn: Connection>(
contents: Contents,
connection: &Cn,
) -> Result<Vec<CollectionDocument<Self>>, Error>
where
Self: Sized + 'static,
Self::PrimaryKey: Default,
{
let mut tx = Transaction::new();
let contents = contents.into_iter();
let mut results = Vec::with_capacity(contents.size_hint().0);
for contents in contents {
tx.push(Operation::push_serialized::<Self>(&contents)?);
results.push(CollectionDocument {
header: CollectionHeader {
id: <<Self as Collection>::PrimaryKey as Default>::default(),
revision: Revision {
id: 0,
sha256: [0; 32],
},
},
contents,
});
}
for (result, document) in tx.apply(connection)?.into_iter().zip(&mut results) {
match result {
OperationResult::DocumentUpdated { header, .. } => {
document.header = CollectionHeader::try_from(header)?;
}
_ => unreachable!("invalid result from transaction"),
}
}
Ok(results)
}
async fn push_all_async<
Contents: IntoIterator<Item = Self::Contents> + Send,
Cn: AsyncConnection,
>(
contents: Contents,
connection: &Cn,
) -> Result<Vec<CollectionDocument<Self>>, Error>
where
Self: Sized + 'static,
Self::PrimaryKey: Default,
Contents::IntoIter: Send,
{
let mut tx = Transaction::new();
let contents = contents.into_iter();
let mut results = Vec::with_capacity(contents.size_hint().0);
for contents in contents {
tx.push(Operation::push_serialized::<Self>(&contents)?);
results.push(CollectionDocument {
header: CollectionHeader {
id: <<Self as Collection>::PrimaryKey as Default>::default(),
revision: Revision {
id: 0,
sha256: [0; 32],
},
},
contents,
});
}
for (result, document) in tx
.apply_async(connection)
.await?
.into_iter()
.zip(&mut results)
{
match result {
OperationResult::DocumentUpdated { header, .. } => {
document.header = CollectionHeader::try_from(header)?;
}
_ => unreachable!("invalid result from transaction"),
}
}
Ok(results)
}
fn push_into<Cn: Connection>(
self,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self>>
where
Self: SerializedCollection<Contents = Self> + Sized + 'static,
{
Self::push(self, connection)
}
async fn push_into_async<Cn: AsyncConnection>(
self,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self>>
where
Self: SerializedCollection<Contents = Self> + Sized + 'static,
{
Self::push_async(self, connection).await
}
fn push_in_transaction(&self, transaction: &mut Transaction) -> Result<(), Error>
where
Self: SerializedCollection<Contents = Self> + Sized + 'static,
{
transaction.push(Operation::push_serialized::<Self>(self)?);
Ok(())
}
fn insert<PrimaryKey, Cn>(
id: &PrimaryKey,
contents: Self::Contents,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
where
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Cn: Connection,
Self: Sized + 'static,
{
let header = match connection.collection::<Self>().insert(id, &contents) {
Ok(header) => header,
Err(error) => return Err(InsertError { contents, error }),
};
Ok(CollectionDocument { header, contents })
}
async fn insert_async<PrimaryKey, Cn>(
id: &PrimaryKey,
contents: Self::Contents,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
where
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Cn: AsyncConnection,
Self: Sized + 'static,
Self::Contents: 'async_trait,
{
let header = match connection.collection::<Self>().insert(id, &contents).await {
Ok(header) => header,
Err(error) => return Err(InsertError { contents, error }),
};
Ok(CollectionDocument { header, contents })
}
fn insert_into<PrimaryKey, Cn>(
self,
id: &PrimaryKey,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self>>
where
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Cn: Connection,
Self: SerializedCollection<Contents = Self> + Sized + 'static,
{
Self::insert(id, self, connection)
}
async fn insert_into_async<PrimaryKey, Cn>(
self,
id: &PrimaryKey,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self>>
where
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Cn: AsyncConnection,
Self: SerializedCollection<Contents = Self> + Sized + 'static,
{
Self::insert_async(id, self, connection).await
}
fn insert_in_transaction(
&self,
key: &Self::PrimaryKey,
transaction: &mut Transaction,
) -> Result<(), Error>
where
Self: SerializedCollection<Contents = Self> + Sized + 'static,
{
transaction.push(Operation::insert_serialized::<Self>(Some(key), self)?);
Ok(())
}
fn overwrite<PrimaryKey, Cn>(
id: &PrimaryKey,
contents: Self::Contents,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
where
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Cn: Connection,
Self: Sized + 'static,
{
let header = match Self::serialize(&contents) {
Ok(serialized) => match connection.overwrite::<Self, _>(id, serialized) {
Ok(header) => header,
Err(error) => return Err(InsertError { contents, error }),
},
Err(error) => return Err(InsertError { contents, error }),
};
Ok(CollectionDocument { header, contents })
}
async fn overwrite_async<PrimaryKey, Cn>(
id: &PrimaryKey,
contents: Self::Contents,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
where
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Cn: AsyncConnection,
Self: Sized + 'static,
Self::Contents: 'async_trait,
{
let header = match Self::serialize(&contents) {
Ok(serialized) => match connection.overwrite::<Self, _>(id, serialized).await {
Ok(header) => header,
Err(error) => return Err(InsertError { contents, error }),
},
Err(error) => return Err(InsertError { contents, error }),
};
Ok(CollectionDocument { header, contents })
}
fn overwrite_into<Cn: Connection, PrimaryKey>(
self,
id: &PrimaryKey,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self>>
where
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Self: SerializedCollection<Contents = Self> + Sized + 'static,
{
Self::overwrite(id, self, connection)
}
fn overwrite_in_transaction<PrimaryKey>(
&self,
id: &PrimaryKey,
transaction: &mut Transaction,
) -> Result<(), Error>
where
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Self: SerializedCollection<Contents = Self> + Sized + 'static,
{
transaction.push(Operation::overwrite_serialized::<Self, PrimaryKey>(
id, self,
)?);
Ok(())
}
async fn overwrite_into_async<Cn: AsyncConnection, PrimaryKey>(
self,
id: &PrimaryKey,
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self>>
where
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
Self: SerializedCollection<Contents = Self> + Sized + 'static,
{
Self::overwrite_async(id, self, connection).await
}
}
pub trait DefaultSerialization: Collection {
fn natural_id(&self) -> Option<Self::PrimaryKey> {
None
}
}
impl<T> SerializedCollection for T
where
T: DefaultSerialization + Serialize + DeserializeOwned,
{
type Contents = Self;
type Format = Pot;
fn format() -> Self::Format {
Pot::default()
}
fn natural_id(contents: &Self::Contents) -> Option<Self::PrimaryKey> {
T::natural_id(contents)
}
}
#[derive(thiserror::Error, Debug)]
#[error("{error}")]
pub struct InsertError<T> {
pub contents: T,
pub error: Error,
}
#[async_trait]
pub trait NamedCollection: Collection + Unpin {
type ByNameView: crate::schema::SerializedView<Key = String, Collection = Self>;
fn load<'name, N: Nameable<'name, Self::PrimaryKey> + Send + Sync, C: Connection>(
id: N,
connection: &C,
) -> Result<Option<CollectionDocument<Self>>, Error>
where
Self: SerializedCollection + Sized + 'static,
{
let possible_doc = Self::load_document(id, connection)?;
possible_doc
.as_ref()
.map(CollectionDocument::try_from)
.transpose()
}
async fn load_async<
'name,
N: Nameable<'name, Self::PrimaryKey> + Send + Sync,
C: AsyncConnection,
>(
id: N,
connection: &C,
) -> Result<Option<CollectionDocument<Self>>, Error>
where
Self: SerializedCollection + Sized + 'static,
{
let possible_doc = Self::load_document_async(id, connection).await?;
Ok(possible_doc
.as_ref()
.map(CollectionDocument::try_from)
.transpose()?)
}
fn entry<
'connection,
'name,
N: Into<NamedReference<'name, Self::PrimaryKey>> + Send + Sync,
C: Connection,
>(
id: N,
connection: &'connection C,
) -> Entry<'connection, 'name, C, Self, (), ()>
where
Self: SerializedCollection + Sized,
{
let name = id.into();
Entry {
name,
connection,
insert: None,
update: None,
retry_limit: 0,
_collection: PhantomData,
}
}
fn entry_async<
'connection,
'name,
N: Into<NamedReference<'name, Self::PrimaryKey>> + Send + Sync,
C: AsyncConnection,
>(
id: N,
connection: &'connection C,
) -> AsyncEntry<'connection, 'name, C, Self, (), ()>
where
Self: SerializedCollection + Sized,
{
let name = id.into();
AsyncEntry {
state: EntryState::Pending(Some(EntryBuilder {
name,
connection,
insert: None,
update: None,
retry_limit: 0,
_collection: PhantomData,
})),
}
}
fn load_document<'name, N: Nameable<'name, Self::PrimaryKey> + Send + Sync, C: Connection>(
name: N,
connection: &C,
) -> Result<Option<OwnedDocument>, Error>
where
Self: SerializedCollection + Sized,
{
match name.name()? {
NamedReference::Id(id) => connection.collection::<Self>().get(&id),
NamedReference::Key(id) => connection.collection::<Self>().get(&id),
NamedReference::Name(name) => Ok(connection
.view::<Self::ByNameView>()
.with_key(name.as_ref())
.query_with_docs()?
.documents
.into_iter()
.next()
.map(|(_, document)| document)),
}
}
async fn load_document_async<
'name,
N: Nameable<'name, Self::PrimaryKey> + Send + Sync,
C: AsyncConnection,
>(
name: N,
connection: &C,
) -> Result<Option<OwnedDocument>, Error>
where
Self: SerializedCollection + Sized,
{
match name.name()? {
NamedReference::Id(id) => connection.collection::<Self>().get(&id).await,
NamedReference::Key(id) => connection.collection::<Self>().get(&id).await,
NamedReference::Name(name) => Ok(connection
.view::<Self::ByNameView>()
.with_key(name.as_ref())
.query_with_docs()
.await?
.documents
.into_iter()
.next()
.map(|(_, document)| document)),
}
}
fn delete_by_name<C: Connection>(name: &str, connection: &C) -> Result<bool, Error>
where
Self: SerializedCollection + Sized,
{
Ok(connection
.view::<Self::ByNameView>()
.with_key(name)
.delete_docs()?
> 0)
}
async fn delete_by_name_async<C: AsyncConnection>(
name: &str,
connection: &C,
) -> Result<bool, Error>
where
Self: SerializedCollection + Sized,
{
Ok(connection
.view::<Self::ByNameView>()
.with_key(name)
.delete_docs()
.await?
> 0)
}
}
#[derive(Clone, Eq, PartialEq, Deserialize, Serialize, Debug)]
#[must_use]
pub enum NamedReference<'a, Id> {
Name(Cow<'a, str>),
Id(DocumentId),
Key(Id),
}
impl<'a, Id> From<&'a str> for NamedReference<'a, Id> {
fn from(name: &'a str) -> Self {
Self::Name(Cow::Borrowed(name))
}
}
pub trait Nameable<'a, Id> {
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error>;
}
impl<'a, Id> Nameable<'a, Id> for NamedReference<'a, Id> {
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
Ok(self)
}
}
impl<'a, Id> Nameable<'a, Id> for &'a NamedReference<'a, Id>
where
Id: Clone,
{
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
Ok(match self {
NamedReference::Name(name) => NamedReference::Name(name.clone()),
NamedReference::Id(id) => NamedReference::Id(id.clone()),
NamedReference::Key(key) => NamedReference::Key(key.clone()),
})
}
}
impl<'a, Id> Nameable<'a, Id> for &'a str {
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
Ok(NamedReference::from(self))
}
}
impl<'a, Id> From<&'a String> for NamedReference<'a, Id> {
fn from(name: &'a String) -> Self {
Self::Name(Cow::Borrowed(name.as_str()))
}
}
impl<'a, Id> Nameable<'a, Id> for &'a String {
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
Ok(NamedReference::from(self))
}
}
impl<'a, 'b, Id> From<&'b BorrowedDocument<'b>> for NamedReference<'a, Id> {
fn from(doc: &'b BorrowedDocument<'b>) -> Self {
Self::Id(doc.header.id.clone())
}
}
impl<'a, 'b, Id> Nameable<'a, Id> for &'a BorrowedDocument<'b> {
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
Ok(NamedReference::from(self))
}
}
impl<'a, 'c, C> TryFrom<&'c CollectionDocument<C>> for NamedReference<'a, C::PrimaryKey>
where
C: SerializedCollection,
{
type Error = crate::Error;
fn try_from(doc: &'c CollectionDocument<C>) -> Result<Self, crate::Error> {
DocumentId::new(&doc.header.id).map(Self::Id)
}
}
impl<'a, C> Nameable<'a, C::PrimaryKey> for &'a CollectionDocument<C>
where
C: SerializedCollection,
{
fn name(self) -> Result<NamedReference<'a, C::PrimaryKey>, crate::Error> {
NamedReference::try_from(self)
}
}
impl<'a, Id> From<String> for NamedReference<'a, Id> {
fn from(name: String) -> Self {
Self::Name(Cow::Owned(name))
}
}
impl<'a, Id> Nameable<'a, Id> for String {
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
Ok(NamedReference::from(self))
}
}
impl<'a, Id> From<DocumentId> for NamedReference<'a, Id> {
fn from(id: DocumentId) -> Self {
Self::Id(id)
}
}
impl<'a, Id> Nameable<'a, Id> for DocumentId {
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
Ok(NamedReference::from(self))
}
}
impl<'a> Nameable<'a, Self> for u64 {
fn name(self) -> Result<NamedReference<'a, Self>, crate::Error> {
Ok(NamedReference::Key(self))
}
}
impl<'a, Id> NamedReference<'a, Id>
where
Id: for<'k> Key<'k>,
{
pub fn into_owned(self) -> NamedReference<'static, Id> {
match self {
Self::Name(name) => NamedReference::Name(match name {
Cow::Owned(string) => Cow::Owned(string),
Cow::Borrowed(borrowed) => Cow::Owned(borrowed.to_owned()),
}),
Self::Id(id) => NamedReference::Id(id),
Self::Key(key) => NamedReference::Key(key),
}
}
pub fn id<Col: NamedCollection<PrimaryKey = Id>, Cn: Connection>(
&self,
connection: &Cn,
) -> Result<Option<Col::PrimaryKey>, Error> {
match self {
Self::Name(name) => connection
.view::<Col::ByNameView>()
.with_key(name.as_ref())
.query()?
.into_iter()
.next()
.map(|e| Ok(e.source.id))
.transpose(),
Self::Id(id) => Ok(Some(id.deserialize()?)),
Self::Key(id) => Ok(Some(id.clone())),
}
}
pub async fn id_async<Col: NamedCollection<PrimaryKey = Id>, Cn: AsyncConnection>(
&self,
connection: &Cn,
) -> Result<Option<Col::PrimaryKey>, Error> {
match self {
Self::Name(name) => connection
.view::<Col::ByNameView>()
.with_key(name.as_ref())
.query()
.await?
.into_iter()
.next()
.map(|e| Ok(e.source.id))
.transpose(),
Self::Id(id) => Ok(Some(id.deserialize()?)),
Self::Key(id) => Ok(Some(id.clone())),
}
}
}
#[must_use]
pub struct Entry<'a, 'name, Connection, Col, EI, EU>
where
Col: NamedCollection + SerializedCollection,
EI: EntryInsert<Col>,
EU: EntryUpdate<Col>,
{
name: NamedReference<'name, Col::PrimaryKey>,
connection: &'a Connection,
insert: Option<EI>,
update: Option<EU>,
retry_limit: usize,
_collection: PhantomData<Col>,
}
impl<'a, 'name, Connection, Col, EI, EU> Entry<'a, 'name, Connection, Col, EI, EU>
where
Col: NamedCollection + SerializedCollection + 'static + Unpin,
Connection: crate::connection::Connection,
EI: EntryInsert<Col> + 'a + Unpin,
EU: EntryUpdate<Col> + 'a + Unpin,
'name: 'a,
{
pub fn execute(self) -> Result<Option<CollectionDocument<Col>>, Error> {
let Self {
name,
connection,
insert,
update,
mut retry_limit,
..
} = self;
if let Some(mut existing) = Col::load(name, connection)? {
if let Some(update) = update {
loop {
update.call(&mut existing.contents);
match existing.update(connection) {
Ok(()) => return Ok(Some(existing)),
Err(Error::DocumentConflict(collection, header)) => {
if retry_limit > 0 {
retry_limit -= 1;
existing = match Col::load(header.id, connection)? {
Some(doc) => doc,
None => break Ok(None),
}
} else {
break Err(Error::DocumentConflict(collection, header));
}
}
Err(other) => break Err(other),
}
}
} else {
Ok(Some(existing))
}
} else if let Some(insert) = insert {
let new_document = insert.call();
Ok(Some(Col::push(new_document, connection)?))
} else {
Ok(None)
}
}
#[allow(clippy::missing_const_for_fn)] pub fn or_insert_with<F: EntryInsert<Col> + 'a + Unpin>(
self,
cb: F,
) -> Entry<'a, 'name, Connection, Col, F, EU> {
Entry {
name: self.name,
connection: self.connection,
insert: Some(cb),
update: self.update,
retry_limit: self.retry_limit,
_collection: PhantomData,
}
}
#[allow(clippy::missing_const_for_fn)] pub fn update_with<F: EntryUpdate<Col> + 'a + Unpin>(
self,
cb: F,
) -> Entry<'a, 'name, Connection, Col, EI, F> {
Entry {
name: self.name,
connection: self.connection,
update: Some(cb),
insert: self.insert,
retry_limit: self.retry_limit,
_collection: PhantomData,
}
}
pub const fn retry_limit(mut self, attempts: usize) -> Self {
self.retry_limit = attempts;
self
}
}
#[must_use]
pub struct AsyncEntry<'a, 'name, Connection, Col, EI, EU>
where
Col: NamedCollection + SerializedCollection,
EI: EntryInsert<Col>,
EU: EntryUpdate<Col>,
{
state: EntryState<'a, 'name, Connection, Col, EI, EU>,
}
struct EntryBuilder<
'a,
'name,
Connection,
Col,
EI: EntryInsert<Col> + 'a,
EU: EntryUpdate<Col> + 'a,
> where
Col: SerializedCollection,
{
name: NamedReference<'name, Col::PrimaryKey>,
connection: &'a Connection,
insert: Option<EI>,
update: Option<EU>,
retry_limit: usize,
_collection: PhantomData<Col>,
}
impl<'a, 'name, Connection, Col, EI, EU> AsyncEntry<'a, 'name, Connection, Col, EI, EU>
where
Col: NamedCollection + SerializedCollection + 'static + Unpin,
Connection: crate::connection::AsyncConnection,
EI: EntryInsert<Col> + 'a + Unpin,
EU: EntryUpdate<Col> + 'a + Unpin,
'name: 'a,
{
async fn execute(
name: NamedReference<'name, Col::PrimaryKey>,
connection: &'a Connection,
insert: Option<EI>,
update: Option<EU>,
mut retry_limit: usize,
) -> Result<Option<CollectionDocument<Col>>, Error> {
if let Some(mut existing) = Col::load_async(name, connection).await? {
if let Some(update) = update {
loop {
update.call(&mut existing.contents);
match existing.update_async(connection).await {
Ok(()) => return Ok(Some(existing)),
Err(Error::DocumentConflict(collection, header)) => {
if retry_limit > 0 {
retry_limit -= 1;
existing = match Col::load_async(header.id, connection).await? {
Some(doc) => doc,
None => break Ok(None),
}
} else {
break Err(Error::DocumentConflict(collection, header));
}
}
Err(other) => break Err(other),
}
}
} else {
Ok(Some(existing))
}
} else if let Some(insert) = insert {
let new_document = insert.call();
Ok(Some(Col::push_async(new_document, connection).await?))
} else {
Ok(None)
}
}
fn pending(&mut self) -> &mut EntryBuilder<'a, 'name, Connection, Col, EI, EU> {
match &mut self.state {
EntryState::Pending(pending) => pending.as_mut().unwrap(),
EntryState::Executing(_) => unreachable!(),
}
}
pub fn or_insert_with<F: EntryInsert<Col> + 'a + Unpin>(
self,
cb: F,
) -> AsyncEntry<'a, 'name, Connection, Col, F, EU> {
AsyncEntry {
state: match self.state {
EntryState::Pending(Some(EntryBuilder {
name,
connection,
update,
retry_limit,
..
})) => EntryState::Pending(Some(EntryBuilder {
name,
connection,
insert: Some(cb),
update,
retry_limit,
_collection: PhantomData,
})),
_ => {
unreachable!("attempting to modify an already executing future")
}
},
}
}
pub fn update_with<F: EntryUpdate<Col> + 'a + Unpin>(
self,
cb: F,
) -> AsyncEntry<'a, 'name, Connection, Col, EI, F> {
AsyncEntry {
state: match self.state {
EntryState::Pending(Some(EntryBuilder {
name,
connection,
insert,
retry_limit,
..
})) => EntryState::Pending(Some(EntryBuilder {
name,
connection,
insert,
update: Some(cb),
retry_limit,
_collection: PhantomData,
})),
_ => {
unreachable!("attempting to modify an already executing future")
}
},
}
}
pub fn retry_limit(mut self, attempts: usize) -> Self {
self.pending().retry_limit = attempts;
self
}
}
pub trait EntryInsert<Col: SerializedCollection>: Send + Unpin {
fn call(self) -> Col::Contents;
}
impl<F, Col> EntryInsert<Col> for F
where
F: FnOnce() -> Col::Contents + Send + Unpin,
Col: SerializedCollection,
{
fn call(self) -> Col::Contents {
self()
}
}
impl<Col> EntryInsert<Col> for ()
where
Col: SerializedCollection,
{
fn call(self) -> Col::Contents {
unreachable!()
}
}
pub trait EntryUpdate<Col>: Send + Unpin
where
Col: SerializedCollection,
{
fn call(&self, doc: &mut Col::Contents);
}
impl<F, Col> EntryUpdate<Col> for F
where
F: Fn(&mut Col::Contents) + Send + Unpin,
Col: NamedCollection + SerializedCollection,
{
fn call(&self, doc: &mut Col::Contents) {
self(doc);
}
}
impl<Col> EntryUpdate<Col> for ()
where
Col: SerializedCollection,
{
fn call(&self, _doc: &mut Col::Contents) {
unreachable!();
}
}
impl<'a, 'name, Conn, Col, EI, EU> Future for AsyncEntry<'a, 'name, Conn, Col, EI, EU>
where
Col: NamedCollection + SerializedCollection + 'static,
<Col as Collection>::PrimaryKey: Unpin,
Conn: AsyncConnection,
EI: EntryInsert<Col> + 'a,
EU: EntryUpdate<Col> + 'a,
'name: 'a,
{
type Output = Result<Option<CollectionDocument<Col>>, Error>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
if let Some(EntryBuilder {
name,
connection,
insert,
update,
retry_limit,
..
}) = match &mut self.state {
EntryState::Executing(_) => None,
EntryState::Pending(builder) => builder.take(),
} {
let future = Self::execute(name, connection, insert, update, retry_limit).boxed();
self.state = EntryState::Executing(future);
}
if let EntryState::Executing(future) = &mut self.state {
future.as_mut().poll(cx)
} else {
unreachable!()
}
}
}
enum EntryState<'a, 'name, Connection, Col, EI, EU>
where
Col: NamedCollection + SerializedCollection,
EI: EntryInsert<Col>,
EU: EntryUpdate<Col>,
{
Pending(Option<EntryBuilder<'a, 'name, Connection, Col, EI, EU>>),
Executing(BoxFuture<'a, Result<Option<CollectionDocument<Col>>, Error>>),
}
#[must_use]
pub struct List<'a, Cn, Cl, PrimaryKey>(connection::List<'a, Cn, Cl, PrimaryKey>)
where
Cl: Collection,
PrimaryKey: KeyEncoding<Cl::PrimaryKey> + PartialEq + ?Sized,
Cl::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>;
impl<'a, Cn, Cl, PrimaryKey> List<'a, Cn, Cl, PrimaryKey>
where
Cl: SerializedCollection,
Cn: Connection,
PrimaryKey: KeyEncoding<Cl::PrimaryKey> + PartialEq + ?Sized + 'a,
Cl::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
{
#[allow(clippy::missing_const_for_fn)] pub fn ascending(mut self) -> Self {
self.0 = self.0.ascending();
self
}
#[allow(clippy::missing_const_for_fn)] pub fn descending(mut self) -> Self {
self.0 = self.0.descending();
self
}
#[allow(clippy::missing_const_for_fn)] pub fn limit(mut self, maximum_results: u32) -> Self {
self.0 = self.0.limit(maximum_results);
self
}
pub fn headers(self) -> Result<Vec<Header>, Error> {
self.0.headers()
}
pub fn count(self) -> Result<u64, Error> {
self.0.count()
}
pub fn query(self) -> Result<Vec<CollectionDocument<Cl>>, Error> {
self.0.query().and_then(|docs| docs.collection_documents())
}
}
#[must_use]
pub struct AsyncList<'a, Cn, Cl, PrimaryKey>(connection::AsyncList<'a, Cn, Cl, PrimaryKey>)
where
Cl: Collection,
PrimaryKey: KeyEncoding<Cl::PrimaryKey> + PartialEq + ?Sized,
Cl::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>;
impl<'a, Cn, Cl, PrimaryKey> AsyncList<'a, Cn, Cl, PrimaryKey>
where
Cl: Collection,
Cn: AsyncConnection,
PrimaryKey: KeyEncoding<Cl::PrimaryKey> + PartialEq + ?Sized,
Cl::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
{
pub fn ascending(mut self) -> Self {
self.0 = self.0.ascending();
self
}
pub fn descending(mut self) -> Self {
self.0 = self.0.descending();
self
}
pub fn limit(mut self, maximum_results: u32) -> Self {
self.0 = self.0.limit(maximum_results);
self
}
pub async fn count(self) -> Result<u64, Error> {
self.0.count().await
}
pub async fn headers(self) -> Result<Vec<Header>, Error> {
self.0.headers().await
}
}
#[allow(clippy::type_repetition_in_bounds)]
impl<'a, Cn, Cl, PrimaryKey> Future for AsyncList<'a, Cn, Cl, PrimaryKey>
where
Cl: SerializedCollection + Unpin,
Cn: AsyncConnection,
PrimaryKey: KeyEncoding<Cl::PrimaryKey> + PartialEq + Unpin + ?Sized + 'a,
Cl::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey> + Unpin,
{
type Output = Result<Vec<CollectionDocument<Cl>>, Error>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
let result = ready!(self.0.poll_unpin(cx));
Poll::Ready(result.and_then(|docs| docs.collection_documents()))
}
}