1use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22 seq: u64,
23 head: String,
24 #[serde(default)]
28 tip_hash: String,
29 #[serde(default)]
35 coll_tips: std::collections::HashMap<String, String>,
36}
37
38pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
42
43#[derive(Debug, Clone, serde::Serialize)]
49pub struct SinceBatch {
50 pub nodes: Vec<Node>,
52 pub from_seq: u64,
54 pub to_seq: u64,
56 pub head_seq: u64,
58 pub has_more: bool,
60}
61
62#[derive(Debug, Clone, serde::Serialize)]
69pub struct ScanStatus {
70 pub scan_complete: bool,
72 pub tip_seq: u64,
74 pub indexed_seq_min: u64,
76 pub indexed_seq_max: u64,
78 pub indexed_count: usize,
80}
81
82pub struct Db {
83 pub objects: ObjectStore,
84 pub id_index: IdIndex,
85 pub sorted_indexes: SortedIndexes,
86 pub graph: GraphStore,
87 pub root: PathBuf,
88 manifest_dirty: Arc<AtomicBool>,
92 pub seq: AtomicU64,
93 head: RwLock<String>,
95 tip_hash: RwLock<(u64, String)>,
102 coll_tip_hash: Arc<DashMap<String, (u64, String)>>,
108 pub startup_ready: Arc<AtomicBool>,
113 seq_index: Arc<DashMap<u64, String>>,
117}
118
119impl Db {
120 pub fn in_memory() -> Self {
124 Self {
125 objects: ObjectStore::in_memory(),
126 id_index: IdIndex::in_memory(),
127 sorted_indexes: SortedIndexes::new(),
128 graph: GraphStore::in_memory(),
129 root: std::path::PathBuf::from(":memory:"),
130 seq: AtomicU64::new(0),
131 head: RwLock::new(String::new()),
132 tip_hash: RwLock::new((0, String::new())),
133 coll_tip_hash: Arc::new(DashMap::new()),
134 startup_ready: Arc::new(AtomicBool::new(true)), manifest_dirty: Arc::new(AtomicBool::new(false)),
136 seq_index: Arc::new(DashMap::new()),
137 }
138 }
139
140 pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
142 std::fs::create_dir_all(db_root)?;
143
144 let objects = ObjectStore::new(db_root, dek.clone())?;
145 let id_index = IdIndex::new(db_root)?;
146 let sorted_indexes = SortedIndexes::new();
147 let graph = GraphStore::new(db_root)?;
148
149 let mut db = Self {
150 objects,
151 id_index,
152 sorted_indexes,
153 graph,
154 root: db_root.to_path_buf(),
155 seq: AtomicU64::new(0),
156 head: RwLock::new(String::new()),
157 tip_hash: RwLock::new((0, String::new())),
158 coll_tip_hash: Arc::new(DashMap::new()),
159 startup_ready: Arc::new(AtomicBool::new(false)),
160 manifest_dirty: Arc::new(AtomicBool::new(false)),
161 seq_index: Arc::new(DashMap::new()),
162 };
163
164 migrate::migrate_if_needed(
166 db_root,
167 &db.objects,
168 &db.id_index,
169 &db.sorted_indexes,
170 &db.graph,
171 dek.as_ref(),
172 )?;
173
174 db.startup_rebuild()?;
177
178 Ok(db)
179 }
180
181 fn startup_rebuild(&mut self) -> Result<()> {
186 let manifest_path = self.root.join("MANIFEST");
187 let needs_index_rebuild = !self.sorted_indexes.is_empty();
188
189 if manifest_path.exists() && !needs_index_rebuild {
191 if let Some(m) = fs::read_to_string(&manifest_path)
192 .ok()
193 .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
194 {
195 if m.head.len() < 8 {
198 eprintln!(" [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
199 } else if m.tip_hash.is_empty() {
200 eprintln!(" [nedbd] MANIFEST predates durable tip() — cold scan once to upgrade");
204 } else {
205 self.seq.store(m.seq, Ordering::SeqCst); *self.head.write() = m.head.clone();
207 *self.tip_hash.write() = (m.seq.saturating_sub(1), m.tip_hash.clone());
209 for (coll, hash) in &m.coll_tips {
210 self.coll_tip_hash.insert(coll.clone(), (0, hash.clone()));
215 }
216 self.startup_ready.store(true, Ordering::SeqCst);
217 println!(" [nedbd] warm start — seq={} head={}... tip={}...",
218 m.seq, &m.head[..8], &m.tip_hash[..8.min(m.tip_hash.len())]);
219 return Ok(());
220 }
221 } else {
222 eprintln!(" [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
223 }
224 }
225
226 println!(" [nedbd] cold start — background scan will start after heap allocation");
232 Ok(())
233 }
234
235 pub fn start_cold_scan(self_arc: Arc<Self>) {
239 if self_arc.startup_ready.load(Ordering::SeqCst) {
240 return; }
242 if self_arc.objects.all_hashes().next().is_none() {
245 self_arc.startup_ready.store(true, Ordering::SeqCst);
246 return;
247 }
248 println!(" [nedbd] cold start — background scan starting, server accepting reads now");
249 std::thread::spawn(move || {
250 let db = self_arc;
251 cold_scan_background_arc(db);
252 });
253 }
254
255 pub fn put(
257 &self,
258 coll: &str,
259 id: &str,
260 data: Value,
261 caused_by: Vec<String>,
262 valid_from: Option<String>,
263 valid_to: Option<String>,
264 ) -> Result<Node> {
265 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
266 let prev = self.id_index.get(coll, id);
267
268 if !self.sorted_indexes.is_empty() {
274 if let Some(old_hash) = &prev {
275 if let Ok(old_node) = self.objects.read(old_hash) {
276 if let Value::Object(ref obj) = old_node.data {
277 for (field, value) in obj {
278 self.sorted_indexes.remove(coll, field, value, old_hash);
279 }
280 }
281 }
282 }
283 }
284
285 let mut node = Node {
286 id: id.to_string(),
287 coll: coll.to_string(),
288 seq,
289 data: data.clone(),
290 prev,
291 caused_by: caused_by.clone(),
292 ts: now(),
293 valid_from,
294 valid_to,
295 hash: String::new(),
296 };
297
298 let hash = self.objects.write(&mut node)?;
300 self.seq_index.insert(seq, hash.clone());
301
302 self.id_index.set(coll, id, &hash)?;
304
305 if let Value::Object(ref obj) = data {
307 for (field, value) in obj {
308 if self.sorted_indexes.has(coll, field) {
309 self.sorted_indexes.insert(coll, field, value, &hash);
310 }
311 }
312 }
313
314 for cause in &caused_by {
316 self.graph.add_edge(&hash, "caused_by", cause)?;
317 self.graph.add_edge(cause, "caused_by_rev", &hash)?;
318 }
319
320 self.update_head(coll, seq, &hash);
323
324 Ok(node)
325 }
326
327 pub fn put_batch(
332 &self,
333 ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
334 ) -> Result<Vec<Node>> {
336 use rayon::prelude::*;
337
338 if ops.is_empty() { return Ok(vec![]); }
339 let n = ops.len() as u64;
340
341 let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
343 let ts = now();
344
345 let index_live = !self.sorted_indexes.is_empty();
347 let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
348 let prev = self.id_index.get(&coll, &id);
349 if index_live {
355 if let Some(old_hash) = &prev {
356 if let Ok(old_node) = self.objects.read(old_hash) {
357 if let Value::Object(ref obj) = old_node.data {
358 for (field, value) in obj {
359 self.sorted_indexes.remove(&coll, field, value, old_hash);
360 }
361 }
362 }
363 }
364 }
365 Node {
366 id, coll, seq: base_seq + i as u64,
367 data, prev, caused_by,
368 ts, valid_from, valid_to,
369 hash: String::new(),
370 }
371 }).collect();
372
373 let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
375 .filter_map(|node| self.objects.write(node).err())
376 .collect();
377 if let Some(e) = write_errors.into_iter().next() { return Err(e); }
378
379 let index_errors: Vec<anyhow::Error> = nodes.par_iter()
381 .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
382 .collect();
383 if let Some(e) = index_errors.into_iter().next() { return Err(e); }
384
385 for node in &nodes {
387 self.seq_index.insert(node.seq, node.hash.clone());
388 if let Value::Object(ref obj) = node.data {
389 for (field, value) in obj {
390 if self.sorted_indexes.has(&node.coll, field) {
391 self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
392 }
393 }
394 }
395 for cause in &node.caused_by {
396 self.graph.add_edge(&node.hash, "caused_by", cause).ok();
397 self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
398 }
399 }
400
401 for node in &nodes {
403 self.update_head(&node.coll, node.seq, &node.hash);
404 }
405
406 Ok(nodes)
407 }
408
409 fn update_head(&self, coll: &str, seq: u64, new_hash: &str) {
425 use blake2::{Blake2b512, Digest};
426 {
427 let mut head = self.head.write();
428 let mut h = Blake2b512::new();
429 h.update(head.as_bytes());
430 h.update(seq.to_le_bytes());
431 h.update(new_hash.as_bytes());
432 *head = hex::encode(&h.finalize()[..32]);
433 }
434 {
435 let mut tip = self.tip_hash.write();
436 if seq >= tip.0 {
437 *tip = (seq, new_hash.to_string());
438 }
439 }
440 self.coll_tip_hash
441 .entry(coll.to_string())
442 .and_modify(|t| {
443 if seq >= t.0 {
444 *t = (seq, new_hash.to_string());
445 }
446 })
447 .or_insert_with(|| (seq, new_hash.to_string()));
448 self.manifest_dirty.store(true, Ordering::Release);
450 }
451
452 pub fn flush_all(&self) {
454 self.id_index.flush_write_buf();
455 if let Err(e) = self.objects.sync() {
458 eprintln!("nedb: segment sync failed: {}", e);
459 }
460 self.flush_manifest();
461 }
462
463 pub fn compact(&self) -> Result<crate::segment::CompactStats> {
472 self.flush_all();
473 let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
474 for coll in self.id_index.collections() {
475 for id in self.id_index.list_ids(&coll) {
476 if let Some(h) = self.id_index.get(&coll, &id) {
477 live.insert(h);
478 }
479 }
480 }
481 self.objects.compact(&live)
482 }
483
484 pub fn flush_manifest_if_dirty(&self) {
486 if self.root == std::path::PathBuf::from(":memory:") { return; }
487 if self.manifest_dirty.compare_exchange(
488 true, false, Ordering::AcqRel, Ordering::Relaxed
489 ).is_ok() {
490 self.flush_manifest();
491 }
492 }
493
494 pub fn flush_manifest(&self) {
496 if self.root == std::path::PathBuf::from(":memory:") { return; }
497 let seq = self.seq.load(Ordering::SeqCst);
498 let head = self.head.read().clone();
499 let tip_hash = self.tip_hash.read().1.clone();
500 let coll_tips: std::collections::HashMap<String, String> = self.coll_tip_hash
501 .iter()
502 .map(|kv| (kv.key().clone(), kv.value().1.clone()))
503 .collect();
504 let m = Manifest { seq, head, tip_hash, coll_tips };
505 if let Ok(json) = serde_json::to_string(&m) {
506 let path = self.root.join("MANIFEST");
507 let tmp = self.root.join("MANIFEST.tmp");
508 let wrote = (|| -> std::io::Result<()> {
515 use std::io::Write;
516 let mut f = fs::File::create(&tmp)?;
517 f.write_all(json.as_bytes())?;
518 f.sync_all()
519 })();
520 if wrote.is_ok() && fs::rename(&tmp, &path).is_ok() {
521 #[cfg(unix)]
525 if let Ok(dir) = fs::File::open(&self.root) {
526 let _ = dir.sync_all();
527 }
528 }
529 }
530 }
531
532 pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
536 let db = self_arc;
537 std::thread::spawn(move || {
538 loop {
539 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
540 db.id_index.flush_write_buf();
542 if db.manifest_dirty.load(Ordering::Acquire) {
552 if let Err(e) = db.objects.sync() {
553 eprintln!("nedb: segment sync failed: {}", e);
554 }
555 db.flush_manifest_if_dirty();
556 }
557 }
558 });
559 }
560
561 pub fn head(&self) -> String {
563 self.head.read().clone()
564 }
565
566 pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
569 let prev = match self.id_index.get(coll, id) {
570 None => return Ok(false), Some(h) => h,
572 };
573 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
574 let mut tombstone = Node {
575 id: format!("_del_{}", id),
576 coll: coll.to_string(),
577 seq,
578 data: serde_json::json!({"_deleted": id, "_prev": prev}),
579 prev: Some(prev),
580 caused_by: vec![],
581 ts: now(),
582 valid_from: None,
583 valid_to: None,
584 hash: String::new(),
585 };
586 let hash = self.objects.write(&mut tombstone)?;
587 self.update_head(coll, seq, &hash);
588 self.id_index.remove(coll, id)?;
590 Ok(true)
591 }
592
593 pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
595 let hash = self.id_index.get(coll, id)?;
596 self.objects.read(&hash).ok()
597 }
598
599 pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
601 self.objects.read(hash).ok()
602 }
603
604 pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
607 let hash = self.id_index.get(coll, id)?;
608 let mut current = self.objects.read(&hash).ok()?;
609 loop {
610 if current.seq <= target_seq {
611 return Some(current);
612 }
613 let prev_hash = current.prev.as_deref()?;
614 current = self.objects.read(prev_hash).ok()?;
615 }
616 }
617
618 pub fn list(&self, coll: &str) -> Vec<Node> {
620 self.id_index
621 .list_ids(coll)
622 .into_iter()
623 .filter_map(|id| self.get(coll, &id))
624 .collect()
625 }
626
627 pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
629 if self.sorted_indexes.has(coll, field) {
630 self.sorted_indexes
631 .top_k_asc(coll, field, limit)
632 .into_iter()
633 .filter_map(|h| self.objects.read(&h).ok())
634 .collect()
635 } else {
636 let mut docs = self.list(coll);
637 docs.sort_by(|a, b| {
638 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
639 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
640 av.cmp(&bv)
641 });
642 docs.truncate(limit);
643 docs
644 }
645 }
646
647 pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
649 if self.sorted_indexes.has(coll, field) {
650 self.sorted_indexes
651 .top_k_desc(coll, field, limit)
652 .into_iter()
653 .filter_map(|h| self.objects.read(&h).ok())
654 .collect()
655 } else {
656 let mut docs = self.list(coll);
657 docs.sort_by(|a, b| {
658 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
659 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
660 bv.cmp(&av)
661 });
662 docs.truncate(limit);
663 docs
664 }
665 }
666
667 pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
669 self.graph
670 .trace(hash, "caused_by", reverse, limit)
671 .into_iter()
672 .filter_map(|h| self.objects.read(&h).ok())
673 .collect()
674 }
675
676 pub fn verify(&self) -> (usize, Vec<String>) {
678 self.objects.verify_all()
679 }
680
681 pub fn create_sorted_index(&self, coll: &str, field: &str) {
683 self.sorted_indexes.ensure(coll, field);
684 for id in self.id_index.list_ids(coll) {
686 if let Some(node) = self.get(coll, &id) {
687 if let Value::Object(ref obj) = node.data {
688 if let Some(value) = obj.get(field) {
689 self.sorted_indexes.insert(coll, field, value, &node.hash);
690 }
691 }
692 }
693 }
694 }
695
696 pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
699 self.seq_index.get(&seq).map(|r| r.clone())
700 }
701
702 pub fn tip(&self) -> Option<Node> {
710 let next = self.seq.load(Ordering::SeqCst);
711 if next == 0 {
712 return None; }
714 if let Some(hash) = self.get_hash_by_seq(next - 1) {
717 return self.get_by_hash(&hash);
718 }
719 let th = self.tip_hash.read().1.clone();
723 if !th.is_empty() {
724 return self.get_by_hash(&th);
725 }
726 None
727 }
728
729 pub fn tip_collection(&self, coll: &str) -> Option<Node> {
740 let hash = self.coll_tip_hash.get(coll)?.1.clone();
741 self.get_by_hash(&hash)
742 }
743
744 pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
755 let next = self.seq.load(Ordering::SeqCst); let head_seq = next.saturating_sub(1);
757 let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
758 let mut nodes: Vec<Node> = Vec::new();
759 let mut to_seq = after_seq;
760 let mut hit_limit = false;
761 let mut s = after_seq.saturating_add(1);
762 while s < next {
763 if nodes.len() >= cap { hit_limit = true; break; }
764 if let Some(hash) = self.get_hash_by_seq(s) {
765 if let Some(node) = self.get_by_hash(&hash) {
766 to_seq = node.seq;
767 nodes.push(node);
768 }
769 }
770 s += 1;
771 }
772 SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
773 }
774
775 pub fn scan_status(&self) -> ScanStatus {
782 let next = self.seq.load(Ordering::SeqCst);
783 let mut min = u64::MAX;
784 let mut max = 0u64;
785 let mut count = 0usize;
786 for kv in self.seq_index.iter() {
787 let s = *kv.key();
788 if s < min { min = s; }
789 if s > max { max = s; }
790 count += 1;
791 }
792 if count == 0 { min = 0; }
793 ScanStatus {
794 scan_complete: self.startup_ready.load(Ordering::SeqCst),
795 tip_seq: next.saturating_sub(1),
796 indexed_seq_min: min,
797 indexed_seq_max: max,
798 indexed_count: count,
799 }
800 }
801
802 pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
807 let (frm_coll, frm_id) = frm.split_once(':')
808 .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
809 let (to_coll, to_id) = to.split_once(':')
810 .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
811 if self.id_index.get(frm_coll, frm_id).is_none() {
812 anyhow::bail!("link: frm not found: {}", frm);
813 }
814 if self.id_index.get(to_coll, to_id).is_none() {
815 anyhow::bail!("link: to not found: {}", to);
816 }
817 let link_id = format!("{}|{}|{}", frm, rel, to);
818 let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
819 self.put("__links__", &link_id, doc, vec![], None, None)?;
820 Ok(())
821 }
822
823 pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
825 let link_id = format!("{}|{}|{}", frm, rel, to);
826 self.delete("__links__", &link_id)
827 }
828
829 pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
832 self.id_index
833 .list_ids("__links__")
834 .into_iter()
835 .filter_map(|id| self.get("__links__", &id))
836 .filter(|node| {
837 node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
838 && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
839 })
840 .filter_map(|node| {
841 let to = node.data.get("_to")?.as_str()?;
842 let (to_coll, to_id) = to.split_once(':')?;
843 self.get(to_coll, to_id)
844 })
845 .collect()
846 }
847}
848
849impl Drop for Db {
850 fn drop(&mut self) {
866 self.flush_all();
867 }
868}
869
870fn cold_scan_background_arc(db: Arc<Db>) {
872 use rayon::prelude::*;
873 use blake2::{Blake2b512, Digest};
874
875 let objects = &db.objects;
876 let head = &db.head;
877 let seq_atomic = &db.seq;
878 let sorted_indexes = &db.sorted_indexes;
879 let seq_index = &db.seq_index;
880 let ready_flag = Arc::clone(&db.startup_ready);
881
882 let hashes: Vec<String> = objects.all_hashes().collect();
883 let total = hashes.len();
884
885 if total == 0 {
886 ready_flag.store(true, Ordering::SeqCst);
887 return;
888 }
889
890 println!(" [nedbd] background scan — {} objects...", total);
891 let t0 = std::time::Instant::now();
892 let step = (total / 10).max(1000);
893
894 let nodes: Vec<Node> = hashes.par_iter()
904 .enumerate()
905 .filter_map(|(i, h)| {
906 if i > 0 && i % step == 0 {
907 let pct = i * 100 / total;
908 let elapsed = t0.elapsed().as_secs_f32();
909 let rate = i as f32 / elapsed;
910 let eta = (total - i) as f32 / rate;
911 eprint!("\r [nedbd] {:>3}% {:>8} / {:>8} ({:>8.0}/s eta {:.0}s) ",
912 pct, i, total, rate, eta);
913 }
914 let node = objects.read(h).ok()?;
915 seq_index.insert(node.seq, node.hash.clone());
916 Some(node)
917 })
918 .collect();
919
920 eprintln!("\r [nedbd] 100% {:>8} / {:>8} ({:.1}s) ",
921 total, total, t0.elapsed().as_secs_f32());
922
923 let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
924 seq_atomic.store(max_seq + 1, Ordering::SeqCst);
925
926 let mut coll_max: std::collections::HashMap<String, (u64, String)> = std::collections::HashMap::new();
931
932 for node in &nodes {
933 coll_max.entry(node.coll.clone())
935 .and_modify(|(s, h)| if node.seq > *s { *s = node.seq; *h = node.hash.clone(); })
936 .or_insert_with(|| (node.seq, node.hash.clone()));
937 if let Value::Object(ref obj) = node.data {
938 for (field, value) in obj {
939 if sorted_indexes.has(&node.coll, field) {
940 sorted_indexes.insert(&node.coll, field, value, &node.hash);
941 }
942 }
943 }
944 }
945
946 for (coll, (seq, hash)) in coll_max {
947 db.coll_tip_hash.insert(coll, (seq, hash));
948 }
949
950 let mut sorted_hashes = hashes;
952 sorted_hashes.sort();
953 let mut h = Blake2b512::new();
954 h.update(max_seq.to_le_bytes());
955 for hash_str in &sorted_hashes {
956 h.update(hash_str.as_bytes());
957 }
958 let new_head = hex::encode(&h.finalize()[..32]);
959 *head.write() = new_head;
960
961 let tip_hash = db.seq_index.iter()
964 .max_by_key(|kv| *kv.key())
965 .map(|kv| kv.value().clone())
966 .unwrap_or_default();
967 *db.tip_hash.write() = (max_seq, tip_hash);
968
969 db.flush_manifest();
976
977 ready_flag.store(true, Ordering::SeqCst);
979 println!(" [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
980}
981
982fn now() -> f64 {
983 std::time::SystemTime::now()
984 .duration_since(std::time::UNIX_EPOCH)
985 .map(|d| d.as_secs_f64())
986 .unwrap_or(0.0)
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992 use tempfile::tempdir;
993
994 #[test]
995 fn put_and_get() {
996 let dir = tempdir().unwrap();
997 let db = Db::open(dir.path(), None).unwrap();
998 db.put(
999 "blocks", "618000",
1000 serde_json::json!({"height": 618000, "hash": "0000abc"}),
1001 vec![], None, None,
1002 ).unwrap();
1003 let node = db.get("blocks", "618000").unwrap();
1004 assert_eq!(node.id, "618000");
1005 assert_eq!(node.data["height"], 618000);
1006 }
1007
1008 #[test]
1009 fn order_by_with_sorted_index() {
1010 let dir = tempdir().unwrap();
1011 let db = Db::open(dir.path(), None).unwrap();
1012 db.create_sorted_index("blocks", "height");
1013 for h in [3u64, 1, 5, 2, 4] {
1014 db.put("blocks", &h.to_string(),
1015 serde_json::json!({"height": h}),
1016 vec![], None, None).unwrap();
1017 }
1018 let asc = db.order_by_asc("blocks", "height", 3);
1019 let heights: Vec<u64> = asc.iter()
1020 .filter_map(|n| n.data["height"].as_u64())
1021 .collect();
1022 assert_eq!(heights, vec![1, 2, 3]);
1023 }
1024
1025 #[test]
1026 fn causal_trace() {
1027 let dir = tempdir().unwrap();
1028 let db = Db::open(dir.path(), None).unwrap();
1029 let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
1030 let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
1031 let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
1032
1033 let trace = db.trace(&c.hash, false, 10);
1034 assert_eq!(trace.len(), 3); }
1036
1037 #[test]
1038 fn as_of() {
1039 let dir = tempdir().unwrap();
1040 let db = Db::open(dir.path(), None).unwrap();
1041 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1042 let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1043
1044 let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
1045 assert_eq!(at_v1.data["v"], 1);
1046 let current = db.get("docs", "x").unwrap();
1047 assert_eq!(current.data["v"], 2);
1048 }
1049}
1050
1051#[cfg(test)]
1052mod tests_v2 {
1053 use super::*;
1054 use tempfile::tempdir;
1055
1056 #[test]
1057 fn seq_index_populated_on_put() {
1058 let db = Db::in_memory();
1059 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
1060 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
1061 assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
1062 assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
1063 assert_eq!(db.get_hash_by_seq(9999), None);
1064 }
1065
1066 #[test]
1067 fn tip_and_since() {
1068 let db = Db::in_memory();
1069 assert!(db.tip().is_none());
1071 assert!(db.since(0, 0).nodes.is_empty());
1072
1073 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
1074 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
1075
1076 let t = db.tip().expect("tip after writes");
1078 assert_eq!(t.seq, b.seq);
1079 assert_eq!(t.id, "b");
1080 assert_eq!(t.hash, b.hash);
1081
1082 let after_a = db.since(a.seq, 0);
1084 assert_eq!(after_a.nodes.len(), 1);
1085 assert_eq!(after_a.nodes[0].id, "b");
1086 assert_eq!(after_a.from_seq, a.seq);
1087 assert_eq!(after_a.to_seq, b.seq);
1088 assert_eq!(after_a.head_seq, b.seq);
1089 assert!(!after_a.has_more);
1090
1091 assert!(db.since(b.seq, 0).nodes.is_empty());
1093
1094 let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
1096 let page = db.since(a.seq, 1); assert_eq!(page.nodes.len(), 1);
1098 assert_eq!(page.nodes[0].id, "b");
1099 assert_eq!(page.to_seq, b.seq);
1100 assert!(page.has_more);
1101 let page2 = db.since(page.to_seq, 1); assert_eq!(page2.nodes.len(), 1);
1103 assert_eq!(page2.nodes[0].id, "c");
1104 assert_eq!(page2.to_seq, c.seq);
1105 assert!(!page2.has_more);
1106 }
1107
1108 #[test]
1109 fn tip_collection_per_chain() {
1110 let db = Db::in_memory();
1113 assert!(db.tip_collection("blocks").is_none());
1114
1115 db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
1116 db.put("tx", "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1117 let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1118 let t1 = db.put("tx", "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1119
1120 assert_eq!(db.tip().unwrap().id, "t1");
1122 let bt = db.tip_collection("blocks").expect("blocks tip");
1124 assert_eq!(bt.id, "b1");
1125 assert_eq!(bt.seq, b1.seq);
1126 assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
1127 assert!(db.tip_collection("absent").is_none());
1128 }
1129
1130 #[test]
1131 fn seq_index_survives_batch() {
1132 let db = Db::in_memory();
1133 let nodes = db.put_batch(vec![
1134 ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
1135 ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
1136 ]).unwrap();
1137 for node in &nodes {
1138 assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
1139 }
1140 }
1141
1142 #[test]
1148 fn put_batch_removes_superseded_sorted_index_entries() {
1149 let db = Db::in_memory();
1150 db.create_sorted_index("blocks", "height");
1151 db.put("blocks", "x", serde_json::json!({"height": 1}), vec![], None, None).unwrap();
1152 db.put_batch(vec![
1153 ("blocks".into(), "x".into(), serde_json::json!({"height": 99}), vec![], None, None),
1154 ]).unwrap();
1155
1156 let asc = db.order_by_asc("blocks", "height", 10);
1157 assert_eq!(asc.len(), 1, "stale index entry for the superseded version must be gone");
1158 assert_eq!(asc[0].data["height"], 99);
1159 assert_eq!(asc[0].id, "x");
1160 }
1161
1162 #[test]
1165 fn update_without_indexes_preserves_chain() {
1166 let db = Db::in_memory();
1167 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1168 let v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1169 assert_eq!(v2.prev.as_deref(), Some(v1.hash.as_str()), "prev chain must survive the fast path");
1170 assert_eq!(db.get("docs", "x").unwrap().data["v"], 2);
1171 assert_eq!(db.get_as_of("docs", "x", v1.seq).unwrap().data["v"], 1);
1172 }
1173
1174 #[test]
1175 fn link_and_neighbors() {
1176 let db = Db::in_memory();
1177 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1178 db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
1179 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1180 db.put("trip", "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1181
1182 db.link("driver:d1", "handles", "trip:t1").unwrap();
1183 db.link("driver:d1", "handles", "trip:t2").unwrap();
1184 db.link("driver:d2", "handles", "trip:t1").unwrap();
1185
1186 let d1_trips = db.neighbors("driver:d1", "handles");
1187 assert_eq!(d1_trips.len(), 2);
1188 let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
1189 assert!(ids.contains("t1") && ids.contains("t2"));
1190
1191 let d2_trips = db.neighbors("driver:d2", "handles");
1192 assert_eq!(d2_trips.len(), 1);
1193 assert_eq!(d2_trips[0].id, "t1");
1194 }
1195
1196 #[test]
1197 fn link_stored_in_links_collection() {
1198 let db = Db::in_memory();
1201 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1202 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1203 db.link("driver:d1", "handles", "trip:t1").unwrap();
1204 let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1206 assert!(link_doc.is_some(), "__links__ doc should exist");
1207 let doc = link_doc.unwrap();
1208 assert_eq!(doc.data["_from"], "driver:d1");
1209 assert_eq!(doc.data["_rel"], "handles");
1210 assert_eq!(doc.data["_to"], "trip:t1");
1211 let nb = db.neighbors("driver:d1", "handles");
1213 assert_eq!(nb.len(), 1);
1214 assert_eq!(nb[0].id, "t1");
1215 }
1216
1217 #[test]
1218 fn link_missing_node_errors() {
1219 let db = Db::in_memory();
1220 db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1221 assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1222 }
1223
1224 #[test]
1225 fn link_durable_survives_reopen() {
1226 let dir = tempdir().unwrap();
1227 {
1228 let db = Db::open(dir.path(), None).unwrap();
1229 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1230 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1231 db.link("driver:d1", "handles", "trip:t1").unwrap();
1232 }
1233 let db2 = Db::open(dir.path(), None).unwrap();
1234 db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1235 let trips = db2.neighbors("driver:d1", "handles");
1236 assert_eq!(trips.len(), 1);
1237 assert_eq!(trips[0].id, "t1");
1238 }
1239
1240 #[test]
1241 fn tip_survives_warm_restart() {
1242 let dir = tempdir().unwrap();
1246 {
1247 let db = Db::open(dir.path(), None).unwrap();
1248 db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1249 db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1250 db.flush_all(); assert_eq!(db.tip().expect("tip in-session").id, "b2");
1252 }
1253 let db2 = Db::open(dir.path(), None).unwrap();
1255 assert!(db2.get_hash_by_seq(1).is_none(), "seq_index is cold on a warm boot");
1256 let tip = db2.tip().expect("tip() must survive a warm restart");
1257 assert_eq!(tip.id, "b2");
1258 assert_eq!(tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1259 }
1260
1261 #[test]
1262 fn tip_collection_survives_warm_restart() {
1263 let dir = tempdir().unwrap();
1267 {
1268 let db = Db::open(dir.path(), None).unwrap();
1269 db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1270 db.put("tx", "t1", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1271 let b2 = db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1272 db.flush_all(); assert_eq!(db.tip_collection("blocks").unwrap().id, "b2");
1274 assert_eq!(db.tip_collection("blocks").unwrap().seq, b2.seq);
1275 }
1276 let db2 = Db::open(dir.path(), None).unwrap();
1278 assert!(db2.get_hash_by_seq(0).is_none(), "seq_index is cold on a warm boot");
1279 let blocks_tip = db2.tip_collection("blocks").expect("tip_collection must survive a warm restart");
1280 assert_eq!(blocks_tip.id, "b2");
1281 assert_eq!(blocks_tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1282 let tx_tip = db2.tip_collection("tx").expect("tx tip must also survive");
1283 assert_eq!(tx_tip.id, "t1");
1284 assert!(db2.tip_collection("absent").is_none());
1285 }
1286
1287 #[test]
1288 fn cold_scan_indexes_every_object_and_reports_completion() {
1289 let dir = tempdir().unwrap();
1296 let n = 25u64;
1297 {
1298 let db = Db::open(dir.path(), None).unwrap();
1299 for i in 0..n {
1300 db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1301 }
1302 db.flush_all();
1303 }
1304 std::fs::remove_file(dir.path().join("MANIFEST")).unwrap();
1309
1310 let db = Db::open(dir.path(), None).unwrap();
1311 assert!(!db.scan_status().scan_complete, "should be cold immediately after open");
1312 let db = std::sync::Arc::new(db);
1313 Db::start_cold_scan(std::sync::Arc::clone(&db));
1314
1315 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
1316 while !db.scan_status().scan_complete {
1317 assert!(std::time::Instant::now() < deadline, "cold scan did not complete in time");
1318 std::thread::sleep(std::time::Duration::from_millis(5));
1319 }
1320
1321 let status = db.scan_status();
1322 assert_eq!(status.indexed_count, n as usize, "every written object must be indexed");
1323 assert!(status.scan_complete);
1324
1325 let tip = db.tip().expect("tip resolves after cold scan");
1326 assert_eq!(tip.data.get("i").and_then(|v| v.as_u64()), Some(n - 1));
1327 let coll_tip = db.tip_collection("things").expect("tip_collection resolves after cold scan");
1328 assert_eq!(coll_tip.id, tip.id);
1329 }
1330
1331 #[test]
1338 fn concurrent_puts_tip_resolves_to_highest_seq_after_warm_restart() {
1339 let dir = tempdir().unwrap();
1340 let total: u64 = 100;
1341 {
1342 let db = std::sync::Arc::new(Db::open(dir.path(), None).unwrap());
1343 let mut handles = vec![];
1344 for t in 0..4u64 {
1345 let db2 = std::sync::Arc::clone(&db);
1346 handles.push(std::thread::spawn(move || {
1347 for i in 0..25u64 {
1348 db2.put("c", &format!("{}-{}", t, i),
1349 serde_json::json!({"t": t, "i": i}),
1350 vec![], None, None).unwrap();
1351 }
1352 }));
1353 }
1354 for h in handles { h.join().unwrap(); }
1355 let expected = db.seq.load(std::sync::atomic::Ordering::SeqCst) - 1;
1357 assert_eq!(expected, total - 1, "exactly {} writes expected", total);
1358 assert_eq!(db.tip().expect("in-session tip").seq, expected);
1359 db.flush_all(); }
1361 let db2 = Db::open(dir.path(), None).unwrap();
1363 let tip = db2.tip().expect("tip must survive warm restart after concurrent writes");
1364 assert_eq!(tip.seq, total - 1, "warm-boot tip must be the highest-seq write");
1365 let ct = db2.tip_collection("c").expect("coll tip survives");
1367 assert_eq!(ct.seq, total - 1);
1368 }
1369
1370 #[test]
1378 fn manifest_after_cold_scan_does_not_reuse_tip_seq() {
1379 let dir = tempdir().unwrap();
1380 let old_tip_seq;
1381 {
1382 let db = Db::open(dir.path(), None).unwrap();
1383 for i in 0..5u64 {
1384 db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1385 }
1386 db.flush_all();
1387 old_tip_seq = db.tip().unwrap().seq;
1388 }
1389 std::fs::remove_file(dir.path().join("MANIFEST")).unwrap();
1392 {
1393 let db = std::sync::Arc::new(Db::open(dir.path(), None).unwrap());
1394 Db::start_cold_scan(std::sync::Arc::clone(&db));
1395 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
1396 while !db.scan_status().scan_complete {
1397 assert!(std::time::Instant::now() < deadline, "cold scan did not complete");
1398 std::thread::sleep(std::time::Duration::from_millis(5));
1399 }
1400 }
1402 let db3 = Db::open(dir.path(), None).unwrap();
1405 let tip_before = db3.tip().expect("tip survives scan-written MANIFEST");
1406 assert_eq!(tip_before.seq, old_tip_seq, "tip identity preserved across the scan");
1407 let new_node = db3.put("things", "next", serde_json::json!({"fresh": true}),
1408 vec![], None, None).unwrap();
1409 assert!(new_node.seq > old_tip_seq,
1410 "new write reused seq {} (tip was {}) — duplicate seq in the log",
1411 new_node.seq, old_tip_seq);
1412 }
1413}