use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use anyhow::Result;
use dashmap::DashMap;
use serde_json::Value;
use parking_lot::RwLock;
use crate::store::{Dek, Node, ObjectStore};
use crate::index::{IdIndex, OrderedValue, SortedIndexes};
use crate::graph::GraphStore;
use crate::migrate;
#[derive(serde::Serialize, serde::Deserialize)]
struct Manifest {
seq: u64,
head: String,
}
pub struct Db {
pub objects: ObjectStore,
pub id_index: IdIndex,
pub sorted_indexes: SortedIndexes,
pub graph: GraphStore,
pub root: PathBuf,
manifest_dirty: Arc<AtomicBool>,
pub seq: AtomicU64,
head: RwLock<String>,
pub startup_ready: Arc<AtomicBool>,
seq_index: Arc<DashMap<u64, String>>,
}
impl Db {
pub fn in_memory() -> Self {
Self {
objects: ObjectStore::in_memory(),
id_index: IdIndex::in_memory(),
sorted_indexes: SortedIndexes::new(),
graph: GraphStore::in_memory(),
root: std::path::PathBuf::from(":memory:"),
seq: AtomicU64::new(0),
head: RwLock::new(String::new()),
startup_ready: Arc::new(AtomicBool::new(true)), manifest_dirty: Arc::new(AtomicBool::new(false)),
seq_index: Arc::new(DashMap::new()),
}
}
pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
std::fs::create_dir_all(db_root)?;
let objects = ObjectStore::new(db_root, dek.clone())?;
let id_index = IdIndex::new(db_root)?;
let sorted_indexes = SortedIndexes::new();
let graph = GraphStore::new(db_root)?;
let mut db = Self {
objects,
id_index,
sorted_indexes,
graph,
root: db_root.to_path_buf(),
seq: AtomicU64::new(0),
head: RwLock::new(String::new()),
startup_ready: Arc::new(AtomicBool::new(false)),
manifest_dirty: Arc::new(AtomicBool::new(false)),
seq_index: Arc::new(DashMap::new()),
};
migrate::migrate_if_needed(
db_root,
&db.objects,
&db.id_index,
&db.sorted_indexes,
&db.graph,
dek.as_ref(),
)?;
db.startup_rebuild()?;
Ok(db)
}
fn startup_rebuild(&mut self) -> Result<()> {
let manifest_path = self.root.join("MANIFEST");
let needs_index_rebuild = !self.sorted_indexes.is_empty();
if manifest_path.exists() && !needs_index_rebuild {
if let Some(m) = fs::read_to_string(&manifest_path)
.ok()
.and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
{
if m.head.len() < 8 {
eprintln!(" [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
} else {
self.seq.store(m.seq, Ordering::SeqCst); *self.head.write() = m.head.clone();
self.startup_ready.store(true, Ordering::SeqCst);
println!(" [nedbd] warm start — seq={} head={}...", m.seq, &m.head[..8]);
return Ok(());
}
} else {
eprintln!(" [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
}
}
println!(" [nedbd] cold start — background scan will start after heap allocation");
Ok(())
}
pub fn start_cold_scan(self_arc: Arc<Self>) {
if self_arc.startup_ready.load(Ordering::SeqCst) {
return; }
if self_arc.objects.all_hashes().next().is_none() {
self_arc.startup_ready.store(true, Ordering::SeqCst);
return;
}
println!(" [nedbd] cold start — background scan starting, server accepting reads now");
std::thread::spawn(move || {
let db = self_arc;
cold_scan_background_arc(db);
});
}
pub fn put(
&self,
coll: &str,
id: &str,
data: Value,
caused_by: Vec<String>,
valid_from: Option<String>,
valid_to: Option<String>,
) -> Result<Node> {
let seq = self.seq.fetch_add(1, Ordering::SeqCst);
let prev = self.id_index.get(coll, id);
if let Some(old_hash) = &prev {
if let Ok(old_node) = self.objects.read(old_hash) {
if let Value::Object(ref obj) = old_node.data {
for (field, value) in obj {
self.sorted_indexes.remove(coll, field, value, old_hash);
}
}
}
}
let mut node = Node {
id: id.to_string(),
coll: coll.to_string(),
seq,
data: data.clone(),
prev,
caused_by: caused_by.clone(),
ts: now(),
valid_from,
valid_to,
hash: String::new(),
};
let hash = self.objects.write(&mut node)?;
self.seq_index.insert(seq, hash.clone());
self.id_index.set(coll, id, &hash)?;
if let Value::Object(ref obj) = data {
for (field, value) in obj {
if self.sorted_indexes.has(coll, field) {
self.sorted_indexes.insert(coll, field, value, &hash);
}
}
}
for cause in &caused_by {
self.graph.add_edge(&hash, "caused_by", cause)?;
self.graph.add_edge(cause, "caused_by_rev", &hash)?;
}
self.update_head(seq, &hash);
Ok(node)
}
pub fn put_batch(
&self,
ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
) -> Result<Vec<Node>> {
use rayon::prelude::*;
if ops.is_empty() { return Ok(vec![]); }
let n = ops.len() as u64;
let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
let ts = now();
let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
let prev = self.id_index.get(&coll, &id);
Node {
id, coll, seq: base_seq + i as u64,
data, prev, caused_by,
ts, valid_from, valid_to,
hash: String::new(),
}
}).collect();
let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
.filter_map(|node| self.objects.write(node).err())
.collect();
if let Some(e) = write_errors.into_iter().next() { return Err(e); }
let index_errors: Vec<anyhow::Error> = nodes.par_iter()
.filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
.collect();
if let Some(e) = index_errors.into_iter().next() { return Err(e); }
for node in &nodes {
self.seq_index.insert(node.seq, node.hash.clone());
if let Value::Object(ref obj) = node.data {
for (field, value) in obj {
if self.sorted_indexes.has(&node.coll, field) {
self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
}
}
}
for cause in &node.caused_by {
self.graph.add_edge(&node.hash, "caused_by", cause).ok();
self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
}
}
for node in &nodes {
self.update_head(node.seq, &node.hash);
}
Ok(nodes)
}
fn update_head(&self, seq: u64, new_hash: &str) {
use blake2::{Blake2b512, Digest};
let prev = self.head.read().clone();
let mut h = Blake2b512::new();
h.update(prev.as_bytes());
h.update(seq.to_le_bytes());
h.update(new_hash.as_bytes());
*self.head.write() = hex::encode(&h.finalize()[..32]);
self.manifest_dirty.store(true, Ordering::Release);
}
pub fn flush_all(&self) {
self.id_index.flush_write_buf();
self.flush_manifest();
}
pub fn flush_manifest_if_dirty(&self) {
if self.root == std::path::PathBuf::from(":memory:") { return; }
if self.manifest_dirty.compare_exchange(
true, false, Ordering::AcqRel, Ordering::Relaxed
).is_ok() {
self.flush_manifest();
}
}
pub fn flush_manifest(&self) {
if self.root == std::path::PathBuf::from(":memory:") { return; }
let seq = self.seq.load(Ordering::SeqCst);
let head = self.head.read().clone();
let m = Manifest { seq, head };
if let Ok(json) = serde_json::to_string(&m) {
let path = self.root.join("MANIFEST");
let tmp = self.root.join("MANIFEST.tmp");
let _ = fs::write(&tmp, &json);
let _ = fs::rename(&tmp, &path);
}
}
pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
let db = self_arc;
std::thread::spawn(move || {
loop {
std::thread::sleep(std::time::Duration::from_millis(interval_ms));
db.id_index.flush_write_buf();
db.flush_manifest_if_dirty();
}
});
}
pub fn head(&self) -> String {
self.head.read().clone()
}
pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
let prev = match self.id_index.get(coll, id) {
None => return Ok(false), Some(h) => h,
};
let seq = self.seq.fetch_add(1, Ordering::SeqCst);
let mut tombstone = Node {
id: format!("_del_{}", id),
coll: coll.to_string(),
seq,
data: serde_json::json!({"_deleted": id, "_prev": prev}),
prev: Some(prev),
caused_by: vec![],
ts: now(),
valid_from: None,
valid_to: None,
hash: String::new(),
};
let hash = self.objects.write(&mut tombstone)?;
self.update_head(seq, &hash);
self.id_index.remove(coll, id)?;
Ok(true)
}
pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
let hash = self.id_index.get(coll, id)?;
self.objects.read(&hash).ok()
}
pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
self.objects.read(hash).ok()
}
pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
let hash = self.id_index.get(coll, id)?;
let mut current = self.objects.read(&hash).ok()?;
loop {
if current.seq <= target_seq {
return Some(current);
}
let prev_hash = current.prev.as_deref()?;
current = self.objects.read(prev_hash).ok()?;
}
}
pub fn list(&self, coll: &str) -> Vec<Node> {
self.id_index
.list_ids(coll)
.into_iter()
.filter_map(|id| self.get(coll, &id))
.collect()
}
pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
if self.sorted_indexes.has(coll, field) {
self.sorted_indexes
.top_k_asc(coll, field, limit)
.into_iter()
.filter_map(|h| self.objects.read(&h).ok())
.collect()
} else {
let mut docs = self.list(coll);
docs.sort_by(|a, b| {
let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
av.cmp(&bv)
});
docs.truncate(limit);
docs
}
}
pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
if self.sorted_indexes.has(coll, field) {
self.sorted_indexes
.top_k_desc(coll, field, limit)
.into_iter()
.filter_map(|h| self.objects.read(&h).ok())
.collect()
} else {
let mut docs = self.list(coll);
docs.sort_by(|a, b| {
let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
bv.cmp(&av)
});
docs.truncate(limit);
docs
}
}
pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
self.graph
.trace(hash, "caused_by", reverse, limit)
.into_iter()
.filter_map(|h| self.objects.read(&h).ok())
.collect()
}
pub fn verify(&self) -> (usize, Vec<String>) {
self.objects.verify_all()
}
pub fn create_sorted_index(&self, coll: &str, field: &str) {
self.sorted_indexes.ensure(coll, field);
for id in self.id_index.list_ids(coll) {
if let Some(node) = self.get(coll, &id) {
if let Value::Object(ref obj) = node.data {
if let Some(value) = obj.get(field) {
self.sorted_indexes.insert(coll, field, value, &node.hash);
}
}
}
}
}
pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
self.seq_index.get(&seq).map(|r| r.clone())
}
pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
let (frm_coll, frm_id) = frm.split_once(':')
.ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
let (to_coll, to_id) = to.split_once(':')
.ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
if self.id_index.get(frm_coll, frm_id).is_none() {
anyhow::bail!("link: frm not found: {}", frm);
}
if self.id_index.get(to_coll, to_id).is_none() {
anyhow::bail!("link: to not found: {}", to);
}
let link_id = format!("{}|{}|{}", frm, rel, to);
let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
self.put("__links__", &link_id, doc, vec![], None, None)?;
Ok(())
}
pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
let link_id = format!("{}|{}|{}", frm, rel, to);
self.delete("__links__", &link_id)
}
pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
self.id_index
.list_ids("__links__")
.into_iter()
.filter_map(|id| self.get("__links__", &id))
.filter(|node| {
node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
&& node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
})
.filter_map(|node| {
let to = node.data.get("_to")?.as_str()?;
let (to_coll, to_id) = to.split_once(':')?;
self.get(to_coll, to_id)
})
.collect()
}
}
fn cold_scan_background_arc(db: Arc<Db>) {
use rayon::prelude::*;
use blake2::{Blake2b512, Digest};
let objects = &db.objects;
let head = &db.head;
let seq_atomic = &db.seq;
let sorted_indexes = &db.sorted_indexes;
let root = db.root.clone();
let ready_flag = Arc::clone(&db.startup_ready);
let hashes: Vec<String> = objects.all_hashes().collect();
let total = hashes.len();
if total == 0 {
ready_flag.store(true, Ordering::SeqCst);
return;
}
println!(" [nedbd] background scan — {} objects...", total);
let t0 = std::time::Instant::now();
let step = (total / 10).max(1000);
let nodes: Vec<Node> = hashes.par_iter()
.enumerate()
.filter_map(|(i, h)| {
if i > 0 && i % step == 0 {
let pct = i * 100 / total;
let elapsed = t0.elapsed().as_secs_f32();
let rate = i as f32 / elapsed;
let eta = (total - i) as f32 / rate;
eprint!("\r [nedbd] {:>3}% {:>8} / {:>8} ({:>8.0}/s eta {:.0}s) ",
pct, i, total, rate, eta);
}
objects.read(h).ok()
})
.collect();
eprintln!("\r [nedbd] 100% {:>8} / {:>8} ({:.1}s) ",
total, total, t0.elapsed().as_secs_f32());
let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
seq_atomic.store(max_seq + 1, Ordering::SeqCst);
for node in &nodes {
db.seq_index.insert(node.seq, node.hash.clone());
if let Value::Object(ref obj) = node.data {
for (field, value) in obj {
if sorted_indexes.has(&node.coll, field) {
sorted_indexes.insert(&node.coll, field, value, &node.hash);
}
}
}
}
let mut sorted_hashes = hashes;
sorted_hashes.sort();
let mut h = Blake2b512::new();
h.update(max_seq.to_le_bytes());
for hash_str in &sorted_hashes {
h.update(hash_str.as_bytes());
}
let new_head = hex::encode(&h.finalize()[..32]);
*head.write() = new_head.clone();
let m = Manifest { seq: max_seq, head: new_head };
let json = serde_json::to_string(&m).unwrap_or_default();
let path = root.join("MANIFEST");
let tmp = root.join("MANIFEST.tmp");
let _ = fs::write(&tmp, &json);
let _ = fs::rename(&tmp, &path);
ready_flag.store(true, Ordering::SeqCst);
println!(" [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
}
fn now() -> f64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn put_and_get() {
let dir = tempdir().unwrap();
let db = Db::open(dir.path(), None).unwrap();
db.put(
"blocks", "618000",
serde_json::json!({"height": 618000, "hash": "0000abc"}),
vec![], None, None,
).unwrap();
let node = db.get("blocks", "618000").unwrap();
assert_eq!(node.id, "618000");
assert_eq!(node.data["height"], 618000);
}
#[test]
fn order_by_with_sorted_index() {
let dir = tempdir().unwrap();
let db = Db::open(dir.path(), None).unwrap();
db.create_sorted_index("blocks", "height");
for h in [3u64, 1, 5, 2, 4] {
db.put("blocks", &h.to_string(),
serde_json::json!({"height": h}),
vec![], None, None).unwrap();
}
let asc = db.order_by_asc("blocks", "height", 3);
let heights: Vec<u64> = asc.iter()
.filter_map(|n| n.data["height"].as_u64())
.collect();
assert_eq!(heights, vec![1, 2, 3]);
}
#[test]
fn causal_trace() {
let dir = tempdir().unwrap();
let db = Db::open(dir.path(), None).unwrap();
let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
let trace = db.trace(&c.hash, false, 10);
assert_eq!(trace.len(), 3); }
#[test]
fn as_of() {
let dir = tempdir().unwrap();
let db = Db::open(dir.path(), None).unwrap();
let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
assert_eq!(at_v1.data["v"], 1);
let current = db.get("docs", "x").unwrap();
assert_eq!(current.data["v"], 2);
}
}
#[cfg(test)]
mod tests_v2 {
use super::*;
use tempfile::tempdir;
#[test]
fn seq_index_populated_on_put() {
let db = Db::in_memory();
let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
assert_eq!(db.get_hash_by_seq(9999), None);
}
#[test]
fn seq_index_survives_batch() {
let db = Db::in_memory();
let nodes = db.put_batch(vec![
("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
]).unwrap();
for node in &nodes {
assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
}
}
#[test]
fn link_and_neighbors() {
let db = Db::in_memory();
db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
db.put("trip", "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
db.link("driver:d1", "handles", "trip:t1").unwrap();
db.link("driver:d1", "handles", "trip:t2").unwrap();
db.link("driver:d2", "handles", "trip:t1").unwrap();
let d1_trips = db.neighbors("driver:d1", "handles");
assert_eq!(d1_trips.len(), 2);
let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
assert!(ids.contains("t1") && ids.contains("t2"));
let d2_trips = db.neighbors("driver:d2", "handles");
assert_eq!(d2_trips.len(), 1);
assert_eq!(d2_trips[0].id, "t1");
}
#[test]
fn link_stored_in_links_collection() {
let db = Db::in_memory();
db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
db.link("driver:d1", "handles", "trip:t1").unwrap();
let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
assert!(link_doc.is_some(), "__links__ doc should exist");
let doc = link_doc.unwrap();
assert_eq!(doc.data["_from"], "driver:d1");
assert_eq!(doc.data["_rel"], "handles");
assert_eq!(doc.data["_to"], "trip:t1");
let nb = db.neighbors("driver:d1", "handles");
assert_eq!(nb.len(), 1);
assert_eq!(nb[0].id, "t1");
}
#[test]
fn link_missing_node_errors() {
let db = Db::in_memory();
db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
}
#[test]
fn link_durable_survives_reopen() {
let dir = tempdir().unwrap();
{
let db = Db::open(dir.path(), None).unwrap();
db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
db.link("driver:d1", "handles", "trip:t1").unwrap();
}
let db2 = Db::open(dir.path(), None).unwrap();
db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
let trips = db2.neighbors("driver:d1", "handles");
assert_eq!(trips.len(), 1);
assert_eq!(trips[0].id, "t1");
}
}