1use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22 seq: u64,
23 head: String,
24}
25
26pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
30
31#[derive(Debug, Clone, serde::Serialize)]
37pub struct SinceBatch {
38 pub nodes: Vec<Node>,
40 pub from_seq: u64,
42 pub to_seq: u64,
44 pub head_seq: u64,
46 pub has_more: bool,
48}
49
50#[derive(Debug, Clone, serde::Serialize)]
57pub struct ScanStatus {
58 pub scan_complete: bool,
60 pub tip_seq: u64,
62 pub indexed_seq_min: u64,
64 pub indexed_seq_max: u64,
66 pub indexed_count: usize,
68}
69
70pub struct Db {
71 pub objects: ObjectStore,
72 pub id_index: IdIndex,
73 pub sorted_indexes: SortedIndexes,
74 pub graph: GraphStore,
75 pub root: PathBuf,
76 manifest_dirty: Arc<AtomicBool>,
80 pub seq: AtomicU64,
81 head: RwLock<String>,
83 pub startup_ready: Arc<AtomicBool>,
88 seq_index: Arc<DashMap<u64, String>>,
92}
93
94impl Db {
95 pub fn in_memory() -> Self {
99 Self {
100 objects: ObjectStore::in_memory(),
101 id_index: IdIndex::in_memory(),
102 sorted_indexes: SortedIndexes::new(),
103 graph: GraphStore::in_memory(),
104 root: std::path::PathBuf::from(":memory:"),
105 seq: AtomicU64::new(0),
106 head: RwLock::new(String::new()),
107 startup_ready: Arc::new(AtomicBool::new(true)), manifest_dirty: Arc::new(AtomicBool::new(false)),
109 seq_index: Arc::new(DashMap::new()),
110 }
111 }
112
113 pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
115 std::fs::create_dir_all(db_root)?;
116
117 let objects = ObjectStore::new(db_root, dek.clone())?;
118 let id_index = IdIndex::new(db_root)?;
119 let sorted_indexes = SortedIndexes::new();
120 let graph = GraphStore::new(db_root)?;
121
122 let mut db = Self {
123 objects,
124 id_index,
125 sorted_indexes,
126 graph,
127 root: db_root.to_path_buf(),
128 seq: AtomicU64::new(0),
129 head: RwLock::new(String::new()),
130 startup_ready: Arc::new(AtomicBool::new(false)),
131 manifest_dirty: Arc::new(AtomicBool::new(false)),
132 seq_index: Arc::new(DashMap::new()),
133 };
134
135 migrate::migrate_if_needed(
137 db_root,
138 &db.objects,
139 &db.id_index,
140 &db.sorted_indexes,
141 &db.graph,
142 dek.as_ref(),
143 )?;
144
145 db.startup_rebuild()?;
148
149 Ok(db)
150 }
151
152 fn startup_rebuild(&mut self) -> Result<()> {
157 let manifest_path = self.root.join("MANIFEST");
158 let needs_index_rebuild = !self.sorted_indexes.is_empty();
159
160 if manifest_path.exists() && !needs_index_rebuild {
162 if let Some(m) = fs::read_to_string(&manifest_path)
163 .ok()
164 .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
165 {
166 if m.head.len() < 8 {
169 eprintln!(" [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
170 } else {
171 self.seq.store(m.seq, Ordering::SeqCst); *self.head.write() = m.head.clone();
173 self.startup_ready.store(true, Ordering::SeqCst);
174 println!(" [nedbd] warm start — seq={} head={}...", m.seq, &m.head[..8]);
175 return Ok(());
176 }
177 } else {
178 eprintln!(" [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
179 }
180 }
181
182 println!(" [nedbd] cold start — background scan will start after heap allocation");
188 Ok(())
189 }
190
191 pub fn start_cold_scan(self_arc: Arc<Self>) {
195 if self_arc.startup_ready.load(Ordering::SeqCst) {
196 return; }
198 if self_arc.objects.all_hashes().next().is_none() {
201 self_arc.startup_ready.store(true, Ordering::SeqCst);
202 return;
203 }
204 println!(" [nedbd] cold start — background scan starting, server accepting reads now");
205 std::thread::spawn(move || {
206 let db = self_arc;
207 cold_scan_background_arc(db);
208 });
209 }
210
211 pub fn put(
213 &self,
214 coll: &str,
215 id: &str,
216 data: Value,
217 caused_by: Vec<String>,
218 valid_from: Option<String>,
219 valid_to: Option<String>,
220 ) -> Result<Node> {
221 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
222 let prev = self.id_index.get(coll, id);
223
224 if let Some(old_hash) = &prev {
226 if let Ok(old_node) = self.objects.read(old_hash) {
227 if let Value::Object(ref obj) = old_node.data {
228 for (field, value) in obj {
229 self.sorted_indexes.remove(coll, field, value, old_hash);
230 }
231 }
232 }
233 }
234
235 let mut node = Node {
236 id: id.to_string(),
237 coll: coll.to_string(),
238 seq,
239 data: data.clone(),
240 prev,
241 caused_by: caused_by.clone(),
242 ts: now(),
243 valid_from,
244 valid_to,
245 hash: String::new(),
246 };
247
248 let hash = self.objects.write(&mut node)?;
250 self.seq_index.insert(seq, hash.clone());
251
252 self.id_index.set(coll, id, &hash)?;
254
255 if let Value::Object(ref obj) = data {
257 for (field, value) in obj {
258 if self.sorted_indexes.has(coll, field) {
259 self.sorted_indexes.insert(coll, field, value, &hash);
260 }
261 }
262 }
263
264 for cause in &caused_by {
266 self.graph.add_edge(&hash, "caused_by", cause)?;
267 self.graph.add_edge(cause, "caused_by_rev", &hash)?;
268 }
269
270 self.update_head(seq, &hash);
273
274 Ok(node)
275 }
276
277 pub fn put_batch(
282 &self,
283 ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
284 ) -> Result<Vec<Node>> {
286 use rayon::prelude::*;
287
288 if ops.is_empty() { return Ok(vec![]); }
289 let n = ops.len() as u64;
290
291 let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
293 let ts = now();
294
295 let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
297 let prev = self.id_index.get(&coll, &id);
298 Node {
299 id, coll, seq: base_seq + i as u64,
300 data, prev, caused_by,
301 ts, valid_from, valid_to,
302 hash: String::new(),
303 }
304 }).collect();
305
306 let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
308 .filter_map(|node| self.objects.write(node).err())
309 .collect();
310 if let Some(e) = write_errors.into_iter().next() { return Err(e); }
311
312 let index_errors: Vec<anyhow::Error> = nodes.par_iter()
314 .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
315 .collect();
316 if let Some(e) = index_errors.into_iter().next() { return Err(e); }
317
318 for node in &nodes {
320 self.seq_index.insert(node.seq, node.hash.clone());
321 if let Value::Object(ref obj) = node.data {
322 for (field, value) in obj {
323 if self.sorted_indexes.has(&node.coll, field) {
324 self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
325 }
326 }
327 }
328 for cause in &node.caused_by {
329 self.graph.add_edge(&node.hash, "caused_by", cause).ok();
330 self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
331 }
332 }
333
334 for node in &nodes {
336 self.update_head(node.seq, &node.hash);
337 }
338
339 Ok(nodes)
340 }
341
342 fn update_head(&self, seq: u64, new_hash: &str) {
346 use blake2::{Blake2b512, Digest};
347 let prev = self.head.read().clone();
348 let mut h = Blake2b512::new();
349 h.update(prev.as_bytes());
350 h.update(seq.to_le_bytes());
351 h.update(new_hash.as_bytes());
352 *self.head.write() = hex::encode(&h.finalize()[..32]);
353 self.manifest_dirty.store(true, Ordering::Release);
355 }
356
357 pub fn flush_all(&self) {
359 self.id_index.flush_write_buf();
360 if let Err(e) = self.objects.sync() {
363 eprintln!("nedb: segment sync failed: {}", e);
364 }
365 self.flush_manifest();
366 }
367
368 pub fn compact(&self) -> Result<crate::segment::CompactStats> {
377 self.flush_all();
378 let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
379 for coll in self.id_index.collections() {
380 for id in self.id_index.list_ids(&coll) {
381 if let Some(h) = self.id_index.get(&coll, &id) {
382 live.insert(h);
383 }
384 }
385 }
386 self.objects.compact(&live)
387 }
388
389 pub fn flush_manifest_if_dirty(&self) {
391 if self.root == std::path::PathBuf::from(":memory:") { return; }
392 if self.manifest_dirty.compare_exchange(
393 true, false, Ordering::AcqRel, Ordering::Relaxed
394 ).is_ok() {
395 self.flush_manifest();
396 }
397 }
398
399 pub fn flush_manifest(&self) {
401 if self.root == std::path::PathBuf::from(":memory:") { return; }
402 let seq = self.seq.load(Ordering::SeqCst);
403 let head = self.head.read().clone();
404 let m = Manifest { seq, head };
405 if let Ok(json) = serde_json::to_string(&m) {
406 let path = self.root.join("MANIFEST");
407 let tmp = self.root.join("MANIFEST.tmp");
408 let _ = fs::write(&tmp, &json);
409 let _ = fs::rename(&tmp, &path);
410 }
411 }
412
413 pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
417 let db = self_arc;
418 std::thread::spawn(move || {
419 loop {
420 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
421 db.id_index.flush_write_buf();
423 db.flush_manifest_if_dirty();
425 }
426 });
427 }
428
429 pub fn head(&self) -> String {
431 self.head.read().clone()
432 }
433
434 pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
437 let prev = match self.id_index.get(coll, id) {
438 None => return Ok(false), Some(h) => h,
440 };
441 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
442 let mut tombstone = Node {
443 id: format!("_del_{}", id),
444 coll: coll.to_string(),
445 seq,
446 data: serde_json::json!({"_deleted": id, "_prev": prev}),
447 prev: Some(prev),
448 caused_by: vec![],
449 ts: now(),
450 valid_from: None,
451 valid_to: None,
452 hash: String::new(),
453 };
454 let hash = self.objects.write(&mut tombstone)?;
455 self.update_head(seq, &hash);
456 self.id_index.remove(coll, id)?;
458 Ok(true)
459 }
460
461 pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
463 let hash = self.id_index.get(coll, id)?;
464 self.objects.read(&hash).ok()
465 }
466
467 pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
469 self.objects.read(hash).ok()
470 }
471
472 pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
475 let hash = self.id_index.get(coll, id)?;
476 let mut current = self.objects.read(&hash).ok()?;
477 loop {
478 if current.seq <= target_seq {
479 return Some(current);
480 }
481 let prev_hash = current.prev.as_deref()?;
482 current = self.objects.read(prev_hash).ok()?;
483 }
484 }
485
486 pub fn list(&self, coll: &str) -> Vec<Node> {
488 self.id_index
489 .list_ids(coll)
490 .into_iter()
491 .filter_map(|id| self.get(coll, &id))
492 .collect()
493 }
494
495 pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
497 if self.sorted_indexes.has(coll, field) {
498 self.sorted_indexes
499 .top_k_asc(coll, field, limit)
500 .into_iter()
501 .filter_map(|h| self.objects.read(&h).ok())
502 .collect()
503 } else {
504 let mut docs = self.list(coll);
505 docs.sort_by(|a, b| {
506 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
507 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
508 av.cmp(&bv)
509 });
510 docs.truncate(limit);
511 docs
512 }
513 }
514
515 pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
517 if self.sorted_indexes.has(coll, field) {
518 self.sorted_indexes
519 .top_k_desc(coll, field, limit)
520 .into_iter()
521 .filter_map(|h| self.objects.read(&h).ok())
522 .collect()
523 } else {
524 let mut docs = self.list(coll);
525 docs.sort_by(|a, b| {
526 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
527 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
528 bv.cmp(&av)
529 });
530 docs.truncate(limit);
531 docs
532 }
533 }
534
535 pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
537 self.graph
538 .trace(hash, "caused_by", reverse, limit)
539 .into_iter()
540 .filter_map(|h| self.objects.read(&h).ok())
541 .collect()
542 }
543
544 pub fn verify(&self) -> (usize, Vec<String>) {
546 self.objects.verify_all()
547 }
548
549 pub fn create_sorted_index(&self, coll: &str, field: &str) {
551 self.sorted_indexes.ensure(coll, field);
552 for id in self.id_index.list_ids(coll) {
554 if let Some(node) = self.get(coll, &id) {
555 if let Value::Object(ref obj) = node.data {
556 if let Some(value) = obj.get(field) {
557 self.sorted_indexes.insert(coll, field, value, &node.hash);
558 }
559 }
560 }
561 }
562 }
563
564 pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
567 self.seq_index.get(&seq).map(|r| r.clone())
568 }
569
570 pub fn tip(&self) -> Option<Node> {
578 let next = self.seq.load(Ordering::SeqCst);
579 if next == 0 {
580 return None; }
582 let hash = self.get_hash_by_seq(next - 1)?;
583 self.get_by_hash(&hash)
584 }
585
586 pub fn tip_collection(&self, coll: &str) -> Option<Node> {
596 let mut s = self.seq.load(Ordering::SeqCst); while s > 0 {
598 s -= 1;
599 if let Some(hash) = self.get_hash_by_seq(s) {
600 if let Some(node) = self.get_by_hash(&hash) {
601 if node.coll.as_str() == coll {
602 return Some(node);
603 }
604 }
605 }
606 }
607 None
608 }
609
610 pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
621 let next = self.seq.load(Ordering::SeqCst); let head_seq = next.saturating_sub(1);
623 let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
624 let mut nodes: Vec<Node> = Vec::new();
625 let mut to_seq = after_seq;
626 let mut hit_limit = false;
627 let mut s = after_seq.saturating_add(1);
628 while s < next {
629 if nodes.len() >= cap { hit_limit = true; break; }
630 if let Some(hash) = self.get_hash_by_seq(s) {
631 if let Some(node) = self.get_by_hash(&hash) {
632 to_seq = node.seq;
633 nodes.push(node);
634 }
635 }
636 s += 1;
637 }
638 SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
639 }
640
641 pub fn scan_status(&self) -> ScanStatus {
648 let next = self.seq.load(Ordering::SeqCst);
649 let mut min = u64::MAX;
650 let mut max = 0u64;
651 let mut count = 0usize;
652 for kv in self.seq_index.iter() {
653 let s = *kv.key();
654 if s < min { min = s; }
655 if s > max { max = s; }
656 count += 1;
657 }
658 if count == 0 { min = 0; }
659 ScanStatus {
660 scan_complete: self.startup_ready.load(Ordering::SeqCst),
661 tip_seq: next.saturating_sub(1),
662 indexed_seq_min: min,
663 indexed_seq_max: max,
664 indexed_count: count,
665 }
666 }
667
668 pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
673 let (frm_coll, frm_id) = frm.split_once(':')
674 .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
675 let (to_coll, to_id) = to.split_once(':')
676 .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
677 if self.id_index.get(frm_coll, frm_id).is_none() {
678 anyhow::bail!("link: frm not found: {}", frm);
679 }
680 if self.id_index.get(to_coll, to_id).is_none() {
681 anyhow::bail!("link: to not found: {}", to);
682 }
683 let link_id = format!("{}|{}|{}", frm, rel, to);
684 let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
685 self.put("__links__", &link_id, doc, vec![], None, None)?;
686 Ok(())
687 }
688
689 pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
691 let link_id = format!("{}|{}|{}", frm, rel, to);
692 self.delete("__links__", &link_id)
693 }
694
695 pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
698 self.id_index
699 .list_ids("__links__")
700 .into_iter()
701 .filter_map(|id| self.get("__links__", &id))
702 .filter(|node| {
703 node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
704 && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
705 })
706 .filter_map(|node| {
707 let to = node.data.get("_to")?.as_str()?;
708 let (to_coll, to_id) = to.split_once(':')?;
709 self.get(to_coll, to_id)
710 })
711 .collect()
712 }
713}
714
715impl Drop for Db {
716 fn drop(&mut self) {
732 self.flush_all();
733 }
734}
735
736fn cold_scan_background_arc(db: Arc<Db>) {
738 use rayon::prelude::*;
739 use blake2::{Blake2b512, Digest};
740
741 let objects = &db.objects;
742 let head = &db.head;
743 let seq_atomic = &db.seq;
744 let sorted_indexes = &db.sorted_indexes;
745 let root = db.root.clone();
746 let ready_flag = Arc::clone(&db.startup_ready);
747
748 let hashes: Vec<String> = objects.all_hashes().collect();
749 let total = hashes.len();
750
751 if total == 0 {
752 ready_flag.store(true, Ordering::SeqCst);
753 return;
754 }
755
756 println!(" [nedbd] background scan — {} objects...", total);
757 let t0 = std::time::Instant::now();
758 let step = (total / 10).max(1000);
759
760 let nodes: Vec<Node> = hashes.par_iter()
761 .enumerate()
762 .filter_map(|(i, h)| {
763 if i > 0 && i % step == 0 {
764 let pct = i * 100 / total;
765 let elapsed = t0.elapsed().as_secs_f32();
766 let rate = i as f32 / elapsed;
767 let eta = (total - i) as f32 / rate;
768 eprint!("\r [nedbd] {:>3}% {:>8} / {:>8} ({:>8.0}/s eta {:.0}s) ",
769 pct, i, total, rate, eta);
770 }
771 objects.read(h).ok()
772 })
773 .collect();
774
775 eprintln!("\r [nedbd] 100% {:>8} / {:>8} ({:.1}s) ",
776 total, total, t0.elapsed().as_secs_f32());
777
778 let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
779 seq_atomic.store(max_seq + 1, Ordering::SeqCst);
780
781 for node in &nodes {
782 db.seq_index.insert(node.seq, node.hash.clone());
783 if let Value::Object(ref obj) = node.data {
784 for (field, value) in obj {
785 if sorted_indexes.has(&node.coll, field) {
786 sorted_indexes.insert(&node.coll, field, value, &node.hash);
787 }
788 }
789 }
790 }
791
792 let mut sorted_hashes = hashes;
794 sorted_hashes.sort();
795 let mut h = Blake2b512::new();
796 h.update(max_seq.to_le_bytes());
797 for hash_str in &sorted_hashes {
798 h.update(hash_str.as_bytes());
799 }
800 let new_head = hex::encode(&h.finalize()[..32]);
801 *head.write() = new_head.clone();
802
803 let m = Manifest { seq: max_seq, head: new_head };
805 let json = serde_json::to_string(&m).unwrap_or_default();
806 let path = root.join("MANIFEST");
807 let tmp = root.join("MANIFEST.tmp");
808 let _ = fs::write(&tmp, &json);
809 let _ = fs::rename(&tmp, &path);
810
811 ready_flag.store(true, Ordering::SeqCst);
813 println!(" [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
814}
815
816fn now() -> f64 {
817 std::time::SystemTime::now()
818 .duration_since(std::time::UNIX_EPOCH)
819 .map(|d| d.as_secs_f64())
820 .unwrap_or(0.0)
821}
822
823#[cfg(test)]
824mod tests {
825 use super::*;
826 use tempfile::tempdir;
827
828 #[test]
829 fn put_and_get() {
830 let dir = tempdir().unwrap();
831 let db = Db::open(dir.path(), None).unwrap();
832 db.put(
833 "blocks", "618000",
834 serde_json::json!({"height": 618000, "hash": "0000abc"}),
835 vec![], None, None,
836 ).unwrap();
837 let node = db.get("blocks", "618000").unwrap();
838 assert_eq!(node.id, "618000");
839 assert_eq!(node.data["height"], 618000);
840 }
841
842 #[test]
843 fn order_by_with_sorted_index() {
844 let dir = tempdir().unwrap();
845 let db = Db::open(dir.path(), None).unwrap();
846 db.create_sorted_index("blocks", "height");
847 for h in [3u64, 1, 5, 2, 4] {
848 db.put("blocks", &h.to_string(),
849 serde_json::json!({"height": h}),
850 vec![], None, None).unwrap();
851 }
852 let asc = db.order_by_asc("blocks", "height", 3);
853 let heights: Vec<u64> = asc.iter()
854 .filter_map(|n| n.data["height"].as_u64())
855 .collect();
856 assert_eq!(heights, vec![1, 2, 3]);
857 }
858
859 #[test]
860 fn causal_trace() {
861 let dir = tempdir().unwrap();
862 let db = Db::open(dir.path(), None).unwrap();
863 let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
864 let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
865 let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
866
867 let trace = db.trace(&c.hash, false, 10);
868 assert_eq!(trace.len(), 3); }
870
871 #[test]
872 fn as_of() {
873 let dir = tempdir().unwrap();
874 let db = Db::open(dir.path(), None).unwrap();
875 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
876 let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
877
878 let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
879 assert_eq!(at_v1.data["v"], 1);
880 let current = db.get("docs", "x").unwrap();
881 assert_eq!(current.data["v"], 2);
882 }
883}
884
885#[cfg(test)]
886mod tests_v2 {
887 use super::*;
888 use tempfile::tempdir;
889
890 #[test]
891 fn seq_index_populated_on_put() {
892 let db = Db::in_memory();
893 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
894 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
895 assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
896 assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
897 assert_eq!(db.get_hash_by_seq(9999), None);
898 }
899
900 #[test]
901 fn tip_and_since() {
902 let db = Db::in_memory();
903 assert!(db.tip().is_none());
905 assert!(db.since(0, 0).nodes.is_empty());
906
907 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
908 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
909
910 let t = db.tip().expect("tip after writes");
912 assert_eq!(t.seq, b.seq);
913 assert_eq!(t.id, "b");
914 assert_eq!(t.hash, b.hash);
915
916 let after_a = db.since(a.seq, 0);
918 assert_eq!(after_a.nodes.len(), 1);
919 assert_eq!(after_a.nodes[0].id, "b");
920 assert_eq!(after_a.from_seq, a.seq);
921 assert_eq!(after_a.to_seq, b.seq);
922 assert_eq!(after_a.head_seq, b.seq);
923 assert!(!after_a.has_more);
924
925 assert!(db.since(b.seq, 0).nodes.is_empty());
927
928 let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
930 let page = db.since(a.seq, 1); assert_eq!(page.nodes.len(), 1);
932 assert_eq!(page.nodes[0].id, "b");
933 assert_eq!(page.to_seq, b.seq);
934 assert!(page.has_more);
935 let page2 = db.since(page.to_seq, 1); assert_eq!(page2.nodes.len(), 1);
937 assert_eq!(page2.nodes[0].id, "c");
938 assert_eq!(page2.to_seq, c.seq);
939 assert!(!page2.has_more);
940 }
941
942 #[test]
943 fn tip_collection_per_chain() {
944 let db = Db::in_memory();
947 assert!(db.tip_collection("blocks").is_none());
948
949 db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
950 db.put("tx", "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
951 let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
952 let t1 = db.put("tx", "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
953
954 assert_eq!(db.tip().unwrap().id, "t1");
956 let bt = db.tip_collection("blocks").expect("blocks tip");
958 assert_eq!(bt.id, "b1");
959 assert_eq!(bt.seq, b1.seq);
960 assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
961 assert!(db.tip_collection("absent").is_none());
962 }
963
964 #[test]
965 fn seq_index_survives_batch() {
966 let db = Db::in_memory();
967 let nodes = db.put_batch(vec![
968 ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
969 ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
970 ]).unwrap();
971 for node in &nodes {
972 assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
973 }
974 }
975
976 #[test]
977 fn link_and_neighbors() {
978 let db = Db::in_memory();
979 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
980 db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
981 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
982 db.put("trip", "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
983
984 db.link("driver:d1", "handles", "trip:t1").unwrap();
985 db.link("driver:d1", "handles", "trip:t2").unwrap();
986 db.link("driver:d2", "handles", "trip:t1").unwrap();
987
988 let d1_trips = db.neighbors("driver:d1", "handles");
989 assert_eq!(d1_trips.len(), 2);
990 let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
991 assert!(ids.contains("t1") && ids.contains("t2"));
992
993 let d2_trips = db.neighbors("driver:d2", "handles");
994 assert_eq!(d2_trips.len(), 1);
995 assert_eq!(d2_trips[0].id, "t1");
996 }
997
998 #[test]
999 fn link_stored_in_links_collection() {
1000 let db = Db::in_memory();
1003 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1004 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1005 db.link("driver:d1", "handles", "trip:t1").unwrap();
1006 let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1008 assert!(link_doc.is_some(), "__links__ doc should exist");
1009 let doc = link_doc.unwrap();
1010 assert_eq!(doc.data["_from"], "driver:d1");
1011 assert_eq!(doc.data["_rel"], "handles");
1012 assert_eq!(doc.data["_to"], "trip:t1");
1013 let nb = db.neighbors("driver:d1", "handles");
1015 assert_eq!(nb.len(), 1);
1016 assert_eq!(nb[0].id, "t1");
1017 }
1018
1019 #[test]
1020 fn link_missing_node_errors() {
1021 let db = Db::in_memory();
1022 db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1023 assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1024 }
1025
1026 #[test]
1027 fn link_durable_survives_reopen() {
1028 let dir = tempdir().unwrap();
1029 {
1030 let db = Db::open(dir.path(), None).unwrap();
1031 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1032 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1033 db.link("driver:d1", "handles", "trip:t1").unwrap();
1034 }
1035 let db2 = Db::open(dir.path(), None).unwrap();
1036 db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1037 let trips = db2.neighbors("driver:d1", "handles");
1038 assert_eq!(trips.len(), 1);
1039 assert_eq!(trips[0].id, "t1");
1040 }
1041}