reddb_server/storage/wal/rmgr.rs
1//! WAL resource manager dispatch — Post-MVP credibility item.
2//!
3//! Mirrors PG's `xlogrecord.h::RmgrId` + `xlog.h::RmgrTable[]`
4//! pattern. Each subsystem (heap, btree, vector, graph,
5//! timeseries, queue) registers a *resource manager* that owns
6//! redo / undo / desc logic for its WAL records. The recovery
7//! loop dispatches by `RmgrId` instead of a giant
8//! `match record_type` arm.
9//!
10//! ## Why
11//!
12//! reddb's `wal/recovery.rs` currently has one match per record
13//! type. Adding a new subsystem (e.g. probabilistic data
14//! structures) means editing recovery.rs and touching every
15//! adjacent arm. With rmgr dispatch, a new subsystem just adds
16//! a `ResourceManager` impl and registers it at startup.
17//!
18//! ## Design
19//!
20//! - `RmgrId` is a single-byte tag stored at the start of every
21//! WAL record body (after the common header).
22//! - `ResourceManager` is a trait with three methods:
23//! - `redo(record)` — apply during forward recovery.
24//! - `undo(record)` — apply during transaction abort.
25//! - `desc(record)` — produce a human-readable string for
26//! diagnostics / pg_waldump-style tooling.
27//! - `RmgrRegistry` is a fixed array indexed by `RmgrId` that
28//! the recovery loop consults to dispatch.
29//!
30//! ## Wiring
31//!
32//! Phase post-MVP wiring:
33//! 1. Each subsystem (heap, btree, vector, …) implements
34//! `ResourceManager` and registers itself at startup via
35//! `rmgr::register(RmgrId::Heap, Box::new(HeapRmgr))`.
36//! 2. `wal/recovery.rs::redo_loop` calls `rmgr::dispatch(record)`
37//! instead of pattern-matching on the record kind.
38//! 3. Each existing match arm becomes the body of a `redo` impl.
39//!
40//! Today this module is the framework only — actual subsystem
41//! impls live in their respective modules.
42
43use std::sync::OnceLock;
44
45/// Single-byte resource manager identifier. Matches PG's
46/// RmgrId space (8-bit) so reddb shares the wire format
47/// vocabulary if we ever ship a pg-compat WAL reader.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
49#[repr(u8)]
50pub enum RmgrId {
51 /// Heap rows (table data inserts/updates/deletes).
52 Heap = 10,
53 /// B-tree index records.
54 Btree = 11,
55 /// Vector index updates (HNSW / IVF / flat).
56 Vector = 12,
57 /// Graph subsystem (nodes, edges, properties).
58 Graph = 13,
59 /// Timeseries chunks + downsample state.
60 Timeseries = 14,
61 /// Queue subsystem (push, pop, ack).
62 Queue = 15,
63 /// Document store (put, delete, schema-less updates).
64 Document = 16,
65 /// KV store (set, delete, ttl).
66 Kv = 17,
67 /// Probabilistic data structures (HLL, CMS, Cuckoo, Bloom).
68 Probabilistic = 18,
69 /// Transaction commit / abort markers.
70 Transaction = 50,
71 /// Checkpoint metadata.
72 Checkpoint = 51,
73 /// Cross-cutting CDC / replication markers.
74 Replication = 52,
75 /// Reserved for future subsystems. Records carrying this
76 /// tag are skipped during recovery instead of failing.
77 Reserved = 255,
78}
79
80impl RmgrId {
81 /// Try to convert a raw byte to an `RmgrId`. Returns `None`
82 /// for unknown tags so recovery can decide whether to skip
83 /// or hard-fail.
84 pub fn from_u8(b: u8) -> Option<Self> {
85 match b {
86 10 => Some(Self::Heap),
87 11 => Some(Self::Btree),
88 12 => Some(Self::Vector),
89 13 => Some(Self::Graph),
90 14 => Some(Self::Timeseries),
91 15 => Some(Self::Queue),
92 16 => Some(Self::Document),
93 17 => Some(Self::Kv),
94 18 => Some(Self::Probabilistic),
95 50 => Some(Self::Transaction),
96 51 => Some(Self::Checkpoint),
97 52 => Some(Self::Replication),
98 255 => Some(Self::Reserved),
99 _ => None,
100 }
101 }
102
103 /// Single-byte wire encoding.
104 pub fn to_u8(self) -> u8 {
105 self as u8
106 }
107
108 /// Stable human-readable name for diagnostics.
109 pub fn name(self) -> &'static str {
110 match self {
111 Self::Heap => "heap",
112 Self::Btree => "btree",
113 Self::Vector => "vector",
114 Self::Graph => "graph",
115 Self::Timeseries => "timeseries",
116 Self::Queue => "queue",
117 Self::Document => "document",
118 Self::Kv => "kv",
119 Self::Probabilistic => "probabilistic",
120 Self::Transaction => "transaction",
121 Self::Checkpoint => "checkpoint",
122 Self::Replication => "replication",
123 Self::Reserved => "reserved",
124 }
125 }
126}
127
128/// Errors raised by resource manager dispatch.
129#[derive(Debug)]
130pub enum RmgrError {
131 /// No resource manager registered for this id.
132 Unregistered(RmgrId),
133 /// Subsystem-specific failure during redo / undo.
134 SubsystemFailure { rmgr: RmgrId, message: String },
135}
136
137impl std::fmt::Display for RmgrError {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 match self {
140 Self::Unregistered(id) => write!(f, "no resource manager for {}", id.name()),
141 Self::SubsystemFailure { rmgr, message } => {
142 write!(f, "{} rmgr failed: {message}", rmgr.name())
143 }
144 }
145 }
146}
147
148impl std::error::Error for RmgrError {}
149
150/// Trait implemented by each subsystem that ships WAL records.
151/// Every method receives the raw record bytes (including the
152/// 1-byte rmgr id prefix) so the impl can decode in its
153/// preferred format.
154pub trait ResourceManager: Send + Sync {
155 /// Apply `record` to the live state during forward recovery.
156 /// Called by `wal/recovery.rs::redo_loop` for every record
157 /// in the WAL whose LSN is past the last checkpoint.
158 fn redo(&self, record: &[u8]) -> Result<(), RmgrError>;
159
160 /// Apply `record`'s inverse to the live state during
161 /// transaction abort. Most subsystems can leave this as
162 /// the default no-op when their state is rebuilt
163 /// idempotently from the WAL during recovery — only
164 /// subsystems that carry side-effects across transactions
165 /// (queue ack, replication ship) need to override.
166 fn undo(&self, _record: &[u8]) -> Result<(), RmgrError> {
167 Ok(())
168 }
169
170 /// Format a single record for diagnostic display
171 /// (`reddb-waldump`-style tooling). Default returns a
172 /// generic placeholder when the impl doesn't bother.
173 fn desc(&self, record: &[u8]) -> String {
174 format!("({} bytes opaque)", record.len())
175 }
176
177 /// Stable identifier for logging. Returns the rmgr's own
178 /// `RmgrId::name()`. Trait method so trait objects can
179 /// answer without an extra `dyn Any` cast.
180 fn name(&self) -> &'static str;
181}
182
183/// Process-wide resource manager registry. Indexed by
184/// `RmgrId.to_u8()` so dispatch is a single array lookup.
185/// `OnceLock` ensures registration happens once at startup
186/// and is then read-only — no runtime locking on the hot path.
187static REGISTRY: OnceLock<RmgrRegistry> = OnceLock::new();
188
189/// Read-only table of resource managers indexed by RmgrId.
190pub struct RmgrRegistry {
191 table: Vec<Option<Box<dyn ResourceManager>>>,
192}
193
194impl RmgrRegistry {
195 /// Build a registry of `capacity` slots. 256 is the natural
196 /// choice since `RmgrId` is u8.
197 pub fn with_capacity(capacity: usize) -> Self {
198 let mut table = Vec::with_capacity(capacity);
199 for _ in 0..capacity {
200 table.push(None);
201 }
202 Self { table }
203 }
204
205 /// Insert a resource manager at the given id. Replaces any
206 /// existing entry. Builders typically chain `.register()`
207 /// calls for each subsystem before sealing into a static.
208 pub fn register(mut self, id: RmgrId, rmgr: Box<dyn ResourceManager>) -> Self {
209 let idx = id.to_u8() as usize;
210 if idx >= self.table.len() {
211 self.table.resize_with(idx + 1, || None);
212 }
213 self.table[idx] = Some(rmgr);
214 self
215 }
216
217 /// Look up a resource manager by id.
218 pub fn get(&self, id: RmgrId) -> Option<&dyn ResourceManager> {
219 self.table
220 .get(id.to_u8() as usize)
221 .and_then(|slot| slot.as_deref())
222 }
223
224 /// Dispatch a record to its registered manager's `redo`.
225 /// Reads the first byte of `record` as the `RmgrId` tag.
226 pub fn dispatch_redo(&self, record: &[u8]) -> Result<(), RmgrError> {
227 let id_byte = record.first().copied().unwrap_or(0);
228 let id = RmgrId::from_u8(id_byte).ok_or(RmgrError::Unregistered(RmgrId::Reserved))?;
229 match self.get(id) {
230 Some(rmgr) => rmgr.redo(record),
231 None => Err(RmgrError::Unregistered(id)),
232 }
233 }
234}
235
236/// Install the process-wide registry. Call once during
237/// `Database::open` after every subsystem has had a chance to
238/// register its rmgr. Subsequent calls are no-ops (OnceLock).
239pub fn install(registry: RmgrRegistry) -> Result<(), RmgrError> {
240 REGISTRY
241 .set(registry)
242 .map_err(|_| RmgrError::SubsystemFailure {
243 rmgr: RmgrId::Reserved,
244 message: "rmgr registry already installed".to_string(),
245 })
246}
247
248/// Look up the installed registry. Returns `None` until
249/// `install()` is called — recovery code should panic in that
250/// case because no recovery is possible without rmgrs.
251pub fn registry() -> Option<&'static RmgrRegistry> {
252 REGISTRY.get()
253}
254
255/// Dispatch a record to its registered manager. Convenience
256/// wrapper used by the recovery loop.
257pub fn dispatch_redo(record: &[u8]) -> Result<(), RmgrError> {
258 match registry() {
259 Some(reg) => reg.dispatch_redo(record),
260 None => Err(RmgrError::SubsystemFailure {
261 rmgr: RmgrId::Reserved,
262 message: "rmgr registry not installed".to_string(),
263 }),
264 }
265}