1use std::fs;
15use std::path::Path;
16use anyhow::{Context, Result};
17use serde_json::Value;
18
19use crate::store::{Dek, Node, ObjectStore};
20use crate::index::{IdIndex, SortedIndexes};
21use crate::graph::GraphStore;
22
23#[derive(Debug)]
25struct V1Op {
26 seq: u64,
27 coll: String,
28 id: String,
29 data: Value,
30 caused_by: Vec<String>, ts: f64,
32 valid_from: Option<String>,
33 valid_to: Option<String>,
34}
35
36fn read_v1_aof(aof_path: &Path, dek: Option<&Dek>) -> Result<Vec<V1Op>> {
38 let raw = fs::read_to_string(aof_path)
39 .context("read log.aof")?;
40
41 let mut ops = Vec::new();
42 let mut skipped = 0usize;
43
44 for (line_num, line) in raw.lines().enumerate() {
45 let line = line.trim();
46 if line.is_empty() { continue; }
47
48 let decoded: Value = match try_decode_line(line, dek) {
50 Ok(v) => v,
51 Err(e) => {
52 skipped += 1;
53 eprintln!(" [nedb-migrate] skip corrupt line {}: {}", line_num + 1, e);
54 break; }
56 };
57
58 let op_type = decoded.get("op").and_then(|v| v.as_str()).unwrap_or("");
60 if op_type != "put" { continue; } let payload = match decoded.get("payload") {
63 Some(p) => p.clone(),
64 None => continue,
65 };
66 let coll = payload.get("coll").and_then(|v| v.as_str()).unwrap_or("").to_string();
67 let id = payload.get("id").and_then(|v| v.as_str()).unwrap_or("").to_string();
68 let data = payload.get("doc").cloned().unwrap_or(Value::Null);
69
70 if coll.is_empty() || id.is_empty() { continue; }
71
72 let seq = decoded.get("seq").and_then(|v| v.as_u64()).unwrap_or(0);
73 let ts = decoded.get("ts").and_then(|v| v.as_f64()).unwrap_or(0.0);
74
75 let caused_by_seqs: Vec<u64> = decoded
77 .get("caused_by")
78 .and_then(|v| v.as_array())
79 .map(|a| a.iter().filter_map(|x| x.as_u64()).collect())
80 .unwrap_or_default();
81
82 ops.push(V1Op {
83 seq, coll, id, data, ts,
84 caused_by: caused_by_seqs.iter().map(|s| s.to_string()).collect(), valid_from: decoded.get("valid_from").and_then(|v| v.as_str()).map(|s| s.to_string()),
86 valid_to: decoded.get("valid_to").and_then(|v| v.as_str()).map(|s| s.to_string()),
87 });
88 }
89
90 if skipped > 0 {
91 eprintln!(
92 " [nedb-migrate] {} op(s) recovered, {} corrupt line(s) truncated",
93 ops.len(), skipped
94 );
95 }
96 Ok(ops)
97}
98
99fn try_decode_line(line: &str, dek: Option<&Dek>) -> Result<Value> {
100 if let Ok(v) = serde_json::from_str::<Value>(line) {
102 return Ok(v);
103 }
104 let envelope: Value = serde_json::from_str(line)
106 .context("parse AOF line as JSON")?;
107 if envelope.get("enc").and_then(|v| v.as_u64()) == Some(1) {
108 if let Some(dek) = dek {
109 let b64 = envelope.get("data")
110 .and_then(|v| v.as_str())
111 .context("missing data field in encrypted envelope")?;
112 let ciphertext = base64_decode(b64)?;
113 let plaintext = decrypt_v1(&ciphertext, dek)?;
114 return Ok(serde_json::from_slice(&plaintext)?);
115 }
116 }
117 anyhow::bail!("cannot decode AOF line")
118}
119
120fn base64_decode(s: &str) -> Result<Vec<u8>> {
121 base64_simple::decode(s).map_err(|e| anyhow::anyhow!("{}", e))
122}
123
124fn decrypt_v1(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
125 use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
126 if data.len() < 12 { anyhow::bail!("ciphertext too short"); }
127 let (nonce_bytes, ciphertext) = data.split_at(12);
128 let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
129 let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
130 cipher.decrypt(nonce, ciphertext)
131 .map_err(|e| anyhow::anyhow!("decrypt: {:?}", e))
132}
133
134pub fn migrate_if_needed(
136 db_root: &Path,
137 object_store: &ObjectStore,
138 id_index: &IdIndex,
139 sorted_indexes: &SortedIndexes,
140 graph: &GraphStore,
141 dek: Option<&Dek>,
142) -> Result<bool> {
143 let aof_path = db_root.join("log.aof");
144 if !aof_path.exists() {
145 return Ok(false); }
147 let bak_path = db_root.join("log.aof.v1.bak");
148 if bak_path.exists() {
149 }
152
153 println!(" [nedb] Detected v1 log.aof — running automatic migration to v2 DAG...");
154
155 let ops = read_v1_aof(&aof_path, dek)?;
156 let total = ops.len();
157 println!(" [nedb] {} op(s) to migrate", total);
158
159 let mut seq_to_hash: std::collections::HashMap<u64, String> = std::collections::HashMap::new();
161
162 for op in &ops {
164 let caused_by_hashes: Vec<String> = op.caused_by
166 .iter()
167 .filter_map(|s| s.parse::<u64>().ok())
168 .filter_map(|seq| seq_to_hash.get(&seq).cloned())
169 .collect();
170
171 let prev = id_index.get(&op.coll, &op.id);
173
174 let mut node = Node {
175 id: op.id.clone(),
176 coll: op.coll.clone(),
177 seq: op.seq,
178 data: op.data.clone(),
179 prev,
180 caused_by: caused_by_hashes.clone(),
181 ts: op.ts,
182 valid_from: op.valid_from.clone(),
183 valid_to: op.valid_to.clone(),
184 hash: String::new(),
185 };
186
187 let hash = object_store.write(&mut node)?;
188 id_index.set(&op.coll, &op.id, &hash)?;
189 seq_to_hash.insert(op.seq, hash.clone());
190
191 for cause_hash in &caused_by_hashes {
193 graph.add_edge(&hash, "caused_by", cause_hash)?;
194 graph.add_edge(cause_hash, "caused_by_rev", &hash)?;
195 }
196
197 if let serde_json::Value::Object(ref obj) = op.data {
199 for (field, value) in obj {
200 if sorted_indexes.has(&op.coll, field) {
201 sorted_indexes.insert(&op.coll, field, value, &hash);
202 }
203 }
204 }
205 }
206
207 fs::rename(&aof_path, &bak_path)
209 .context("rename log.aof to log.aof.v1.bak")?;
210
211 println!(
212 " [nedb] Migration complete: {} op(s) → v2 DAG. Backup: {}",
213 total,
214 bak_path.display()
215 );
216
217 Ok(true)
218}
219
220mod base64_simple {
222 pub fn decode(s: &str) -> Result<Vec<u8>, String> {
223 let s = s.trim();
224 let mut out = Vec::with_capacity(s.len() * 3 / 4);
225 let chars: Vec<u8> = s.bytes().filter(|&b| b != b'=').collect();
226 let table = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
227 let mut buf = 0u32;
228 let mut bits = 0;
229 for &c in &chars {
230 let val = table.iter().position(|&t| t == c)
231 .ok_or_else(|| format!("invalid base64 char: {}", c as char))? as u32;
232 buf = (buf << 6) | val;
233 bits += 6;
234 if bits >= 8 {
235 bits -= 8;
236 out.push((buf >> bits) as u8);
237 buf &= (1 << bits) - 1;
238 }
239 }
240 Ok(out)
241 }
242}