use crate::prelude::*;
use crate::IdentityBy;
use async_trait::async_trait;
use frunk::hlist::{HCons, HList, HMappable, HNil};
use frunk::traits::IntoReverse;
pub use operation::*;
use std::error::Error;
use std::marker::PhantomData;
use std::sync::Arc;
#[cfg(test)]
mod tests;
pub mod operation {
use super::repository::Repository;
use crate::{Identity, IdentityBy, IdentityOf};
use async_trait::async_trait;
use frunk::HNil;
use std::error::Error;
use std::sync::Arc;
#[async_trait]
pub trait Operation
where
Self: Sized + 'static,
{
type Output: Send + Sync + 'static;
type Error: Send + Sync;
async fn perform(&mut self) -> Result<Self::Output, Self::Error>;
async fn rollback(&mut self) {}
}
#[async_trait]
impl Operation for () {
type Output = ();
type Error = Box<dyn Error + Send + Sync>;
async fn perform(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
impl Operation for HNil {
type Output = ();
type Error = Box<dyn Error + Send + Sync>;
async fn perform(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
pub trait Add<R: Repository<V>, V: Identity>: Operation {
fn new(r: Arc<R>, v: V) -> Self;
}
pub trait Get<R: Repository<V>, V: Identity>: Operation {
fn new(r: Arc<R>, k: IdentityOf<V>) -> Self;
}
pub trait List<R: Repository<V>, V: Identity>: Operation {
fn new(r: Arc<R>) -> Self;
}
pub trait Update<R: Repository<V>, V: Identity>: Operation {
fn new(r: Arc<R>, v: V) -> Self;
}
pub trait Remove<R: Repository<V>, V: Identity>: Operation {
fn new(r: Arc<R>, k: IdentityOf<V>) -> Self;
}
pub trait ListBy<R: Repository<V>, V: Identity + IdentityBy<K>, K>: Operation {
fn new(r: Arc<R>, k: K) -> Self;
}
pub trait GetBy<R: Repository<V>, V: Identity + IdentityBy<K>, K>: Operation {
fn new(r: Arc<R>, k: K) -> Self;
}
pub mod default {
use super::{Add, GetBy, Get, List, Operation, Remove, ListBy, Update};
use crate::transactional::Repository;
use crate::{Identity, IdentityBy, IdentityOf};
use async_trait::async_trait;
use std::marker::PhantomData;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct GetOperation<R, V>
where
V: Identity,
R: crate::Repository<V> + crate::Get<V>,
{
r: Arc<R>,
k: IdentityOf<V>,
}
impl<R, V> Get<R, V> for GetOperation<R, V>
where
V: Identity + Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V> + crate::Repository<V> + crate::Get<V> + Send + Sync + 'static,
R::Error: Send + Sync,
{
fn new(r: Arc<R>, k: IdentityOf<V>) -> Self {
Self { r, k }
}
}
#[async_trait]
impl<R, V> Operation for GetOperation<R, V>
where
V: Identity + Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V> + crate::Repository<V> + crate::Get<V> + Send + Sync + 'static,
R::Error: Send + Sync,
{
type Output = Option<V>;
type Error = R::Error;
async fn perform(&mut self) -> Result<Self::Output, Self::Error> {
self.r.get_one(&self.k).await
}
}
#[derive(Debug, Clone)]
pub struct AddOperation<R, V>
where
V: Identity,
R: crate::Repository<V> + crate::Add<V>,
{
r: Arc<R>,
v: V,
id: Option<IdentityOf<V>>,
}
#[async_trait]
impl<R, V> Operation for AddOperation<R, V>
where
V: Identity + Send + Sync + Clone + 'static,
V::Id: Send + Sync,
R: Repository<V>
+ crate::Repository<V>
+ crate::Add<V>
+ crate::Remove<V>
+ Send
+ Sync
+ 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
type Output = ();
type Error = R::Error;
async fn perform(&mut self) -> Result<Self::Output, R::Error> {
self.r.add(self.v.clone()).await
}
async fn rollback(&mut self) {
if let Some(ref id) = self.id {
if let Err(err) = self.r.remove(id).await {
log::error!("Rollback error:\n{err:?}");
}
}
}
}
impl<R, V> Add<R, V> for AddOperation<R, V>
where
V: Identity + Send + Sync + Clone + 'static,
V::Id: Send + Sync,
R: Repository<V>
+ crate::Repository<V>
+ crate::Add<V>
+ crate::Remove<V>
+ Send
+ Sync
+ 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
fn new(r: Arc<R>, v: V) -> Self {
Self { r, v, id: None }
}
}
#[derive(Debug, Clone)]
pub struct ListOperation<R, V>
where
V: Identity,
R: crate::Repository<V> + crate::List<V>,
{
r: Arc<R>,
v: PhantomData<V>,
}
#[async_trait]
impl<R, V> Operation for ListOperation<R, V>
where
V: Identity + Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V> + crate::Repository<V> + crate::List<V> + Send + Sync + 'static,
R::Error: Send + Sync,
{
type Output = Vec<V>;
type Error = R::Error;
async fn perform(&mut self) -> Result<Self::Output, Self::Error> {
self.r.list().await
}
}
impl<R, V> List<R, V> for ListOperation<R, V>
where
V: Identity + Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V> + crate::Repository<V> + crate::List<V> + Send + Sync + 'static,
R::Error: Send + Sync,
{
fn new(r: Arc<R>) -> Self {
Self { r, v: PhantomData }
}
}
#[derive(Debug, Clone)]
pub struct RemoveOperation<R, V>
where
V: Identity,
R: Repository<V>,
{
r: Arc<R>,
k: IdentityOf<V>,
res: Option<V>,
}
#[async_trait]
impl<R, V> Operation for RemoveOperation<R, V>
where
V: Identity + Clone + Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V>
+ crate::Repository<V>
+ crate::Remove<V>
+ crate::Add<V>
+ Send
+ Sync
+ 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
type Output = Option<V>;
type Error = R::Error;
async fn perform(&mut self) -> Result<Self::Output, Self::Error> {
let res = self.r.remove(&self.k).await?;
self.res = res.clone();
Ok(res)
}
async fn rollback(&mut self) {
if let Some(ref v) = self.res {
if let Err(err) = self.r.add(v.clone()).await {
log::error!("Rollback error:\n{err:?}");
}
}
}
}
impl<R, V> Remove<R, V> for RemoveOperation<R, V>
where
V: Identity + Clone + Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V>
+ crate::Repository<V>
+ crate::Remove<V>
+ crate::Add<V>
+ Send
+ Sync
+ 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
fn new(r: Arc<R>, k: IdentityOf<V>) -> Self {
Self { r, k, res: None }
}
}
#[derive(Clone, Debug)]
pub struct UpdateOperation<R: crate::Repository<V>, V: Identity> {
r: Arc<R>,
v: V,
initial: Option<V>,
}
#[async_trait]
impl<R, V> Operation for UpdateOperation<R, V>
where
V: Identity + Clone + Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V>
+ crate::Repository<V>
+ crate::Update<V>
+ crate::Get<V>
+ Send
+ Sync
+ 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
type Output = ();
type Error = R::Error;
async fn perform(&mut self) -> Result<Self::Output, Self::Error> {
self.initial = self.r.get_one(self.v.id()).await?;
self.r.update(self.v.clone()).await
}
async fn rollback(&mut self) {
if let Some(ref initial) = self.initial {
if let Err(err) = self.r.update(initial.clone()).await {
log::error!("Rollback error:\n{err:?}");
}
}
}
}
impl<R, V> Update<R, V> for UpdateOperation<R, V>
where
V: Identity + Clone + Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V>
+ crate::Repository<V>
+ crate::Update<V>
+ crate::Get<V>
+ Send
+ Sync
+ 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
fn new(r: Arc<R>, v: V) -> Self {
Self {
r,
v,
initial: None,
}
}
}
#[derive(Debug, Clone)]
pub struct ListByOperation<R, V, K>
where
V: Identity + IdentityBy<K>,
R: crate::Repository<V>,
{
r: Arc<R>,
v: PhantomData<V>,
k: K,
}
#[async_trait]
impl<R, V, K> Operation for ListByOperation<R, V, K>
where
V: Identity + IdentityBy<K> + Clone + Send + Sync + 'static,
K: Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V> + crate::Repository<V> + crate::ListBy<V, K> + Send + Sync + 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
type Output = Vec<V>;
type Error = R::Error;
async fn perform(&mut self) -> Result<Self::Output, Self::Error> {
self.r.list_by(&self.k).await
}
}
impl<R, V, K> ListBy<R, V, K> for ListByOperation<R, V, K>
where
V: Identity + IdentityBy<K> + Clone + Send + Sync + 'static,
K: Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V> + crate::Repository<V> + crate::ListBy<V, K> + Send + Sync + 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
fn new(r: Arc<R>, k: K) -> Self {
Self {
r,
v: PhantomData,
k,
}
}
}
#[derive(Debug, Clone)]
pub struct GetByOperation<R, V, K>
where
V: Identity + IdentityBy<K>,
R: crate::Repository<V>,
{
r: Arc<R>,
v: PhantomData<V>,
k: K,
}
#[async_trait]
impl<R, V, K> Operation for GetByOperation<R, V, K>
where
V: Identity + IdentityBy<K> + Clone + Send + Sync + 'static,
K: Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V> + crate::Repository<V> + crate::GetBy<V, K> + Send + Sync + 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
type Output = Option<V>;
type Error = R::Error;
async fn perform(&mut self) -> Result<Self::Output, Self::Error> {
self.r.get_by(&self.k).await
}
}
impl<R, V, K> GetBy<R, V, K> for GetByOperation<R, V, K>
where
V: Identity + IdentityBy<K> + Clone + Send + Sync + 'static,
K: Send + Sync + 'static,
V::Id: Send + Sync,
R: Repository<V> + crate::Repository<V> + crate::GetBy<V, K> + Send + Sync + 'static,
R::Error: std::fmt::Debug + Send + Sync,
{
fn new(r: Arc<R>, k: K) -> Self {
Self {
r,
v: PhantomData,
k,
}
}
}
}
}
pub struct RepositoryWrapper<H, T, V, R>
where
H: Operation,
V: Identity,
R: Repository<V>,
{
operations: HCons<H, T>,
_v: PhantomData<V>,
r: Arc<R>,
}
impl<H, T, V, R> RepositoryWrapper<H, T, V, R>
where
H: Operation,
V: Identity,
R: Repository<V>,
{
fn add_operation<RH: Operation>(self, o: RH) -> RepositoryWrapper<RH, HCons<H, T>, V, R> {
let operations = HCons {
head: o,
tail: self.operations,
};
let r = self.r;
RepositoryWrapper {
operations,
_v: PhantomData,
r,
}
}
pub fn change_type<NV, NR>(self, r: Arc<NR>) -> RepositoryWrapper<H, T, NV, NR>
where
NR: Repository<NV>,
NV: Identity,
{
let operations = self.operations;
RepositoryWrapper {
operations,
_v: PhantomData,
r,
}
}
}
impl<H, T, V, R> crate::Repository<V> for RepositoryWrapper<H, T, V, R>
where
V: Identity,
H: Operation,
R: crate::Repository<V> + Repository<V>,
{
type Error = <R as crate::Repository<V>>::Error;
}
#[async_trait]
impl<H, T, V, R> crate::Add<V> for RepositoryWrapper<H, T, V, R>
where
V: Identity + Sync,
H: Operation + Send + Sync,
T: Send + Sync,
R: crate::Repository<V> + Repository<V> + crate::Add<V> + Send + Sync,
{
async fn add(&self, v: V) -> Result<(), Self::Error> {
self.r.add(v).await
}
}
#[async_trait]
impl<H, T, V, R> crate::List<V> for RepositoryWrapper<H, T, V, R>
where
V: Identity + Sync,
H: Operation + Send + Sync,
T: Send + Sync,
R: crate::Repository<V> + Repository<V> + crate::List<V> + Send + Sync,
{
async fn list(&self) -> Result<Vec<V>, Self::Error> {
self.r.list().await
}
}
#[async_trait]
impl<H, T, V, R> crate::Get<V> for RepositoryWrapper<H, T, V, R>
where
V: Identity + Sync,
V::Id: Sync,
H: Operation + Send + Sync,
T: Send + Sync,
R: crate::Repository<V> + Repository<V> + crate::Get<V> + Send + Sync,
{
async fn get_one(&self, k: &IdentityOf<V>) -> Result<Option<V>, Self::Error> {
self.r.get_one(k).await
}
}
#[async_trait]
impl<H, T, V, R> crate::Remove<V> for RepositoryWrapper<H, T, V, R>
where
V: Identity + Sync,
V::Id: Sync,
H: Operation + Send + Sync,
T: Send + Sync,
R: crate::Repository<V> + Repository<V> + crate::Remove<V> + Send + Sync,
{
async fn remove(&self, k: &IdentityOf<V>) -> Result<Option<V>, Self::Error> {
self.r.remove(k).await
}
}
#[async_trait]
impl<H, T, V, R> crate::Update<V> for RepositoryWrapper<H, T, V, R>
where
V: Identity + Sync,
V::Id: Sync,
H: Operation + Send + Sync,
T: Send + Sync,
R: crate::Repository<V> + Repository<V> + crate::Update<V> + Send + Sync,
{
async fn update(&self, v: V) -> Result<(), Self::Error> {
self.r.update(v).await
}
}
impl<V, R> RepositoryWrapper<(), HNil, V, R>
where
V: Identity,
R: Repository<V>,
{
pub fn new(r: Arc<R>) -> Self {
Self {
operations: HCons {
head: (),
tail: HNil,
},
_v: PhantomData,
r,
}
}
}
#[async_trait]
impl<V, H, T, R> TransactionRunner<V, H, T> for RepositoryWrapper<H, T, V, R>
where
V: Identity,
T: HList + IntoReverse + Send + Sync,
H: Operation + Send + Sync,
H::Error: AsRef<dyn Error + Send + Sync>,
HCons<H, T>: HMappable<RollbackMapper> + IntoReverse,
<HCons<H, T> as IntoReverse>::Output: TryMap<PerformMapper, Error = H::Error>
+ Send
+ Sync
+ HMappable<RollbackMapper>,
<<HCons<H, T> as IntoReverse>::Output as TryMap<PerformMapper>>::Output: Send,
R: Repository<V> + Send + Sync,
{
type Error = H::Error;
type Output = <<HCons<H, T> as IntoReverse>::Output as TryMap<PerformMapper>>::Output;
async fn commit_transaction(self) -> Result<Self::Output, Self::Error> {
let mut operations = self.operations.into_reverse();
match operations.try_map(PerformMapper).await {
Ok(res) => Ok(res),
Err(err) => {
operations.map(RollbackMapper);
Err(err)
}
}
}
}
impl<V, H, T, R> RepositoryWrapper<H, T, V, R>
where
V: Identity,
H: Operation,
R: Repository<V>,
{
#[allow(clippy::should_implement_trait)]
pub fn add(self, v: V) -> RepositoryWrapper<R::Add, HCons<H, T>, V, R>
where
R: repository::Add<V>,
{
let op = R::Add::new(self.r.clone(), v);
self.add_operation(op)
}
pub fn get(self, k: IdentityOf<V>) -> RepositoryWrapper<R::Get, HCons<H, T>, V, R>
where
R: repository::Get<V>,
{
let op = R::Get::new(self.r.clone(), k);
self.add_operation(op)
}
pub fn list(self) -> RepositoryWrapper<R::List, HCons<H, T>, V, R>
where
R: repository::List<V>,
{
let op = R::List::new(self.r.clone());
self.add_operation(op)
}
pub fn update(self, v: V) -> RepositoryWrapper<R::Update, HCons<H, T>, V, R>
where
R: repository::Update<V>,
{
let op = R::Update::new(self.r.clone(), v);
self.add_operation(op)
}
pub fn remove(self, k: IdentityOf<V>) -> RepositoryWrapper<R::Remove, HCons<H, T>, V, R>
where
R: repository::Remove<V>,
{
let op = R::Remove::new(self.r.clone(), k);
self.add_operation(op)
}
pub fn select<K>(self, k: K) -> RepositoryWrapper<R::ListBy, HCons<H, T>, V, R>
where
V: IdentityBy<K>,
R: repository::ListBy<V, K>,
{
let op = R::ListBy::new(self.r.clone(), k);
self.add_operation(op)
}
pub fn find<K>(self, k: K) -> RepositoryWrapper<R::GetBy, HCons<H, T>, V, R>
where
V: IdentityBy<K>,
R: repository::GetBy<V, K>,
{
let op = R::GetBy::new(self.r.clone(), k);
self.add_operation(op)
}
}
pub mod mappers {
use super::Operation;
use async_trait::async_trait;
use frunk::hlist::{HCons, HMappable, HNil};
use std::error::Error;
#[async_trait]
pub trait TryMap<Mapper>
where
Self: Sized,
{
type Output;
type Error;
async fn try_map(&mut self, mapper: Mapper) -> Result<Self::Output, Self::Error>;
}
#[async_trait]
impl<H, T> TryMap<PerformMapper> for HCons<H, T>
where
H: Operation + Send + Sync,
T: TryMap<PerformMapper, Error = H::Error> + Send + Sync,
{
type Output = HCons<H::Output, T::Output>;
type Error = H::Error;
async fn try_map(&mut self, mapper: PerformMapper) -> Result<Self::Output, Self::Error> {
let HCons { ref mut head, tail } = self;
let head = match head.perform().await {
Ok(head) => head,
Err(err) => return Err(err),
};
Ok(HCons {
head,
tail: tail.try_map(mapper).await?,
})
}
}
#[async_trait]
impl TryMap<PerformMapper> for HNil {
type Output = HNil;
type Error = Box<dyn Error + Send + Sync + 'static>;
async fn try_map(&mut self, _: PerformMapper) -> Result<Self::Output, Self::Error> {
Ok(HNil)
}
}
#[derive(Debug)]
pub struct PerformMapper;
pub struct RollbackMapper;
impl<H, T> HMappable<RollbackMapper> for HCons<H, T>
where
T: HMappable<RollbackMapper>,
H: Operation + Send,
{
type Output = ();
fn map(self, m: RollbackMapper) -> Self::Output {
let HCons { mut head, tail } = self;
futures::executor::block_on(async { head.rollback().await });
tail.map(m);
}
}
}
use mappers::{PerformMapper, RollbackMapper, TryMap};
#[async_trait]
pub trait TransactionRunner<V, H, T>
where
V: Identity,
H: Operation,
{
type Error: AsRef<dyn Error + Send + Sync + 'static>;
type Output;
async fn commit_transaction(self) -> Result<Self::Output, Self::Error>;
}
pub use self::repository::Repository;
pub mod repository {
use super::operation;
use crate::{Identity, IdentityBy};
use async_trait::async_trait;
#[async_trait]
pub trait Repository<V: Identity>: crate::Repository<V>
where
Self: Sized,
{
async fn transaction_started(&mut self) {}
async fn rollback(&mut self) {}
}
pub trait Add<V: Identity>: Repository<V> {
type Add: operation::Add<Self, V>;
}
pub trait Get<V: Identity>: Repository<V> {
type Get: operation::Get<Self, V>;
}
pub trait List<V: Identity>: Repository<V> {
type List: operation::List<Self, V>;
}
pub trait Update<V: Identity>: Repository<V> {
type Update: operation::Update<Self, V>;
}
pub trait Remove<V: Identity>: Repository<V> {
type Remove: operation::Remove<Self, V>;
}
pub trait ListBy<V: Identity + IdentityBy<K>, K>: Repository<V> {
type ListBy: operation::ListBy<Self, V, K>;
}
pub trait GetBy<V: Identity + IdentityBy<K>, K>: Repository<V> {
type GetBy: operation::GetBy<Self, V, K>;
}
}