Skip to main content

ubiquisync_sql/
reducer.rs

1//! Op → SQL translation: the [`Reducer`] trait a data domain implements to turn
2//! each of its ops into the backend writes that materialize it.
3//!
4//! Kept as its own module so the [ingestion processor](crate::processor) can
5//! drive any reducer generically. See [`Reducer`] for the three-phase
6//! prepare/apply/post_apply contract that lets one op map onto every backend.
7
8use ubiquisync_core::hlc::Timestamp;
9
10use crate::db::{Db, DbBatch, DbStatementResult};
11
12/// Translates a single op into the SQL writes that materialize it, in three
13/// phases so the work maps onto every backend — including ones with no
14/// interactive transaction (e.g. D1's `batch()`):
15///
16/// 1. [`prepare`](Reducer::prepare) runs *before* the batch. It is the only
17///    phase allowed to read or issue DDL, and it returns the
18///    [`ReadState`](Reducer::ReadState) `apply` needs — so every read is hoisted
19///    out of the batch.
20/// 2. [`apply`](Reducer::apply) emits the op's mutation statements into the open
21///    batch. It is pure and read-free (it consumes the `ReadState`), so the
22///    batch stays a flat, declarative statement list.
23/// 3. [`post_apply`](Reducer::post_apply) runs *after* the batch commits, when
24///    `RETURNING` rows finally exist, and turns the results into zero or more
25///    events — empty when the op changed nothing observable.
26/// # Idempotency
27///
28/// Whether a reducer must apply idempotently is not its own choice — it depends
29/// on the [`LogTracker`](crate::tracker::LogTracker) it is paired with. Behind a
30/// tracker that rejects a repeated `(peer_id, entry_idx)` (e.g.
31/// [`LogIndexTracker`](crate::tracker::LogIndexTracker)) a redundant apply rolls
32/// back, so the reducer need not be idempotent; behind one that does not reject,
33/// it must be. See the tracker's
34/// [duplicate-rejection contract](crate::tracker::LogTracker#duplicate-rejection).
35#[async_trait::async_trait]
36pub trait Reducer: Send {
37    /// The op vocabulary this reducer materializes (e.g. the table op enum).
38    type Op: Send + Sync;
39    /// Data read in [`prepare`](Reducer::prepare) and consumed by
40    /// [`apply`](Reducer::apply): e.g. a card's prior FSRS state, or its full
41    /// review history when an out-of-order op forces a recompute. `()` when
42    /// `apply` needs nothing read.
43    type ReadState;
44    /// Carried from [`apply`](Reducer::apply) to
45    /// [`post_apply`](Reducer::post_apply): the `StmtId`s of the emitted
46    /// statements plus any op-derived data needed to build the event.
47    type ApplyState: Send;
48    /// The change event produced for an applied op, for downstream observers.
49    type Event: Send + Clone;
50    /// Error surfaced from any phase.
51    type Error;
52
53    /// Reconcile the schema needed by `op` (create/alter tables, refresh any
54    /// cache) and read whatever `apply` will need, returning it as a
55    /// [`ReadState`](Reducer::ReadState). Runs outside the batch — DDL is
56    /// additive and safe to commit on its own, and hoisting reads here is what
57    /// keeps `apply` pure.
58    async fn prepare(&mut self, db: &dyn Db, op: &Self::Op)
59    -> Result<Self::ReadState, Self::Error>;
60
61    /// Emit the statements that materialize `op` at `timestamp` into `batch`,
62    /// using only `op`, the cached schema, and `read`. Read-free, so it stays
63    /// expressible as a declarative batch. The returned
64    /// [`ApplyState`](Reducer::ApplyState) is provisional until `batch` commits.
65    fn apply(
66        &self,
67        batch: &mut dyn DbBatch,
68        timestamp: Timestamp,
69        op: &Self::Op,
70        read: Self::ReadState,
71    ) -> Result<Self::ApplyState, Self::Error>;
72
73    /// Build the events once the batch has committed — empty when the op changed
74    /// nothing observable (e.g. an upsert that lost every column's LWW, or a
75    /// write to a table with no observer-facing name), usually one, or several
76    /// when a single op has multiple logical effects. `batch_result` holds the
77    /// whole batch's per-statement results in add order; locate this op's
78    /// `RETURNING` rows via the `StmtId`s stored in `apply_state`.
79    fn post_apply(
80        &self,
81        apply_state: Self::ApplyState,
82        batch_result: &[DbStatementResult],
83    ) -> Result<Vec<Self::Event>, Self::Error>;
84}