#![cfg(feature = "cqrs")]
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use super::{Event, EventStore, EventStoreBackend, InMemoryBackend};
#[derive(Debug, Clone, Default)]
pub struct SyncCursor {
pub local_version: u64,
pub remote_version: u64,
}
#[derive(Debug, Clone)]
pub struct SyncReport {
pub pushed: usize,
pub pulled: usize,
pub conflicts: usize,
}
#[async_trait]
pub trait ConflictResolver<E: Event>: Send + Sync {
async fn resolve(&self, local: &[E], remote: &[E]) -> Vec<E>;
}
pub struct LastWriteWins;
#[async_trait]
impl<E: Event> ConflictResolver<E> for LastWriteWins {
async fn resolve(&self, _local: &[E], remote: &[E]) -> Vec<E> {
remote.to_vec()
}
}
pub struct AppendOnly;
#[async_trait]
impl<E: Event> ConflictResolver<E> for AppendOnly {
async fn resolve(&self, local: &[E], remote: &[E]) -> Vec<E> {
let mut merged = local.to_vec();
merged.extend(remote.iter().cloned());
merged
}
}
type ManualResolveFn<E> = dyn Fn(Vec<E>, Vec<E>) -> Pin<Box<dyn Future<Output = Vec<E>> + Send>>
+ Send
+ Sync;
pub struct Manual<E: Event> {
resolver_fn: Arc<ManualResolveFn<E>>,
}
impl<E: Event> Manual<E> {
pub fn new<F, Fut>(f: F) -> Self
where
F: Fn(Vec<E>, Vec<E>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Vec<E>> + Send + 'static,
{
Self {
resolver_fn: Arc::new(move |local, remote| Box::pin(f(local, remote))),
}
}
}
#[async_trait]
impl<E: Event> ConflictResolver<E> for Manual<E> {
async fn resolve(&self, local: &[E], remote: &[E]) -> Vec<E> {
(self.resolver_fn)(local.to_vec(), remote.to_vec()).await
}
}
pub struct SyncEngine<
E: Event,
B1: EventStoreBackend<E> = InMemoryBackend<E>,
B2: EventStoreBackend<E> = InMemoryBackend<E>,
R: ConflictResolver<E> = LastWriteWins,
> {
local: EventStore<E, B1>,
remote: EventStore<E, B2>,
resolver: R,
cursor: Arc<Mutex<SyncCursor>>,
}
impl<E: Event> SyncEngine<E, InMemoryBackend<E>, InMemoryBackend<E>, LastWriteWins> {
pub fn new(local: EventStore<E>, remote: EventStore<E>, resolver: LastWriteWins) -> Self {
Self {
local,
remote,
resolver,
cursor: Arc::new(Mutex::new(SyncCursor::default())),
}
}
}
impl<E: Event, R: ConflictResolver<E>>
SyncEngine<E, InMemoryBackend<E>, InMemoryBackend<E>, R>
{
pub fn with_resolver(
local: EventStore<E>,
remote: EventStore<E>,
resolver: R,
) -> Self {
Self {
local,
remote,
resolver,
cursor: Arc::new(Mutex::new(SyncCursor::default())),
}
}
pub async fn sync(&self) -> Result<SyncReport, String> {
let mut cursor = self.cursor.lock().await;
let local_events = self.local.get_all_events().await?;
let local_new: Vec<E> = local_events
.into_iter()
.skip(cursor.local_version as usize)
.collect();
let remote_events = self.remote.get_all_events().await?;
let remote_new: Vec<E> = remote_events
.into_iter()
.skip(cursor.remote_version as usize)
.collect();
let pushed = local_new.len();
let pulled = remote_new.len();
if !local_new.is_empty() {
self.remote.append("synced", local_new).await?;
}
if !remote_new.is_empty() {
self.local.append("synced", remote_new).await?;
}
let local_total = self.local.get_all_events().await?.len() as u64;
let remote_total = self.remote.get_all_events().await?.len() as u64;
cursor.local_version = local_total;
cursor.remote_version = remote_total;
Ok(SyncReport {
pushed,
pulled,
conflicts: 0,
})
}
pub async fn resolve_conflicts(
&self,
local: &[E],
remote: &[E],
) -> Vec<E> {
self.resolver.resolve(local, remote).await
}
}