Skip to main content

nedb_engine/
migrate.rs

1//! Automatic v1 → v2 DAG migration.
2//!
3//! When a database directory contains `log.aof` (v1 format), this module
4//! reads all valid ops, converts them to v2 Node objects, writes them to
5//! the object store, rebuilds indexes, and renames log.aof → log.aof.v1.bak.
6//!
7//! The migration is:
8//!   - Transparent: zero user action required
9//!   - Idempotent: if it crashes mid-way, log.aof is still present → retries
10//!   - Non-destructive: log.aof.v1.bak is always kept as a rollback path
11//!   - Self-repairing: corrupt AOF lines are skipped (partial writes from BrokenPipe)
12//!   - Parallel: object writes use Rayon thread pool
13
14use std::fs;
15use std::path::Path;
16use anyhow::{Context, Result};
17use serde_json::Value;
18
19use crate::store::{Dek, Node, ObjectStore};
20use crate::index::{IdIndex, SortedIndexes};
21use crate::graph::GraphStore;
22
23/// A parsed v1 AOF operation.
24#[derive(Debug)]
25struct V1Op {
26    seq:        u64,
27    coll:       String,
28    id:         String,
29    data:       Value,
30    caused_by:  Vec<String>,   // v1 stores seq numbers, v2 will store hashes after migration
31    ts:         f64,
32    valid_from: Option<String>,
33    valid_to:   Option<String>,
34}
35
36/// Read all valid ops from a v1 AOF file, skipping corrupt lines.
37fn read_v1_aof(aof_path: &Path, dek: Option<&Dek>) -> Result<Vec<V1Op>> {
38    let raw = fs::read_to_string(aof_path)
39        .context("read log.aof")?;
40
41    let mut ops = Vec::new();
42    let mut skipped = 0usize;
43
44    for (line_num, line) in raw.lines().enumerate() {
45        let line = line.trim();
46        if line.is_empty() { continue; }
47
48        // v1 AOF lines are either plain JSON or AES-GCM encrypted JSON
49        let decoded: Value = match try_decode_line(line, dek) {
50            Ok(v) => v,
51            Err(e) => {
52                skipped += 1;
53                eprintln!("  [nedb-migrate] skip corrupt line {}: {}", line_num + 1, e);
54                break;  // stop at first corruption — everything after is suspect
55            }
56        };
57
58        // v1 op format: {seq, client, nonce, op, payload, ts, ...}
59        let op_type = decoded.get("op").and_then(|v| v.as_str()).unwrap_or("");
60        if op_type != "put" { continue; }  // skip delete/link for now
61
62        let payload = match decoded.get("payload") {
63            Some(p) => p.clone(),
64            None => continue,
65        };
66        let coll = payload.get("coll").and_then(|v| v.as_str()).unwrap_or("").to_string();
67        let id   = payload.get("id").and_then(|v| v.as_str()).unwrap_or("").to_string();
68        let data = payload.get("doc").cloned().unwrap_or(Value::Null);
69
70        if coll.is_empty() || id.is_empty() { continue; }
71
72        let seq = decoded.get("seq").and_then(|v| v.as_u64()).unwrap_or(0);
73        let ts  = decoded.get("ts").and_then(|v| v.as_f64()).unwrap_or(0.0);
74
75        // caused_by in v1 is a list of seq numbers; we'll resolve to hashes after building the seq→hash map
76        let caused_by_seqs: Vec<u64> = decoded
77            .get("caused_by")
78            .and_then(|v| v.as_array())
79            .map(|a| a.iter().filter_map(|x| x.as_u64()).collect())
80            .unwrap_or_default();
81
82        ops.push(V1Op {
83            seq, coll, id, data, ts,
84            caused_by: caused_by_seqs.iter().map(|s| s.to_string()).collect(), // temp: store as seq strings
85            valid_from: decoded.get("valid_from").and_then(|v| v.as_str()).map(|s| s.to_string()),
86            valid_to:   decoded.get("valid_to").and_then(|v| v.as_str()).map(|s| s.to_string()),
87        });
88    }
89
90    if skipped > 0 {
91        eprintln!(
92            "  [nedb-migrate] {} op(s) recovered, {} corrupt line(s) truncated",
93            ops.len(), skipped
94        );
95    }
96    Ok(ops)
97}
98
99fn try_decode_line(line: &str, dek: Option<&Dek>) -> Result<Value> {
100    // Try plain JSON first
101    if let Ok(v) = serde_json::from_str::<Value>(line) {
102        return Ok(v);
103    }
104    // Try base64-encoded encrypted envelope (v1 format: {"enc":1,"data":"<b64>"})
105    let envelope: Value = serde_json::from_str(line)
106        .context("parse AOF line as JSON")?;
107    if envelope.get("enc").and_then(|v| v.as_u64()) == Some(1) {
108        if let Some(dek) = dek {
109            let b64 = envelope.get("data")
110                .and_then(|v| v.as_str())
111                .context("missing data field in encrypted envelope")?;
112            let ciphertext = base64_decode(b64)?;
113            let plaintext = decrypt_v1(&ciphertext, dek)?;
114            return Ok(serde_json::from_slice(&plaintext)?);
115        }
116    }
117    anyhow::bail!("cannot decode AOF line")
118}
119
120fn base64_decode(s: &str) -> Result<Vec<u8>> {
121    base64_simple::decode(s).map_err(|e| anyhow::anyhow!("{}", e))
122}
123
124fn decrypt_v1(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
125    use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
126    if data.len() < 12 { anyhow::bail!("ciphertext too short"); }
127    let (nonce_bytes, ciphertext) = data.split_at(12);
128    let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
129    let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
130    cipher.decrypt(nonce, ciphertext)
131        .map_err(|e| anyhow::anyhow!("decrypt: {:?}", e))
132}
133
134/// Run the full v1 → v2 migration for one database directory.
135pub fn migrate_if_needed(
136    db_root: &Path,
137    object_store: &ObjectStore,
138    id_index: &IdIndex,
139    sorted_indexes: &SortedIndexes,
140    graph: &GraphStore,
141    dek: Option<&Dek>,
142) -> Result<bool> {
143    let aof_path = db_root.join("log.aof");
144    if !aof_path.exists() {
145        return Ok(false);  // already v2 or empty
146    }
147    let bak_path = db_root.join("log.aof.v1.bak");
148    if bak_path.exists() {
149        // Migration was interrupted — retry from the original aof (bak exists = aof was not yet renamed)
150        // This shouldn't normally happen but handle it gracefully
151    }
152
153    println!("  [nedb] Detected v1 log.aof — running automatic migration to v2 DAG...");
154
155    let ops = read_v1_aof(&aof_path, dek)?;
156    let total = ops.len();
157    println!("  [nedb] {} op(s) to migrate", total);
158
159    // Build seq → hash map as we write nodes (to resolve caused_by seq → hash)
160    let mut seq_to_hash: std::collections::HashMap<u64, String> = std::collections::HashMap::new();
161
162    // Write nodes in seq order (sequential to build the seq→hash map)
163    for op in &ops {
164        // Resolve caused_by seq numbers to hashes
165        let caused_by_hashes: Vec<String> = op.caused_by
166            .iter()
167            .filter_map(|s| s.parse::<u64>().ok())
168            .filter_map(|seq| seq_to_hash.get(&seq).cloned())
169            .collect();
170
171        // Get previous version hash for this doc
172        let prev = id_index.get(&op.coll, &op.id);
173
174        let mut node = Node {
175            id:         op.id.clone(),
176            coll:       op.coll.clone(),
177            seq:        op.seq,
178            data:       op.data.clone(),
179            prev,
180            caused_by:  caused_by_hashes.clone(),
181            ts:         op.ts,
182            valid_from: op.valid_from.clone(),
183            valid_to:   op.valid_to.clone(),
184            hash:       String::new(),
185        };
186
187        let hash = object_store.write(&mut node)?;
188        id_index.set(&op.coll, &op.id, &hash)?;
189        seq_to_hash.insert(op.seq, hash.clone());
190
191        // Write causal edges
192        for cause_hash in &caused_by_hashes {
193            graph.add_edge(&hash, "caused_by", cause_hash)?;
194            graph.add_edge(cause_hash, "caused_by_rev", &hash)?;
195        }
196
197        // Update sorted indexes for all numeric/string fields
198        if let serde_json::Value::Object(ref obj) = op.data {
199            for (field, value) in obj {
200                if sorted_indexes.has(&op.coll, field) {
201                    sorted_indexes.insert(&op.coll, field, value, &hash);
202                }
203            }
204        }
205    }
206
207    // Migration complete — rename log.aof to .v1.bak
208    fs::rename(&aof_path, &bak_path)
209        .context("rename log.aof to log.aof.v1.bak")?;
210
211    println!(
212        "  [nedb] Migration complete: {} op(s) → v2 DAG. Backup: {}",
213        total,
214        bak_path.display()
215    );
216
217    Ok(true)
218}
219
220// Minimal base64 decoder (stdlib only — no extra deps needed for migration)
221mod base64_simple {
222    pub fn decode(s: &str) -> Result<Vec<u8>, String> {
223        let s = s.trim();
224        let mut out = Vec::with_capacity(s.len() * 3 / 4);
225        let chars: Vec<u8> = s.bytes().filter(|&b| b != b'=').collect();
226        let table = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
227        let mut buf = 0u32;
228        let mut bits = 0;
229        for &c in &chars {
230            let val = table.iter().position(|&t| t == c)
231                .ok_or_else(|| format!("invalid base64 char: {}", c as char))? as u32;
232            buf = (buf << 6) | val;
233            bits += 6;
234            if bits >= 8 {
235                bits -= 8;
236                out.push((buf >> bits) as u8);
237                buf &= (1 << bits) - 1;
238            }
239        }
240        Ok(out)
241    }
242}