nookdb_core/migrate.rs
1//! Stable migration-runner API SEAM (extension seam §6b).
2//!
3//! This is the stable public surface + a `_meta` schema-version ledger
4//! ONLY. The migration DSL / up-down / backfill is the deferred full
5//! framework (spec §8). No Pro code. The advanced migration tooling
6//! attaches to THIS API without modifying the crate.
7
8use crate::database::Database;
9use crate::error::NookError;
10
11// RESERVED internal collection. Shares the single `entries` keyspace
12// (M1 composite key) with user collections; a user collection named
13// "_meta" would alias this ledger. A reserved-name guard belongs in
14// schema::ir compile-time validation (owner: schema DSL task / M3).
15const META_COLLECTION: &str = "_meta";
16const APPLIED_KEY: &[u8] = b"applied_versions";
17
18/// Snapshot of the current migration-ledger state.
19pub struct MigrationStatus {
20 /// The highest version number that has been applied, or 0 if none.
21 pub current_version: u32,
22 /// The number of migration versions that have been applied.
23 pub applied_count: usize,
24}
25
26/// Stable migration-runner API seam (extension seam §6b).
27///
28/// Maintains a JSON ledger of applied version numbers in the `_meta`
29/// collection. The full migration DSL (up/down/backfill) is deferred
30/// to spec §8; this runner provides the version-tracking surface that
31/// the future framework will attach to.
32///
33/// # Examples
34///
35/// ```
36/// use nookdb_core::Database;
37/// use nookdb_core::migrate::Runner;
38/// # let dir = tempfile::tempdir().unwrap();
39/// let db = Database::open(dir.path().join("m.db")).unwrap();
40/// let r = Runner::new(&db);
41/// assert_eq!(r.status().unwrap().current_version, 0);
42/// assert_eq!(r.list_pending(&[1, 2]).unwrap(), vec![1, 2]);
43/// r.run(&[1, 2]).unwrap();
44/// assert_eq!(r.list_applied().unwrap(), vec![1, 2]);
45/// assert_eq!(r.status().unwrap().current_version, 2);
46/// ```
47pub struct Runner<'a> {
48 db: &'a Database,
49}
50
51impl<'a> Runner<'a> {
52 /// Creates a new `Runner` bound to the given database.
53 #[must_use]
54 pub const fn new(db: &'a Database) -> Self {
55 Self { db }
56 }
57
58 fn applied(&self) -> Result<Vec<u32>, NookError> {
59 let raw = self.db.read(|tx| tx.get(META_COLLECTION, APPLIED_KEY))?;
60 raw.map_or_else(
61 || Ok(vec![]),
62 |b| {
63 serde_json::from_slice(&b).map_err(|e| NookError::Migration {
64 msg: format!("corrupt ledger: {e}"),
65 })
66 },
67 )
68 }
69
70 /// Returns the list of applied migration version numbers in the order
71 /// they were applied.
72 ///
73 /// # Errors
74 ///
75 /// Returns `NookError::Migration` if the ledger is corrupt, or a
76 /// storage error if the underlying read fails.
77 pub fn list_applied(&self) -> Result<Vec<u32>, NookError> {
78 self.applied()
79 }
80
81 /// Returns a snapshot of the current migration state.
82 ///
83 /// # Errors
84 ///
85 /// Returns `NookError::Migration` if the ledger is corrupt, or a
86 /// storage error if the underlying read fails.
87 pub fn status(&self) -> Result<MigrationStatus, NookError> {
88 let a = self.applied()?;
89 Ok(MigrationStatus {
90 current_version: a.iter().copied().max().unwrap_or(0),
91 applied_count: a.len(),
92 })
93 }
94
95 /// Returns the versions in `all` that have not yet been applied,
96 /// in the same order as given.
97 ///
98 /// # Errors
99 ///
100 /// Returns `NookError::Migration` if the ledger is corrupt, or a
101 /// storage error if the underlying read fails.
102 pub fn list_pending(&self, all: &[u32]) -> Result<Vec<u32>, NookError> {
103 let a = self.applied()?;
104 Ok(all.iter().copied().filter(|v| !a.contains(v)).collect())
105 }
106
107 /// Records the pending subset of `all` as applied (idempotent). M2 has
108 /// no real migration-step type yet — this is the no-op-capable runner
109 /// that maintains the version ledger.
110 ///
111 /// Already-applied versions in `all` are silently skipped; the call
112 /// is safe to repeat with the same arguments.
113 ///
114 /// Hardened in M5a: read-merge-write happens in a single write
115 /// transaction so concurrent `run`s cannot lost-update the ledger.
116 /// See `tests/migrate_concurrent.rs`.
117 ///
118 /// # Errors
119 ///
120 /// Returns `NookError::Migration` if the ledger is corrupt or cannot be
121 /// serialised, or a storage error if the underlying write fails.
122 pub fn run(&self, all: &[u32]) -> Result<(), NookError> {
123 self.db.write(|tx| {
124 let raw = tx.get(META_COLLECTION, APPLIED_KEY)?;
125 let mut a: Vec<u32> = match raw {
126 None => Vec::new(),
127 Some(b) => serde_json::from_slice(&b).map_err(|e| NookError::Migration {
128 msg: format!("corrupt ledger: {e}"),
129 })?,
130 };
131 let initial_len = a.len();
132 for v in all {
133 if !a.contains(v) {
134 a.push(*v);
135 }
136 }
137 if a.len() == initial_len {
138 return Ok(());
139 }
140 a.sort_unstable();
141 a.dedup();
142 let bytes =
143 serde_json::to_vec(&a).map_err(|e| NookError::Migration { msg: e.to_string() })?;
144 tx.put(META_COLLECTION, APPLIED_KEY, &bytes)
145 })
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::database::Database;
153
154 fn db() -> (tempfile::TempDir, Database) {
155 let d = tempfile::tempdir().unwrap();
156 let db = Database::open(d.path().join("t.db")).unwrap();
157 (d, db)
158 }
159
160 #[test]
161 fn status_starts_at_zero_and_run_advances_version() {
162 let (_d, db) = db();
163 let r = Runner::new(&db);
164 assert_eq!(r.status().unwrap().current_version, 0);
165 assert_eq!(r.list_applied().unwrap(), Vec::<u32>::new());
166 assert_eq!(r.list_pending(&[1, 2]).unwrap(), vec![1, 2]);
167 r.run(&[1, 2]).unwrap();
168 assert_eq!(r.status().unwrap().current_version, 2);
169 assert_eq!(r.list_applied().unwrap(), vec![1, 2]);
170 assert_eq!(r.list_pending(&[1, 2, 3]).unwrap(), vec![3]);
171 }
172
173 #[test]
174 fn run_is_idempotent_for_already_applied_versions() {
175 let (_d, db) = db();
176 let r = Runner::new(&db);
177 r.run(&[1]).unwrap();
178 r.run(&[1]).unwrap(); // no-op, no error
179 assert_eq!(r.status().unwrap().current_version, 1);
180 }
181}