1use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22 seq: u64,
23 head: String,
24 #[serde(default)]
28 tip_hash: String,
29 #[serde(default)]
35 coll_tips: std::collections::HashMap<String, String>,
36}
37
38pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
42
43#[derive(Debug, Clone, serde::Serialize)]
49pub struct SinceBatch {
50 pub nodes: Vec<Node>,
52 pub from_seq: u64,
54 pub to_seq: u64,
56 pub head_seq: u64,
58 pub has_more: bool,
60}
61
62#[derive(Debug, Clone, serde::Serialize)]
69pub struct ScanStatus {
70 pub scan_complete: bool,
72 pub tip_seq: u64,
74 pub indexed_seq_min: u64,
76 pub indexed_seq_max: u64,
78 pub indexed_count: usize,
80}
81
82pub struct Db {
83 pub objects: ObjectStore,
84 pub id_index: IdIndex,
85 pub sorted_indexes: SortedIndexes,
86 pub graph: GraphStore,
87 pub root: PathBuf,
88 manifest_dirty: Arc<AtomicBool>,
92 pub seq: AtomicU64,
93 head: RwLock<String>,
95 tip_hash: RwLock<(u64, String)>,
102 coll_tip_hash: Arc<DashMap<String, (u64, String)>>,
108 pub startup_ready: Arc<AtomicBool>,
113 seq_index: Arc<DashMap<u64, String>>,
117}
118
119impl Db {
120 pub fn in_memory() -> Self {
124 Self {
125 objects: ObjectStore::in_memory(),
126 id_index: IdIndex::in_memory(),
127 sorted_indexes: SortedIndexes::new(),
128 graph: GraphStore::in_memory(),
129 root: std::path::PathBuf::from(":memory:"),
130 seq: AtomicU64::new(0),
131 head: RwLock::new(String::new()),
132 tip_hash: RwLock::new((0, String::new())),
133 coll_tip_hash: Arc::new(DashMap::new()),
134 startup_ready: Arc::new(AtomicBool::new(true)), manifest_dirty: Arc::new(AtomicBool::new(false)),
136 seq_index: Arc::new(DashMap::new()),
137 }
138 }
139
140 pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
142 std::fs::create_dir_all(db_root)?;
143
144 let objects = ObjectStore::new(db_root, dek.clone())?;
145 let id_index = IdIndex::new(db_root)?;
146 let sorted_indexes = SortedIndexes::new();
147 let graph = GraphStore::new(db_root)?;
148
149 let mut db = Self {
150 objects,
151 id_index,
152 sorted_indexes,
153 graph,
154 root: db_root.to_path_buf(),
155 seq: AtomicU64::new(0),
156 head: RwLock::new(String::new()),
157 tip_hash: RwLock::new((0, String::new())),
158 coll_tip_hash: Arc::new(DashMap::new()),
159 startup_ready: Arc::new(AtomicBool::new(false)),
160 manifest_dirty: Arc::new(AtomicBool::new(false)),
161 seq_index: Arc::new(DashMap::new()),
162 };
163
164 migrate::migrate_if_needed(
166 db_root,
167 &db.objects,
168 &db.id_index,
169 &db.sorted_indexes,
170 &db.graph,
171 dek.as_ref(),
172 )?;
173
174 db.startup_rebuild()?;
177
178 Ok(db)
179 }
180
181 fn startup_rebuild(&mut self) -> Result<()> {
186 let manifest_path = self.root.join("MANIFEST");
187 let needs_index_rebuild = !self.sorted_indexes.is_empty();
188
189 if manifest_path.exists() && !needs_index_rebuild {
191 if let Some(m) = fs::read_to_string(&manifest_path)
192 .ok()
193 .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
194 {
195 if m.head.len() < 8 {
198 eprintln!(" [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
199 } else {
200 if m.tip_hash.is_empty() {
218 eprintln!(" [nedbd] MANIFEST predates durable tip() — warm boot; tip()/tip_collection() heal on first flush (no forced scan)");
219 }
220 self.seq.store(m.seq, Ordering::SeqCst); *self.head.write() = m.head.clone();
222 *self.tip_hash.write() = (m.seq.saturating_sub(1), m.tip_hash.clone());
224 for (coll, hash) in &m.coll_tips {
225 self.coll_tip_hash.insert(coll.clone(), (0, hash.clone()));
230 }
231 self.startup_ready.store(true, Ordering::SeqCst);
232 println!(" [nedbd] warm start — seq={} head={}... tip={}...",
233 m.seq, &m.head[..8],
234 if m.tip_hash.is_empty() { "(pre-2.5.43, heals on flush)" }
235 else { &m.tip_hash[..8.min(m.tip_hash.len())] });
236 return Ok(());
237 }
238 } else {
239 eprintln!(" [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
240 }
241 }
242
243 println!(" [nedbd] cold start — background scan will start after heap allocation");
249 Ok(())
250 }
251
252 pub fn start_cold_scan(self_arc: Arc<Self>) {
256 if self_arc.startup_ready.load(Ordering::SeqCst) {
257 return; }
259 if self_arc.objects.all_hashes().next().is_none() {
262 self_arc.startup_ready.store(true, Ordering::SeqCst);
263 return;
264 }
265 println!(" [nedbd] cold start — background scan starting, server accepting reads now");
266 std::thread::spawn(move || {
267 let db = self_arc;
268 cold_scan_background_arc(db);
269 });
270 }
271
272 pub fn put(
274 &self,
275 coll: &str,
276 id: &str,
277 data: Value,
278 caused_by: Vec<String>,
279 valid_from: Option<String>,
280 valid_to: Option<String>,
281 ) -> Result<Node> {
282 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
283 let prev = self.id_index.get(coll, id);
284
285 if !self.sorted_indexes.is_empty() {
291 if let Some(old_hash) = &prev {
292 if let Ok(old_node) = self.objects.read(old_hash) {
293 if let Value::Object(ref obj) = old_node.data {
294 for (field, value) in obj {
295 self.sorted_indexes.remove(coll, field, value, old_hash);
296 }
297 }
298 }
299 }
300 }
301
302 let mut node = Node {
303 id: id.to_string(),
304 coll: coll.to_string(),
305 seq,
306 data: data.clone(),
307 prev,
308 caused_by: caused_by.clone(),
309 ts: now(),
310 valid_from,
311 valid_to,
312 hash: String::new(),
313 };
314
315 let hash = self.objects.write(&mut node)?;
317 self.seq_index.insert(seq, hash.clone());
318
319 self.id_index.set(coll, id, &hash)?;
321
322 if let Value::Object(ref obj) = data {
324 for (field, value) in obj {
325 if self.sorted_indexes.has(coll, field) {
326 self.sorted_indexes.insert(coll, field, value, &hash);
327 }
328 }
329 }
330
331 for cause in &caused_by {
333 self.graph.add_edge(&hash, "caused_by", cause)?;
334 self.graph.add_edge(cause, "caused_by_rev", &hash)?;
335 }
336
337 self.update_head(coll, seq, &hash);
340
341 Ok(node)
342 }
343
344 pub fn put_batch(
349 &self,
350 ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
351 ) -> Result<Vec<Node>> {
353 use rayon::prelude::*;
354
355 if ops.is_empty() { return Ok(vec![]); }
356 let n = ops.len() as u64;
357
358 let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
360 let ts = now();
361
362 let index_live = !self.sorted_indexes.is_empty();
364 let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
365 let prev = self.id_index.get(&coll, &id);
366 if index_live {
372 if let Some(old_hash) = &prev {
373 if let Ok(old_node) = self.objects.read(old_hash) {
374 if let Value::Object(ref obj) = old_node.data {
375 for (field, value) in obj {
376 self.sorted_indexes.remove(&coll, field, value, old_hash);
377 }
378 }
379 }
380 }
381 }
382 Node {
383 id, coll, seq: base_seq + i as u64,
384 data, prev, caused_by,
385 ts, valid_from, valid_to,
386 hash: String::new(),
387 }
388 }).collect();
389
390 let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
392 .filter_map(|node| self.objects.write(node).err())
393 .collect();
394 if let Some(e) = write_errors.into_iter().next() { return Err(e); }
395
396 let index_errors: Vec<anyhow::Error> = nodes.par_iter()
398 .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
399 .collect();
400 if let Some(e) = index_errors.into_iter().next() { return Err(e); }
401
402 for node in &nodes {
404 self.seq_index.insert(node.seq, node.hash.clone());
405 if let Value::Object(ref obj) = node.data {
406 for (field, value) in obj {
407 if self.sorted_indexes.has(&node.coll, field) {
408 self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
409 }
410 }
411 }
412 for cause in &node.caused_by {
413 self.graph.add_edge(&node.hash, "caused_by", cause).ok();
414 self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
415 }
416 }
417
418 for node in &nodes {
420 self.update_head(&node.coll, node.seq, &node.hash);
421 }
422
423 Ok(nodes)
424 }
425
426 fn update_head(&self, coll: &str, seq: u64, new_hash: &str) {
442 use blake2::{Blake2b512, Digest};
443 {
444 let mut head = self.head.write();
445 let mut h = Blake2b512::new();
446 h.update(head.as_bytes());
447 h.update(seq.to_le_bytes());
448 h.update(new_hash.as_bytes());
449 *head = hex::encode(&h.finalize()[..32]);
450 }
451 {
452 let mut tip = self.tip_hash.write();
453 if seq >= tip.0 {
454 *tip = (seq, new_hash.to_string());
455 }
456 }
457 self.coll_tip_hash
458 .entry(coll.to_string())
459 .and_modify(|t| {
460 if seq >= t.0 {
461 *t = (seq, new_hash.to_string());
462 }
463 })
464 .or_insert_with(|| (seq, new_hash.to_string()));
465 self.manifest_dirty.store(true, Ordering::Release);
467 }
468
469 pub fn flush_all(&self) {
471 self.id_index.flush_write_buf();
472 if let Err(e) = self.objects.sync() {
475 eprintln!("nedb: segment sync failed: {}", e);
476 }
477 self.flush_manifest();
478 }
479
480 pub fn compact(&self) -> Result<crate::segment::CompactStats> {
489 self.flush_all();
490 let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
491 for coll in self.id_index.collections() {
492 for id in self.id_index.list_ids(&coll) {
493 if let Some(h) = self.id_index.get(&coll, &id) {
494 live.insert(h);
495 }
496 }
497 }
498 self.objects.compact(&live)
499 }
500
501 pub fn flush_manifest_if_dirty(&self) {
503 if self.root == std::path::PathBuf::from(":memory:") { return; }
504 if self.manifest_dirty.compare_exchange(
505 true, false, Ordering::AcqRel, Ordering::Relaxed
506 ).is_ok() {
507 self.flush_manifest();
508 }
509 }
510
511 pub fn flush_manifest(&self) {
513 if self.root == std::path::PathBuf::from(":memory:") { return; }
514 let seq = self.seq.load(Ordering::SeqCst);
515 let head = self.head.read().clone();
516 let tip_hash = self.tip_hash.read().1.clone();
517 let coll_tips: std::collections::HashMap<String, String> = self.coll_tip_hash
518 .iter()
519 .map(|kv| (kv.key().clone(), kv.value().1.clone()))
520 .collect();
521 let m = Manifest { seq, head, tip_hash, coll_tips };
522 if let Ok(json) = serde_json::to_string(&m) {
523 let path = self.root.join("MANIFEST");
524 let tmp = self.root.join("MANIFEST.tmp");
525 let wrote = (|| -> std::io::Result<()> {
532 use std::io::Write;
533 let mut f = fs::File::create(&tmp)?;
534 f.write_all(json.as_bytes())?;
535 f.sync_all()
536 })();
537 if wrote.is_ok() && fs::rename(&tmp, &path).is_ok() {
538 #[cfg(unix)]
542 if let Ok(dir) = fs::File::open(&self.root) {
543 let _ = dir.sync_all();
544 }
545 }
546 }
547 }
548
549 pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
553 let db = self_arc;
554 std::thread::spawn(move || {
555 loop {
556 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
557 db.id_index.flush_write_buf();
559 if db.manifest_dirty.load(Ordering::Acquire) {
569 if let Err(e) = db.objects.sync() {
570 eprintln!("nedb: segment sync failed: {}", e);
571 }
572 db.flush_manifest_if_dirty();
573 }
574 }
575 });
576 }
577
578 pub fn head(&self) -> String {
580 self.head.read().clone()
581 }
582
583 pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
586 let prev = match self.id_index.get(coll, id) {
587 None => return Ok(false), Some(h) => h,
589 };
590 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
591 let mut tombstone = Node {
592 id: format!("_del_{}", id),
593 coll: coll.to_string(),
594 seq,
595 data: serde_json::json!({"_deleted": id, "_prev": prev}),
596 prev: Some(prev),
597 caused_by: vec![],
598 ts: now(),
599 valid_from: None,
600 valid_to: None,
601 hash: String::new(),
602 };
603 let hash = self.objects.write(&mut tombstone)?;
604 self.update_head(coll, seq, &hash);
605 self.id_index.remove(coll, id)?;
607 Ok(true)
608 }
609
610 pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
612 let hash = self.id_index.get(coll, id)?;
613 self.objects.read(&hash).ok()
614 }
615
616 pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
618 self.objects.read(hash).ok()
619 }
620
621 pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
624 let hash = self.id_index.get(coll, id)?;
625 let mut current = self.objects.read(&hash).ok()?;
626 loop {
627 if current.seq <= target_seq {
628 return Some(current);
629 }
630 let prev_hash = current.prev.as_deref()?;
631 current = self.objects.read(prev_hash).ok()?;
632 }
633 }
634
635 pub fn list(&self, coll: &str) -> Vec<Node> {
637 self.id_index
638 .list_ids(coll)
639 .into_iter()
640 .filter_map(|id| self.get(coll, &id))
641 .collect()
642 }
643
644 pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
646 if self.sorted_indexes.has(coll, field) {
647 self.sorted_indexes
648 .top_k_asc(coll, field, limit)
649 .into_iter()
650 .filter_map(|h| self.objects.read(&h).ok())
651 .collect()
652 } else {
653 let mut docs = self.list(coll);
654 docs.sort_by(|a, b| {
655 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
656 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
657 av.cmp(&bv)
658 });
659 docs.truncate(limit);
660 docs
661 }
662 }
663
664 pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
666 if self.sorted_indexes.has(coll, field) {
667 self.sorted_indexes
668 .top_k_desc(coll, field, limit)
669 .into_iter()
670 .filter_map(|h| self.objects.read(&h).ok())
671 .collect()
672 } else {
673 let mut docs = self.list(coll);
674 docs.sort_by(|a, b| {
675 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
676 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
677 bv.cmp(&av)
678 });
679 docs.truncate(limit);
680 docs
681 }
682 }
683
684 pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
686 self.graph
687 .trace(hash, "caused_by", reverse, limit)
688 .into_iter()
689 .filter_map(|h| self.objects.read(&h).ok())
690 .collect()
691 }
692
693 pub fn verify(&self) -> (usize, Vec<String>) {
695 self.objects.verify_all()
696 }
697
698 pub fn create_sorted_index(&self, coll: &str, field: &str) {
700 self.sorted_indexes.ensure(coll, field);
701 for id in self.id_index.list_ids(coll) {
703 if let Some(node) = self.get(coll, &id) {
704 if let Value::Object(ref obj) = node.data {
705 if let Some(value) = obj.get(field) {
706 self.sorted_indexes.insert(coll, field, value, &node.hash);
707 }
708 }
709 }
710 }
711 }
712
713 pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
716 self.seq_index.get(&seq).map(|r| r.clone())
717 }
718
719 pub fn tip(&self) -> Option<Node> {
727 let next = self.seq.load(Ordering::SeqCst);
728 if next == 0 {
729 return None; }
731 if let Some(hash) = self.get_hash_by_seq(next - 1) {
734 return self.get_by_hash(&hash);
735 }
736 let th = self.tip_hash.read().1.clone();
740 if !th.is_empty() {
741 return self.get_by_hash(&th);
742 }
743 None
744 }
745
746 pub fn tip_collection(&self, coll: &str) -> Option<Node> {
757 let hash = self.coll_tip_hash.get(coll)?.1.clone();
758 self.get_by_hash(&hash)
759 }
760
761 pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
772 let next = self.seq.load(Ordering::SeqCst); let head_seq = next.saturating_sub(1);
774 let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
775 let mut nodes: Vec<Node> = Vec::new();
776 let mut to_seq = after_seq;
777 let mut hit_limit = false;
778 let mut s = after_seq.saturating_add(1);
779 while s < next {
780 if nodes.len() >= cap { hit_limit = true; break; }
781 if let Some(hash) = self.get_hash_by_seq(s) {
782 if let Some(node) = self.get_by_hash(&hash) {
783 to_seq = node.seq;
784 nodes.push(node);
785 }
786 }
787 s += 1;
788 }
789 SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
790 }
791
792 pub fn scan_status(&self) -> ScanStatus {
799 let next = self.seq.load(Ordering::SeqCst);
800 let mut min = u64::MAX;
801 let mut max = 0u64;
802 let mut count = 0usize;
803 for kv in self.seq_index.iter() {
804 let s = *kv.key();
805 if s < min { min = s; }
806 if s > max { max = s; }
807 count += 1;
808 }
809 if count == 0 { min = 0; }
810 ScanStatus {
811 scan_complete: self.startup_ready.load(Ordering::SeqCst),
812 tip_seq: next.saturating_sub(1),
813 indexed_seq_min: min,
814 indexed_seq_max: max,
815 indexed_count: count,
816 }
817 }
818
819 pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
824 let (frm_coll, frm_id) = frm.split_once(':')
825 .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
826 let (to_coll, to_id) = to.split_once(':')
827 .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
828 if self.id_index.get(frm_coll, frm_id).is_none() {
829 anyhow::bail!("link: frm not found: {}", frm);
830 }
831 if self.id_index.get(to_coll, to_id).is_none() {
832 anyhow::bail!("link: to not found: {}", to);
833 }
834 let link_id = format!("{}|{}|{}", frm, rel, to);
835 let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
836 self.put("__links__", &link_id, doc, vec![], None, None)?;
837 Ok(())
838 }
839
840 pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
842 let link_id = format!("{}|{}|{}", frm, rel, to);
843 self.delete("__links__", &link_id)
844 }
845
846 pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
849 self.id_index
850 .list_ids("__links__")
851 .into_iter()
852 .filter_map(|id| self.get("__links__", &id))
853 .filter(|node| {
854 node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
855 && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
856 })
857 .filter_map(|node| {
858 let to = node.data.get("_to")?.as_str()?;
859 let (to_coll, to_id) = to.split_once(':')?;
860 self.get(to_coll, to_id)
861 })
862 .collect()
863 }
864}
865
866impl Drop for Db {
867 fn drop(&mut self) {
883 self.flush_all();
884 }
885}
886
887fn cold_scan_background_arc(db: Arc<Db>) {
889 use rayon::prelude::*;
890 use blake2::{Blake2b512, Digest};
891
892 let objects = &db.objects;
893 let head = &db.head;
894 let seq_atomic = &db.seq;
895 let sorted_indexes = &db.sorted_indexes;
896 let seq_index = &db.seq_index;
897 let ready_flag = Arc::clone(&db.startup_ready);
898
899 let hashes: Vec<String> = objects.all_hashes().collect();
900 let total = hashes.len();
901
902 if total == 0 {
903 ready_flag.store(true, Ordering::SeqCst);
904 return;
905 }
906
907 println!(" [nedbd] background scan — {} objects...", total);
908 let t0 = std::time::Instant::now();
909 let step = (total / 10).max(1000);
910
911 let nodes: Vec<Node> = hashes.par_iter()
921 .enumerate()
922 .filter_map(|(i, h)| {
923 if i > 0 && i % step == 0 {
924 let pct = i * 100 / total;
925 let elapsed = t0.elapsed().as_secs_f32();
926 let rate = i as f32 / elapsed;
927 let eta = (total - i) as f32 / rate;
928 eprint!("\r [nedbd] {:>3}% {:>8} / {:>8} ({:>8.0}/s eta {:.0}s) ",
929 pct, i, total, rate, eta);
930 }
931 let node = objects.read(h).ok()?;
932 seq_index.insert(node.seq, node.hash.clone());
933 Some(node)
934 })
935 .collect();
936
937 eprintln!("\r [nedbd] 100% {:>8} / {:>8} ({:.1}s) ",
938 total, total, t0.elapsed().as_secs_f32());
939
940 let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
941 seq_atomic.store(max_seq + 1, Ordering::SeqCst);
942
943 let mut coll_max: std::collections::HashMap<String, (u64, String)> = std::collections::HashMap::new();
948
949 for node in &nodes {
950 coll_max.entry(node.coll.clone())
952 .and_modify(|(s, h)| if node.seq > *s { *s = node.seq; *h = node.hash.clone(); })
953 .or_insert_with(|| (node.seq, node.hash.clone()));
954 if let Value::Object(ref obj) = node.data {
955 for (field, value) in obj {
956 if sorted_indexes.has(&node.coll, field) {
957 sorted_indexes.insert(&node.coll, field, value, &node.hash);
958 }
959 }
960 }
961 }
962
963 for (coll, (seq, hash)) in coll_max {
964 db.coll_tip_hash.insert(coll, (seq, hash));
965 }
966
967 let mut sorted_hashes = hashes;
969 sorted_hashes.sort();
970 let mut h = Blake2b512::new();
971 h.update(max_seq.to_le_bytes());
972 for hash_str in &sorted_hashes {
973 h.update(hash_str.as_bytes());
974 }
975 let new_head = hex::encode(&h.finalize()[..32]);
976 *head.write() = new_head;
977
978 let tip_hash = db.seq_index.iter()
981 .max_by_key(|kv| *kv.key())
982 .map(|kv| kv.value().clone())
983 .unwrap_or_default();
984 *db.tip_hash.write() = (max_seq, tip_hash);
985
986 db.flush_manifest();
993
994 ready_flag.store(true, Ordering::SeqCst);
996 println!(" [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
997}
998
999fn now() -> f64 {
1000 std::time::SystemTime::now()
1001 .duration_since(std::time::UNIX_EPOCH)
1002 .map(|d| d.as_secs_f64())
1003 .unwrap_or(0.0)
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use super::*;
1009 use tempfile::tempdir;
1010
1011 #[test]
1012 fn put_and_get() {
1013 let dir = tempdir().unwrap();
1014 let db = Db::open(dir.path(), None).unwrap();
1015 db.put(
1016 "blocks", "618000",
1017 serde_json::json!({"height": 618000, "hash": "0000abc"}),
1018 vec![], None, None,
1019 ).unwrap();
1020 let node = db.get("blocks", "618000").unwrap();
1021 assert_eq!(node.id, "618000");
1022 assert_eq!(node.data["height"], 618000);
1023 }
1024
1025 #[test]
1026 fn order_by_with_sorted_index() {
1027 let dir = tempdir().unwrap();
1028 let db = Db::open(dir.path(), None).unwrap();
1029 db.create_sorted_index("blocks", "height");
1030 for h in [3u64, 1, 5, 2, 4] {
1031 db.put("blocks", &h.to_string(),
1032 serde_json::json!({"height": h}),
1033 vec![], None, None).unwrap();
1034 }
1035 let asc = db.order_by_asc("blocks", "height", 3);
1036 let heights: Vec<u64> = asc.iter()
1037 .filter_map(|n| n.data["height"].as_u64())
1038 .collect();
1039 assert_eq!(heights, vec![1, 2, 3]);
1040 }
1041
1042 #[test]
1043 fn causal_trace() {
1044 let dir = tempdir().unwrap();
1045 let db = Db::open(dir.path(), None).unwrap();
1046 let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
1047 let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
1048 let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
1049
1050 let trace = db.trace(&c.hash, false, 10);
1051 assert_eq!(trace.len(), 3); }
1053
1054 #[test]
1055 fn as_of() {
1056 let dir = tempdir().unwrap();
1057 let db = Db::open(dir.path(), None).unwrap();
1058 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1059 let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1060
1061 let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
1062 assert_eq!(at_v1.data["v"], 1);
1063 let current = db.get("docs", "x").unwrap();
1064 assert_eq!(current.data["v"], 2);
1065 }
1066}
1067
1068#[cfg(test)]
1069mod tests_v2 {
1070 use super::*;
1071 use tempfile::tempdir;
1072
1073 #[test]
1074 fn seq_index_populated_on_put() {
1075 let db = Db::in_memory();
1076 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
1077 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
1078 assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
1079 assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
1080 assert_eq!(db.get_hash_by_seq(9999), None);
1081 }
1082
1083 #[test]
1084 fn tip_and_since() {
1085 let db = Db::in_memory();
1086 assert!(db.tip().is_none());
1088 assert!(db.since(0, 0).nodes.is_empty());
1089
1090 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
1091 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
1092
1093 let t = db.tip().expect("tip after writes");
1095 assert_eq!(t.seq, b.seq);
1096 assert_eq!(t.id, "b");
1097 assert_eq!(t.hash, b.hash);
1098
1099 let after_a = db.since(a.seq, 0);
1101 assert_eq!(after_a.nodes.len(), 1);
1102 assert_eq!(after_a.nodes[0].id, "b");
1103 assert_eq!(after_a.from_seq, a.seq);
1104 assert_eq!(after_a.to_seq, b.seq);
1105 assert_eq!(after_a.head_seq, b.seq);
1106 assert!(!after_a.has_more);
1107
1108 assert!(db.since(b.seq, 0).nodes.is_empty());
1110
1111 let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
1113 let page = db.since(a.seq, 1); assert_eq!(page.nodes.len(), 1);
1115 assert_eq!(page.nodes[0].id, "b");
1116 assert_eq!(page.to_seq, b.seq);
1117 assert!(page.has_more);
1118 let page2 = db.since(page.to_seq, 1); assert_eq!(page2.nodes.len(), 1);
1120 assert_eq!(page2.nodes[0].id, "c");
1121 assert_eq!(page2.to_seq, c.seq);
1122 assert!(!page2.has_more);
1123 }
1124
1125 #[test]
1126 fn tip_collection_per_chain() {
1127 let db = Db::in_memory();
1130 assert!(db.tip_collection("blocks").is_none());
1131
1132 db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
1133 db.put("tx", "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1134 let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1135 let t1 = db.put("tx", "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1136
1137 assert_eq!(db.tip().unwrap().id, "t1");
1139 let bt = db.tip_collection("blocks").expect("blocks tip");
1141 assert_eq!(bt.id, "b1");
1142 assert_eq!(bt.seq, b1.seq);
1143 assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
1144 assert!(db.tip_collection("absent").is_none());
1145 }
1146
1147 #[test]
1148 fn seq_index_survives_batch() {
1149 let db = Db::in_memory();
1150 let nodes = db.put_batch(vec![
1151 ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
1152 ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
1153 ]).unwrap();
1154 for node in &nodes {
1155 assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
1156 }
1157 }
1158
1159 #[test]
1165 fn put_batch_removes_superseded_sorted_index_entries() {
1166 let db = Db::in_memory();
1167 db.create_sorted_index("blocks", "height");
1168 db.put("blocks", "x", serde_json::json!({"height": 1}), vec![], None, None).unwrap();
1169 db.put_batch(vec![
1170 ("blocks".into(), "x".into(), serde_json::json!({"height": 99}), vec![], None, None),
1171 ]).unwrap();
1172
1173 let asc = db.order_by_asc("blocks", "height", 10);
1174 assert_eq!(asc.len(), 1, "stale index entry for the superseded version must be gone");
1175 assert_eq!(asc[0].data["height"], 99);
1176 assert_eq!(asc[0].id, "x");
1177 }
1178
1179 #[test]
1182 fn update_without_indexes_preserves_chain() {
1183 let db = Db::in_memory();
1184 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1185 let v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1186 assert_eq!(v2.prev.as_deref(), Some(v1.hash.as_str()), "prev chain must survive the fast path");
1187 assert_eq!(db.get("docs", "x").unwrap().data["v"], 2);
1188 assert_eq!(db.get_as_of("docs", "x", v1.seq).unwrap().data["v"], 1);
1189 }
1190
1191 #[test]
1192 fn link_and_neighbors() {
1193 let db = Db::in_memory();
1194 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1195 db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
1196 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1197 db.put("trip", "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1198
1199 db.link("driver:d1", "handles", "trip:t1").unwrap();
1200 db.link("driver:d1", "handles", "trip:t2").unwrap();
1201 db.link("driver:d2", "handles", "trip:t1").unwrap();
1202
1203 let d1_trips = db.neighbors("driver:d1", "handles");
1204 assert_eq!(d1_trips.len(), 2);
1205 let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
1206 assert!(ids.contains("t1") && ids.contains("t2"));
1207
1208 let d2_trips = db.neighbors("driver:d2", "handles");
1209 assert_eq!(d2_trips.len(), 1);
1210 assert_eq!(d2_trips[0].id, "t1");
1211 }
1212
1213 #[test]
1214 fn link_stored_in_links_collection() {
1215 let db = Db::in_memory();
1218 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1219 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1220 db.link("driver:d1", "handles", "trip:t1").unwrap();
1221 let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1223 assert!(link_doc.is_some(), "__links__ doc should exist");
1224 let doc = link_doc.unwrap();
1225 assert_eq!(doc.data["_from"], "driver:d1");
1226 assert_eq!(doc.data["_rel"], "handles");
1227 assert_eq!(doc.data["_to"], "trip:t1");
1228 let nb = db.neighbors("driver:d1", "handles");
1230 assert_eq!(nb.len(), 1);
1231 assert_eq!(nb[0].id, "t1");
1232 }
1233
1234 #[test]
1235 fn link_missing_node_errors() {
1236 let db = Db::in_memory();
1237 db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1238 assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1239 }
1240
1241 #[test]
1242 fn link_durable_survives_reopen() {
1243 let dir = tempdir().unwrap();
1244 {
1245 let db = Db::open(dir.path(), None).unwrap();
1246 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1247 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1248 db.link("driver:d1", "handles", "trip:t1").unwrap();
1249 }
1250 let db2 = Db::open(dir.path(), None).unwrap();
1251 db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1252 let trips = db2.neighbors("driver:d1", "handles");
1253 assert_eq!(trips.len(), 1);
1254 assert_eq!(trips[0].id, "t1");
1255 }
1256
1257 #[test]
1258 fn tip_survives_warm_restart() {
1259 let dir = tempdir().unwrap();
1263 {
1264 let db = Db::open(dir.path(), None).unwrap();
1265 db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1266 db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1267 db.flush_all(); assert_eq!(db.tip().expect("tip in-session").id, "b2");
1269 }
1270 let db2 = Db::open(dir.path(), None).unwrap();
1272 assert!(db2.get_hash_by_seq(1).is_none(), "seq_index is cold on a warm boot");
1273 let tip = db2.tip().expect("tip() must survive a warm restart");
1274 assert_eq!(tip.id, "b2");
1275 assert_eq!(tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1276 }
1277
1278 #[test]
1279 fn tip_collection_survives_warm_restart() {
1280 let dir = tempdir().unwrap();
1284 {
1285 let db = Db::open(dir.path(), None).unwrap();
1286 db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1287 db.put("tx", "t1", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1288 let b2 = db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1289 db.flush_all(); assert_eq!(db.tip_collection("blocks").unwrap().id, "b2");
1291 assert_eq!(db.tip_collection("blocks").unwrap().seq, b2.seq);
1292 }
1293 let db2 = Db::open(dir.path(), None).unwrap();
1295 assert!(db2.get_hash_by_seq(0).is_none(), "seq_index is cold on a warm boot");
1296 let blocks_tip = db2.tip_collection("blocks").expect("tip_collection must survive a warm restart");
1297 assert_eq!(blocks_tip.id, "b2");
1298 assert_eq!(blocks_tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1299 let tx_tip = db2.tip_collection("tx").expect("tx tip must also survive");
1300 assert_eq!(tx_tip.id, "t1");
1301 assert!(db2.tip_collection("absent").is_none());
1302 }
1303
1304 #[test]
1305 fn cold_scan_indexes_every_object_and_reports_completion() {
1306 let dir = tempdir().unwrap();
1313 let n = 25u64;
1314 {
1315 let db = Db::open(dir.path(), None).unwrap();
1316 for i in 0..n {
1317 db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1318 }
1319 db.flush_all();
1320 }
1321 std::fs::remove_file(dir.path().join("MANIFEST")).unwrap();
1326
1327 let db = Db::open(dir.path(), None).unwrap();
1328 assert!(!db.scan_status().scan_complete, "should be cold immediately after open");
1329 let db = std::sync::Arc::new(db);
1330 Db::start_cold_scan(std::sync::Arc::clone(&db));
1331
1332 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
1333 while !db.scan_status().scan_complete {
1334 assert!(std::time::Instant::now() < deadline, "cold scan did not complete in time");
1335 std::thread::sleep(std::time::Duration::from_millis(5));
1336 }
1337
1338 let status = db.scan_status();
1339 assert_eq!(status.indexed_count, n as usize, "every written object must be indexed");
1340 assert!(status.scan_complete);
1341
1342 let tip = db.tip().expect("tip resolves after cold scan");
1343 assert_eq!(tip.data.get("i").and_then(|v| v.as_u64()), Some(n - 1));
1344 let coll_tip = db.tip_collection("things").expect("tip_collection resolves after cold scan");
1345 assert_eq!(coll_tip.id, tip.id);
1346 }
1347
1348 #[test]
1355 fn concurrent_puts_tip_resolves_to_highest_seq_after_warm_restart() {
1356 let dir = tempdir().unwrap();
1357 let total: u64 = 100;
1358 {
1359 let db = std::sync::Arc::new(Db::open(dir.path(), None).unwrap());
1360 let mut handles = vec![];
1361 for t in 0..4u64 {
1362 let db2 = std::sync::Arc::clone(&db);
1363 handles.push(std::thread::spawn(move || {
1364 for i in 0..25u64 {
1365 db2.put("c", &format!("{}-{}", t, i),
1366 serde_json::json!({"t": t, "i": i}),
1367 vec![], None, None).unwrap();
1368 }
1369 }));
1370 }
1371 for h in handles { h.join().unwrap(); }
1372 let expected = db.seq.load(std::sync::atomic::Ordering::SeqCst) - 1;
1374 assert_eq!(expected, total - 1, "exactly {} writes expected", total);
1375 assert_eq!(db.tip().expect("in-session tip").seq, expected);
1376 db.flush_all(); }
1378 let db2 = Db::open(dir.path(), None).unwrap();
1380 let tip = db2.tip().expect("tip must survive warm restart after concurrent writes");
1381 assert_eq!(tip.seq, total - 1, "warm-boot tip must be the highest-seq write");
1382 let ct = db2.tip_collection("c").expect("coll tip survives");
1384 assert_eq!(ct.seq, total - 1);
1385 }
1386
1387 #[test]
1394 fn pre_durable_tip_manifest_warm_boots_and_heals_lazily() {
1395 let dir = tempdir().unwrap();
1396 {
1397 let db = Db::open(dir.path(), None).unwrap();
1398 for i in 0..5u64 {
1399 db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1400 }
1401 db.flush_all();
1402 }
1403 let manifest_path = dir.path().join("MANIFEST");
1405 let m: serde_json::Value =
1406 serde_json::from_str(&std::fs::read_to_string(&manifest_path).unwrap()).unwrap();
1407 let old_format = serde_json::json!({ "seq": m["seq"], "head": m["head"] });
1408 std::fs::write(&manifest_path, serde_json::to_string(&old_format).unwrap()).unwrap();
1409
1410 let db2 = Db::open(dir.path(), None).unwrap();
1412 assert!(db2.startup_ready.load(std::sync::atomic::Ordering::SeqCst),
1413 "pre-2.5.43 MANIFEST must warm-boot, not fall to a cold scan");
1414 assert!(db2.tip().is_none(), "tip() is None until the manifest heals");
1416 let n = db2.put("things", "next", serde_json::json!({"fresh": true}), vec![], None, None).unwrap();
1418 assert_eq!(n.seq, m["seq"].as_u64().unwrap(), "next write takes the persisted next-to-assign seq");
1419 db2.flush_all(); drop(db2);
1421
1422 let db3 = Db::open(dir.path(), None).unwrap();
1424 assert!(db3.startup_ready.load(std::sync::atomic::Ordering::SeqCst));
1425 let tip = db3.tip().expect("tip() must resolve after the organic upgrade");
1426 assert_eq!(tip.id, "next");
1427 }
1428
1429 #[test]
1437 fn manifest_after_cold_scan_does_not_reuse_tip_seq() {
1438 let dir = tempdir().unwrap();
1439 let old_tip_seq;
1440 {
1441 let db = Db::open(dir.path(), None).unwrap();
1442 for i in 0..5u64 {
1443 db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1444 }
1445 db.flush_all();
1446 old_tip_seq = db.tip().unwrap().seq;
1447 }
1448 std::fs::remove_file(dir.path().join("MANIFEST")).unwrap();
1451 {
1452 let db = std::sync::Arc::new(Db::open(dir.path(), None).unwrap());
1453 Db::start_cold_scan(std::sync::Arc::clone(&db));
1454 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
1455 while !db.scan_status().scan_complete {
1456 assert!(std::time::Instant::now() < deadline, "cold scan did not complete");
1457 std::thread::sleep(std::time::Duration::from_millis(5));
1458 }
1459 }
1461 let db3 = Db::open(dir.path(), None).unwrap();
1464 let tip_before = db3.tip().expect("tip survives scan-written MANIFEST");
1465 assert_eq!(tip_before.seq, old_tip_seq, "tip identity preserved across the scan");
1466 let new_node = db3.put("things", "next", serde_json::json!({"fresh": true}),
1467 vec![], None, None).unwrap();
1468 assert!(new_node.seq > old_tip_seq,
1469 "new write reused seq {} (tip was {}) — duplicate seq in the log",
1470 new_node.seq, old_tip_seq);
1471 }
1472}