ubiquisync_sql/reducer.rs
1//! Op → SQL translation: the [`Reducer`] trait a data domain implements to turn
2//! each of its ops into the backend writes that materialize it.
3//!
4//! Kept as its own module so the [ingestion processor](crate::processor) can
5//! drive any reducer generically. See [`Reducer`] for the three-phase
6//! prepare/apply/post_apply contract that lets one op map onto every backend.
7
8use ubiquisync_core::hlc::Timestamp;
9
10use crate::db::{Db, DbBatch, DbStatementResult};
11
12/// Translates a single op into the SQL writes that materialize it, in three
13/// phases so the work maps onto every backend — including ones with no
14/// interactive transaction (e.g. D1's `batch()`):
15///
16/// 1. [`prepare`](Reducer::prepare) runs *before* the batch. It is the only
17/// phase allowed to read or issue DDL, and it returns the
18/// [`ReadState`](Reducer::ReadState) `apply` needs — so every read is hoisted
19/// out of the batch.
20/// 2. [`apply`](Reducer::apply) emits the op's mutation statements into the open
21/// batch. It is pure and read-free (it consumes the `ReadState`), so the
22/// batch stays a flat, declarative statement list.
23/// 3. [`post_apply`](Reducer::post_apply) runs *after* the batch commits, when
24/// `RETURNING` rows finally exist, and turns the results into zero or more
25/// events — empty when the op changed nothing observable.
26/// # Idempotency
27///
28/// Whether a reducer must apply idempotently is not its own choice — it depends
29/// on the [`LogTracker`](crate::tracker::LogTracker) it is paired with. Behind a
30/// tracker that rejects a repeated `(peer_id, entry_idx)` (e.g.
31/// [`LogIndexTracker`](crate::tracker::LogIndexTracker)) a redundant apply rolls
32/// back, so the reducer need not be idempotent; behind one that does not reject,
33/// it must be. See the tracker's
34/// [duplicate-rejection contract](crate::tracker::LogTracker#duplicate-rejection).
35#[async_trait::async_trait]
36pub trait Reducer: Send {
37 /// The op vocabulary this reducer materializes (e.g. the table op enum).
38 type Op: Send + Sync;
39 /// Data read in [`prepare`](Reducer::prepare) and consumed by
40 /// [`apply`](Reducer::apply): e.g. a card's prior FSRS state, or its full
41 /// review history when an out-of-order op forces a recompute. `()` when
42 /// `apply` needs nothing read.
43 type ReadState;
44 /// Carried from [`apply`](Reducer::apply) to
45 /// [`post_apply`](Reducer::post_apply): the `StmtId`s of the emitted
46 /// statements plus any op-derived data needed to build the event.
47 type ApplyState: Send;
48 /// The change event produced for an applied op, for downstream observers.
49 type Event: Send + Clone;
50 /// Error surfaced from any phase.
51 type Error;
52
53 /// Reconcile the schema needed by `op` (create/alter tables, refresh any
54 /// cache) and read whatever `apply` will need, returning it as a
55 /// [`ReadState`](Reducer::ReadState). Runs outside the batch — DDL is
56 /// additive and safe to commit on its own, and hoisting reads here is what
57 /// keeps `apply` pure.
58 async fn prepare(&mut self, db: &dyn Db, op: &Self::Op)
59 -> Result<Self::ReadState, Self::Error>;
60
61 /// Emit the statements that materialize `op` at `timestamp` into `batch`,
62 /// using only `op`, the cached schema, and `read`. Read-free, so it stays
63 /// expressible as a declarative batch. The returned
64 /// [`ApplyState`](Reducer::ApplyState) is provisional until `batch` commits.
65 fn apply(
66 &self,
67 batch: &mut dyn DbBatch,
68 timestamp: Timestamp,
69 op: &Self::Op,
70 read: Self::ReadState,
71 ) -> Result<Self::ApplyState, Self::Error>;
72
73 /// Build the events once the batch has committed — empty when the op changed
74 /// nothing observable (e.g. an upsert that lost every column's LWW, or a
75 /// write to a table with no observer-facing name), usually one, or several
76 /// when a single op has multiple logical effects. `batch_result` holds the
77 /// whole batch's per-statement results in add order; locate this op's
78 /// `RETURNING` rows via the `StmtId`s stored in `apply_state`.
79 fn post_apply(
80 &self,
81 apply_state: Self::ApplyState,
82 batch_result: &[DbStatementResult],
83 ) -> Result<Vec<Self::Event>, Self::Error>;
84}