1use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22 seq: u64,
23 head: String,
24 #[serde(default)]
28 tip_hash: String,
29}
30
31pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
35
36#[derive(Debug, Clone, serde::Serialize)]
42pub struct SinceBatch {
43 pub nodes: Vec<Node>,
45 pub from_seq: u64,
47 pub to_seq: u64,
49 pub head_seq: u64,
51 pub has_more: bool,
53}
54
55#[derive(Debug, Clone, serde::Serialize)]
62pub struct ScanStatus {
63 pub scan_complete: bool,
65 pub tip_seq: u64,
67 pub indexed_seq_min: u64,
69 pub indexed_seq_max: u64,
71 pub indexed_count: usize,
73}
74
75pub struct Db {
76 pub objects: ObjectStore,
77 pub id_index: IdIndex,
78 pub sorted_indexes: SortedIndexes,
79 pub graph: GraphStore,
80 pub root: PathBuf,
81 manifest_dirty: Arc<AtomicBool>,
85 pub seq: AtomicU64,
86 head: RwLock<String>,
88 tip_hash: RwLock<String>,
92 pub startup_ready: Arc<AtomicBool>,
97 seq_index: Arc<DashMap<u64, String>>,
101}
102
103impl Db {
104 pub fn in_memory() -> Self {
108 Self {
109 objects: ObjectStore::in_memory(),
110 id_index: IdIndex::in_memory(),
111 sorted_indexes: SortedIndexes::new(),
112 graph: GraphStore::in_memory(),
113 root: std::path::PathBuf::from(":memory:"),
114 seq: AtomicU64::new(0),
115 head: RwLock::new(String::new()),
116 tip_hash: RwLock::new(String::new()),
117 startup_ready: Arc::new(AtomicBool::new(true)), manifest_dirty: Arc::new(AtomicBool::new(false)),
119 seq_index: Arc::new(DashMap::new()),
120 }
121 }
122
123 pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
125 std::fs::create_dir_all(db_root)?;
126
127 let objects = ObjectStore::new(db_root, dek.clone())?;
128 let id_index = IdIndex::new(db_root)?;
129 let sorted_indexes = SortedIndexes::new();
130 let graph = GraphStore::new(db_root)?;
131
132 let mut db = Self {
133 objects,
134 id_index,
135 sorted_indexes,
136 graph,
137 root: db_root.to_path_buf(),
138 seq: AtomicU64::new(0),
139 head: RwLock::new(String::new()),
140 tip_hash: RwLock::new(String::new()),
141 startup_ready: Arc::new(AtomicBool::new(false)),
142 manifest_dirty: Arc::new(AtomicBool::new(false)),
143 seq_index: Arc::new(DashMap::new()),
144 };
145
146 migrate::migrate_if_needed(
148 db_root,
149 &db.objects,
150 &db.id_index,
151 &db.sorted_indexes,
152 &db.graph,
153 dek.as_ref(),
154 )?;
155
156 db.startup_rebuild()?;
159
160 Ok(db)
161 }
162
163 fn startup_rebuild(&mut self) -> Result<()> {
168 let manifest_path = self.root.join("MANIFEST");
169 let needs_index_rebuild = !self.sorted_indexes.is_empty();
170
171 if manifest_path.exists() && !needs_index_rebuild {
173 if let Some(m) = fs::read_to_string(&manifest_path)
174 .ok()
175 .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
176 {
177 if m.head.len() < 8 {
180 eprintln!(" [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
181 } else if m.tip_hash.is_empty() {
182 eprintln!(" [nedbd] MANIFEST predates durable tip() — cold scan once to upgrade");
186 } else {
187 self.seq.store(m.seq, Ordering::SeqCst); *self.head.write() = m.head.clone();
189 *self.tip_hash.write() = m.tip_hash.clone();
190 self.startup_ready.store(true, Ordering::SeqCst);
191 println!(" [nedbd] warm start — seq={} head={}... tip={}...",
192 m.seq, &m.head[..8], &m.tip_hash[..8.min(m.tip_hash.len())]);
193 return Ok(());
194 }
195 } else {
196 eprintln!(" [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
197 }
198 }
199
200 println!(" [nedbd] cold start — background scan will start after heap allocation");
206 Ok(())
207 }
208
209 pub fn start_cold_scan(self_arc: Arc<Self>) {
213 if self_arc.startup_ready.load(Ordering::SeqCst) {
214 return; }
216 if self_arc.objects.all_hashes().next().is_none() {
219 self_arc.startup_ready.store(true, Ordering::SeqCst);
220 return;
221 }
222 println!(" [nedbd] cold start — background scan starting, server accepting reads now");
223 std::thread::spawn(move || {
224 let db = self_arc;
225 cold_scan_background_arc(db);
226 });
227 }
228
229 pub fn put(
231 &self,
232 coll: &str,
233 id: &str,
234 data: Value,
235 caused_by: Vec<String>,
236 valid_from: Option<String>,
237 valid_to: Option<String>,
238 ) -> Result<Node> {
239 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
240 let prev = self.id_index.get(coll, id);
241
242 if let Some(old_hash) = &prev {
244 if let Ok(old_node) = self.objects.read(old_hash) {
245 if let Value::Object(ref obj) = old_node.data {
246 for (field, value) in obj {
247 self.sorted_indexes.remove(coll, field, value, old_hash);
248 }
249 }
250 }
251 }
252
253 let mut node = Node {
254 id: id.to_string(),
255 coll: coll.to_string(),
256 seq,
257 data: data.clone(),
258 prev,
259 caused_by: caused_by.clone(),
260 ts: now(),
261 valid_from,
262 valid_to,
263 hash: String::new(),
264 };
265
266 let hash = self.objects.write(&mut node)?;
268 self.seq_index.insert(seq, hash.clone());
269
270 self.id_index.set(coll, id, &hash)?;
272
273 if let Value::Object(ref obj) = data {
275 for (field, value) in obj {
276 if self.sorted_indexes.has(coll, field) {
277 self.sorted_indexes.insert(coll, field, value, &hash);
278 }
279 }
280 }
281
282 for cause in &caused_by {
284 self.graph.add_edge(&hash, "caused_by", cause)?;
285 self.graph.add_edge(cause, "caused_by_rev", &hash)?;
286 }
287
288 self.update_head(seq, &hash);
291
292 Ok(node)
293 }
294
295 pub fn put_batch(
300 &self,
301 ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
302 ) -> Result<Vec<Node>> {
304 use rayon::prelude::*;
305
306 if ops.is_empty() { return Ok(vec![]); }
307 let n = ops.len() as u64;
308
309 let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
311 let ts = now();
312
313 let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
315 let prev = self.id_index.get(&coll, &id);
316 Node {
317 id, coll, seq: base_seq + i as u64,
318 data, prev, caused_by,
319 ts, valid_from, valid_to,
320 hash: String::new(),
321 }
322 }).collect();
323
324 let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
326 .filter_map(|node| self.objects.write(node).err())
327 .collect();
328 if let Some(e) = write_errors.into_iter().next() { return Err(e); }
329
330 let index_errors: Vec<anyhow::Error> = nodes.par_iter()
332 .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
333 .collect();
334 if let Some(e) = index_errors.into_iter().next() { return Err(e); }
335
336 for node in &nodes {
338 self.seq_index.insert(node.seq, node.hash.clone());
339 if let Value::Object(ref obj) = node.data {
340 for (field, value) in obj {
341 if self.sorted_indexes.has(&node.coll, field) {
342 self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
343 }
344 }
345 }
346 for cause in &node.caused_by {
347 self.graph.add_edge(&node.hash, "caused_by", cause).ok();
348 self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
349 }
350 }
351
352 for node in &nodes {
354 self.update_head(node.seq, &node.hash);
355 }
356
357 Ok(nodes)
358 }
359
360 fn update_head(&self, seq: u64, new_hash: &str) {
364 use blake2::{Blake2b512, Digest};
365 let prev = self.head.read().clone();
366 let mut h = Blake2b512::new();
367 h.update(prev.as_bytes());
368 h.update(seq.to_le_bytes());
369 h.update(new_hash.as_bytes());
370 *self.head.write() = hex::encode(&h.finalize()[..32]);
371 *self.tip_hash.write() = new_hash.to_string();
375 self.manifest_dirty.store(true, Ordering::Release);
377 }
378
379 pub fn flush_all(&self) {
381 self.id_index.flush_write_buf();
382 if let Err(e) = self.objects.sync() {
385 eprintln!("nedb: segment sync failed: {}", e);
386 }
387 self.flush_manifest();
388 }
389
390 pub fn compact(&self) -> Result<crate::segment::CompactStats> {
399 self.flush_all();
400 let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
401 for coll in self.id_index.collections() {
402 for id in self.id_index.list_ids(&coll) {
403 if let Some(h) = self.id_index.get(&coll, &id) {
404 live.insert(h);
405 }
406 }
407 }
408 self.objects.compact(&live)
409 }
410
411 pub fn flush_manifest_if_dirty(&self) {
413 if self.root == std::path::PathBuf::from(":memory:") { return; }
414 if self.manifest_dirty.compare_exchange(
415 true, false, Ordering::AcqRel, Ordering::Relaxed
416 ).is_ok() {
417 self.flush_manifest();
418 }
419 }
420
421 pub fn flush_manifest(&self) {
423 if self.root == std::path::PathBuf::from(":memory:") { return; }
424 let seq = self.seq.load(Ordering::SeqCst);
425 let head = self.head.read().clone();
426 let tip_hash = self.tip_hash.read().clone();
427 let m = Manifest { seq, head, tip_hash };
428 if let Ok(json) = serde_json::to_string(&m) {
429 let path = self.root.join("MANIFEST");
430 let tmp = self.root.join("MANIFEST.tmp");
431 let _ = fs::write(&tmp, &json);
432 let _ = fs::rename(&tmp, &path);
433 }
434 }
435
436 pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
440 let db = self_arc;
441 std::thread::spawn(move || {
442 loop {
443 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
444 db.id_index.flush_write_buf();
446 db.flush_manifest_if_dirty();
448 }
449 });
450 }
451
452 pub fn head(&self) -> String {
454 self.head.read().clone()
455 }
456
457 pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
460 let prev = match self.id_index.get(coll, id) {
461 None => return Ok(false), Some(h) => h,
463 };
464 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
465 let mut tombstone = Node {
466 id: format!("_del_{}", id),
467 coll: coll.to_string(),
468 seq,
469 data: serde_json::json!({"_deleted": id, "_prev": prev}),
470 prev: Some(prev),
471 caused_by: vec![],
472 ts: now(),
473 valid_from: None,
474 valid_to: None,
475 hash: String::new(),
476 };
477 let hash = self.objects.write(&mut tombstone)?;
478 self.update_head(seq, &hash);
479 self.id_index.remove(coll, id)?;
481 Ok(true)
482 }
483
484 pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
486 let hash = self.id_index.get(coll, id)?;
487 self.objects.read(&hash).ok()
488 }
489
490 pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
492 self.objects.read(hash).ok()
493 }
494
495 pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
498 let hash = self.id_index.get(coll, id)?;
499 let mut current = self.objects.read(&hash).ok()?;
500 loop {
501 if current.seq <= target_seq {
502 return Some(current);
503 }
504 let prev_hash = current.prev.as_deref()?;
505 current = self.objects.read(prev_hash).ok()?;
506 }
507 }
508
509 pub fn list(&self, coll: &str) -> Vec<Node> {
511 self.id_index
512 .list_ids(coll)
513 .into_iter()
514 .filter_map(|id| self.get(coll, &id))
515 .collect()
516 }
517
518 pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
520 if self.sorted_indexes.has(coll, field) {
521 self.sorted_indexes
522 .top_k_asc(coll, field, limit)
523 .into_iter()
524 .filter_map(|h| self.objects.read(&h).ok())
525 .collect()
526 } else {
527 let mut docs = self.list(coll);
528 docs.sort_by(|a, b| {
529 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
530 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
531 av.cmp(&bv)
532 });
533 docs.truncate(limit);
534 docs
535 }
536 }
537
538 pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
540 if self.sorted_indexes.has(coll, field) {
541 self.sorted_indexes
542 .top_k_desc(coll, field, limit)
543 .into_iter()
544 .filter_map(|h| self.objects.read(&h).ok())
545 .collect()
546 } else {
547 let mut docs = self.list(coll);
548 docs.sort_by(|a, b| {
549 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
550 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
551 bv.cmp(&av)
552 });
553 docs.truncate(limit);
554 docs
555 }
556 }
557
558 pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
560 self.graph
561 .trace(hash, "caused_by", reverse, limit)
562 .into_iter()
563 .filter_map(|h| self.objects.read(&h).ok())
564 .collect()
565 }
566
567 pub fn verify(&self) -> (usize, Vec<String>) {
569 self.objects.verify_all()
570 }
571
572 pub fn create_sorted_index(&self, coll: &str, field: &str) {
574 self.sorted_indexes.ensure(coll, field);
575 for id in self.id_index.list_ids(coll) {
577 if let Some(node) = self.get(coll, &id) {
578 if let Value::Object(ref obj) = node.data {
579 if let Some(value) = obj.get(field) {
580 self.sorted_indexes.insert(coll, field, value, &node.hash);
581 }
582 }
583 }
584 }
585 }
586
587 pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
590 self.seq_index.get(&seq).map(|r| r.clone())
591 }
592
593 pub fn tip(&self) -> Option<Node> {
601 let next = self.seq.load(Ordering::SeqCst);
602 if next == 0 {
603 return None; }
605 if let Some(hash) = self.get_hash_by_seq(next - 1) {
608 return self.get_by_hash(&hash);
609 }
610 let th = self.tip_hash.read().clone();
614 if !th.is_empty() {
615 return self.get_by_hash(&th);
616 }
617 None
618 }
619
620 pub fn tip_collection(&self, coll: &str) -> Option<Node> {
630 let mut s = self.seq.load(Ordering::SeqCst); while s > 0 {
632 s -= 1;
633 if let Some(hash) = self.get_hash_by_seq(s) {
634 if let Some(node) = self.get_by_hash(&hash) {
635 if node.coll.as_str() == coll {
636 return Some(node);
637 }
638 }
639 }
640 }
641 None
642 }
643
644 pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
655 let next = self.seq.load(Ordering::SeqCst); let head_seq = next.saturating_sub(1);
657 let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
658 let mut nodes: Vec<Node> = Vec::new();
659 let mut to_seq = after_seq;
660 let mut hit_limit = false;
661 let mut s = after_seq.saturating_add(1);
662 while s < next {
663 if nodes.len() >= cap { hit_limit = true; break; }
664 if let Some(hash) = self.get_hash_by_seq(s) {
665 if let Some(node) = self.get_by_hash(&hash) {
666 to_seq = node.seq;
667 nodes.push(node);
668 }
669 }
670 s += 1;
671 }
672 SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
673 }
674
675 pub fn scan_status(&self) -> ScanStatus {
682 let next = self.seq.load(Ordering::SeqCst);
683 let mut min = u64::MAX;
684 let mut max = 0u64;
685 let mut count = 0usize;
686 for kv in self.seq_index.iter() {
687 let s = *kv.key();
688 if s < min { min = s; }
689 if s > max { max = s; }
690 count += 1;
691 }
692 if count == 0 { min = 0; }
693 ScanStatus {
694 scan_complete: self.startup_ready.load(Ordering::SeqCst),
695 tip_seq: next.saturating_sub(1),
696 indexed_seq_min: min,
697 indexed_seq_max: max,
698 indexed_count: count,
699 }
700 }
701
702 pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
707 let (frm_coll, frm_id) = frm.split_once(':')
708 .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
709 let (to_coll, to_id) = to.split_once(':')
710 .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
711 if self.id_index.get(frm_coll, frm_id).is_none() {
712 anyhow::bail!("link: frm not found: {}", frm);
713 }
714 if self.id_index.get(to_coll, to_id).is_none() {
715 anyhow::bail!("link: to not found: {}", to);
716 }
717 let link_id = format!("{}|{}|{}", frm, rel, to);
718 let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
719 self.put("__links__", &link_id, doc, vec![], None, None)?;
720 Ok(())
721 }
722
723 pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
725 let link_id = format!("{}|{}|{}", frm, rel, to);
726 self.delete("__links__", &link_id)
727 }
728
729 pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
732 self.id_index
733 .list_ids("__links__")
734 .into_iter()
735 .filter_map(|id| self.get("__links__", &id))
736 .filter(|node| {
737 node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
738 && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
739 })
740 .filter_map(|node| {
741 let to = node.data.get("_to")?.as_str()?;
742 let (to_coll, to_id) = to.split_once(':')?;
743 self.get(to_coll, to_id)
744 })
745 .collect()
746 }
747}
748
749impl Drop for Db {
750 fn drop(&mut self) {
766 self.flush_all();
767 }
768}
769
770fn cold_scan_background_arc(db: Arc<Db>) {
772 use rayon::prelude::*;
773 use blake2::{Blake2b512, Digest};
774
775 let objects = &db.objects;
776 let head = &db.head;
777 let seq_atomic = &db.seq;
778 let sorted_indexes = &db.sorted_indexes;
779 let root = db.root.clone();
780 let ready_flag = Arc::clone(&db.startup_ready);
781
782 let hashes: Vec<String> = objects.all_hashes().collect();
783 let total = hashes.len();
784
785 if total == 0 {
786 ready_flag.store(true, Ordering::SeqCst);
787 return;
788 }
789
790 println!(" [nedbd] background scan — {} objects...", total);
791 let t0 = std::time::Instant::now();
792 let step = (total / 10).max(1000);
793
794 let nodes: Vec<Node> = hashes.par_iter()
795 .enumerate()
796 .filter_map(|(i, h)| {
797 if i > 0 && i % step == 0 {
798 let pct = i * 100 / total;
799 let elapsed = t0.elapsed().as_secs_f32();
800 let rate = i as f32 / elapsed;
801 let eta = (total - i) as f32 / rate;
802 eprint!("\r [nedbd] {:>3}% {:>8} / {:>8} ({:>8.0}/s eta {:.0}s) ",
803 pct, i, total, rate, eta);
804 }
805 objects.read(h).ok()
806 })
807 .collect();
808
809 eprintln!("\r [nedbd] 100% {:>8} / {:>8} ({:.1}s) ",
810 total, total, t0.elapsed().as_secs_f32());
811
812 let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
813 seq_atomic.store(max_seq + 1, Ordering::SeqCst);
814
815 for node in &nodes {
816 db.seq_index.insert(node.seq, node.hash.clone());
817 if let Value::Object(ref obj) = node.data {
818 for (field, value) in obj {
819 if sorted_indexes.has(&node.coll, field) {
820 sorted_indexes.insert(&node.coll, field, value, &node.hash);
821 }
822 }
823 }
824 }
825
826 let mut sorted_hashes = hashes;
828 sorted_hashes.sort();
829 let mut h = Blake2b512::new();
830 h.update(max_seq.to_le_bytes());
831 for hash_str in &sorted_hashes {
832 h.update(hash_str.as_bytes());
833 }
834 let new_head = hex::encode(&h.finalize()[..32]);
835 *head.write() = new_head.clone();
836
837 let tip_hash = db.seq_index.iter()
840 .max_by_key(|kv| *kv.key())
841 .map(|kv| kv.value().clone())
842 .unwrap_or_default();
843 *db.tip_hash.write() = tip_hash.clone();
844
845 let m = Manifest { seq: max_seq, head: new_head, tip_hash };
847 let json = serde_json::to_string(&m).unwrap_or_default();
848 let path = root.join("MANIFEST");
849 let tmp = root.join("MANIFEST.tmp");
850 let _ = fs::write(&tmp, &json);
851 let _ = fs::rename(&tmp, &path);
852
853 ready_flag.store(true, Ordering::SeqCst);
855 println!(" [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
856}
857
858fn now() -> f64 {
859 std::time::SystemTime::now()
860 .duration_since(std::time::UNIX_EPOCH)
861 .map(|d| d.as_secs_f64())
862 .unwrap_or(0.0)
863}
864
865#[cfg(test)]
866mod tests {
867 use super::*;
868 use tempfile::tempdir;
869
870 #[test]
871 fn put_and_get() {
872 let dir = tempdir().unwrap();
873 let db = Db::open(dir.path(), None).unwrap();
874 db.put(
875 "blocks", "618000",
876 serde_json::json!({"height": 618000, "hash": "0000abc"}),
877 vec![], None, None,
878 ).unwrap();
879 let node = db.get("blocks", "618000").unwrap();
880 assert_eq!(node.id, "618000");
881 assert_eq!(node.data["height"], 618000);
882 }
883
884 #[test]
885 fn order_by_with_sorted_index() {
886 let dir = tempdir().unwrap();
887 let db = Db::open(dir.path(), None).unwrap();
888 db.create_sorted_index("blocks", "height");
889 for h in [3u64, 1, 5, 2, 4] {
890 db.put("blocks", &h.to_string(),
891 serde_json::json!({"height": h}),
892 vec![], None, None).unwrap();
893 }
894 let asc = db.order_by_asc("blocks", "height", 3);
895 let heights: Vec<u64> = asc.iter()
896 .filter_map(|n| n.data["height"].as_u64())
897 .collect();
898 assert_eq!(heights, vec![1, 2, 3]);
899 }
900
901 #[test]
902 fn causal_trace() {
903 let dir = tempdir().unwrap();
904 let db = Db::open(dir.path(), None).unwrap();
905 let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
906 let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
907 let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
908
909 let trace = db.trace(&c.hash, false, 10);
910 assert_eq!(trace.len(), 3); }
912
913 #[test]
914 fn as_of() {
915 let dir = tempdir().unwrap();
916 let db = Db::open(dir.path(), None).unwrap();
917 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
918 let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
919
920 let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
921 assert_eq!(at_v1.data["v"], 1);
922 let current = db.get("docs", "x").unwrap();
923 assert_eq!(current.data["v"], 2);
924 }
925}
926
927#[cfg(test)]
928mod tests_v2 {
929 use super::*;
930 use tempfile::tempdir;
931
932 #[test]
933 fn seq_index_populated_on_put() {
934 let db = Db::in_memory();
935 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
936 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
937 assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
938 assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
939 assert_eq!(db.get_hash_by_seq(9999), None);
940 }
941
942 #[test]
943 fn tip_and_since() {
944 let db = Db::in_memory();
945 assert!(db.tip().is_none());
947 assert!(db.since(0, 0).nodes.is_empty());
948
949 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
950 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
951
952 let t = db.tip().expect("tip after writes");
954 assert_eq!(t.seq, b.seq);
955 assert_eq!(t.id, "b");
956 assert_eq!(t.hash, b.hash);
957
958 let after_a = db.since(a.seq, 0);
960 assert_eq!(after_a.nodes.len(), 1);
961 assert_eq!(after_a.nodes[0].id, "b");
962 assert_eq!(after_a.from_seq, a.seq);
963 assert_eq!(after_a.to_seq, b.seq);
964 assert_eq!(after_a.head_seq, b.seq);
965 assert!(!after_a.has_more);
966
967 assert!(db.since(b.seq, 0).nodes.is_empty());
969
970 let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
972 let page = db.since(a.seq, 1); assert_eq!(page.nodes.len(), 1);
974 assert_eq!(page.nodes[0].id, "b");
975 assert_eq!(page.to_seq, b.seq);
976 assert!(page.has_more);
977 let page2 = db.since(page.to_seq, 1); assert_eq!(page2.nodes.len(), 1);
979 assert_eq!(page2.nodes[0].id, "c");
980 assert_eq!(page2.to_seq, c.seq);
981 assert!(!page2.has_more);
982 }
983
984 #[test]
985 fn tip_collection_per_chain() {
986 let db = Db::in_memory();
989 assert!(db.tip_collection("blocks").is_none());
990
991 db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
992 db.put("tx", "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
993 let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
994 let t1 = db.put("tx", "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
995
996 assert_eq!(db.tip().unwrap().id, "t1");
998 let bt = db.tip_collection("blocks").expect("blocks tip");
1000 assert_eq!(bt.id, "b1");
1001 assert_eq!(bt.seq, b1.seq);
1002 assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
1003 assert!(db.tip_collection("absent").is_none());
1004 }
1005
1006 #[test]
1007 fn seq_index_survives_batch() {
1008 let db = Db::in_memory();
1009 let nodes = db.put_batch(vec![
1010 ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
1011 ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
1012 ]).unwrap();
1013 for node in &nodes {
1014 assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
1015 }
1016 }
1017
1018 #[test]
1019 fn link_and_neighbors() {
1020 let db = Db::in_memory();
1021 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1022 db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
1023 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1024 db.put("trip", "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1025
1026 db.link("driver:d1", "handles", "trip:t1").unwrap();
1027 db.link("driver:d1", "handles", "trip:t2").unwrap();
1028 db.link("driver:d2", "handles", "trip:t1").unwrap();
1029
1030 let d1_trips = db.neighbors("driver:d1", "handles");
1031 assert_eq!(d1_trips.len(), 2);
1032 let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
1033 assert!(ids.contains("t1") && ids.contains("t2"));
1034
1035 let d2_trips = db.neighbors("driver:d2", "handles");
1036 assert_eq!(d2_trips.len(), 1);
1037 assert_eq!(d2_trips[0].id, "t1");
1038 }
1039
1040 #[test]
1041 fn link_stored_in_links_collection() {
1042 let db = Db::in_memory();
1045 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1046 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1047 db.link("driver:d1", "handles", "trip:t1").unwrap();
1048 let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1050 assert!(link_doc.is_some(), "__links__ doc should exist");
1051 let doc = link_doc.unwrap();
1052 assert_eq!(doc.data["_from"], "driver:d1");
1053 assert_eq!(doc.data["_rel"], "handles");
1054 assert_eq!(doc.data["_to"], "trip:t1");
1055 let nb = db.neighbors("driver:d1", "handles");
1057 assert_eq!(nb.len(), 1);
1058 assert_eq!(nb[0].id, "t1");
1059 }
1060
1061 #[test]
1062 fn link_missing_node_errors() {
1063 let db = Db::in_memory();
1064 db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1065 assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1066 }
1067
1068 #[test]
1069 fn link_durable_survives_reopen() {
1070 let dir = tempdir().unwrap();
1071 {
1072 let db = Db::open(dir.path(), None).unwrap();
1073 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1074 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1075 db.link("driver:d1", "handles", "trip:t1").unwrap();
1076 }
1077 let db2 = Db::open(dir.path(), None).unwrap();
1078 db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1079 let trips = db2.neighbors("driver:d1", "handles");
1080 assert_eq!(trips.len(), 1);
1081 assert_eq!(trips[0].id, "t1");
1082 }
1083
1084 #[test]
1085 fn tip_survives_warm_restart() {
1086 let dir = tempdir().unwrap();
1090 {
1091 let db = Db::open(dir.path(), None).unwrap();
1092 db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1093 db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1094 db.flush_all(); assert_eq!(db.tip().expect("tip in-session").id, "b2");
1096 }
1097 let db2 = Db::open(dir.path(), None).unwrap();
1099 assert!(db2.get_hash_by_seq(1).is_none(), "seq_index is cold on a warm boot");
1100 let tip = db2.tip().expect("tip() must survive a warm restart");
1101 assert_eq!(tip.id, "b2");
1102 assert_eq!(tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1103 }
1104}