Skip to main content

reddb_server/storage/wal/
rmgr.rs

1//! WAL resource manager dispatch — Post-MVP credibility item.
2//!
3//! Mirrors PG's `xlogrecord.h::RmgrId` + `xlog.h::RmgrTable[]`
4//! pattern. Each subsystem (heap, btree, vector, graph,
5//! timeseries, queue) registers a *resource manager* that owns
6//! redo / undo / desc logic for its WAL records. The recovery
7//! loop dispatches by `RmgrId` instead of a giant
8//! `match record_type` arm.
9//!
10//! ## Why
11//!
12//! reddb's `wal/recovery.rs` currently has one match per record
13//! type. Adding a new subsystem (e.g. probabilistic data
14//! structures) means editing recovery.rs and touching every
15//! adjacent arm. With rmgr dispatch, a new subsystem just adds
16//! a `ResourceManager` impl and registers it at startup.
17//!
18//! ## Design
19//!
20//! - `RmgrId` is a single-byte tag stored at the start of every
21//!   WAL record body (after the common header).
22//! - `ResourceManager` is a trait with three methods:
23//!   - `redo(record)` — apply during forward recovery.
24//!   - `undo(record)` — apply during transaction abort.
25//!   - `desc(record)` — produce a human-readable string for
26//!     diagnostics / pg_waldump-style tooling.
27//! - `RmgrRegistry` is a fixed array indexed by `RmgrId` that
28//!   the recovery loop consults to dispatch.
29//!
30//! ## Wiring
31//!
32//! Phase post-MVP wiring:
33//! 1. Each subsystem (heap, btree, vector, …) implements
34//!    `ResourceManager` and registers itself at startup via
35//!    `rmgr::register(RmgrId::Heap, Box::new(HeapRmgr))`.
36//! 2. `wal/recovery.rs::redo_loop` calls `rmgr::dispatch(record)`
37//!    instead of pattern-matching on the record kind.
38//! 3. Each existing match arm becomes the body of a `redo` impl.
39//!
40//! Today this module is the framework only — actual subsystem
41//! impls live in their respective modules.
42
43use std::sync::OnceLock;
44
45/// Single-byte resource manager identifier. Matches PG's
46/// RmgrId space (8-bit) so reddb shares the wire format
47/// vocabulary if we ever ship a pg-compat WAL reader.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
49#[repr(u8)]
50pub enum RmgrId {
51    /// Heap rows (table data inserts/updates/deletes).
52    Heap = 10,
53    /// B-tree index records.
54    Btree = 11,
55    /// Vector index updates (HNSW / IVF / flat).
56    Vector = 12,
57    /// Graph subsystem (nodes, edges, properties).
58    Graph = 13,
59    /// Timeseries chunks + downsample state.
60    Timeseries = 14,
61    /// Queue subsystem (push, pop, ack).
62    Queue = 15,
63    /// Document store (put, delete, schema-less updates).
64    Document = 16,
65    /// KV store (set, delete, ttl).
66    Kv = 17,
67    /// Probabilistic data structures (HLL, CMS, Cuckoo, Bloom).
68    Probabilistic = 18,
69    /// Transaction commit / abort markers.
70    Transaction = 50,
71    /// Checkpoint metadata.
72    Checkpoint = 51,
73    /// Cross-cutting CDC / replication markers.
74    Replication = 52,
75    /// Reserved for future subsystems. Records carrying this
76    /// tag are skipped during recovery instead of failing.
77    Reserved = 255,
78}
79
80impl RmgrId {
81    /// Try to convert a raw byte to an `RmgrId`. Returns `None`
82    /// for unknown tags so recovery can decide whether to skip
83    /// or hard-fail.
84    pub fn from_u8(b: u8) -> Option<Self> {
85        match b {
86            10 => Some(Self::Heap),
87            11 => Some(Self::Btree),
88            12 => Some(Self::Vector),
89            13 => Some(Self::Graph),
90            14 => Some(Self::Timeseries),
91            15 => Some(Self::Queue),
92            16 => Some(Self::Document),
93            17 => Some(Self::Kv),
94            18 => Some(Self::Probabilistic),
95            50 => Some(Self::Transaction),
96            51 => Some(Self::Checkpoint),
97            52 => Some(Self::Replication),
98            255 => Some(Self::Reserved),
99            _ => None,
100        }
101    }
102
103    /// Single-byte wire encoding.
104    pub fn to_u8(self) -> u8 {
105        self as u8
106    }
107
108    /// Stable human-readable name for diagnostics.
109    pub fn name(self) -> &'static str {
110        match self {
111            Self::Heap => "heap",
112            Self::Btree => "btree",
113            Self::Vector => "vector",
114            Self::Graph => "graph",
115            Self::Timeseries => "timeseries",
116            Self::Queue => "queue",
117            Self::Document => "document",
118            Self::Kv => "kv",
119            Self::Probabilistic => "probabilistic",
120            Self::Transaction => "transaction",
121            Self::Checkpoint => "checkpoint",
122            Self::Replication => "replication",
123            Self::Reserved => "reserved",
124        }
125    }
126}
127
128/// Errors raised by resource manager dispatch.
129#[derive(Debug)]
130pub enum RmgrError {
131    /// No resource manager registered for this id.
132    Unregistered(RmgrId),
133    /// Subsystem-specific failure during redo / undo.
134    SubsystemFailure { rmgr: RmgrId, message: String },
135}
136
137impl std::fmt::Display for RmgrError {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        match self {
140            Self::Unregistered(id) => write!(f, "no resource manager for {}", id.name()),
141            Self::SubsystemFailure { rmgr, message } => {
142                write!(f, "{} rmgr failed: {message}", rmgr.name())
143            }
144        }
145    }
146}
147
148impl std::error::Error for RmgrError {}
149
150/// Trait implemented by each subsystem that ships WAL records.
151/// Every method receives the raw record bytes (including the
152/// 1-byte rmgr id prefix) so the impl can decode in its
153/// preferred format.
154pub trait ResourceManager: Send + Sync {
155    /// Apply `record` to the live state during forward recovery.
156    /// Called by `wal/recovery.rs::redo_loop` for every record
157    /// in the WAL whose LSN is past the last checkpoint.
158    fn redo(&self, record: &[u8]) -> Result<(), RmgrError>;
159
160    /// Apply `record`'s inverse to the live state during
161    /// transaction abort. Most subsystems can leave this as
162    /// the default no-op when their state is rebuilt
163    /// idempotently from the WAL during recovery — only
164    /// subsystems that carry side-effects across transactions
165    /// (queue ack, replication ship) need to override.
166    fn undo(&self, _record: &[u8]) -> Result<(), RmgrError> {
167        Ok(())
168    }
169
170    /// Format a single record for diagnostic display
171    /// (`reddb-waldump`-style tooling). Default returns a
172    /// generic placeholder when the impl doesn't bother.
173    fn desc(&self, record: &[u8]) -> String {
174        format!("({} bytes opaque)", record.len())
175    }
176
177    /// Stable identifier for logging. Returns the rmgr's own
178    /// `RmgrId::name()`. Trait method so trait objects can
179    /// answer without an extra `dyn Any` cast.
180    fn name(&self) -> &'static str;
181}
182
183/// Process-wide resource manager registry. Indexed by
184/// `RmgrId.to_u8()` so dispatch is a single array lookup.
185/// `OnceLock` ensures registration happens once at startup
186/// and is then read-only — no runtime locking on the hot path.
187static REGISTRY: OnceLock<RmgrRegistry> = OnceLock::new();
188
189/// Read-only table of resource managers indexed by RmgrId.
190pub struct RmgrRegistry {
191    table: Vec<Option<Box<dyn ResourceManager>>>,
192}
193
194impl RmgrRegistry {
195    /// Build a registry of `capacity` slots. 256 is the natural
196    /// choice since `RmgrId` is u8.
197    pub fn with_capacity(capacity: usize) -> Self {
198        let mut table = Vec::with_capacity(capacity);
199        for _ in 0..capacity {
200            table.push(None);
201        }
202        Self { table }
203    }
204
205    /// Insert a resource manager at the given id. Replaces any
206    /// existing entry. Builders typically chain `.register()`
207    /// calls for each subsystem before sealing into a static.
208    pub fn register(mut self, id: RmgrId, rmgr: Box<dyn ResourceManager>) -> Self {
209        let idx = id.to_u8() as usize;
210        if idx >= self.table.len() {
211            self.table.resize_with(idx + 1, || None);
212        }
213        self.table[idx] = Some(rmgr);
214        self
215    }
216
217    /// Look up a resource manager by id.
218    pub fn get(&self, id: RmgrId) -> Option<&dyn ResourceManager> {
219        self.table
220            .get(id.to_u8() as usize)
221            .and_then(|slot| slot.as_deref())
222    }
223
224    /// Dispatch a record to its registered manager's `redo`.
225    /// Reads the first byte of `record` as the `RmgrId` tag.
226    pub fn dispatch_redo(&self, record: &[u8]) -> Result<(), RmgrError> {
227        let id_byte = record.first().copied().unwrap_or(0);
228        let id = RmgrId::from_u8(id_byte).ok_or(RmgrError::Unregistered(RmgrId::Reserved))?;
229        match self.get(id) {
230            Some(rmgr) => rmgr.redo(record),
231            None => Err(RmgrError::Unregistered(id)),
232        }
233    }
234}
235
236/// Install the process-wide registry. Call once during
237/// `Database::open` after every subsystem has had a chance to
238/// register its rmgr. Subsequent calls are no-ops (OnceLock).
239pub fn install(registry: RmgrRegistry) -> Result<(), RmgrError> {
240    REGISTRY
241        .set(registry)
242        .map_err(|_| RmgrError::SubsystemFailure {
243            rmgr: RmgrId::Reserved,
244            message: "rmgr registry already installed".to_string(),
245        })
246}
247
248/// Look up the installed registry. Returns `None` until
249/// `install()` is called — recovery code should panic in that
250/// case because no recovery is possible without rmgrs.
251pub fn registry() -> Option<&'static RmgrRegistry> {
252    REGISTRY.get()
253}
254
255/// Dispatch a record to its registered manager. Convenience
256/// wrapper used by the recovery loop.
257pub fn dispatch_redo(record: &[u8]) -> Result<(), RmgrError> {
258    match registry() {
259        Some(reg) => reg.dispatch_redo(record),
260        None => Err(RmgrError::SubsystemFailure {
261            rmgr: RmgrId::Reserved,
262            message: "rmgr registry not installed".to_string(),
263        }),
264    }
265}