1use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22 seq: u64,
23 head: String,
24}
25
26pub struct Db {
27 pub objects: ObjectStore,
28 pub id_index: IdIndex,
29 pub sorted_indexes: SortedIndexes,
30 pub graph: GraphStore,
31 pub root: PathBuf,
32 manifest_dirty: Arc<AtomicBool>,
36 pub seq: AtomicU64,
37 head: RwLock<String>,
39 pub startup_ready: Arc<AtomicBool>,
44 seq_index: Arc<DashMap<u64, String>>,
48}
49
50impl Db {
51 pub fn in_memory() -> Self {
55 Self {
56 objects: ObjectStore::in_memory(),
57 id_index: IdIndex::in_memory(),
58 sorted_indexes: SortedIndexes::new(),
59 graph: GraphStore::in_memory(),
60 root: std::path::PathBuf::from(":memory:"),
61 seq: AtomicU64::new(0),
62 head: RwLock::new(String::new()),
63 startup_ready: Arc::new(AtomicBool::new(true)), manifest_dirty: Arc::new(AtomicBool::new(false)),
65 seq_index: Arc::new(DashMap::new()),
66 }
67 }
68
69 pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
71 std::fs::create_dir_all(db_root)?;
72
73 let objects = ObjectStore::new(db_root, dek.clone())?;
74 let id_index = IdIndex::new(db_root)?;
75 let sorted_indexes = SortedIndexes::new();
76 let graph = GraphStore::new(db_root)?;
77
78 let mut db = Self {
79 objects,
80 id_index,
81 sorted_indexes,
82 graph,
83 root: db_root.to_path_buf(),
84 seq: AtomicU64::new(0),
85 head: RwLock::new(String::new()),
86 startup_ready: Arc::new(AtomicBool::new(false)),
87 manifest_dirty: Arc::new(AtomicBool::new(false)),
88 seq_index: Arc::new(DashMap::new()),
89 };
90
91 migrate::migrate_if_needed(
93 db_root,
94 &db.objects,
95 &db.id_index,
96 &db.sorted_indexes,
97 &db.graph,
98 dek.as_ref(),
99 )?;
100
101 db.startup_rebuild()?;
104
105 Ok(db)
106 }
107
108 fn startup_rebuild(&mut self) -> Result<()> {
113 let manifest_path = self.root.join("MANIFEST");
114 let needs_index_rebuild = !self.sorted_indexes.is_empty();
115
116 if manifest_path.exists() && !needs_index_rebuild {
118 if let Some(m) = fs::read_to_string(&manifest_path)
119 .ok()
120 .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
121 {
122 if m.head.len() < 8 {
125 eprintln!(" [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
126 } else {
127 self.seq.store(m.seq, Ordering::SeqCst); *self.head.write() = m.head.clone();
129 self.startup_ready.store(true, Ordering::SeqCst);
130 println!(" [nedbd] warm start — seq={} head={}...", m.seq, &m.head[..8]);
131 return Ok(());
132 }
133 } else {
134 eprintln!(" [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
135 }
136 }
137
138 println!(" [nedbd] cold start — background scan will start after heap allocation");
144 Ok(())
145 }
146
147 pub fn start_cold_scan(self_arc: Arc<Self>) {
151 if self_arc.startup_ready.load(Ordering::SeqCst) {
152 return; }
154 if self_arc.objects.all_hashes().next().is_none() {
157 self_arc.startup_ready.store(true, Ordering::SeqCst);
158 return;
159 }
160 println!(" [nedbd] cold start — background scan starting, server accepting reads now");
161 std::thread::spawn(move || {
162 let db = self_arc;
163 cold_scan_background_arc(db);
164 });
165 }
166
167 pub fn put(
169 &self,
170 coll: &str,
171 id: &str,
172 data: Value,
173 caused_by: Vec<String>,
174 valid_from: Option<String>,
175 valid_to: Option<String>,
176 ) -> Result<Node> {
177 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
178 let prev = self.id_index.get(coll, id);
179
180 if let Some(old_hash) = &prev {
182 if let Ok(old_node) = self.objects.read(old_hash) {
183 if let Value::Object(ref obj) = old_node.data {
184 for (field, value) in obj {
185 self.sorted_indexes.remove(coll, field, value, old_hash);
186 }
187 }
188 }
189 }
190
191 let mut node = Node {
192 id: id.to_string(),
193 coll: coll.to_string(),
194 seq,
195 data: data.clone(),
196 prev,
197 caused_by: caused_by.clone(),
198 ts: now(),
199 valid_from,
200 valid_to,
201 hash: String::new(),
202 };
203
204 let hash = self.objects.write(&mut node)?;
206 self.seq_index.insert(seq, hash.clone());
207
208 self.id_index.set(coll, id, &hash)?;
210
211 if let Value::Object(ref obj) = data {
213 for (field, value) in obj {
214 if self.sorted_indexes.has(coll, field) {
215 self.sorted_indexes.insert(coll, field, value, &hash);
216 }
217 }
218 }
219
220 for cause in &caused_by {
222 self.graph.add_edge(&hash, "caused_by", cause)?;
223 self.graph.add_edge(cause, "caused_by_rev", &hash)?;
224 }
225
226 self.update_head(seq, &hash);
229
230 Ok(node)
231 }
232
233 pub fn put_batch(
238 &self,
239 ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
240 ) -> Result<Vec<Node>> {
242 use rayon::prelude::*;
243
244 if ops.is_empty() { return Ok(vec![]); }
245 let n = ops.len() as u64;
246
247 let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
249 let ts = now();
250
251 let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
253 let prev = self.id_index.get(&coll, &id);
254 Node {
255 id, coll, seq: base_seq + i as u64,
256 data, prev, caused_by,
257 ts, valid_from, valid_to,
258 hash: String::new(),
259 }
260 }).collect();
261
262 let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
264 .filter_map(|node| self.objects.write(node).err())
265 .collect();
266 if let Some(e) = write_errors.into_iter().next() { return Err(e); }
267
268 let index_errors: Vec<anyhow::Error> = nodes.par_iter()
270 .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
271 .collect();
272 if let Some(e) = index_errors.into_iter().next() { return Err(e); }
273
274 for node in &nodes {
276 self.seq_index.insert(node.seq, node.hash.clone());
277 if let Value::Object(ref obj) = node.data {
278 for (field, value) in obj {
279 if self.sorted_indexes.has(&node.coll, field) {
280 self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
281 }
282 }
283 }
284 for cause in &node.caused_by {
285 self.graph.add_edge(&node.hash, "caused_by", cause).ok();
286 self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
287 }
288 }
289
290 for node in &nodes {
292 self.update_head(node.seq, &node.hash);
293 }
294
295 Ok(nodes)
296 }
297
298 fn update_head(&self, seq: u64, new_hash: &str) {
302 use blake2::{Blake2b512, Digest};
303 let prev = self.head.read().clone();
304 let mut h = Blake2b512::new();
305 h.update(prev.as_bytes());
306 h.update(seq.to_le_bytes());
307 h.update(new_hash.as_bytes());
308 *self.head.write() = hex::encode(&h.finalize()[..32]);
309 self.manifest_dirty.store(true, Ordering::Release);
311 }
312
313 pub fn flush_all(&self) {
315 self.id_index.flush_write_buf();
316 self.flush_manifest();
317 }
318
319 pub fn flush_manifest_if_dirty(&self) {
321 if self.root == std::path::PathBuf::from(":memory:") { return; }
322 if self.manifest_dirty.compare_exchange(
323 true, false, Ordering::AcqRel, Ordering::Relaxed
324 ).is_ok() {
325 self.flush_manifest();
326 }
327 }
328
329 pub fn flush_manifest(&self) {
331 if self.root == std::path::PathBuf::from(":memory:") { return; }
332 let seq = self.seq.load(Ordering::SeqCst);
333 let head = self.head.read().clone();
334 let m = Manifest { seq, head };
335 if let Ok(json) = serde_json::to_string(&m) {
336 let path = self.root.join("MANIFEST");
337 let tmp = self.root.join("MANIFEST.tmp");
338 let _ = fs::write(&tmp, &json);
339 let _ = fs::rename(&tmp, &path);
340 }
341 }
342
343 pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
347 let db = self_arc;
348 std::thread::spawn(move || {
349 loop {
350 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
351 db.id_index.flush_write_buf();
353 db.flush_manifest_if_dirty();
355 }
356 });
357 }
358
359 pub fn head(&self) -> String {
361 self.head.read().clone()
362 }
363
364 pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
367 let prev = match self.id_index.get(coll, id) {
368 None => return Ok(false), Some(h) => h,
370 };
371 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
372 let mut tombstone = Node {
373 id: format!("_del_{}", id),
374 coll: coll.to_string(),
375 seq,
376 data: serde_json::json!({"_deleted": id, "_prev": prev}),
377 prev: Some(prev),
378 caused_by: vec![],
379 ts: now(),
380 valid_from: None,
381 valid_to: None,
382 hash: String::new(),
383 };
384 let hash = self.objects.write(&mut tombstone)?;
385 self.update_head(seq, &hash);
386 self.id_index.remove(coll, id)?;
388 Ok(true)
389 }
390
391 pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
393 let hash = self.id_index.get(coll, id)?;
394 self.objects.read(&hash).ok()
395 }
396
397 pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
399 self.objects.read(hash).ok()
400 }
401
402 pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
405 let hash = self.id_index.get(coll, id)?;
406 let mut current = self.objects.read(&hash).ok()?;
407 loop {
408 if current.seq <= target_seq {
409 return Some(current);
410 }
411 let prev_hash = current.prev.as_deref()?;
412 current = self.objects.read(prev_hash).ok()?;
413 }
414 }
415
416 pub fn list(&self, coll: &str) -> Vec<Node> {
418 self.id_index
419 .list_ids(coll)
420 .into_iter()
421 .filter_map(|id| self.get(coll, &id))
422 .collect()
423 }
424
425 pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
427 if self.sorted_indexes.has(coll, field) {
428 self.sorted_indexes
429 .top_k_asc(coll, field, limit)
430 .into_iter()
431 .filter_map(|h| self.objects.read(&h).ok())
432 .collect()
433 } else {
434 let mut docs = self.list(coll);
435 docs.sort_by(|a, b| {
436 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
437 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
438 av.cmp(&bv)
439 });
440 docs.truncate(limit);
441 docs
442 }
443 }
444
445 pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
447 if self.sorted_indexes.has(coll, field) {
448 self.sorted_indexes
449 .top_k_desc(coll, field, limit)
450 .into_iter()
451 .filter_map(|h| self.objects.read(&h).ok())
452 .collect()
453 } else {
454 let mut docs = self.list(coll);
455 docs.sort_by(|a, b| {
456 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
457 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
458 bv.cmp(&av)
459 });
460 docs.truncate(limit);
461 docs
462 }
463 }
464
465 pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
467 self.graph
468 .trace(hash, "caused_by", reverse, limit)
469 .into_iter()
470 .filter_map(|h| self.objects.read(&h).ok())
471 .collect()
472 }
473
474 pub fn verify(&self) -> (usize, Vec<String>) {
476 self.objects.verify_all()
477 }
478
479 pub fn create_sorted_index(&self, coll: &str, field: &str) {
481 self.sorted_indexes.ensure(coll, field);
482 for id in self.id_index.list_ids(coll) {
484 if let Some(node) = self.get(coll, &id) {
485 if let Value::Object(ref obj) = node.data {
486 if let Some(value) = obj.get(field) {
487 self.sorted_indexes.insert(coll, field, value, &node.hash);
488 }
489 }
490 }
491 }
492 }
493
494 pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
497 self.seq_index.get(&seq).map(|r| r.clone())
498 }
499
500 pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
505 let (frm_coll, frm_id) = frm.split_once(':')
506 .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
507 let (to_coll, to_id) = to.split_once(':')
508 .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
509 if self.id_index.get(frm_coll, frm_id).is_none() {
510 anyhow::bail!("link: frm not found: {}", frm);
511 }
512 if self.id_index.get(to_coll, to_id).is_none() {
513 anyhow::bail!("link: to not found: {}", to);
514 }
515 let link_id = format!("{}|{}|{}", frm, rel, to);
516 let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
517 self.put("__links__", &link_id, doc, vec![], None, None)?;
518 Ok(())
519 }
520
521 pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
523 let link_id = format!("{}|{}|{}", frm, rel, to);
524 self.delete("__links__", &link_id)
525 }
526
527 pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
530 self.id_index
531 .list_ids("__links__")
532 .into_iter()
533 .filter_map(|id| self.get("__links__", &id))
534 .filter(|node| {
535 node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
536 && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
537 })
538 .filter_map(|node| {
539 let to = node.data.get("_to")?.as_str()?;
540 let (to_coll, to_id) = to.split_once(':')?;
541 self.get(to_coll, to_id)
542 })
543 .collect()
544 }
545}
546
547fn cold_scan_background_arc(db: Arc<Db>) {
549 use rayon::prelude::*;
550 use blake2::{Blake2b512, Digest};
551
552 let objects = &db.objects;
553 let head = &db.head;
554 let seq_atomic = &db.seq;
555 let sorted_indexes = &db.sorted_indexes;
556 let root = db.root.clone();
557 let ready_flag = Arc::clone(&db.startup_ready);
558
559 let hashes: Vec<String> = objects.all_hashes().collect();
560 let total = hashes.len();
561
562 if total == 0 {
563 ready_flag.store(true, Ordering::SeqCst);
564 return;
565 }
566
567 println!(" [nedbd] background scan — {} objects...", total);
568 let t0 = std::time::Instant::now();
569 let step = (total / 10).max(1000);
570
571 let nodes: Vec<Node> = hashes.par_iter()
572 .enumerate()
573 .filter_map(|(i, h)| {
574 if i > 0 && i % step == 0 {
575 let pct = i * 100 / total;
576 let elapsed = t0.elapsed().as_secs_f32();
577 let rate = i as f32 / elapsed;
578 let eta = (total - i) as f32 / rate;
579 eprint!("\r [nedbd] {:>3}% {:>8} / {:>8} ({:>8.0}/s eta {:.0}s) ",
580 pct, i, total, rate, eta);
581 }
582 objects.read(h).ok()
583 })
584 .collect();
585
586 eprintln!("\r [nedbd] 100% {:>8} / {:>8} ({:.1}s) ",
587 total, total, t0.elapsed().as_secs_f32());
588
589 let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
590 seq_atomic.store(max_seq + 1, Ordering::SeqCst);
591
592 for node in &nodes {
593 db.seq_index.insert(node.seq, node.hash.clone());
594 if let Value::Object(ref obj) = node.data {
595 for (field, value) in obj {
596 if sorted_indexes.has(&node.coll, field) {
597 sorted_indexes.insert(&node.coll, field, value, &node.hash);
598 }
599 }
600 }
601 }
602
603 let mut sorted_hashes = hashes;
605 sorted_hashes.sort();
606 let mut h = Blake2b512::new();
607 h.update(max_seq.to_le_bytes());
608 for hash_str in &sorted_hashes {
609 h.update(hash_str.as_bytes());
610 }
611 let new_head = hex::encode(&h.finalize()[..32]);
612 *head.write() = new_head.clone();
613
614 let m = Manifest { seq: max_seq, head: new_head };
616 let json = serde_json::to_string(&m).unwrap_or_default();
617 let path = root.join("MANIFEST");
618 let tmp = root.join("MANIFEST.tmp");
619 let _ = fs::write(&tmp, &json);
620 let _ = fs::rename(&tmp, &path);
621
622 ready_flag.store(true, Ordering::SeqCst);
624 println!(" [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
625}
626
627fn now() -> f64 {
628 std::time::SystemTime::now()
629 .duration_since(std::time::UNIX_EPOCH)
630 .map(|d| d.as_secs_f64())
631 .unwrap_or(0.0)
632}
633
634#[cfg(test)]
635mod tests {
636 use super::*;
637 use tempfile::tempdir;
638
639 #[test]
640 fn put_and_get() {
641 let dir = tempdir().unwrap();
642 let db = Db::open(dir.path(), None).unwrap();
643 db.put(
644 "blocks", "618000",
645 serde_json::json!({"height": 618000, "hash": "0000abc"}),
646 vec![], None, None,
647 ).unwrap();
648 let node = db.get("blocks", "618000").unwrap();
649 assert_eq!(node.id, "618000");
650 assert_eq!(node.data["height"], 618000);
651 }
652
653 #[test]
654 fn order_by_with_sorted_index() {
655 let dir = tempdir().unwrap();
656 let db = Db::open(dir.path(), None).unwrap();
657 db.create_sorted_index("blocks", "height");
658 for h in [3u64, 1, 5, 2, 4] {
659 db.put("blocks", &h.to_string(),
660 serde_json::json!({"height": h}),
661 vec![], None, None).unwrap();
662 }
663 let asc = db.order_by_asc("blocks", "height", 3);
664 let heights: Vec<u64> = asc.iter()
665 .filter_map(|n| n.data["height"].as_u64())
666 .collect();
667 assert_eq!(heights, vec![1, 2, 3]);
668 }
669
670 #[test]
671 fn causal_trace() {
672 let dir = tempdir().unwrap();
673 let db = Db::open(dir.path(), None).unwrap();
674 let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
675 let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
676 let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
677
678 let trace = db.trace(&c.hash, false, 10);
679 assert_eq!(trace.len(), 3); }
681
682 #[test]
683 fn as_of() {
684 let dir = tempdir().unwrap();
685 let db = Db::open(dir.path(), None).unwrap();
686 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
687 let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
688
689 let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
690 assert_eq!(at_v1.data["v"], 1);
691 let current = db.get("docs", "x").unwrap();
692 assert_eq!(current.data["v"], 2);
693 }
694}
695
696#[cfg(test)]
697mod tests_v2 {
698 use super::*;
699 use tempfile::tempdir;
700
701 #[test]
702 fn seq_index_populated_on_put() {
703 let db = Db::in_memory();
704 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
705 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
706 assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
707 assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
708 assert_eq!(db.get_hash_by_seq(9999), None);
709 }
710
711 #[test]
712 fn seq_index_survives_batch() {
713 let db = Db::in_memory();
714 let nodes = db.put_batch(vec![
715 ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
716 ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
717 ]).unwrap();
718 for node in &nodes {
719 assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
720 }
721 }
722
723 #[test]
724 fn link_and_neighbors() {
725 let db = Db::in_memory();
726 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
727 db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
728 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
729 db.put("trip", "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
730
731 db.link("driver:d1", "handles", "trip:t1").unwrap();
732 db.link("driver:d1", "handles", "trip:t2").unwrap();
733 db.link("driver:d2", "handles", "trip:t1").unwrap();
734
735 let d1_trips = db.neighbors("driver:d1", "handles");
736 assert_eq!(d1_trips.len(), 2);
737 let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
738 assert!(ids.contains("t1") && ids.contains("t2"));
739
740 let d2_trips = db.neighbors("driver:d2", "handles");
741 assert_eq!(d2_trips.len(), 1);
742 assert_eq!(d2_trips[0].id, "t1");
743 }
744
745 #[test]
746 fn link_stored_in_links_collection() {
747 let db = Db::in_memory();
750 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
751 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
752 db.link("driver:d1", "handles", "trip:t1").unwrap();
753 let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
755 assert!(link_doc.is_some(), "__links__ doc should exist");
756 let doc = link_doc.unwrap();
757 assert_eq!(doc.data["_from"], "driver:d1");
758 assert_eq!(doc.data["_rel"], "handles");
759 assert_eq!(doc.data["_to"], "trip:t1");
760 let nb = db.neighbors("driver:d1", "handles");
762 assert_eq!(nb.len(), 1);
763 assert_eq!(nb[0].id, "t1");
764 }
765
766 #[test]
767 fn link_missing_node_errors() {
768 let db = Db::in_memory();
769 db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
770 assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
771 }
772
773 #[test]
774 fn link_durable_survives_reopen() {
775 let dir = tempdir().unwrap();
776 {
777 let db = Db::open(dir.path(), None).unwrap();
778 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
779 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
780 db.link("driver:d1", "handles", "trip:t1").unwrap();
781 }
782 let db2 = Db::open(dir.path(), None).unwrap();
783 db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
784 let trips = db2.neighbors("driver:d1", "handles");
785 assert_eq!(trips.len(), 1);
786 assert_eq!(trips[0].id, "t1");
787 }
788}