allframe_core/cqrs/
sync.rs1#![cfg(feature = "cqrs")]
7
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use tokio::sync::Mutex;
14
15use super::{Event, EventStore, EventStoreBackend, InMemoryBackend};
16
17#[derive(Debug, Clone, Default)]
19pub struct SyncCursor {
20 pub local_version: u64,
22 pub remote_version: u64,
24}
25
26#[derive(Debug, Clone)]
28pub struct SyncReport {
29 pub pushed: usize,
31 pub pulled: usize,
33 pub conflicts: usize,
35}
36
37#[async_trait]
39pub trait ConflictResolver<E: Event>: Send + Sync {
40 async fn resolve(&self, local: &[E], remote: &[E]) -> Vec<E>;
42}
43
44pub struct LastWriteWins;
46
47#[async_trait]
48impl<E: Event> ConflictResolver<E> for LastWriteWins {
49 async fn resolve(&self, _local: &[E], remote: &[E]) -> Vec<E> {
50 remote.to_vec()
51 }
52}
53
54pub struct AppendOnly;
58
59#[async_trait]
60impl<E: Event> ConflictResolver<E> for AppendOnly {
61 async fn resolve(&self, local: &[E], remote: &[E]) -> Vec<E> {
62 let mut merged = local.to_vec();
63 merged.extend(remote.iter().cloned());
64 merged
65 }
66}
67
68type ManualResolveFn<E> = dyn Fn(Vec<E>, Vec<E>) -> Pin<Box<dyn Future<Output = Vec<E>> + Send>>
70 + Send
71 + Sync;
72
73pub struct Manual<E: Event> {
75 resolver_fn: Arc<ManualResolveFn<E>>,
76}
77
78impl<E: Event> Manual<E> {
79 pub fn new<F, Fut>(f: F) -> Self
81 where
82 F: Fn(Vec<E>, Vec<E>) -> Fut + Send + Sync + 'static,
83 Fut: Future<Output = Vec<E>> + Send + 'static,
84 {
85 Self {
86 resolver_fn: Arc::new(move |local, remote| Box::pin(f(local, remote))),
87 }
88 }
89}
90
91#[async_trait]
92impl<E: Event> ConflictResolver<E> for Manual<E> {
93 async fn resolve(&self, local: &[E], remote: &[E]) -> Vec<E> {
94 (self.resolver_fn)(local.to_vec(), remote.to_vec()).await
95 }
96}
97
98pub struct SyncEngine<
100 E: Event,
101 B1: EventStoreBackend<E> = InMemoryBackend<E>,
102 B2: EventStoreBackend<E> = InMemoryBackend<E>,
103 R: ConflictResolver<E> = LastWriteWins,
104> {
105 local: EventStore<E, B1>,
106 remote: EventStore<E, B2>,
107 resolver: R,
108 cursor: Arc<Mutex<SyncCursor>>,
109}
110
111impl<E: Event> SyncEngine<E, InMemoryBackend<E>, InMemoryBackend<E>, LastWriteWins> {
112 pub fn new(local: EventStore<E>, remote: EventStore<E>, resolver: LastWriteWins) -> Self {
114 Self {
115 local,
116 remote,
117 resolver,
118 cursor: Arc::new(Mutex::new(SyncCursor::default())),
119 }
120 }
121}
122
123impl<E: Event, R: ConflictResolver<E>>
124 SyncEngine<E, InMemoryBackend<E>, InMemoryBackend<E>, R>
125{
126 pub fn with_resolver(
128 local: EventStore<E>,
129 remote: EventStore<E>,
130 resolver: R,
131 ) -> Self {
132 Self {
133 local,
134 remote,
135 resolver,
136 cursor: Arc::new(Mutex::new(SyncCursor::default())),
137 }
138 }
139
140 pub async fn sync(&self) -> Result<SyncReport, String> {
146 let mut cursor = self.cursor.lock().await;
147
148 let local_events = self.local.get_all_events().await?;
150 let local_new: Vec<E> = local_events
151 .into_iter()
152 .skip(cursor.local_version as usize)
153 .collect();
154
155 let remote_events = self.remote.get_all_events().await?;
157 let remote_new: Vec<E> = remote_events
158 .into_iter()
159 .skip(cursor.remote_version as usize)
160 .collect();
161
162 let pushed = local_new.len();
163 let pulled = remote_new.len();
164
165 if !local_new.is_empty() {
167 self.remote.append("synced", local_new).await?;
168 }
169
170 if !remote_new.is_empty() {
172 self.local.append("synced", remote_new).await?;
173 }
174
175 let local_total = self.local.get_all_events().await?.len() as u64;
177 let remote_total = self.remote.get_all_events().await?.len() as u64;
178 cursor.local_version = local_total;
179 cursor.remote_version = remote_total;
180
181 Ok(SyncReport {
182 pushed,
183 pulled,
184 conflicts: 0,
185 })
186 }
187
188 pub async fn resolve_conflicts(
193 &self,
194 local: &[E],
195 remote: &[E],
196 ) -> Vec<E> {
197 self.resolver.resolve(local, remote).await
198 }
199}