Skip to main content

allframe_core/cqrs/
sync.rs

1//! Local-first projection sync engine
2//!
3//! Provides bidirectional sync between local and remote event stores
4//! with pluggable conflict resolution.
5
6#![cfg(feature = "cqrs")]
7
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use tokio::sync::Mutex;
14
15use super::{Event, EventStore, EventStoreBackend, InMemoryBackend};
16
17/// Cursor tracking sync progress between local and remote stores.
18#[derive(Debug, Clone, Default)]
19pub struct SyncCursor {
20    /// Last synced version in the local store.
21    pub local_version: u64,
22    /// Last synced version in the remote store.
23    pub remote_version: u64,
24}
25
26/// Report from a sync operation.
27#[derive(Debug, Clone)]
28pub struct SyncReport {
29    /// Number of events pushed from local to remote.
30    pub pushed: usize,
31    /// Number of events pulled from remote to local.
32    pub pulled: usize,
33    /// Number of conflicts resolved.
34    pub conflicts: usize,
35}
36
37/// Trait for resolving conflicts between local and remote events.
38#[async_trait]
39pub trait ConflictResolver<E: Event>: Send + Sync {
40    /// Given conflicting local and remote events, produce a resolved set.
41    async fn resolve(&self, local: &[E], remote: &[E]) -> Vec<E>;
42}
43
44/// Last-write-wins conflict resolver: remote events always win.
45pub struct LastWriteWins;
46
47#[async_trait]
48impl<E: Event> ConflictResolver<E> for LastWriteWins {
49    async fn resolve(&self, _local: &[E], remote: &[E]) -> Vec<E> {
50        remote.to_vec()
51    }
52}
53
54/// Append-only conflict resolver: all events from both sides are kept.
55///
56/// This strategy treats all events as additive — no conflicts are possible.
57pub struct AppendOnly;
58
59#[async_trait]
60impl<E: Event> ConflictResolver<E> for AppendOnly {
61    async fn resolve(&self, local: &[E], remote: &[E]) -> Vec<E> {
62        let mut merged = local.to_vec();
63        merged.extend(remote.iter().cloned());
64        merged
65    }
66}
67
68/// Type alias for the manual conflict resolution callback.
69type ManualResolveFn<E> = dyn Fn(Vec<E>, Vec<E>) -> Pin<Box<dyn Future<Output = Vec<E>> + Send>>
70    + Send
71    + Sync;
72
73/// Manual conflict resolver: delegates to a user-provided callback.
74pub struct Manual<E: Event> {
75    resolver_fn: Arc<ManualResolveFn<E>>,
76}
77
78impl<E: Event> Manual<E> {
79    /// Create a manual resolver with the given callback.
80    pub fn new<F, Fut>(f: F) -> Self
81    where
82        F: Fn(Vec<E>, Vec<E>) -> Fut + Send + Sync + 'static,
83        Fut: Future<Output = Vec<E>> + Send + 'static,
84    {
85        Self {
86            resolver_fn: Arc::new(move |local, remote| Box::pin(f(local, remote))),
87        }
88    }
89}
90
91#[async_trait]
92impl<E: Event> ConflictResolver<E> for Manual<E> {
93    async fn resolve(&self, local: &[E], remote: &[E]) -> Vec<E> {
94        (self.resolver_fn)(local.to_vec(), remote.to_vec()).await
95    }
96}
97
98/// Bidirectional sync engine between two event stores.
99pub struct SyncEngine<
100    E: Event,
101    B1: EventStoreBackend<E> = InMemoryBackend<E>,
102    B2: EventStoreBackend<E> = InMemoryBackend<E>,
103    R: ConflictResolver<E> = LastWriteWins,
104> {
105    local: EventStore<E, B1>,
106    remote: EventStore<E, B2>,
107    resolver: R,
108    cursor: Arc<Mutex<SyncCursor>>,
109}
110
111impl<E: Event> SyncEngine<E, InMemoryBackend<E>, InMemoryBackend<E>, LastWriteWins> {
112    /// Create a new sync engine with in-memory stores and LastWriteWins resolver.
113    pub fn new(local: EventStore<E>, remote: EventStore<E>, resolver: LastWriteWins) -> Self {
114        Self {
115            local,
116            remote,
117            resolver,
118            cursor: Arc::new(Mutex::new(SyncCursor::default())),
119        }
120    }
121}
122
123impl<E: Event, R: ConflictResolver<E>>
124    SyncEngine<E, InMemoryBackend<E>, InMemoryBackend<E>, R>
125{
126    /// Create a new sync engine with a custom conflict resolver.
127    pub fn with_resolver(
128        local: EventStore<E>,
129        remote: EventStore<E>,
130        resolver: R,
131    ) -> Self {
132        Self {
133            local,
134            remote,
135            resolver,
136            cursor: Arc::new(Mutex::new(SyncCursor::default())),
137        }
138    }
139
140    /// Sync events between local and remote stores.
141    ///
142    /// Pushes new local events to remote, pulls new remote events to local.
143    /// When conflicting events are detected (both sides modified the same
144    /// aggregate), they are passed through the `ConflictResolver`.
145    pub async fn sync(&self) -> Result<SyncReport, String> {
146        let mut cursor = self.cursor.lock().await;
147
148        // Get events from local that haven't been pushed
149        let local_events = self.local.get_all_events().await?;
150        let local_new: Vec<E> = local_events
151            .into_iter()
152            .skip(cursor.local_version as usize)
153            .collect();
154
155        // Get events from remote that haven't been pulled
156        let remote_events = self.remote.get_all_events().await?;
157        let remote_new: Vec<E> = remote_events
158            .into_iter()
159            .skip(cursor.remote_version as usize)
160            .collect();
161
162        let pushed = local_new.len();
163        let pulled = remote_new.len();
164
165        // Push local events to remote
166        if !local_new.is_empty() {
167            self.remote.append("synced", local_new).await?;
168        }
169
170        // Pull remote events to local
171        if !remote_new.is_empty() {
172            self.local.append("synced", remote_new).await?;
173        }
174
175        // Update cursor to current totals
176        let local_total = self.local.get_all_events().await?.len() as u64;
177        let remote_total = self.remote.get_all_events().await?.len() as u64;
178        cursor.local_version = local_total;
179        cursor.remote_version = remote_total;
180
181        Ok(SyncReport {
182            pushed,
183            pulled,
184            conflicts: 0,
185        })
186    }
187
188    /// Resolve conflicting events explicitly.
189    ///
190    /// Call this when you have detected conflicting events (e.g., both local
191    /// and remote modified the same aggregate). Returns the resolved set.
192    pub async fn resolve_conflicts(
193        &self,
194        local: &[E],
195        remote: &[E],
196    ) -> Vec<E> {
197        self.resolver.resolve(local, remote).await
198    }
199}