Skip to main content

ubiquisync_sql/
processor.rs

1//! The ingestion driver: applies one log entry atomically, and exposes the
2//! stored log back as a [`Replica`](ubiquisync_core::sync::Replica).
3//!
4//! A `Processor` pairs a [`Reducer`](crate::reducer) with an
5//! [`HlcService`](ubiquisync_core::hlc) clock and a
6//! [`LogTracker`](crate::tracker): ingesting an entry advances the clock, records
7//! it via the tracker, and applies the reducer's writes in one all-or-nothing
8//! batch. It always implements [`HasCursors`] and [`LogProcessor`]; when its
9//! tracker keeps full history ([`HistoryTracker`]) it also implements
10//! [`LogSource`], and so is a `Replica`.
11
12use std::sync::Mutex;
13
14use async_trait::async_trait;
15use futures::channel::mpsc;
16use futures::lock::Mutex as AsyncMutex;
17use ubiquisync_core::{
18    codec::DecodedEntry,
19    event::{EventBus, EventHandler, Publisher, RoutableEvent, Subscription},
20    hlc::{HlcError, HlcService, Timestamp, wall_ms},
21    log_entry::LogEntry,
22    sync::{
23        Applied, CursorStream, CursorsEvent, HasCursors, LogProcessor, LogSource, PeerCursors,
24        SyncError,
25    },
26    uuid::Uuid,
27};
28
29use crate::{
30    db::{Db, DbError, DbRow, DbValue},
31    hlc_storage::SqlHlcStorage,
32    reducer::Reducer,
33    store::SqlStore,
34    tracker::{HistoryTracker, LogTracker, LogTrackerError},
35};
36
37/// Drives a [`Reducer`] over a [`Db`]: applies local writes ([`exec`](Self::exec))
38/// and ingests remote log entries, advancing the HLC and per-peer cursors and
39/// emitting change events through its [`EventHandler`]. This is the concrete
40/// engine behind the [`Store`](ubiquisync_core::store::Store)/[`SqlStore`] surface
41/// (open one with [`open`](Self::open)); it also implements the sync traits.
42pub struct Processor<R: Reducer, D: Db, T, E: EventHandler<R::Event>> {
43    self_id: Uuid,
44    // Behind an async mutex: it hands out `&mut` for the reducer's `prepare`, and
45    // holding it across an apply serializes writes (single-writer log store).
46    reducer: AsyncMutex<R>,
47    db: D,
48    hlc: HlcService<SqlHlcStorage>,
49    tracker: T,
50    // In-memory version vector, seeded at open and advanced on each apply. Backs
51    // the idempotent-drop fast path and the `watch_cursors` broadcast.
52    cursors: Mutex<PeerCursors>,
53    watchers: Mutex<Vec<mpsc::UnboundedSender<CursorsEvent>>>,
54    event_publish: E::Publish,
55    event_handler: E,
56}
57
58#[allow(dead_code)]
59impl<R: Reducer, D: Db, T: LogTracker<R::Op>, E: EventHandler<R::Event>> Processor<R, D, T, E> {
60    /// Open a processor against `db`: set up HLC storage and the tracker (both
61    /// namespaced by `prefix`), seed the clock and cursor view from persisted
62    /// state, and take ownership of `reducer`.
63    pub async fn open(
64        reducer: R,
65        db: D,
66        prefix: &str,
67        self_id: Uuid,
68    ) -> Result<Self, ProcessorError<R::Error>> {
69        let hlc = HlcService::open(SqlHlcStorage::open(&db, prefix).await?)?;
70        let tracker = T::init(&db, prefix).await?;
71        let cursors = tracker.all_cursors(&db).await?;
72        let (event_publish, event_handler) = E::init();
73        Ok(Self {
74            reducer: AsyncMutex::new(reducer),
75            self_id,
76            db,
77            hlc,
78            tracker,
79            event_publish,
80            event_handler,
81            cursors: Mutex::new(cursors),
82            watchers: Mutex::new(Vec::new()),
83        })
84    }
85
86    /// Apply a local write: mint a fresh entry under `self_id` and ingest it,
87    /// advancing self's cursor.
88    pub async fn exec(
89        &self,
90        server_user_id: Option<Uuid>,
91        op: R::Op,
92    ) -> Result<(), ProcessorError<R::Error>> {
93        let mut reducer = self.reducer.lock().await;
94        let entry_idx = self.cached_cursor(&self.self_id);
95        let events = self
96            .ingest_entry_or_local(
97                &mut reducer,
98                &self.self_id,
99                entry_idx,
100                None,
101                server_user_id,
102                &op,
103            )
104            .await?;
105        // Emit outside the reducer lock (see `ingest_entry_or_local`).
106        drop(reducer);
107        for event in events {
108            self.event_publish.publish(event);
109        }
110        Ok(())
111    }
112
113    /// The handler this processor emits into — subscribe off it.
114    pub fn event_handler(&self) -> &E {
115        &self.event_handler
116    }
117
118    /// The backend this processor writes through — for tests and diagnostics.
119    pub(crate) fn db(&self) -> &D {
120        &self.db
121    }
122
123    /// Ingest one entry into the open write section — caller holds the reducer
124    /// lock. Advances the HLC, records via the tracker, and applies the reducer's
125    /// writes in one all-or-nothing batch. No dedup: a repeated
126    /// `(peer_id, entry_idx)` fails the tracker's unique key and rolls back.
127    async fn ingest_entry(
128        &self,
129        reducer: &mut R,
130        peer_id: &Uuid,
131        entry_idx: u64,
132        entry: &LogEntry<R::Op>,
133    ) -> Result<Vec<R::Event>, ProcessorError<R::Error>> {
134        self.ingest_entry_or_local(
135            reducer,
136            peer_id,
137            entry_idx,
138            Some(entry.timestamp),
139            entry.server_user_id,
140            &entry.op,
141        )
142        .await
143    }
144
145    async fn ingest_entry_or_local(
146        &self,
147        reducer: &mut R,
148        peer_id: &Uuid,
149        entry_idx: u64,
150        timestamp: Option<Timestamp>,
151        server_user_id: Option<Uuid>,
152        op: &R::Op,
153    ) -> Result<Vec<R::Event>, ProcessorError<R::Error>> {
154        let prepare_state = reducer
155            .prepare(&self.db, op)
156            .await
157            .map_err(ProcessorError::Reducer)?;
158        let mut batch = self.db.new_batch();
159        let timestamp = if let Some(timestamp) = timestamp {
160            self.hlc.observe(timestamp, wall_ms(), batch.as_mut())?;
161            timestamp
162        } else {
163            self.hlc.now(batch.as_mut())?
164        };
165        self.tracker.track_one(
166            peer_id,
167            entry_idx,
168            timestamp,
169            server_user_id,
170            op,
171            batch.as_mut(),
172        )?;
173        let apply_state = reducer
174            .apply(batch.as_mut(), timestamp, op, prepare_state)
175            .map_err(ProcessorError::Reducer)?;
176        let batch_result = batch.commit().await?;
177        let event = reducer
178            .post_apply(apply_state, &batch_result)
179            .map_err(ProcessorError::Reducer)?;
180        // Advance under the reducer lock so the cursor reflects this commit before
181        // the next apply checks it. The caller emits `event` *after* releasing the
182        // lock — user-provided `publish` must not run in the write critical section.
183        self.advance_cursor(peer_id, entry_idx + 1);
184        Ok(event)
185    }
186    /// Record the expunged marker at `(peer_id, entry_idx)` — caller holds the
187    /// reducer lock. No clock tick and no reducer work: just the tracker row that
188    /// occupies the stream index.
189    async fn ingest_expunged(
190        &self,
191        peer_id: &Uuid,
192        entry_idx: u64,
193        hash: &blake3::Hash,
194    ) -> Result<(), ProcessorError<R::Error>> {
195        let mut batch = self.db.new_batch();
196        self.tracker
197            .track_expunged(peer_id, entry_idx, hash, batch.as_mut())?;
198        batch.commit().await?;
199        Ok(())
200    }
201
202    /// Raw ingest at a caller-given index — no dedup gate
203    /// ([`apply`](LogProcessor::apply)'s job). Advances the cursor like any
204    /// ingest. Test-only; local writes go through `exec`.
205    #[cfg(any(test, feature = "test-support"))]
206    pub(crate) async fn process_one(
207        &self,
208        peer_id: &Uuid,
209        entry_idx: u64,
210        entry: &LogEntry<R::Op>,
211    ) -> Result<(), ProcessorError<R::Error>> {
212        let mut reducer = self.reducer.lock().await;
213        let events = self
214            .ingest_entry(&mut reducer, peer_id, entry_idx, entry)
215            .await?;
216        drop(reducer);
217        for event in events {
218            self.event_publish.publish(event);
219        }
220        Ok(())
221    }
222}
223
224#[allow(dead_code)]
225impl<R: Reducer, D: Db, T, E: EventHandler<R::Event>> Processor<R, D, T, E> {
226    fn cached_cursor(&self, peer: &Uuid) -> u64 {
227        lock(&self.cursors).get(peer).copied().unwrap_or(0)
228    }
229
230    /// Raise `peer`'s cached cursor and broadcast the advance to watchers.
231    fn advance_cursor(&self, peer: &Uuid, next: u64) {
232        let advanced = {
233            let mut cursors = lock(&self.cursors);
234            let slot = cursors.entry(*peer).or_insert(0);
235            if next > *slot {
236                *slot = next;
237                true
238            } else {
239                false
240            }
241        };
242        if advanced {
243            let mut delta = PeerCursors::new();
244            delta.insert(*peer, next);
245            lock(&self.watchers).retain(|tx| {
246                tx.unbounded_send(CursorsEvent::Advanced(delta.clone()))
247                    .is_ok()
248            });
249        }
250    }
251}
252
253/// Lock a sync mutex, recovering the guard if a prior holder panicked: the
254/// protected cursor/watcher state can't be left corrupt by a panic, so this
255/// beats propagating a poison to every subsystem sharing the processor.
256fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
257    m.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
258}
259
260#[async_trait]
261impl<R: Reducer, D: Db, T: Send + Sync, E: EventHandler<R::Event> + Send + Sync> HasCursors
262    for Processor<R, D, T, E>
263where
264    E::Publish: Send + Sync,
265{
266    async fn cursors(&self) -> Result<PeerCursors, SyncError> {
267        Ok(lock(&self.cursors).clone())
268    }
269
270    fn watch_cursors(&self) -> CursorStream {
271        let (tx, rx) = mpsc::unbounded();
272        // Hold both locks across snapshot + registration (cursors before
273        // watchers, the order advance_cursor also acquires them) so a concurrent
274        // advance can't slip its mutation and broadcast between the two and leave
275        // this subscriber stale. Neither section awaits.
276        let cursors = lock(&self.cursors);
277        let mut watchers = lock(&self.watchers);
278        let _ = tx.unbounded_send(CursorsEvent::Snapshot(cursors.clone()));
279        watchers.retain(|w| !w.is_closed()); // drop subscribers that went away
280        watchers.push(tx);
281        Box::pin(rx)
282    }
283}
284
285#[async_trait]
286impl<R: Reducer, D: Db, T: LogTracker<R::Op>, E: EventHandler<R::Event> + Send + Sync>
287    LogProcessor<R::Op> for Processor<R, D, T, E>
288where
289    R::Error: std::error::Error + Send + Sync + 'static,
290    E::Publish: Send + Sync,
291{
292    async fn apply(
293        &self,
294        peer: Uuid,
295        index: u64,
296        entry: DecodedEntry<R::Op>,
297    ) -> Result<Applied, SyncError> {
298        // Fast path: drop an already-applied re-delivery without taking the lock.
299        if index < self.cached_cursor(&peer) {
300            return Ok(Applied { new: false });
301        }
302        let mut reducer = self.reducer.lock().await;
303        // Re-check under the lock and enforce contiguity: a concurrent apply may
304        // have advanced past `index`. `< cursor` is a re-delivery (drop);
305        // `> cursor` is a gap the caller must not create — the scalar cursor
306        // can't hold a hole, so reject rather than jump past the missing entries.
307        // `== cursor` is the only-once gate, so the reducer need not be idempotent.
308        let cursor = self.cached_cursor(&peer);
309        if index < cursor {
310            return Ok(Applied { new: false });
311        }
312        if index > cursor {
313            return Err(SyncError::CursorMismatch {
314                expected_idx: cursor,
315                actual_idx: index,
316            });
317        }
318        // `ingest_entry_or_local` advances the cursor after its commit; the
319        // expunged path does no reducer work, so it advances here instead.
320        let outcome: Result<Vec<R::Event>, ProcessorError<R::Error>> = match entry {
321            DecodedEntry::LogEntry(e) => self.ingest_entry(&mut reducer, &peer, index, &e).await,
322            DecodedEntry::Expunged(hash) => {
323                let outcome = self.ingest_expunged(&peer, index, &hash).await;
324                if outcome.is_ok() {
325                    self.advance_cursor(&peer, index + 1);
326                }
327                outcome.map(|()| Vec::new())
328            }
329        };
330        // Emit outside the reducer lock (see `ingest_entry_or_local`).
331        drop(reducer);
332        match outcome {
333            Ok(events) => {
334                for event in events {
335                    self.event_publish.publish(event);
336                }
337                Ok(Applied { new: true })
338            }
339            // Backstop if the gate is bypassed (a direct process_one, or an
340            // unseeded cache): the batch rolled back, so it's a no-op.
341            Err(ProcessorError::Db(DbError::UniqueViolation)) => Ok(Applied { new: false }),
342            Err(e) => Err(SyncError::Backend(Box::new(e))),
343        }
344    }
345}
346
347#[async_trait]
348impl<R: Reducer, D: Db, T: HistoryTracker<R::Op>, E: EventHandler<R::Event> + Send + Sync>
349    LogSource<R::Op> for Processor<R, D, T, E>
350where
351    R::Error: std::error::Error + Send + Sync + 'static,
352    E::Publish: Send + Sync,
353{
354    async fn read_since(
355        &self,
356        peer: Uuid,
357        from: u64,
358    ) -> Result<Vec<(u64, DecodedEntry<R::Op>)>, SyncError> {
359        // One oplog page per call; the caller loops with an advancing `from`.
360        const PAGE: u64 = 256;
361        self.tracker
362            .read_entries(&self.db, &peer, from, PAGE)
363            .await
364            .map_err(|e| SyncError::Backend(Box::new(e)))
365    }
366}
367
368// The engine implements `Store` over its *own* op/event types — no projection.
369// A domain wrapper (e.g. a `-tables` store) layers `Into`/`TryFrom` on top to
370// expose an app slice; that stays out of here.
371#[async_trait]
372impl<R: Reducer, D: Db, T: LogTracker<R::Op>>
373    ubiquisync_core::store::Store<R::Op, ProcessorError<BoxError>, R::Event>
374    for Processor<R, D, T, EventBus<R::Event>>
375where
376    R::Event: RoutableEvent,
377    R::Error: std::error::Error + Send + Sync + 'static,
378    <R::Event as RoutableEvent>::Target: Send + Sync,
379{
380    async fn exec(
381        &self,
382        server_user_id: Option<Uuid>,
383        op: R::Op,
384    ) -> Result<(), ProcessorError<BoxError>> {
385        // Inherent `Processor::exec` (shadows this trait method), then erase the
386        // reducer error behind `BoxError` so the Store surface is one uniform type
387        // across reducers while keeping the `ProcessorError` variants matchable.
388        Processor::exec(self, server_user_id, op)
389            .await
390            .map_err(|e| match e {
391                ProcessorError::Reducer(r) => ProcessorError::Reducer(Box::new(r) as BoxError),
392                ProcessorError::Hlc(x) => ProcessorError::Hlc(x),
393                ProcessorError::Tracker(x) => ProcessorError::Tracker(x),
394                ProcessorError::Db(x) => ProcessorError::Db(x),
395                ProcessorError::Sync(x) => ProcessorError::Sync(x),
396            })
397    }
398
399    fn watch(&self, target: <R::Event as RoutableEvent>::Target) -> Subscription<R::Event> {
400        self.event_handler().subscribe(target)
401    }
402}
403
404#[async_trait]
405impl<R: Reducer, D: Db, T: LogTracker<R::Op>> SqlStore<R::Op, R::Event>
406    for Processor<R, D, T, EventBus<R::Event>>
407where
408    R::Event: RoutableEvent,
409    R::Error: std::error::Error + Send + Sync + 'static,
410    <R::Event as RoutableEvent>::Target: Send + Sync,
411{
412    async fn query(&self, sql: &str, params: &[DbValue]) -> Result<Vec<DbRow>, DbError> {
413        self.db().query(sql, params).await
414    }
415
416    fn dialect(&self) -> crate::dialect::SqlDialect {
417        self.db().dialect()
418    }
419}
420
421/// A reducer error erased to a trait object — the uniform reducer-error type the
422/// [`Store`](ubiquisync_core::store::Store) surface exposes across reducers.
423pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
424
425/// A failure while ingesting an entry, tagged by the stage that produced it.
426#[derive(Debug, thiserror::Error)]
427pub enum ProcessorError<E> {
428    /// The reducer failed; `E` is its own error type.
429    // No `#[from]`: a blanket `From<E>` would clash with `From<DbError>` when a
430    // reducer sets `Error = DbError`. Mapped explicitly at the call sites.
431    #[error("reducer error: {0}")]
432    Reducer(E),
433    /// Advancing or persisting the HLC failed.
434    #[error("hlc error: {0}")]
435    Hlc(#[from] HlcError<DbError>),
436    /// The tracker failed to record the entry.
437    #[error("tracker error: {0}")]
438    Tracker(#[from] LogTrackerError),
439    /// A backend operation failed — e.g. the batch commit was rejected.
440    #[error("db error: {0}")]
441    Db(#[from] DbError),
442    /// A [`SyncError`] surfaced from the sync layer.
443    #[error("sync error: {0}")]
444    Sync(#[from] SyncError),
445}