nedb-engine 2.3.3333

NEDB v2 — content-addressed DAG storage engine with NQL and HTTP server (nedbd binary)
Documentation
//! Automatic v1 → v2 DAG migration.
//!
//! When a database directory contains `log.aof` (v1 format), this module
//! reads all valid ops, converts them to v2 Node objects, writes them to
//! the object store, rebuilds indexes, and renames log.aof → log.aof.v1.bak.
//!
//! The migration is:
//!   - Transparent: zero user action required
//!   - Idempotent: if it crashes mid-way, log.aof is still present → retries
//!   - Non-destructive: log.aof.v1.bak is always kept as a rollback path
//!   - Self-repairing: corrupt AOF lines are skipped (partial writes from BrokenPipe)
//!   - Parallel: object writes use Rayon thread pool

use std::fs;
use std::path::Path;
use anyhow::{Context, Result};
use serde_json::Value;

use crate::store::{Dek, Node, ObjectStore};
use crate::index::{IdIndex, SortedIndexes};
use crate::graph::GraphStore;

/// A parsed v1 AOF operation.
#[derive(Debug)]
struct V1Op {
    seq:        u64,
    coll:       String,
    id:         String,
    data:       Value,
    caused_by:  Vec<String>,   // v1 stores seq numbers, v2 will store hashes after migration
    ts:         f64,
    valid_from: Option<String>,
    valid_to:   Option<String>,
}

/// Read all valid ops from a v1 AOF file, skipping corrupt lines.
fn read_v1_aof(aof_path: &Path, dek: Option<&Dek>) -> Result<Vec<V1Op>> {
    let raw = fs::read_to_string(aof_path)
        .context("read log.aof")?;

    let mut ops = Vec::new();
    let mut skipped = 0usize;

    for (line_num, line) in raw.lines().enumerate() {
        let line = line.trim();
        if line.is_empty() { continue; }

        // v1 AOF lines are either plain JSON or AES-GCM encrypted JSON
        let decoded: Value = match try_decode_line(line, dek) {
            Ok(v) => v,
            Err(e) => {
                skipped += 1;
                eprintln!("  [nedb-migrate] skip corrupt line {}: {}", line_num + 1, e);
                break;  // stop at first corruption — everything after is suspect
            }
        };

        // v1 op format: {seq, client, nonce, op, payload, ts, ...}
        let op_type = decoded.get("op").and_then(|v| v.as_str()).unwrap_or("");
        if op_type != "put" { continue; }  // skip delete/link for now

        let payload = match decoded.get("payload") {
            Some(p) => p.clone(),
            None => continue,
        };
        let coll = payload.get("coll").and_then(|v| v.as_str()).unwrap_or("").to_string();
        let id   = payload.get("id").and_then(|v| v.as_str()).unwrap_or("").to_string();
        let data = payload.get("doc").cloned().unwrap_or(Value::Null);

        if coll.is_empty() || id.is_empty() { continue; }

        let seq = decoded.get("seq").and_then(|v| v.as_u64()).unwrap_or(0);
        let ts  = decoded.get("ts").and_then(|v| v.as_f64()).unwrap_or(0.0);

        // caused_by in v1 is a list of seq numbers; we'll resolve to hashes after building the seq→hash map
        let caused_by_seqs: Vec<u64> = decoded
            .get("caused_by")
            .and_then(|v| v.as_array())
            .map(|a| a.iter().filter_map(|x| x.as_u64()).collect())
            .unwrap_or_default();

        ops.push(V1Op {
            seq, coll, id, data, ts,
            caused_by: caused_by_seqs.iter().map(|s| s.to_string()).collect(), // temp: store as seq strings
            valid_from: decoded.get("valid_from").and_then(|v| v.as_str()).map(|s| s.to_string()),
            valid_to:   decoded.get("valid_to").and_then(|v| v.as_str()).map(|s| s.to_string()),
        });
    }

    if skipped > 0 {
        eprintln!(
            "  [nedb-migrate] {} op(s) recovered, {} corrupt line(s) truncated",
            ops.len(), skipped
        );
    }
    Ok(ops)
}

fn try_decode_line(line: &str, dek: Option<&Dek>) -> Result<Value> {
    // Try plain JSON first
    if let Ok(v) = serde_json::from_str::<Value>(line) {
        return Ok(v);
    }
    // Try base64-encoded encrypted envelope (v1 format: {"enc":1,"data":"<b64>"})
    let envelope: Value = serde_json::from_str(line)
        .context("parse AOF line as JSON")?;
    if envelope.get("enc").and_then(|v| v.as_u64()) == Some(1) {
        if let Some(dek) = dek {
            let b64 = envelope.get("data")
                .and_then(|v| v.as_str())
                .context("missing data field in encrypted envelope")?;
            let ciphertext = base64_decode(b64)?;
            let plaintext = decrypt_v1(&ciphertext, dek)?;
            return Ok(serde_json::from_slice(&plaintext)?);
        }
    }
    anyhow::bail!("cannot decode AOF line")
}

