1use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22 seq: u64,
23 head: String,
24}
25
26pub struct Db {
27 pub objects: ObjectStore,
28 pub id_index: IdIndex,
29 pub sorted_indexes: SortedIndexes,
30 pub graph: GraphStore,
31 pub root: PathBuf,
32 manifest_dirty: Arc<AtomicBool>,
36 pub seq: AtomicU64,
37 head: RwLock<String>,
39 pub startup_ready: Arc<AtomicBool>,
44 seq_index: Arc<DashMap<u64, String>>,
48}
49
50impl Db {
51 pub fn in_memory() -> Self {
55 Self {
56 objects: ObjectStore::in_memory(),
57 id_index: IdIndex::in_memory(),
58 sorted_indexes: SortedIndexes::new(),
59 graph: GraphStore::in_memory(),
60 root: std::path::PathBuf::from(":memory:"),
61 seq: AtomicU64::new(0),
62 head: RwLock::new(String::new()),
63 startup_ready: Arc::new(AtomicBool::new(true)), manifest_dirty: Arc::new(AtomicBool::new(false)),
65 seq_index: Arc::new(DashMap::new()),
66 }
67 }
68
69 pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
71 std::fs::create_dir_all(db_root)?;
72
73 let objects = ObjectStore::new(db_root, dek.clone())?;
74 let id_index = IdIndex::new(db_root)?;
75 let sorted_indexes = SortedIndexes::new();
76 let graph = GraphStore::new(db_root)?;
77
78 let mut db = Self {
79 objects,
80 id_index,
81 sorted_indexes,
82 graph,
83 root: db_root.to_path_buf(),
84 seq: AtomicU64::new(0),
85 head: RwLock::new(String::new()),
86 startup_ready: Arc::new(AtomicBool::new(false)),
87 manifest_dirty: Arc::new(AtomicBool::new(false)),
88 seq_index: Arc::new(DashMap::new()),
89 };
90
91 migrate::migrate_if_needed(
93 db_root,
94 &db.objects,
95 &db.id_index,
96 &db.sorted_indexes,
97 &db.graph,
98 dek.as_ref(),
99 )?;
100
101 db.startup_rebuild()?;
104
105 Ok(db)
106 }
107
108 fn startup_rebuild(&mut self) -> Result<()> {
113 let manifest_path = self.root.join("MANIFEST");
114 let needs_index_rebuild = !self.sorted_indexes.is_empty();
115
116 if manifest_path.exists() && !needs_index_rebuild {
118 if let Some(m) = fs::read_to_string(&manifest_path)
119 .ok()
120 .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
121 {
122 if m.head.len() < 8 {
125 eprintln!(" [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
126 } else {
127 self.seq.store(m.seq, Ordering::SeqCst); *self.head.write() = m.head.clone();
129 self.startup_ready.store(true, Ordering::SeqCst);
130 println!(" [nedbd] warm start — seq={} head={}...", m.seq, &m.head[..8]);
131 return Ok(());
132 }
133 } else {
134 eprintln!(" [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
135 }
136 }
137
138 println!(" [nedbd] cold start — background scan will start after heap allocation");
144 Ok(())
145 }
146
147 pub fn start_cold_scan(self_arc: Arc<Self>) {
151 if self_arc.startup_ready.load(Ordering::SeqCst) {
152 return; }
154 if self_arc.objects.all_hashes().next().is_none() {
157 self_arc.startup_ready.store(true, Ordering::SeqCst);
158 return;
159 }
160 println!(" [nedbd] cold start — background scan starting, server accepting reads now");
161 std::thread::spawn(move || {
162 let db = self_arc;
163 cold_scan_background_arc(db);
164 });
165 }
166
167 pub fn put(
169 &self,
170 coll: &str,
171 id: &str,
172 data: Value,
173 caused_by: Vec<String>,
174 valid_from: Option<String>,
175 valid_to: Option<String>,
176 ) -> Result<Node> {
177 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
178 let prev = self.id_index.get(coll, id);
179
180 if let Some(old_hash) = &prev {
182 if let Ok(old_node) = self.objects.read(old_hash) {
183 if let Value::Object(ref obj) = old_node.data {
184 for (field, value) in obj {
185 self.sorted_indexes.remove(coll, field, value, old_hash);
186 }
187 }
188 }
189 }
190
191 let mut node = Node {
192 id: id.to_string(),
193 coll: coll.to_string(),
194 seq,
195 data: data.clone(),
196 prev,
197 caused_by: caused_by.clone(),
198 ts: now(),
199 valid_from,
200 valid_to,
201 hash: String::new(),
202 };
203
204 let hash = self.objects.write(&mut node)?;
206 self.seq_index.insert(seq, hash.clone());
207
208 self.id_index.set(coll, id, &hash)?;
210
211 if let Value::Object(ref obj) = data {
213 for (field, value) in obj {
214 if self.sorted_indexes.has(coll, field) {
215 self.sorted_indexes.insert(coll, field, value, &hash);
216 }
217 }
218 }
219
220 for cause in &caused_by {
222 self.graph.add_edge(&hash, "caused_by", cause)?;
223 self.graph.add_edge(cause, "caused_by_rev", &hash)?;
224 }
225
226 self.update_head(seq, &hash);
229
230 Ok(node)
231 }
232
233 pub fn put_batch(
238 &self,
239 ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
240 ) -> Result<Vec<Node>> {
242 use rayon::prelude::*;
243
244 if ops.is_empty() { return Ok(vec![]); }
245 let n = ops.len() as u64;
246
247 let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
249 let ts = now();
250
251 let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
253 let prev = self.id_index.get(&coll, &id);
254 Node {
255 id, coll, seq: base_seq + i as u64,
256 data, prev, caused_by,
257 ts, valid_from, valid_to,
258 hash: String::new(),
259 }
260 }).collect();
261
262 let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
264 .filter_map(|node| self.objects.write(node).err())
265 .collect();
266 if let Some(e) = write_errors.into_iter().next() { return Err(e); }
267
268 let index_errors: Vec<anyhow::Error> = nodes.par_iter()
270 .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
271 .collect();
272 if let Some(e) = index_errors.into_iter().next() { return Err(e); }
273
274 for node in &nodes {
276 self.seq_index.insert(node.seq, node.hash.clone());
277 if let Value::Object(ref obj) = node.data {
278 for (field, value) in obj {
279 if self.sorted_indexes.has(&node.coll, field) {
280 self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
281 }
282 }
283 }
284 for cause in &node.caused_by {
285 self.graph.add_edge(&node.hash, "caused_by", cause).ok();
286 self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
287 }
288 }
289
290 for node in &nodes {
292 self.update_head(node.seq, &node.hash);
293 }
294
295 Ok(nodes)
296 }
297
298 fn update_head(&self, seq: u64, new_hash: &str) {
302 use blake2::{Blake2b512, Digest};
303 let prev = self.head.read().clone();
304 let mut h = Blake2b512::new();
305 h.update(prev.as_bytes());
306 h.update(seq.to_le_bytes());
307 h.update(new_hash.as_bytes());
308 *self.head.write() = hex::encode(&h.finalize()[..32]);
309 self.manifest_dirty.store(true, Ordering::Release);
311 }
312
313 pub fn flush_all(&self) {
315 self.id_index.flush_write_buf();
316 if let Err(e) = self.objects.sync() {
319 eprintln!("nedb: segment sync failed: {}", e);
320 }
321 self.flush_manifest();
322 }
323
324 pub fn compact(&self) -> Result<crate::segment::CompactStats> {
333 self.flush_all();
334 let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
335 for coll in self.id_index.collections() {
336 for id in self.id_index.list_ids(&coll) {
337 if let Some(h) = self.id_index.get(&coll, &id) {
338 live.insert(h);
339 }
340 }
341 }
342 self.objects.compact(&live)
343 }
344
345 pub fn flush_manifest_if_dirty(&self) {
347 if self.root == std::path::PathBuf::from(":memory:") { return; }
348 if self.manifest_dirty.compare_exchange(
349 true, false, Ordering::AcqRel, Ordering::Relaxed
350 ).is_ok() {
351 self.flush_manifest();
352 }
353 }
354
355 pub fn flush_manifest(&self) {
357 if self.root == std::path::PathBuf::from(":memory:") { return; }
358 let seq = self.seq.load(Ordering::SeqCst);
359 let head = self.head.read().clone();
360 let m = Manifest { seq, head };
361 if let Ok(json) = serde_json::to_string(&m) {
362 let path = self.root.join("MANIFEST");
363 let tmp = self.root.join("MANIFEST.tmp");
364 let _ = fs::write(&tmp, &json);
365 let _ = fs::rename(&tmp, &path);
366 }
367 }
368
369 pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
373 let db = self_arc;
374 std::thread::spawn(move || {
375 loop {
376 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
377 db.id_index.flush_write_buf();
379 db.flush_manifest_if_dirty();
381 }
382 });
383 }
384
385 pub fn head(&self) -> String {
387 self.head.read().clone()
388 }
389
390 pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
393 let prev = match self.id_index.get(coll, id) {
394 None => return Ok(false), Some(h) => h,
396 };
397 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
398 let mut tombstone = Node {
399 id: format!("_del_{}", id),
400 coll: coll.to_string(),
401 seq,
402 data: serde_json::json!({"_deleted": id, "_prev": prev}),
403 prev: Some(prev),
404 caused_by: vec![],
405 ts: now(),
406 valid_from: None,
407 valid_to: None,
408 hash: String::new(),
409 };
410 let hash = self.objects.write(&mut tombstone)?;
411 self.update_head(seq, &hash);
412 self.id_index.remove(coll, id)?;
414 Ok(true)
415 }
416
417 pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
419 let hash = self.id_index.get(coll, id)?;
420 self.objects.read(&hash).ok()
421 }
422
423 pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
425 self.objects.read(hash).ok()
426 }
427
428 pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
431 let hash = self.id_index.get(coll, id)?;
432 let mut current = self.objects.read(&hash).ok()?;
433 loop {
434 if current.seq <= target_seq {
435 return Some(current);
436 }
437 let prev_hash = current.prev.as_deref()?;
438 current = self.objects.read(prev_hash).ok()?;
439 }
440 }
441
442 pub fn list(&self, coll: &str) -> Vec<Node> {
444 self.id_index
445 .list_ids(coll)
446 .into_iter()
447 .filter_map(|id| self.get(coll, &id))
448 .collect()
449 }
450
451 pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
453 if self.sorted_indexes.has(coll, field) {
454 self.sorted_indexes
455 .top_k_asc(coll, field, limit)
456 .into_iter()
457 .filter_map(|h| self.objects.read(&h).ok())
458 .collect()
459 } else {
460 let mut docs = self.list(coll);
461 docs.sort_by(|a, b| {
462 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
463 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
464 av.cmp(&bv)
465 });
466 docs.truncate(limit);
467 docs
468 }
469 }
470
471 pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
473 if self.sorted_indexes.has(coll, field) {
474 self.sorted_indexes
475 .top_k_desc(coll, field, limit)
476 .into_iter()
477 .filter_map(|h| self.objects.read(&h).ok())
478 .collect()
479 } else {
480 let mut docs = self.list(coll);
481 docs.sort_by(|a, b| {
482 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
483 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
484 bv.cmp(&av)
485 });
486 docs.truncate(limit);
487 docs
488 }
489 }
490
491 pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
493 self.graph
494 .trace(hash, "caused_by", reverse, limit)
495 .into_iter()
496 .filter_map(|h| self.objects.read(&h).ok())
497 .collect()
498 }
499
500 pub fn verify(&self) -> (usize, Vec<String>) {
502 self.objects.verify_all()
503 }
504
505 pub fn create_sorted_index(&self, coll: &str, field: &str) {
507 self.sorted_indexes.ensure(coll, field);
508 for id in self.id_index.list_ids(coll) {
510 if let Some(node) = self.get(coll, &id) {
511 if let Value::Object(ref obj) = node.data {
512 if let Some(value) = obj.get(field) {
513 self.sorted_indexes.insert(coll, field, value, &node.hash);
514 }
515 }
516 }
517 }
518 }
519
520 pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
523 self.seq_index.get(&seq).map(|r| r.clone())
524 }
525
526 pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
531 let (frm_coll, frm_id) = frm.split_once(':')
532 .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
533 let (to_coll, to_id) = to.split_once(':')
534 .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
535 if self.id_index.get(frm_coll, frm_id).is_none() {
536 anyhow::bail!("link: frm not found: {}", frm);
537 }
538 if self.id_index.get(to_coll, to_id).is_none() {
539 anyhow::bail!("link: to not found: {}", to);
540 }
541 let link_id = format!("{}|{}|{}", frm, rel, to);
542 let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
543 self.put("__links__", &link_id, doc, vec![], None, None)?;
544 Ok(())
545 }
546
547 pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
549 let link_id = format!("{}|{}|{}", frm, rel, to);
550 self.delete("__links__", &link_id)
551 }
552
553 pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
556 self.id_index
557 .list_ids("__links__")
558 .into_iter()
559 .filter_map(|id| self.get("__links__", &id))
560 .filter(|node| {
561 node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
562 && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
563 })
564 .filter_map(|node| {
565 let to = node.data.get("_to")?.as_str()?;
566 let (to_coll, to_id) = to.split_once(':')?;
567 self.get(to_coll, to_id)
568 })
569 .collect()
570 }
571}
572
573fn cold_scan_background_arc(db: Arc<Db>) {
575 use rayon::prelude::*;
576 use blake2::{Blake2b512, Digest};
577
578 let objects = &db.objects;
579 let head = &db.head;
580 let seq_atomic = &db.seq;
581 let sorted_indexes = &db.sorted_indexes;
582 let root = db.root.clone();
583 let ready_flag = Arc::clone(&db.startup_ready);
584
585 let hashes: Vec<String> = objects.all_hashes().collect();
586 let total = hashes.len();
587
588 if total == 0 {
589 ready_flag.store(true, Ordering::SeqCst);
590 return;
591 }
592
593 println!(" [nedbd] background scan — {} objects...", total);
594 let t0 = std::time::Instant::now();
595 let step = (total / 10).max(1000);
596
597 let nodes: Vec<Node> = hashes.par_iter()
598 .enumerate()
599 .filter_map(|(i, h)| {
600 if i > 0 && i % step == 0 {
601 let pct = i * 100 / total;
602 let elapsed = t0.elapsed().as_secs_f32();
603 let rate = i as f32 / elapsed;
604 let eta = (total - i) as f32 / rate;
605 eprint!("\r [nedbd] {:>3}% {:>8} / {:>8} ({:>8.0}/s eta {:.0}s) ",
606 pct, i, total, rate, eta);
607 }
608 objects.read(h).ok()
609 })
610 .collect();
611
612 eprintln!("\r [nedbd] 100% {:>8} / {:>8} ({:.1}s) ",
613 total, total, t0.elapsed().as_secs_f32());
614
615 let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
616 seq_atomic.store(max_seq + 1, Ordering::SeqCst);
617
618 for node in &nodes {
619 db.seq_index.insert(node.seq, node.hash.clone());
620 if let Value::Object(ref obj) = node.data {
621 for (field, value) in obj {
622 if sorted_indexes.has(&node.coll, field) {
623 sorted_indexes.insert(&node.coll, field, value, &node.hash);
624 }
625 }
626 }
627 }
628
629 let mut sorted_hashes = hashes;
631 sorted_hashes.sort();
632 let mut h = Blake2b512::new();
633 h.update(max_seq.to_le_bytes());
634 for hash_str in &sorted_hashes {
635 h.update(hash_str.as_bytes());
636 }
637 let new_head = hex::encode(&h.finalize()[..32]);
638 *head.write() = new_head.clone();
639
640 let m = Manifest { seq: max_seq, head: new_head };
642 let json = serde_json::to_string(&m).unwrap_or_default();
643 let path = root.join("MANIFEST");
644 let tmp = root.join("MANIFEST.tmp");
645 let _ = fs::write(&tmp, &json);
646 let _ = fs::rename(&tmp, &path);
647
648 ready_flag.store(true, Ordering::SeqCst);
650 println!(" [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
651}
652
653fn now() -> f64 {
654 std::time::SystemTime::now()
655 .duration_since(std::time::UNIX_EPOCH)
656 .map(|d| d.as_secs_f64())
657 .unwrap_or(0.0)
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663 use tempfile::tempdir;
664
665 #[test]
666 fn put_and_get() {
667 let dir = tempdir().unwrap();
668 let db = Db::open(dir.path(), None).unwrap();
669 db.put(
670 "blocks", "618000",
671 serde_json::json!({"height": 618000, "hash": "0000abc"}),
672 vec![], None, None,
673 ).unwrap();
674 let node = db.get("blocks", "618000").unwrap();
675 assert_eq!(node.id, "618000");
676 assert_eq!(node.data["height"], 618000);
677 }
678
679 #[test]
680 fn order_by_with_sorted_index() {
681 let dir = tempdir().unwrap();
682 let db = Db::open(dir.path(), None).unwrap();
683 db.create_sorted_index("blocks", "height");
684 for h in [3u64, 1, 5, 2, 4] {
685 db.put("blocks", &h.to_string(),
686 serde_json::json!({"height": h}),
687 vec![], None, None).unwrap();
688 }
689 let asc = db.order_by_asc("blocks", "height", 3);
690 let heights: Vec<u64> = asc.iter()
691 .filter_map(|n| n.data["height"].as_u64())
692 .collect();
693 assert_eq!(heights, vec![1, 2, 3]);
694 }
695
696 #[test]
697 fn causal_trace() {
698 let dir = tempdir().unwrap();
699 let db = Db::open(dir.path(), None).unwrap();
700 let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
701 let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
702 let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
703
704 let trace = db.trace(&c.hash, false, 10);
705 assert_eq!(trace.len(), 3); }
707
708 #[test]
709 fn as_of() {
710 let dir = tempdir().unwrap();
711 let db = Db::open(dir.path(), None).unwrap();
712 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
713 let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
714
715 let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
716 assert_eq!(at_v1.data["v"], 1);
717 let current = db.get("docs", "x").unwrap();
718 assert_eq!(current.data["v"], 2);
719 }
720}
721
722#[cfg(test)]
723mod tests_v2 {
724 use super::*;
725 use tempfile::tempdir;
726
727 #[test]
728 fn seq_index_populated_on_put() {
729 let db = Db::in_memory();
730 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
731 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
732 assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
733 assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
734 assert_eq!(db.get_hash_by_seq(9999), None);
735 }
736
737 #[test]
738 fn seq_index_survives_batch() {
739 let db = Db::in_memory();
740 let nodes = db.put_batch(vec![
741 ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
742 ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
743 ]).unwrap();
744 for node in &nodes {
745 assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
746 }
747 }
748
749 #[test]
750 fn link_and_neighbors() {
751 let db = Db::in_memory();
752 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
753 db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
754 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
755 db.put("trip", "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
756
757 db.link("driver:d1", "handles", "trip:t1").unwrap();
758 db.link("driver:d1", "handles", "trip:t2").unwrap();
759 db.link("driver:d2", "handles", "trip:t1").unwrap();
760
761 let d1_trips = db.neighbors("driver:d1", "handles");
762 assert_eq!(d1_trips.len(), 2);
763 let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
764 assert!(ids.contains("t1") && ids.contains("t2"));
765
766 let d2_trips = db.neighbors("driver:d2", "handles");
767 assert_eq!(d2_trips.len(), 1);
768 assert_eq!(d2_trips[0].id, "t1");
769 }
770
771 #[test]
772 fn link_stored_in_links_collection() {
773 let db = Db::in_memory();
776 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
777 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
778 db.link("driver:d1", "handles", "trip:t1").unwrap();
779 let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
781 assert!(link_doc.is_some(), "__links__ doc should exist");
782 let doc = link_doc.unwrap();
783 assert_eq!(doc.data["_from"], "driver:d1");
784 assert_eq!(doc.data["_rel"], "handles");
785 assert_eq!(doc.data["_to"], "trip:t1");
786 let nb = db.neighbors("driver:d1", "handles");
788 assert_eq!(nb.len(), 1);
789 assert_eq!(nb[0].id, "t1");
790 }
791
792 #[test]
793 fn link_missing_node_errors() {
794 let db = Db::in_memory();
795 db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
796 assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
797 }
798
799 #[test]
800 fn link_durable_survives_reopen() {
801 let dir = tempdir().unwrap();
802 {
803 let db = Db::open(dir.path(), None).unwrap();
804 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
805 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
806 db.link("driver:d1", "handles", "trip:t1").unwrap();
807 }
808 let db2 = Db::open(dir.path(), None).unwrap();
809 db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
810 let trips = db2.neighbors("driver:d1", "handles");
811 assert_eq!(trips.len(), 1);
812 assert_eq!(trips[0].id, "t1");
813 }
814}