1use std::sync::Mutex;
13
14use async_trait::async_trait;
15use futures::channel::mpsc;
16use futures::lock::Mutex as AsyncMutex;
17use ubiquisync_core::{
18 codec::DecodedEntry,
19 event::{EventBus, EventHandler, Publisher, RoutableEvent, Subscription},
20 hlc::{HlcError, HlcService, Timestamp, wall_ms},
21 log_entry::LogEntry,
22 sync::{
23 Applied, CursorStream, CursorsEvent, HasCursors, LogProcessor, LogSource, PeerCursors,
24 SyncError,
25 },
26 uuid::Uuid,
27};
28
29use crate::{
30 db::{Db, DbError, DbRow, DbValue},
31 hlc_storage::SqlHlcStorage,
32 reducer::Reducer,
33 store::SqlStore,
34 tracker::{HistoryTracker, LogTracker, LogTrackerError},
35};
36
37pub struct Processor<R: Reducer, D: Db, T, E: EventHandler<R::Event>> {
43 self_id: Uuid,
44 reducer: AsyncMutex<R>,
47 db: D,
48 hlc: HlcService<SqlHlcStorage>,
49 tracker: T,
50 cursors: Mutex<PeerCursors>,
53 watchers: Mutex<Vec<mpsc::UnboundedSender<CursorsEvent>>>,
54 event_publish: E::Publish,
55 event_handler: E,
56}
57
58#[allow(dead_code)]
59impl<R: Reducer, D: Db, T: LogTracker<R::Op>, E: EventHandler<R::Event>> Processor<R, D, T, E> {
60 pub async fn open(
64 reducer: R,
65 db: D,
66 prefix: &str,
67 self_id: Uuid,
68 ) -> Result<Self, ProcessorError<R::Error>> {
69 let hlc = HlcService::open(SqlHlcStorage::open(&db, prefix).await?)?;
70 let tracker = T::init(&db, prefix).await?;
71 let cursors = tracker.all_cursors(&db).await?;
72 let (event_publish, event_handler) = E::init();
73 Ok(Self {
74 reducer: AsyncMutex::new(reducer),
75 self_id,
76 db,
77 hlc,
78 tracker,
79 event_publish,
80 event_handler,
81 cursors: Mutex::new(cursors),
82 watchers: Mutex::new(Vec::new()),
83 })
84 }
85
86 pub async fn exec(
89 &self,
90 server_user_id: Option<Uuid>,
91 op: R::Op,
92 ) -> Result<(), ProcessorError<R::Error>> {
93 let mut reducer = self.reducer.lock().await;
94 let entry_idx = self.cached_cursor(&self.self_id);
95 let events = self
96 .ingest_entry_or_local(
97 &mut reducer,
98 &self.self_id,
99 entry_idx,
100 None,
101 server_user_id,
102 &op,
103 )
104 .await?;
105 drop(reducer);
107 for event in events {
108 self.event_publish.publish(event);
109 }
110 Ok(())
111 }
112
113 pub fn event_handler(&self) -> &E {
115 &self.event_handler
116 }
117
118 pub(crate) fn db(&self) -> &D {
120 &self.db
121 }
122
123 async fn ingest_entry(
128 &self,
129 reducer: &mut R,
130 peer_id: &Uuid,
131 entry_idx: u64,
132 entry: &LogEntry<R::Op>,
133 ) -> Result<Vec<R::Event>, ProcessorError<R::Error>> {
134 self.ingest_entry_or_local(
135 reducer,
136 peer_id,
137 entry_idx,
138 Some(entry.timestamp),
139 entry.server_user_id,
140 &entry.op,
141 )
142 .await
143 }
144
145 async fn ingest_entry_or_local(
146 &self,
147 reducer: &mut R,
148 peer_id: &Uuid,
149 entry_idx: u64,
150 timestamp: Option<Timestamp>,
151 server_user_id: Option<Uuid>,
152 op: &R::Op,
153 ) -> Result<Vec<R::Event>, ProcessorError<R::Error>> {
154 let prepare_state = reducer
155 .prepare(&self.db, op)
156 .await
157 .map_err(ProcessorError::Reducer)?;
158 let mut batch = self.db.new_batch();
159 let timestamp = if let Some(timestamp) = timestamp {
160 self.hlc.observe(timestamp, wall_ms(), batch.as_mut())?;
161 timestamp
162 } else {
163 self.hlc.now(batch.as_mut())?
164 };
165 self.tracker.track_one(
166 peer_id,
167 entry_idx,
168 timestamp,
169 server_user_id,
170 op,
171 batch.as_mut(),
172 )?;
173 let apply_state = reducer
174 .apply(batch.as_mut(), timestamp, op, prepare_state)
175 .map_err(ProcessorError::Reducer)?;
176 let batch_result = batch.commit().await?;
177 let event = reducer
178 .post_apply(apply_state, &batch_result)
179 .map_err(ProcessorError::Reducer)?;
180 self.advance_cursor(peer_id, entry_idx + 1);
184 Ok(event)
185 }
186 async fn ingest_expunged(
190 &self,
191 peer_id: &Uuid,
192 entry_idx: u64,
193 hash: &blake3::Hash,
194 ) -> Result<(), ProcessorError<R::Error>> {
195 let mut batch = self.db.new_batch();
196 self.tracker
197 .track_expunged(peer_id, entry_idx, hash, batch.as_mut())?;
198 batch.commit().await?;
199 Ok(())
200 }
201
202 #[cfg(any(test, feature = "test-support"))]
206 pub(crate) async fn process_one(
207 &self,
208 peer_id: &Uuid,
209 entry_idx: u64,
210 entry: &LogEntry<R::Op>,
211 ) -> Result<(), ProcessorError<R::Error>> {
212 let mut reducer = self.reducer.lock().await;
213 let events = self
214 .ingest_entry(&mut reducer, peer_id, entry_idx, entry)
215 .await?;
216 drop(reducer);
217 for event in events {
218 self.event_publish.publish(event);
219 }
220 Ok(())
221 }
222}
223
224#[allow(dead_code)]
225impl<R: Reducer, D: Db, T, E: EventHandler<R::Event>> Processor<R, D, T, E> {
226 fn cached_cursor(&self, peer: &Uuid) -> u64 {
227 lock(&self.cursors).get(peer).copied().unwrap_or(0)
228 }
229
230 fn advance_cursor(&self, peer: &Uuid, next: u64) {
232 let advanced = {
233 let mut cursors = lock(&self.cursors);
234 let slot = cursors.entry(*peer).or_insert(0);
235 if next > *slot {
236 *slot = next;
237 true
238 } else {
239 false
240 }
241 };
242 if advanced {
243 let mut delta = PeerCursors::new();
244 delta.insert(*peer, next);
245 lock(&self.watchers).retain(|tx| {
246 tx.unbounded_send(CursorsEvent::Advanced(delta.clone()))
247 .is_ok()
248 });
249 }
250 }
251}
252
253fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
257 m.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
258}
259
260#[async_trait]
261impl<R: Reducer, D: Db, T: Send + Sync, E: EventHandler<R::Event> + Send + Sync> HasCursors
262 for Processor<R, D, T, E>
263where
264 E::Publish: Send + Sync,
265{
266 async fn cursors(&self) -> Result<PeerCursors, SyncError> {
267 Ok(lock(&self.cursors).clone())
268 }
269
270 fn watch_cursors(&self) -> CursorStream {
271 let (tx, rx) = mpsc::unbounded();
272 let cursors = lock(&self.cursors);
277 let mut watchers = lock(&self.watchers);
278 let _ = tx.unbounded_send(CursorsEvent::Snapshot(cursors.clone()));
279 watchers.retain(|w| !w.is_closed()); watchers.push(tx);
281 Box::pin(rx)
282 }
283}
284
285#[async_trait]
286impl<R: Reducer, D: Db, T: LogTracker<R::Op>, E: EventHandler<R::Event> + Send + Sync>
287 LogProcessor<R::Op> for Processor<R, D, T, E>
288where
289 R::Error: std::error::Error + Send + Sync + 'static,
290 E::Publish: Send + Sync,
291{
292 async fn apply(
293 &self,
294 peer: Uuid,
295 index: u64,
296 entry: DecodedEntry<R::Op>,
297 ) -> Result<Applied, SyncError> {
298 if index < self.cached_cursor(&peer) {
300 return Ok(Applied { new: false });
301 }
302 let mut reducer = self.reducer.lock().await;
303 let cursor = self.cached_cursor(&peer);
309 if index < cursor {
310 return Ok(Applied { new: false });
311 }
312 if index > cursor {
313 return Err(SyncError::CursorMismatch {
314 expected_idx: cursor,
315 actual_idx: index,
316 });
317 }
318 let outcome: Result<Vec<R::Event>, ProcessorError<R::Error>> = match entry {
321 DecodedEntry::LogEntry(e) => self.ingest_entry(&mut reducer, &peer, index, &e).await,
322 DecodedEntry::Expunged(hash) => {
323 let outcome = self.ingest_expunged(&peer, index, &hash).await;
324 if outcome.is_ok() {
325 self.advance_cursor(&peer, index + 1);
326 }
327 outcome.map(|()| Vec::new())
328 }
329 };
330 drop(reducer);
332 match outcome {
333 Ok(events) => {
334 for event in events {
335 self.event_publish.publish(event);
336 }
337 Ok(Applied { new: true })
338 }
339 Err(ProcessorError::Db(DbError::UniqueViolation)) => Ok(Applied { new: false }),
342 Err(e) => Err(SyncError::Backend(Box::new(e))),
343 }
344 }
345}
346
347#[async_trait]
348impl<R: Reducer, D: Db, T: HistoryTracker<R::Op>, E: EventHandler<R::Event> + Send + Sync>
349 LogSource<R::Op> for Processor<R, D, T, E>
350where
351 R::Error: std::error::Error + Send + Sync + 'static,
352 E::Publish: Send + Sync,
353{
354 async fn read_since(
355 &self,
356 peer: Uuid,
357 from: u64,
358 ) -> Result<Vec<(u64, DecodedEntry<R::Op>)>, SyncError> {
359 const PAGE: u64 = 256;
361 self.tracker
362 .read_entries(&self.db, &peer, from, PAGE)
363 .await
364 .map_err(|e| SyncError::Backend(Box::new(e)))
365 }
366}
367
368#[async_trait]
372impl<R: Reducer, D: Db, T: LogTracker<R::Op>>
373 ubiquisync_core::store::Store<R::Op, ProcessorError<BoxError>, R::Event>
374 for Processor<R, D, T, EventBus<R::Event>>
375where
376 R::Event: RoutableEvent,
377 R::Error: std::error::Error + Send + Sync + 'static,
378 <R::Event as RoutableEvent>::Target: Send + Sync,
379{
380 async fn exec(
381 &self,
382 server_user_id: Option<Uuid>,
383 op: R::Op,
384 ) -> Result<(), ProcessorError<BoxError>> {
385 Processor::exec(self, server_user_id, op)
389 .await
390 .map_err(|e| match e {
391 ProcessorError::Reducer(r) => ProcessorError::Reducer(Box::new(r) as BoxError),
392 ProcessorError::Hlc(x) => ProcessorError::Hlc(x),
393 ProcessorError::Tracker(x) => ProcessorError::Tracker(x),
394 ProcessorError::Db(x) => ProcessorError::Db(x),
395 ProcessorError::Sync(x) => ProcessorError::Sync(x),
396 })
397 }
398
399 fn watch(&self, target: <R::Event as RoutableEvent>::Target) -> Subscription<R::Event> {
400 self.event_handler().subscribe(target)
401 }
402}
403
404#[async_trait]
405impl<R: Reducer, D: Db, T: LogTracker<R::Op>> SqlStore<R::Op, R::Event>
406 for Processor<R, D, T, EventBus<R::Event>>
407where
408 R::Event: RoutableEvent,
409 R::Error: std::error::Error + Send + Sync + 'static,
410 <R::Event as RoutableEvent>::Target: Send + Sync,
411{
412 async fn query(&self, sql: &str, params: &[DbValue]) -> Result<Vec<DbRow>, DbError> {
413 self.db().query(sql, params).await
414 }
415
416 fn dialect(&self) -> crate::dialect::SqlDialect {
417 self.db().dialect()
418 }
419}
420
421pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
424
425#[derive(Debug, thiserror::Error)]
427pub enum ProcessorError<E> {
428 #[error("reducer error: {0}")]
432 Reducer(E),
433 #[error("hlc error: {0}")]
435 Hlc(#[from] HlcError<DbError>),
436 #[error("tracker error: {0}")]
438 Tracker(#[from] LogTrackerError),
439 #[error("db error: {0}")]
441 Db(#[from] DbError),
442 #[error("sync error: {0}")]
444 Sync(#[from] SyncError),
445}