Skip to main content

nookdb_core/
migrate.rs

1//! Stable migration-runner API SEAM (extension seam §6b).
2//!
3//! This is the stable public surface + a `_meta` schema-version ledger
4//! ONLY. The migration DSL / up-down / backfill is the deferred full
5//! framework (spec §8). No Pro code. The advanced migration tooling
6//! attaches to THIS API without modifying the crate.
7
8use crate::database::Database;
9use crate::error::NookError;
10
11// RESERVED internal collection. Shares the single `entries` keyspace
12// (M1 composite key) with user collections; a user collection named
13// "_meta" would alias this ledger. A reserved-name guard belongs in
14// schema::ir compile-time validation (owner: schema DSL task / M3).
15const META_COLLECTION: &str = "_meta";
16const APPLIED_KEY: &[u8] = b"applied_versions";
17
18/// Snapshot of the current migration-ledger state.
19pub struct MigrationStatus {
20    /// The highest version number that has been applied, or 0 if none.
21    pub current_version: u32,
22    /// The number of migration versions that have been applied.
23    pub applied_count: usize,
24}
25
26/// Stable migration-runner API seam (extension seam §6b).
27///
28/// Maintains a JSON ledger of applied version numbers in the `_meta`
29/// collection. The full migration DSL (up/down/backfill) is deferred
30/// to spec §8; this runner provides the version-tracking surface that
31/// the future framework will attach to.
32///
33/// # Examples
34///
35/// ```
36/// use nookdb_core::Database;
37/// use nookdb_core::migrate::Runner;
38/// # let dir = tempfile::tempdir().unwrap();
39/// let db = Database::open(dir.path().join("m.db")).unwrap();
40/// let r = Runner::new(&db);
41/// assert_eq!(r.status().unwrap().current_version, 0);
42/// assert_eq!(r.list_pending(&[1, 2]).unwrap(), vec![1, 2]);
43/// r.run(&[1, 2]).unwrap();
44/// assert_eq!(r.list_applied().unwrap(), vec![1, 2]);
45/// assert_eq!(r.status().unwrap().current_version, 2);
46/// ```
47pub struct Runner<'a> {
48    db: &'a Database,
49}
50
51impl<'a> Runner<'a> {
52    /// Creates a new `Runner` bound to the given database.
53    #[must_use]
54    pub const fn new(db: &'a Database) -> Self {
55        Self { db }
56    }
57
58    fn applied(&self) -> Result<Vec<u32>, NookError> {
59        let raw = self.db.read(|tx| tx.get(META_COLLECTION, APPLIED_KEY))?;
60        raw.map_or_else(
61            || Ok(vec![]),
62            |b| {
63                serde_json::from_slice(&b).map_err(|e| NookError::Migration {
64                    msg: format!("corrupt ledger: {e}"),
65                })
66            },
67        )
68    }
69
70    /// Returns the list of applied migration version numbers in the order
71    /// they were applied.
72    ///
73    /// # Errors
74    ///
75    /// Returns `NookError::Migration` if the ledger is corrupt, or a
76    /// storage error if the underlying read fails.
77    pub fn list_applied(&self) -> Result<Vec<u32>, NookError> {
78        self.applied()
79    }
80
81    /// Returns a snapshot of the current migration state.
82    ///
83    /// # Errors
84    ///
85    /// Returns `NookError::Migration` if the ledger is corrupt, or a
86    /// storage error if the underlying read fails.
87    pub fn status(&self) -> Result<MigrationStatus, NookError> {
88        let a = self.applied()?;
89        Ok(MigrationStatus {
90            current_version: a.iter().copied().max().unwrap_or(0),
91            applied_count: a.len(),
92        })
93    }
94
95    /// Returns the versions in `all` that have not yet been applied,
96    /// in the same order as given.
97    ///
98    /// # Errors
99    ///
100    /// Returns `NookError::Migration` if the ledger is corrupt, or a
101    /// storage error if the underlying read fails.
102    pub fn list_pending(&self, all: &[u32]) -> Result<Vec<u32>, NookError> {
103        let a = self.applied()?;
104        Ok(all.iter().copied().filter(|v| !a.contains(v)).collect())
105    }
106
107    /// Records the pending subset of `all` as applied (idempotent). M2 has
108    /// no real migration-step type yet — this is the no-op-capable runner
109    /// that maintains the version ledger.
110    ///
111    /// Already-applied versions in `all` are silently skipped; the call
112    /// is safe to repeat with the same arguments.
113    ///
114    /// Hardened in M5a: read-merge-write happens in a single write
115    /// transaction so concurrent `run`s cannot lost-update the ledger.
116    /// See `tests/migrate_concurrent.rs`.
117    ///
118    /// # Errors
119    ///
120    /// Returns `NookError::Migration` if the ledger is corrupt or cannot be
121    /// serialised, or a storage error if the underlying write fails.
122    pub fn run(&self, all: &[u32]) -> Result<(), NookError> {
123        self.db.write(|tx| {
124            let raw = tx.get(META_COLLECTION, APPLIED_KEY)?;
125            let mut a: Vec<u32> = match raw {
126                None => Vec::new(),
127                Some(b) => serde_json::from_slice(&b).map_err(|e| NookError::Migration {
128                    msg: format!("corrupt ledger: {e}"),
129                })?,
130            };
131            let initial_len = a.len();
132            for v in all {
133                if !a.contains(v) {
134                    a.push(*v);
135                }
136            }
137            if a.len() == initial_len {
138                return Ok(());
139            }
140            a.sort_unstable();
141            a.dedup();
142            let bytes =
143                serde_json::to_vec(&a).map_err(|e| NookError::Migration { msg: e.to_string() })?;
144            tx.put(META_COLLECTION, APPLIED_KEY, &bytes)
145        })
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::database::Database;
153
154    fn db() -> (tempfile::TempDir, Database) {
155        let d = tempfile::tempdir().unwrap();
156        let db = Database::open(d.path().join("t.db")).unwrap();
157        (d, db)
158    }
159
160    #[test]
161    fn status_starts_at_zero_and_run_advances_version() {
162        let (_d, db) = db();
163        let r = Runner::new(&db);
164        assert_eq!(r.status().unwrap().current_version, 0);
165        assert_eq!(r.list_applied().unwrap(), Vec::<u32>::new());
166        assert_eq!(r.list_pending(&[1, 2]).unwrap(), vec![1, 2]);
167        r.run(&[1, 2]).unwrap();
168        assert_eq!(r.status().unwrap().current_version, 2);
169        assert_eq!(r.list_applied().unwrap(), vec![1, 2]);
170        assert_eq!(r.list_pending(&[1, 2, 3]).unwrap(), vec![3]);
171    }
172
173    #[test]
174    fn run_is_idempotent_for_already_applied_versions() {
175        let (_d, db) = db();
176        let r = Runner::new(&db);
177        r.run(&[1]).unwrap();
178        r.run(&[1]).unwrap(); // no-op, no error
179        assert_eq!(r.status().unwrap().current_version, 1);
180    }
181}