use std::fs;
use std::path::Path;
use anyhow::{Context, Result};
use serde_json::Value;
use crate::store::{Dek, Node, ObjectStore};
use crate::index::{IdIndex, SortedIndexes};
use crate::graph::GraphStore;
#[derive(Debug)]
struct V1Op {
seq: u64,
coll: String,
id: String,
data: Value,
caused_by: Vec<String>, ts: f64,
valid_from: Option<String>,
valid_to: Option<String>,
}
fn read_v1_aof(aof_path: &Path, dek: Option<&Dek>) -> Result<Vec<V1Op>> {
let raw = fs::read_to_string(aof_path)
.context("read log.aof")?;
let mut ops = Vec::new();
let mut skipped = 0usize;
for (line_num, line) in raw.lines().enumerate() {
let line = line.trim();
if line.is_empty() { continue; }
let decoded: Value = match try_decode_line(line, dek) {
Ok(v) => v,
Err(e) => {
skipped += 1;
eprintln!(" [nedb-migrate] skip corrupt line {}: {}", line_num + 1, e);
break; }
};
let op_type = decoded.get("op").and_then(|v| v.as_str()).unwrap_or("");
if op_type != "put" { continue; }
let payload = match decoded.get("payload") {
Some(p) => p.clone(),
None => continue,
};
let coll = payload.get("coll").and_then(|v| v.as_str()).unwrap_or("").to_string();
let id = payload.get("id").and_then(|v| v.as_str()).unwrap_or("").to_string();
let data = payload.get("doc").cloned().unwrap_or(Value::Null);
if coll.is_empty() || id.is_empty() { continue; }
let seq = decoded.get("seq").and_then(|v| v.as_u64()).unwrap_or(0);
let ts = decoded.get("ts").and_then(|v| v.as_f64()).unwrap_or(0.0);
let caused_by_seqs: Vec<u64> = decoded
.get("caused_by")
.and_then(|v| v.as_array())
.map(|a| a.iter().filter_map(|x| x.as_u64()).collect())
.unwrap_or_default();
ops.push(V1Op {
seq, coll, id, data, ts,
caused_by: caused_by_seqs.iter().map(|s| s.to_string()).collect(), valid_from: decoded.get("valid_from").and_then(|v| v.as_str()).map(|s| s.to_string()),
valid_to: decoded.get("valid_to").and_then(|v| v.as_str()).map(|s| s.to_string()),
});
}
if skipped > 0 {
eprintln!(
" [nedb-migrate] {} op(s) recovered, {} corrupt line(s) truncated",
ops.len(), skipped
);
}
Ok(ops)
}
fn try_decode_line(line: &str, dek: Option<&Dek>) -> Result<Value> {
if let Ok(v) = serde_json::from_str::<Value>(line) {
return Ok(v);
}
let envelope: Value = serde_json::from_str(line)
.context("parse AOF line as JSON")?;
if envelope.get("enc").and_then(|v| v.as_u64()) == Some(1) {
if let Some(dek) = dek {
let b64 = envelope.get("data")
.and_then(|v| v.as_str())
.context("missing data field in encrypted envelope")?;
let ciphertext = base64_decode(b64)?;
let plaintext = decrypt_v1(&ciphertext, dek)?;
return Ok(serde_json::from_slice(&plaintext)?);
}
}
anyhow::bail!("cannot decode AOF line")
}
fn base64_decode(s: &str) -> Result<Vec<u8>> {
base64_simple::decode(s).map_err(|e| anyhow::anyhow!("{}", e))
}
fn decrypt_v1(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
if data.len() < 12 { anyhow::bail!("ciphertext too short"); }
let (nonce_bytes, ciphertext) = data.split_at(12);
let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
cipher.decrypt(nonce, ciphertext)
.map_err(|e| anyhow::anyhow!("decrypt: {:?}", e))
}
pub fn migrate_if_needed(
db_root: &Path,
object_store: &ObjectStore,
id_index: &IdIndex,
sorted_indexes: &SortedIndexes,
graph: &GraphStore,
dek: Option<&Dek>,
) -> Result<bool> {
let aof_path = db_root.join("log.aof");
if !aof_path.exists() {
return Ok(false); }
let bak_path = db_root.join("log.aof.v1.bak");
if bak_path.exists() {
}
println!(" [nedb] Detected v1 log.aof — running automatic migration to v2 DAG...");
let ops = read_v1_aof(&aof_path, dek)?;
let total = ops.len();
println!(" [nedb] {} op(s) to migrate", total);
let mut seq_to_hash: std::collections::HashMap<u64, String> = std::collections::HashMap::new();
for op in &ops {
let caused_by_hashes: Vec<String> = op.caused_by
.iter()
.filter_map(|s| s.parse::<u64>().ok())
.filter_map(|seq| seq_to_hash.get(&seq).cloned())
.collect();
let prev = id_index.get(&op.coll, &op.id);
let mut node = Node {
id: op.id.clone(),
coll: op.coll.clone(),
seq: op.seq,
data: op.data.clone(),
prev,
caused_by: caused_by_hashes.clone(),
ts: op.ts,
valid_from: op.valid_from.clone(),
valid_to: op.valid_to.clone(),
hash: String::new(),
};
let hash = object_store.write(&mut node)?;
id_index.set(&op.coll, &op.id, &hash)?;
seq_to_hash.insert(op.seq, hash.clone());
for cause_hash in &caused_by_hashes {
graph.add_edge(&hash, "caused_by", cause_hash)?;
graph.add_edge(cause_hash, "caused_by_rev", &hash)?;
}
if let serde_json::Value::Object(ref obj) = op.data {
for (field, value) in obj {
if sorted_indexes.has(&op.coll, field) {
sorted_indexes.insert(&op.coll, field, value, &hash);
}
}
}
}
fs::rename(&aof_path, &bak_path)
.context("rename log.aof to log.aof.v1.bak")?;
println!(
" [nedb] Migration complete: {} op(s) → v2 DAG. Backup: {}",
total,
bak_path.display()
);
Ok(true)
}
mod base64_simple {
pub fn decode(s: &str) -> Result<Vec<u8>, String> {
let s = s.trim();
let mut out = Vec::with_capacity(s.len() * 3 / 4);
let chars: Vec<u8> = s.bytes().filter(|&b| b != b'=').collect();
let table = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut buf = 0u32;
let mut bits = 0;
for &c in &chars {
let val = table.iter().position(|&t| t == c)
.ok_or_else(|| format!("invalid base64 char: {}", c as char))? as u32;
buf = (buf << 6) | val;
bits += 6;
if bits >= 8 {
bits -= 8;
out.push((buf >> bits) as u8);
buf &= (1 << bits) - 1;
}
}
Ok(out)
}
}