1use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22 seq: u64,
23 head: String,
24}
25
26pub struct Db {
27 pub objects: ObjectStore,
28 pub id_index: IdIndex,
29 pub sorted_indexes: SortedIndexes,
30 pub graph: GraphStore,
31 pub root: PathBuf,
32 manifest_dirty: Arc<AtomicBool>,
36 pub seq: AtomicU64,
37 head: RwLock<String>,
39 pub startup_ready: Arc<AtomicBool>,
44 seq_index: Arc<DashMap<u64, String>>,
48}
49
50impl Db {
51 pub fn in_memory() -> Self {
55 Self {
56 objects: ObjectStore::in_memory(),
57 id_index: IdIndex::in_memory(),
58 sorted_indexes: SortedIndexes::new(),
59 graph: GraphStore::in_memory(),
60 root: std::path::PathBuf::from(":memory:"),
61 seq: AtomicU64::new(0),
62 head: RwLock::new(String::new()),
63 startup_ready: Arc::new(AtomicBool::new(true)), manifest_dirty: Arc::new(AtomicBool::new(false)),
65 seq_index: Arc::new(DashMap::new()),
66 }
67 }
68
69 pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
71 std::fs::create_dir_all(db_root)?;
72
73 let objects = ObjectStore::new(db_root, dek.clone())?;
74 let id_index = IdIndex::new(db_root)?;
75 let sorted_indexes = SortedIndexes::new();
76 let graph = GraphStore::new(db_root)?;
77
78 let mut db = Self {
79 objects,
80 id_index,
81 sorted_indexes,
82 graph,
83 root: db_root.to_path_buf(),
84 seq: AtomicU64::new(0),
85 head: RwLock::new(String::new()),
86 startup_ready: Arc::new(AtomicBool::new(false)),
87 manifest_dirty: Arc::new(AtomicBool::new(false)),
88 seq_index: Arc::new(DashMap::new()),
89 };
90
91 migrate::migrate_if_needed(
93 db_root,
94 &db.objects,
95 &db.id_index,
96 &db.sorted_indexes,
97 &db.graph,
98 dek.as_ref(),
99 )?;
100
101 db.startup_rebuild()?;
104
105 Ok(db)
106 }
107
108 fn startup_rebuild(&mut self) -> Result<()> {
113 let manifest_path = self.root.join("MANIFEST");
114 let needs_index_rebuild = !self.sorted_indexes.is_empty();
115
116 if manifest_path.exists() && !needs_index_rebuild {
118 if let Some(m) = fs::read_to_string(&manifest_path)
119 .ok()
120 .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
121 {
122 if m.head.len() < 8 {
125 eprintln!(" [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
126 } else {
127 self.seq.store(m.seq, Ordering::SeqCst); *self.head.write() = m.head.clone();
129 self.startup_ready.store(true, Ordering::SeqCst);
130 println!(" [nedbd] warm start — seq={} head={}...", m.seq, &m.head[..8]);
131 return Ok(());
132 }
133 } else {
134 eprintln!(" [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
135 }
136 }
137
138 println!(" [nedbd] cold start — background scan will start after heap allocation");
144 Ok(())
145 }
146
147 pub fn start_cold_scan(self_arc: Arc<Self>) {
151 if self_arc.startup_ready.load(Ordering::SeqCst) {
152 return; }
154 if self_arc.objects.all_hashes().next().is_none() {
157 self_arc.startup_ready.store(true, Ordering::SeqCst);
158 return;
159 }
160 println!(" [nedbd] cold start — background scan starting, server accepting reads now");
161 std::thread::spawn(move || {
162 let db = self_arc;
163 cold_scan_background_arc(db);
164 });
165 }
166
167 pub fn put(
169 &self,
170 coll: &str,
171 id: &str,
172 data: Value,
173 caused_by: Vec<String>,
174 valid_from: Option<String>,
175 valid_to: Option<String>,
176 ) -> Result<Node> {
177 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
178 let prev = self.id_index.get(coll, id);
179
180 if let Some(old_hash) = &prev {
182 if let Ok(old_node) = self.objects.read(old_hash) {
183 if let Value::Object(ref obj) = old_node.data {
184 for (field, value) in obj {
185 self.sorted_indexes.remove(coll, field, value, old_hash);
186 }
187 }
188 }
189 }
190
191 let mut node = Node {
192 id: id.to_string(),
193 coll: coll.to_string(),
194 seq,
195 data: data.clone(),
196 prev,
197 caused_by: caused_by.clone(),
198 ts: now(),
199 valid_from,
200 valid_to,
201 hash: String::new(),
202 };
203
204 let hash = self.objects.write(&mut node)?;
206 self.seq_index.insert(seq, hash.clone());
207
208 self.id_index.set(coll, id, &hash)?;
210
211 if let Value::Object(ref obj) = data {
213 for (field, value) in obj {
214 if self.sorted_indexes.has(coll, field) {
215 self.sorted_indexes.insert(coll, field, value, &hash);
216 }
217 }
218 }
219
220 for cause in &caused_by {
222 self.graph.add_edge(&hash, "caused_by", cause)?;
223 self.graph.add_edge(cause, "caused_by_rev", &hash)?;
224 }
225
226 self.update_head(seq, &hash);
229
230 Ok(node)
231 }
232
233 pub fn put_batch(
238 &self,
239 ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
240 ) -> Result<Vec<Node>> {
242 use rayon::prelude::*;
243
244 if ops.is_empty() { return Ok(vec![]); }
245 let n = ops.len() as u64;
246
247 let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
249 let ts = now();
250
251 let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
253 let prev = self.id_index.get(&coll, &id);
254 Node {
255 id, coll, seq: base_seq + i as u64,
256 data, prev, caused_by,
257 ts, valid_from, valid_to,
258 hash: String::new(),
259 }
260 }).collect();
261
262 let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
264 .filter_map(|node| self.objects.write(node).err())
265 .collect();
266 if let Some(e) = write_errors.into_iter().next() { return Err(e); }
267
268 let index_errors: Vec<anyhow::Error> = nodes.par_iter()
270 .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
271 .collect();
272 if let Some(e) = index_errors.into_iter().next() { return Err(e); }
273
274 for node in &nodes {
276 self.seq_index.insert(node.seq, node.hash.clone());
277 if let Value::Object(ref obj) = node.data {
278 for (field, value) in obj {
279 if self.sorted_indexes.has(&node.coll, field) {
280 self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
281 }
282 }
283 }
284 for cause in &node.caused_by {
285 self.graph.add_edge(&node.hash, "caused_by", cause).ok();
286 self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
287 }
288 }
289
290 for node in &nodes {
292 self.update_head(node.seq, &node.hash);
293 }
294
295 Ok(nodes)
296 }
297
298 fn update_head(&self, seq: u64, new_hash: &str) {
302 use blake2::{Blake2b512, Digest};
303 let prev = self.head.read().clone();
304 let mut h = Blake2b512::new();
305 h.update(prev.as_bytes());
306 h.update(seq.to_le_bytes());
307 h.update(new_hash.as_bytes());
308 *self.head.write() = hex::encode(&h.finalize()[..32]);
309 self.manifest_dirty.store(true, Ordering::Release);
311 }
312
313 pub fn flush_all(&self) {
315 self.id_index.flush_write_buf();
316 if let Err(e) = self.objects.sync() {
319 eprintln!("nedb: segment sync failed: {}", e);
320 }
321 self.flush_manifest();
322 }
323
324 pub fn compact(&self) -> Result<crate::segment::CompactStats> {
333 self.flush_all();
334 let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
335 for coll in self.id_index.collections() {
336 for id in self.id_index.list_ids(&coll) {
337 if let Some(h) = self.id_index.get(&coll, &id) {
338 live.insert(h);
339 }
340 }
341 }
342 self.objects.compact(&live)
343 }
344
345 pub fn flush_manifest_if_dirty(&self) {
347 if self.root == std::path::PathBuf::from(":memory:") { return; }
348 if self.manifest_dirty.compare_exchange(
349 true, false, Ordering::AcqRel, Ordering::Relaxed
350 ).is_ok() {
351 self.flush_manifest();
352 }
353 }
354
355 pub fn flush_manifest(&self) {
357 if self.root == std::path::PathBuf::from(":memory:") { return; }
358 let seq = self.seq.load(Ordering::SeqCst);
359 let head = self.head.read().clone();
360 let m = Manifest { seq, head };
361 if let Ok(json) = serde_json::to_string(&m) {
362 let path = self.root.join("MANIFEST");
363 let tmp = self.root.join("MANIFEST.tmp");
364 let _ = fs::write(&tmp, &json);
365 let _ = fs::rename(&tmp, &path);
366 }
367 }
368
369 pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
373 let db = self_arc;
374 std::thread::spawn(move || {
375 loop {
376 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
377 db.id_index.flush_write_buf();
379 db.flush_manifest_if_dirty();
381 }
382 });
383 }
384
385 pub fn head(&self) -> String {
387 self.head.read().clone()
388 }
389
390 pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
393 let prev = match self.id_index.get(coll, id) {
394 None => return Ok(false), Some(h) => h,
396 };
397 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
398 let mut tombstone = Node {
399 id: format!("_del_{}", id),
400 coll: coll.to_string(),
401 seq,
402 data: serde_json::json!({"_deleted": id, "_prev": prev}),
403 prev: Some(prev),
404 caused_by: vec![],
405 ts: now(),
406 valid_from: None,
407 valid_to: None,
408 hash: String::new(),
409 };
410 let hash = self.objects.write(&mut tombstone)?;
411 self.update_head(seq, &hash);
412 self.id_index.remove(coll, id)?;
414 Ok(true)
415 }
416
417 pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
419 let hash = self.id_index.get(coll, id)?;
420 self.objects.read(&hash).ok()
421 }
422
423 pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
425 self.objects.read(hash).ok()
426 }
427
428 pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
431 let hash = self.id_index.get(coll, id)?;
432 let mut current = self.objects.read(&hash).ok()?;
433 loop {
434 if current.seq <= target_seq {
435 return Some(current);
436 }
437 let prev_hash = current.prev.as_deref()?;
438 current = self.objects.read(prev_hash).ok()?;
439 }
440 }
441
442 pub fn list(&self, coll: &str) -> Vec<Node> {
444 self.id_index
445 .list_ids(coll)
446 .into_iter()
447 .filter_map(|id| self.get(coll, &id))
448 .collect()
449 }
450
451 pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
453 if self.sorted_indexes.has(coll, field) {
454 self.sorted_indexes
455 .top_k_asc(coll, field, limit)
456 .into_iter()
457 .filter_map(|h| self.objects.read(&h).ok())
458 .collect()
459 } else {
460 let mut docs = self.list(coll);
461 docs.sort_by(|a, b| {
462 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
463 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
464 av.cmp(&bv)
465 });
466 docs.truncate(limit);
467 docs
468 }
469 }
470
471 pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
473 if self.sorted_indexes.has(coll, field) {
474 self.sorted_indexes
475 .top_k_desc(coll, field, limit)
476 .into_iter()
477 .filter_map(|h| self.objects.read(&h).ok())
478 .collect()
479 } else {
480 let mut docs = self.list(coll);
481 docs.sort_by(|a, b| {
482 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
483 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
484 bv.cmp(&av)
485 });
486 docs.truncate(limit);
487 docs
488 }
489 }
490
491 pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
493 self.graph
494 .trace(hash, "caused_by", reverse, limit)
495 .into_iter()
496 .filter_map(|h| self.objects.read(&h).ok())
497 .collect()
498 }
499
500 pub fn verify(&self) -> (usize, Vec<String>) {
502 self.objects.verify_all()
503 }
504
505 pub fn create_sorted_index(&self, coll: &str, field: &str) {
507 self.sorted_indexes.ensure(coll, field);
508 for id in self.id_index.list_ids(coll) {
510 if let Some(node) = self.get(coll, &id) {
511 if let Value::Object(ref obj) = node.data {
512 if let Some(value) = obj.get(field) {
513 self.sorted_indexes.insert(coll, field, value, &node.hash);
514 }
515 }
516 }
517 }
518 }
519
520 pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
523 self.seq_index.get(&seq).map(|r| r.clone())
524 }
525
526 pub fn tip(&self) -> Option<Node> {
534 let next = self.seq.load(Ordering::SeqCst);
535 if next == 0 {
536 return None; }
538 let hash = self.get_hash_by_seq(next - 1)?;
539 self.get_by_hash(&hash)
540 }
541
542 pub fn since(&self, after_seq: u64) -> Vec<Node> {
550 let next = self.seq.load(Ordering::SeqCst);
551 let mut out = Vec::new();
552 let mut s = after_seq.saturating_add(1);
553 while s < next {
554 if let Some(hash) = self.get_hash_by_seq(s) {
555 if let Some(node) = self.get_by_hash(&hash) {
556 out.push(node);
557 }
558 }
559 s += 1;
560 }
561 out
562 }
563
564 pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
569 let (frm_coll, frm_id) = frm.split_once(':')
570 .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
571 let (to_coll, to_id) = to.split_once(':')
572 .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
573 if self.id_index.get(frm_coll, frm_id).is_none() {
574 anyhow::bail!("link: frm not found: {}", frm);
575 }
576 if self.id_index.get(to_coll, to_id).is_none() {
577 anyhow::bail!("link: to not found: {}", to);
578 }
579 let link_id = format!("{}|{}|{}", frm, rel, to);
580 let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
581 self.put("__links__", &link_id, doc, vec![], None, None)?;
582 Ok(())
583 }
584
585 pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
587 let link_id = format!("{}|{}|{}", frm, rel, to);
588 self.delete("__links__", &link_id)
589 }
590
591 pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
594 self.id_index
595 .list_ids("__links__")
596 .into_iter()
597 .filter_map(|id| self.get("__links__", &id))
598 .filter(|node| {
599 node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
600 && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
601 })
602 .filter_map(|node| {
603 let to = node.data.get("_to")?.as_str()?;
604 let (to_coll, to_id) = to.split_once(':')?;
605 self.get(to_coll, to_id)
606 })
607 .collect()
608 }
609}
610
611impl Drop for Db {
612 fn drop(&mut self) {
628 self.flush_all();
629 }
630}
631
632fn cold_scan_background_arc(db: Arc<Db>) {
634 use rayon::prelude::*;
635 use blake2::{Blake2b512, Digest};
636
637 let objects = &db.objects;
638 let head = &db.head;
639 let seq_atomic = &db.seq;
640 let sorted_indexes = &db.sorted_indexes;
641 let root = db.root.clone();
642 let ready_flag = Arc::clone(&db.startup_ready);
643
644 let hashes: Vec<String> = objects.all_hashes().collect();
645 let total = hashes.len();
646
647 if total == 0 {
648 ready_flag.store(true, Ordering::SeqCst);
649 return;
650 }
651
652 println!(" [nedbd] background scan — {} objects...", total);
653 let t0 = std::time::Instant::now();
654 let step = (total / 10).max(1000);
655
656 let nodes: Vec<Node> = hashes.par_iter()
657 .enumerate()
658 .filter_map(|(i, h)| {
659 if i > 0 && i % step == 0 {
660 let pct = i * 100 / total;
661 let elapsed = t0.elapsed().as_secs_f32();
662 let rate = i as f32 / elapsed;
663 let eta = (total - i) as f32 / rate;
664 eprint!("\r [nedbd] {:>3}% {:>8} / {:>8} ({:>8.0}/s eta {:.0}s) ",
665 pct, i, total, rate, eta);
666 }
667 objects.read(h).ok()
668 })
669 .collect();
670
671 eprintln!("\r [nedbd] 100% {:>8} / {:>8} ({:.1}s) ",
672 total, total, t0.elapsed().as_secs_f32());
673
674 let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
675 seq_atomic.store(max_seq + 1, Ordering::SeqCst);
676
677 for node in &nodes {
678 db.seq_index.insert(node.seq, node.hash.clone());
679 if let Value::Object(ref obj) = node.data {
680 for (field, value) in obj {
681 if sorted_indexes.has(&node.coll, field) {
682 sorted_indexes.insert(&node.coll, field, value, &node.hash);
683 }
684 }
685 }
686 }
687
688 let mut sorted_hashes = hashes;
690 sorted_hashes.sort();
691 let mut h = Blake2b512::new();
692 h.update(max_seq.to_le_bytes());
693 for hash_str in &sorted_hashes {
694 h.update(hash_str.as_bytes());
695 }
696 let new_head = hex::encode(&h.finalize()[..32]);
697 *head.write() = new_head.clone();
698
699 let m = Manifest { seq: max_seq, head: new_head };
701 let json = serde_json::to_string(&m).unwrap_or_default();
702 let path = root.join("MANIFEST");
703 let tmp = root.join("MANIFEST.tmp");
704 let _ = fs::write(&tmp, &json);
705 let _ = fs::rename(&tmp, &path);
706
707 ready_flag.store(true, Ordering::SeqCst);
709 println!(" [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
710}
711
712fn now() -> f64 {
713 std::time::SystemTime::now()
714 .duration_since(std::time::UNIX_EPOCH)
715 .map(|d| d.as_secs_f64())
716 .unwrap_or(0.0)
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722 use tempfile::tempdir;
723
724 #[test]
725 fn put_and_get() {
726 let dir = tempdir().unwrap();
727 let db = Db::open(dir.path(), None).unwrap();
728 db.put(
729 "blocks", "618000",
730 serde_json::json!({"height": 618000, "hash": "0000abc"}),
731 vec![], None, None,
732 ).unwrap();
733 let node = db.get("blocks", "618000").unwrap();
734 assert_eq!(node.id, "618000");
735 assert_eq!(node.data["height"], 618000);
736 }
737
738 #[test]
739 fn order_by_with_sorted_index() {
740 let dir = tempdir().unwrap();
741 let db = Db::open(dir.path(), None).unwrap();
742 db.create_sorted_index("blocks", "height");
743 for h in [3u64, 1, 5, 2, 4] {
744 db.put("blocks", &h.to_string(),
745 serde_json::json!({"height": h}),
746 vec![], None, None).unwrap();
747 }
748 let asc = db.order_by_asc("blocks", "height", 3);
749 let heights: Vec<u64> = asc.iter()
750 .filter_map(|n| n.data["height"].as_u64())
751 .collect();
752 assert_eq!(heights, vec![1, 2, 3]);
753 }
754
755 #[test]
756 fn causal_trace() {
757 let dir = tempdir().unwrap();
758 let db = Db::open(dir.path(), None).unwrap();
759 let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
760 let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
761 let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
762
763 let trace = db.trace(&c.hash, false, 10);
764 assert_eq!(trace.len(), 3); }
766
767 #[test]
768 fn as_of() {
769 let dir = tempdir().unwrap();
770 let db = Db::open(dir.path(), None).unwrap();
771 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
772 let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
773
774 let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
775 assert_eq!(at_v1.data["v"], 1);
776 let current = db.get("docs", "x").unwrap();
777 assert_eq!(current.data["v"], 2);
778 }
779}
780
781#[cfg(test)]
782mod tests_v2 {
783 use super::*;
784 use tempfile::tempdir;
785
786 #[test]
787 fn seq_index_populated_on_put() {
788 let db = Db::in_memory();
789 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
790 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
791 assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
792 assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
793 assert_eq!(db.get_hash_by_seq(9999), None);
794 }
795
796 #[test]
797 fn tip_and_since() {
798 let db = Db::in_memory();
799 assert!(db.tip().is_none());
801 assert!(db.since(0).is_empty());
802
803 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
804 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
805
806 let t = db.tip().expect("tip after writes");
808 assert_eq!(t.seq, b.seq);
809 assert_eq!(t.id, "b");
810 assert_eq!(t.hash, b.hash);
811
812 let after_a = db.since(a.seq);
814 assert_eq!(after_a.len(), 1);
815 assert_eq!(after_a[0].id, "b");
816
817 assert!(db.since(b.seq).is_empty());
819 }
820
821 #[test]
822 fn seq_index_survives_batch() {
823 let db = Db::in_memory();
824 let nodes = db.put_batch(vec![
825 ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
826 ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
827 ]).unwrap();
828 for node in &nodes {
829 assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
830 }
831 }
832
833 #[test]
834 fn link_and_neighbors() {
835 let db = Db::in_memory();
836 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
837 db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
838 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
839 db.put("trip", "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
840
841 db.link("driver:d1", "handles", "trip:t1").unwrap();
842 db.link("driver:d1", "handles", "trip:t2").unwrap();
843 db.link("driver:d2", "handles", "trip:t1").unwrap();
844
845 let d1_trips = db.neighbors("driver:d1", "handles");
846 assert_eq!(d1_trips.len(), 2);
847 let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
848 assert!(ids.contains("t1") && ids.contains("t2"));
849
850 let d2_trips = db.neighbors("driver:d2", "handles");
851 assert_eq!(d2_trips.len(), 1);
852 assert_eq!(d2_trips[0].id, "t1");
853 }
854
855 #[test]
856 fn link_stored_in_links_collection() {
857 let db = Db::in_memory();
860 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
861 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
862 db.link("driver:d1", "handles", "trip:t1").unwrap();
863 let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
865 assert!(link_doc.is_some(), "__links__ doc should exist");
866 let doc = link_doc.unwrap();
867 assert_eq!(doc.data["_from"], "driver:d1");
868 assert_eq!(doc.data["_rel"], "handles");
869 assert_eq!(doc.data["_to"], "trip:t1");
870 let nb = db.neighbors("driver:d1", "handles");
872 assert_eq!(nb.len(), 1);
873 assert_eq!(nb[0].id, "t1");
874 }
875
876 #[test]
877 fn link_missing_node_errors() {
878 let db = Db::in_memory();
879 db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
880 assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
881 }
882
883 #[test]
884 fn link_durable_survives_reopen() {
885 let dir = tempdir().unwrap();
886 {
887 let db = Db::open(dir.path(), None).unwrap();
888 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
889 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
890 db.link("driver:d1", "handles", "trip:t1").unwrap();
891 }
892 let db2 = Db::open(dir.path(), None).unwrap();
893 db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
894 let trips = db2.neighbors("driver:d1", "handles");
895 assert_eq!(trips.len(), 1);
896 assert_eq!(trips[0].id, "t1");
897 }
898}