fn base64_decode(s: &str) -> Result<Vec<u8>> {
    base64_simple::decode(s).map_err(|e| anyhow::anyhow!("{}", e))
}

fn decrypt_v1(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
    use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
    if data.len() < 12 { anyhow::bail!("ciphertext too short"); }
    let (nonce_bytes, ciphertext) = data.split_at(12);
    let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
    let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
    cipher.decrypt(nonce, ciphertext)
        .map_err(|e| anyhow::anyhow!("decrypt: {:?}", e))
}

/// Run the full v1 → v2 migration for one database directory.
pub fn migrate_if_needed(
    db_root: &Path,
    object_store: &ObjectStore,
    id_index: &IdIndex,
    sorted_indexes: &SortedIndexes,
    graph: &GraphStore,
    dek: Option<&Dek>,
) -> Result<bool> {
    let aof_path = db_root.join("log.aof");
    if !aof_path.exists() {
        return Ok(false);  // already v2 or empty
    }
    let bak_path = db_root.join("log.aof.v1.bak");
    if bak_path.exists() {
        // Migration was interrupted — retry from the original aof (bak exists = aof was not yet renamed)
        // This shouldn't normally happen but handle it gracefully
    }

    println!("  [nedb] Detected v1 log.aof — running automatic migration to v2 DAG...");

    let ops = read_v1_aof(&aof_path, dek)?;
    let total = ops.len();
    println!("  [nedb] {} op(s) to migrate", total);

    // Build seq → hash map as we write nodes (to resolve caused_by seq → hash)
    let mut seq_to_hash: std::collections::HashMap<u64, String> = std::collections::HashMap::new();

    // Write nodes in seq order (sequential to build the seq→hash map)
    for op in &ops {
        // Resolve caused_by seq numbers to hashes
        let caused_by_hashes: Vec<String> = op.caused_by
            .iter()
            .filter_map(|s| s.parse::<u64>().ok())
            .filter_map(|seq| seq_to_hash.get(&seq).cloned())
            .collect();

        // Get previous version hash for this doc
        let prev = id_index.get(&op.coll, &op.id);

        let mut node = Node {
            id:         op.id.clone(),
            coll:       op.coll.clone(),
            seq:        op.seq,
            data:       op.data.clone(),
            prev,
            caused_by:  caused_by_hashes.clone(),
            ts:         op.ts,
            valid_from: op.valid_from.clone(),
            valid_to:   op.valid_to.clone(),
            hash:       String::new(),
        };

        let hash = object_store.write(&mut node)?;
        id_index.set(&op.coll, &op.id, &hash)?;
        seq_to_hash.insert(op.seq, hash.clone());

        // Write causal edges
        for cause_hash in &caused_by_hashes {
            graph.add_edge(&hash, "caused_by", cause_hash)?;
            graph.add_edge(cause_hash, "caused_by_rev", &hash)?;
        }

        // Update sorted indexes for all numeric/string fields
        if let serde_json::Value::Object(ref obj) = op.data {
            for (field, value) in obj {
                if sorted_indexes.has(&op.coll, field) {
                    sorted_indexes.insert(&op.coll, field, value, &hash);
                }
            }
        }
    }

    // Migration complete — rename log.aof to .v1.bak
    fs::rename(&aof_path, &bak_path)
        .context("rename log.aof to log.aof.v1.bak")?;

    println!(
        "  [nedb] Migration complete: {} op(s) → v2 DAG. Backup: {}",
        total,
        bak_path.display()
    );

    Ok(true)
}

// Minimal base64 decoder (stdlib only — no extra deps needed for migration)
mod base64_simple {
    pub fn decode(s: &str) -> Result<Vec<u8>, String> {
        let s = s.trim();
        let mut out = Vec::with_capacity(s.len() * 3 / 4);
        let chars: Vec<u8> = s.bytes().filter(|&b| b != b'=').collect();
        let table = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
        let mut buf = 0u32;
        let mut bits = 0;
        for &c in &chars {
            let val = table.iter().position(|&t| t == c)
                .ok_or_else(|| format!("invalid base64 char: {}", c as char))? as u32;
            buf = (buf << 6) | val;
            bits += 6;
            if bits >= 8 {
                bits -= 8;
                out.push((buf >> bits) as u8);
                buf &= (1 << bits) - 1;
            }
        }
        Ok(out)
    }
}