1use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22 seq: u64,
23 head: String,
24 #[serde(default)]
28 tip_hash: String,
29 #[serde(default)]
35 coll_tips: std::collections::HashMap<String, String>,
36}
37
38pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
42
43#[derive(Debug, Clone, serde::Serialize)]
49pub struct SinceBatch {
50 pub nodes: Vec<Node>,
52 pub from_seq: u64,
54 pub to_seq: u64,
56 pub head_seq: u64,
58 pub has_more: bool,
60}
61
62#[derive(Debug, Clone, serde::Serialize)]
69pub struct ScanStatus {
70 pub scan_complete: bool,
72 pub tip_seq: u64,
74 pub indexed_seq_min: u64,
76 pub indexed_seq_max: u64,
78 pub indexed_count: usize,
80}
81
82pub struct Db {
83 pub objects: ObjectStore,
84 pub id_index: IdIndex,
85 pub sorted_indexes: SortedIndexes,
86 pub graph: GraphStore,
87 pub root: PathBuf,
88 manifest_dirty: Arc<AtomicBool>,
92 pub seq: AtomicU64,
93 head: RwLock<String>,
95 tip_hash: RwLock<String>,
99 coll_tip_hash: Arc<DashMap<String, String>>,
104 pub startup_ready: Arc<AtomicBool>,
109 seq_index: Arc<DashMap<u64, String>>,
113}
114
115impl Db {
116 pub fn in_memory() -> Self {
120 Self {
121 objects: ObjectStore::in_memory(),
122 id_index: IdIndex::in_memory(),
123 sorted_indexes: SortedIndexes::new(),
124 graph: GraphStore::in_memory(),
125 root: std::path::PathBuf::from(":memory:"),
126 seq: AtomicU64::new(0),
127 head: RwLock::new(String::new()),
128 tip_hash: RwLock::new(String::new()),
129 coll_tip_hash: Arc::new(DashMap::new()),
130 startup_ready: Arc::new(AtomicBool::new(true)), manifest_dirty: Arc::new(AtomicBool::new(false)),
132 seq_index: Arc::new(DashMap::new()),
133 }
134 }
135
136 pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
138 std::fs::create_dir_all(db_root)?;
139
140 let objects = ObjectStore::new(db_root, dek.clone())?;
141 let id_index = IdIndex::new(db_root)?;
142 let sorted_indexes = SortedIndexes::new();
143 let graph = GraphStore::new(db_root)?;
144
145 let mut db = Self {
146 objects,
147 id_index,
148 sorted_indexes,
149 graph,
150 root: db_root.to_path_buf(),
151 seq: AtomicU64::new(0),
152 head: RwLock::new(String::new()),
153 tip_hash: RwLock::new(String::new()),
154 coll_tip_hash: Arc::new(DashMap::new()),
155 startup_ready: Arc::new(AtomicBool::new(false)),
156 manifest_dirty: Arc::new(AtomicBool::new(false)),
157 seq_index: Arc::new(DashMap::new()),
158 };
159
160 migrate::migrate_if_needed(
162 db_root,
163 &db.objects,
164 &db.id_index,
165 &db.sorted_indexes,
166 &db.graph,
167 dek.as_ref(),
168 )?;
169
170 db.startup_rebuild()?;
173
174 Ok(db)
175 }
176
177 fn startup_rebuild(&mut self) -> Result<()> {
182 let manifest_path = self.root.join("MANIFEST");
183 let needs_index_rebuild = !self.sorted_indexes.is_empty();
184
185 if manifest_path.exists() && !needs_index_rebuild {
187 if let Some(m) = fs::read_to_string(&manifest_path)
188 .ok()
189 .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
190 {
191 if m.head.len() < 8 {
194 eprintln!(" [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
195 } else if m.tip_hash.is_empty() {
196 eprintln!(" [nedbd] MANIFEST predates durable tip() — cold scan once to upgrade");
200 } else {
201 self.seq.store(m.seq, Ordering::SeqCst); *self.head.write() = m.head.clone();
203 *self.tip_hash.write() = m.tip_hash.clone();
204 for (coll, hash) in &m.coll_tips {
205 self.coll_tip_hash.insert(coll.clone(), hash.clone());
206 }
207 self.startup_ready.store(true, Ordering::SeqCst);
208 println!(" [nedbd] warm start — seq={} head={}... tip={}...",
209 m.seq, &m.head[..8], &m.tip_hash[..8.min(m.tip_hash.len())]);
210 return Ok(());
211 }
212 } else {
213 eprintln!(" [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
214 }
215 }
216
217 println!(" [nedbd] cold start — background scan will start after heap allocation");
223 Ok(())
224 }
225
226 pub fn start_cold_scan(self_arc: Arc<Self>) {
230 if self_arc.startup_ready.load(Ordering::SeqCst) {
231 return; }
233 if self_arc.objects.all_hashes().next().is_none() {
236 self_arc.startup_ready.store(true, Ordering::SeqCst);
237 return;
238 }
239 println!(" [nedbd] cold start — background scan starting, server accepting reads now");
240 std::thread::spawn(move || {
241 let db = self_arc;
242 cold_scan_background_arc(db);
243 });
244 }
245
246 pub fn put(
248 &self,
249 coll: &str,
250 id: &str,
251 data: Value,
252 caused_by: Vec<String>,
253 valid_from: Option<String>,
254 valid_to: Option<String>,
255 ) -> Result<Node> {
256 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
257 let prev = self.id_index.get(coll, id);
258
259 if let Some(old_hash) = &prev {
261 if let Ok(old_node) = self.objects.read(old_hash) {
262 if let Value::Object(ref obj) = old_node.data {
263 for (field, value) in obj {
264 self.sorted_indexes.remove(coll, field, value, old_hash);
265 }
266 }
267 }
268 }
269
270 let mut node = Node {
271 id: id.to_string(),
272 coll: coll.to_string(),
273 seq,
274 data: data.clone(),
275 prev,
276 caused_by: caused_by.clone(),
277 ts: now(),
278 valid_from,
279 valid_to,
280 hash: String::new(),
281 };
282
283 let hash = self.objects.write(&mut node)?;
285 self.seq_index.insert(seq, hash.clone());
286
287 self.id_index.set(coll, id, &hash)?;
289
290 if let Value::Object(ref obj) = data {
292 for (field, value) in obj {
293 if self.sorted_indexes.has(coll, field) {
294 self.sorted_indexes.insert(coll, field, value, &hash);
295 }
296 }
297 }
298
299 for cause in &caused_by {
301 self.graph.add_edge(&hash, "caused_by", cause)?;
302 self.graph.add_edge(cause, "caused_by_rev", &hash)?;
303 }
304
305 self.update_head(coll, seq, &hash);
308
309 Ok(node)
310 }
311
312 pub fn put_batch(
317 &self,
318 ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
319 ) -> Result<Vec<Node>> {
321 use rayon::prelude::*;
322
323 if ops.is_empty() { return Ok(vec![]); }
324 let n = ops.len() as u64;
325
326 let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
328 let ts = now();
329
330 let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
332 let prev = self.id_index.get(&coll, &id);
333 Node {
334 id, coll, seq: base_seq + i as u64,
335 data, prev, caused_by,
336 ts, valid_from, valid_to,
337 hash: String::new(),
338 }
339 }).collect();
340
341 let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
343 .filter_map(|node| self.objects.write(node).err())
344 .collect();
345 if let Some(e) = write_errors.into_iter().next() { return Err(e); }
346
347 let index_errors: Vec<anyhow::Error> = nodes.par_iter()
349 .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
350 .collect();
351 if let Some(e) = index_errors.into_iter().next() { return Err(e); }
352
353 for node in &nodes {
355 self.seq_index.insert(node.seq, node.hash.clone());
356 if let Value::Object(ref obj) = node.data {
357 for (field, value) in obj {
358 if self.sorted_indexes.has(&node.coll, field) {
359 self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
360 }
361 }
362 }
363 for cause in &node.caused_by {
364 self.graph.add_edge(&node.hash, "caused_by", cause).ok();
365 self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
366 }
367 }
368
369 for node in &nodes {
371 self.update_head(&node.coll, node.seq, &node.hash);
372 }
373
374 Ok(nodes)
375 }
376
377 fn update_head(&self, coll: &str, seq: u64, new_hash: &str) {
381 use blake2::{Blake2b512, Digest};
382 let prev = self.head.read().clone();
383 let mut h = Blake2b512::new();
384 h.update(prev.as_bytes());
385 h.update(seq.to_le_bytes());
386 h.update(new_hash.as_bytes());
387 *self.head.write() = hex::encode(&h.finalize()[..32]);
388 *self.tip_hash.write() = new_hash.to_string();
392 self.coll_tip_hash.insert(coll.to_string(), new_hash.to_string());
396 self.manifest_dirty.store(true, Ordering::Release);
398 }
399
400 pub fn flush_all(&self) {
402 self.id_index.flush_write_buf();
403 if let Err(e) = self.objects.sync() {
406 eprintln!("nedb: segment sync failed: {}", e);
407 }
408 self.flush_manifest();
409 }
410
411 pub fn compact(&self) -> Result<crate::segment::CompactStats> {
420 self.flush_all();
421 let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
422 for coll in self.id_index.collections() {
423 for id in self.id_index.list_ids(&coll) {
424 if let Some(h) = self.id_index.get(&coll, &id) {
425 live.insert(h);
426 }
427 }
428 }
429 self.objects.compact(&live)
430 }
431
432 pub fn flush_manifest_if_dirty(&self) {
434 if self.root == std::path::PathBuf::from(":memory:") { return; }
435 if self.manifest_dirty.compare_exchange(
436 true, false, Ordering::AcqRel, Ordering::Relaxed
437 ).is_ok() {
438 self.flush_manifest();
439 }
440 }
441
442 pub fn flush_manifest(&self) {
444 if self.root == std::path::PathBuf::from(":memory:") { return; }
445 let seq = self.seq.load(Ordering::SeqCst);
446 let head = self.head.read().clone();
447 let tip_hash = self.tip_hash.read().clone();
448 let coll_tips: std::collections::HashMap<String, String> = self.coll_tip_hash
449 .iter()
450 .map(|kv| (kv.key().clone(), kv.value().clone()))
451 .collect();
452 let m = Manifest { seq, head, tip_hash, coll_tips };
453 if let Ok(json) = serde_json::to_string(&m) {
454 let path = self.root.join("MANIFEST");
455 let tmp = self.root.join("MANIFEST.tmp");
456 let _ = fs::write(&tmp, &json);
457 let _ = fs::rename(&tmp, &path);
458 }
459 }
460
461 pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
465 let db = self_arc;
466 std::thread::spawn(move || {
467 loop {
468 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
469 db.id_index.flush_write_buf();
471 db.flush_manifest_if_dirty();
473 }
474 });
475 }
476
477 pub fn head(&self) -> String {
479 self.head.read().clone()
480 }
481
482 pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
485 let prev = match self.id_index.get(coll, id) {
486 None => return Ok(false), Some(h) => h,
488 };
489 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
490 let mut tombstone = Node {
491 id: format!("_del_{}", id),
492 coll: coll.to_string(),
493 seq,
494 data: serde_json::json!({"_deleted": id, "_prev": prev}),
495 prev: Some(prev),
496 caused_by: vec![],
497 ts: now(),
498 valid_from: None,
499 valid_to: None,
500 hash: String::new(),
501 };
502 let hash = self.objects.write(&mut tombstone)?;
503 self.update_head(coll, seq, &hash);
504 self.id_index.remove(coll, id)?;
506 Ok(true)
507 }
508
509 pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
511 let hash = self.id_index.get(coll, id)?;
512 self.objects.read(&hash).ok()
513 }
514
515 pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
517 self.objects.read(hash).ok()
518 }
519
520 pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
523 let hash = self.id_index.get(coll, id)?;
524 let mut current = self.objects.read(&hash).ok()?;
525 loop {
526 if current.seq <= target_seq {
527 return Some(current);
528 }
529 let prev_hash = current.prev.as_deref()?;
530 current = self.objects.read(prev_hash).ok()?;
531 }
532 }
533
534 pub fn list(&self, coll: &str) -> Vec<Node> {
536 self.id_index
537 .list_ids(coll)
538 .into_iter()
539 .filter_map(|id| self.get(coll, &id))
540 .collect()
541 }
542
543 pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
545 if self.sorted_indexes.has(coll, field) {
546 self.sorted_indexes
547 .top_k_asc(coll, field, limit)
548 .into_iter()
549 .filter_map(|h| self.objects.read(&h).ok())
550 .collect()
551 } else {
552 let mut docs = self.list(coll);
553 docs.sort_by(|a, b| {
554 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
555 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
556 av.cmp(&bv)
557 });
558 docs.truncate(limit);
559 docs
560 }
561 }
562
563 pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
565 if self.sorted_indexes.has(coll, field) {
566 self.sorted_indexes
567 .top_k_desc(coll, field, limit)
568 .into_iter()
569 .filter_map(|h| self.objects.read(&h).ok())
570 .collect()
571 } else {
572 let mut docs = self.list(coll);
573 docs.sort_by(|a, b| {
574 let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
575 let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
576 bv.cmp(&av)
577 });
578 docs.truncate(limit);
579 docs
580 }
581 }
582
583 pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
585 self.graph
586 .trace(hash, "caused_by", reverse, limit)
587 .into_iter()
588 .filter_map(|h| self.objects.read(&h).ok())
589 .collect()
590 }
591
592 pub fn verify(&self) -> (usize, Vec<String>) {
594 self.objects.verify_all()
595 }
596
597 pub fn create_sorted_index(&self, coll: &str, field: &str) {
599 self.sorted_indexes.ensure(coll, field);
600 for id in self.id_index.list_ids(coll) {
602 if let Some(node) = self.get(coll, &id) {
603 if let Value::Object(ref obj) = node.data {
604 if let Some(value) = obj.get(field) {
605 self.sorted_indexes.insert(coll, field, value, &node.hash);
606 }
607 }
608 }
609 }
610 }
611
612 pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
615 self.seq_index.get(&seq).map(|r| r.clone())
616 }
617
618 pub fn tip(&self) -> Option<Node> {
626 let next = self.seq.load(Ordering::SeqCst);
627 if next == 0 {
628 return None; }
630 if let Some(hash) = self.get_hash_by_seq(next - 1) {
633 return self.get_by_hash(&hash);
634 }
635 let th = self.tip_hash.read().clone();
639 if !th.is_empty() {
640 return self.get_by_hash(&th);
641 }
642 None
643 }
644
645 pub fn tip_collection(&self, coll: &str) -> Option<Node> {
656 let hash = self.coll_tip_hash.get(coll)?.clone();
657 self.get_by_hash(&hash)
658 }
659
660 pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
671 let next = self.seq.load(Ordering::SeqCst); let head_seq = next.saturating_sub(1);
673 let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
674 let mut nodes: Vec<Node> = Vec::new();
675 let mut to_seq = after_seq;
676 let mut hit_limit = false;
677 let mut s = after_seq.saturating_add(1);
678 while s < next {
679 if nodes.len() >= cap { hit_limit = true; break; }
680 if let Some(hash) = self.get_hash_by_seq(s) {
681 if let Some(node) = self.get_by_hash(&hash) {
682 to_seq = node.seq;
683 nodes.push(node);
684 }
685 }
686 s += 1;
687 }
688 SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
689 }
690
691 pub fn scan_status(&self) -> ScanStatus {
698 let next = self.seq.load(Ordering::SeqCst);
699 let mut min = u64::MAX;
700 let mut max = 0u64;
701 let mut count = 0usize;
702 for kv in self.seq_index.iter() {
703 let s = *kv.key();
704 if s < min { min = s; }
705 if s > max { max = s; }
706 count += 1;
707 }
708 if count == 0 { min = 0; }
709 ScanStatus {
710 scan_complete: self.startup_ready.load(Ordering::SeqCst),
711 tip_seq: next.saturating_sub(1),
712 indexed_seq_min: min,
713 indexed_seq_max: max,
714 indexed_count: count,
715 }
716 }
717
718 pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
723 let (frm_coll, frm_id) = frm.split_once(':')
724 .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
725 let (to_coll, to_id) = to.split_once(':')
726 .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
727 if self.id_index.get(frm_coll, frm_id).is_none() {
728 anyhow::bail!("link: frm not found: {}", frm);
729 }
730 if self.id_index.get(to_coll, to_id).is_none() {
731 anyhow::bail!("link: to not found: {}", to);
732 }
733 let link_id = format!("{}|{}|{}", frm, rel, to);
734 let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
735 self.put("__links__", &link_id, doc, vec![], None, None)?;
736 Ok(())
737 }
738
739 pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
741 let link_id = format!("{}|{}|{}", frm, rel, to);
742 self.delete("__links__", &link_id)
743 }
744
745 pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
748 self.id_index
749 .list_ids("__links__")
750 .into_iter()
751 .filter_map(|id| self.get("__links__", &id))
752 .filter(|node| {
753 node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
754 && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
755 })
756 .filter_map(|node| {
757 let to = node.data.get("_to")?.as_str()?;
758 let (to_coll, to_id) = to.split_once(':')?;
759 self.get(to_coll, to_id)
760 })
761 .collect()
762 }
763}
764
765impl Drop for Db {
766 fn drop(&mut self) {
782 self.flush_all();
783 }
784}
785
786fn cold_scan_background_arc(db: Arc<Db>) {
788 use rayon::prelude::*;
789 use blake2::{Blake2b512, Digest};
790
791 let objects = &db.objects;
792 let head = &db.head;
793 let seq_atomic = &db.seq;
794 let sorted_indexes = &db.sorted_indexes;
795 let root = db.root.clone();
796 let ready_flag = Arc::clone(&db.startup_ready);
797
798 let hashes: Vec<String> = objects.all_hashes().collect();
799 let total = hashes.len();
800
801 if total == 0 {
802 ready_flag.store(true, Ordering::SeqCst);
803 return;
804 }
805
806 println!(" [nedbd] background scan — {} objects...", total);
807 let t0 = std::time::Instant::now();
808 let step = (total / 10).max(1000);
809
810 let nodes: Vec<Node> = hashes.par_iter()
811 .enumerate()
812 .filter_map(|(i, h)| {
813 if i > 0 && i % step == 0 {
814 let pct = i * 100 / total;
815 let elapsed = t0.elapsed().as_secs_f32();
816 let rate = i as f32 / elapsed;
817 let eta = (total - i) as f32 / rate;
818 eprint!("\r [nedbd] {:>3}% {:>8} / {:>8} ({:>8.0}/s eta {:.0}s) ",
819 pct, i, total, rate, eta);
820 }
821 objects.read(h).ok()
822 })
823 .collect();
824
825 eprintln!("\r [nedbd] 100% {:>8} / {:>8} ({:.1}s) ",
826 total, total, t0.elapsed().as_secs_f32());
827
828 let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
829 seq_atomic.store(max_seq + 1, Ordering::SeqCst);
830
831 let mut coll_max: std::collections::HashMap<String, (u64, String)> = std::collections::HashMap::new();
836
837 for node in &nodes {
838 db.seq_index.insert(node.seq, node.hash.clone());
839 coll_max.entry(node.coll.clone())
840 .and_modify(|(s, h)| if node.seq > *s { *s = node.seq; *h = node.hash.clone(); })
841 .or_insert_with(|| (node.seq, node.hash.clone()));
842 if let Value::Object(ref obj) = node.data {
843 for (field, value) in obj {
844 if sorted_indexes.has(&node.coll, field) {
845 sorted_indexes.insert(&node.coll, field, value, &node.hash);
846 }
847 }
848 }
849 }
850
851 let coll_tips: std::collections::HashMap<String, String> = coll_max.into_iter()
852 .map(|(coll, (_seq, hash))| {
853 db.coll_tip_hash.insert(coll.clone(), hash.clone());
854 (coll, hash)
855 })
856 .collect();
857
858 let mut sorted_hashes = hashes;
860 sorted_hashes.sort();
861 let mut h = Blake2b512::new();
862 h.update(max_seq.to_le_bytes());
863 for hash_str in &sorted_hashes {
864 h.update(hash_str.as_bytes());
865 }
866 let new_head = hex::encode(&h.finalize()[..32]);
867 *head.write() = new_head.clone();
868
869 let tip_hash = db.seq_index.iter()
872 .max_by_key(|kv| *kv.key())
873 .map(|kv| kv.value().clone())
874 .unwrap_or_default();
875 *db.tip_hash.write() = tip_hash.clone();
876
877 let m = Manifest { seq: max_seq, head: new_head, tip_hash, coll_tips };
879 let json = serde_json::to_string(&m).unwrap_or_default();
880 let path = root.join("MANIFEST");
881 let tmp = root.join("MANIFEST.tmp");
882 let _ = fs::write(&tmp, &json);
883 let _ = fs::rename(&tmp, &path);
884
885 ready_flag.store(true, Ordering::SeqCst);
887 println!(" [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
888}
889
890fn now() -> f64 {
891 std::time::SystemTime::now()
892 .duration_since(std::time::UNIX_EPOCH)
893 .map(|d| d.as_secs_f64())
894 .unwrap_or(0.0)
895}
896
897#[cfg(test)]
898mod tests {
899 use super::*;
900 use tempfile::tempdir;
901
902 #[test]
903 fn put_and_get() {
904 let dir = tempdir().unwrap();
905 let db = Db::open(dir.path(), None).unwrap();
906 db.put(
907 "blocks", "618000",
908 serde_json::json!({"height": 618000, "hash": "0000abc"}),
909 vec![], None, None,
910 ).unwrap();
911 let node = db.get("blocks", "618000").unwrap();
912 assert_eq!(node.id, "618000");
913 assert_eq!(node.data["height"], 618000);
914 }
915
916 #[test]
917 fn order_by_with_sorted_index() {
918 let dir = tempdir().unwrap();
919 let db = Db::open(dir.path(), None).unwrap();
920 db.create_sorted_index("blocks", "height");
921 for h in [3u64, 1, 5, 2, 4] {
922 db.put("blocks", &h.to_string(),
923 serde_json::json!({"height": h}),
924 vec![], None, None).unwrap();
925 }
926 let asc = db.order_by_asc("blocks", "height", 3);
927 let heights: Vec<u64> = asc.iter()
928 .filter_map(|n| n.data["height"].as_u64())
929 .collect();
930 assert_eq!(heights, vec![1, 2, 3]);
931 }
932
933 #[test]
934 fn causal_trace() {
935 let dir = tempdir().unwrap();
936 let db = Db::open(dir.path(), None).unwrap();
937 let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
938 let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
939 let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
940
941 let trace = db.trace(&c.hash, false, 10);
942 assert_eq!(trace.len(), 3); }
944
945 #[test]
946 fn as_of() {
947 let dir = tempdir().unwrap();
948 let db = Db::open(dir.path(), None).unwrap();
949 let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
950 let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
951
952 let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
953 assert_eq!(at_v1.data["v"], 1);
954 let current = db.get("docs", "x").unwrap();
955 assert_eq!(current.data["v"], 2);
956 }
957}
958
959#[cfg(test)]
960mod tests_v2 {
961 use super::*;
962 use tempfile::tempdir;
963
964 #[test]
965 fn seq_index_populated_on_put() {
966 let db = Db::in_memory();
967 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
968 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
969 assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
970 assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
971 assert_eq!(db.get_hash_by_seq(9999), None);
972 }
973
974 #[test]
975 fn tip_and_since() {
976 let db = Db::in_memory();
977 assert!(db.tip().is_none());
979 assert!(db.since(0, 0).nodes.is_empty());
980
981 let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
982 let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
983
984 let t = db.tip().expect("tip after writes");
986 assert_eq!(t.seq, b.seq);
987 assert_eq!(t.id, "b");
988 assert_eq!(t.hash, b.hash);
989
990 let after_a = db.since(a.seq, 0);
992 assert_eq!(after_a.nodes.len(), 1);
993 assert_eq!(after_a.nodes[0].id, "b");
994 assert_eq!(after_a.from_seq, a.seq);
995 assert_eq!(after_a.to_seq, b.seq);
996 assert_eq!(after_a.head_seq, b.seq);
997 assert!(!after_a.has_more);
998
999 assert!(db.since(b.seq, 0).nodes.is_empty());
1001
1002 let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
1004 let page = db.since(a.seq, 1); assert_eq!(page.nodes.len(), 1);
1006 assert_eq!(page.nodes[0].id, "b");
1007 assert_eq!(page.to_seq, b.seq);
1008 assert!(page.has_more);
1009 let page2 = db.since(page.to_seq, 1); assert_eq!(page2.nodes.len(), 1);
1011 assert_eq!(page2.nodes[0].id, "c");
1012 assert_eq!(page2.to_seq, c.seq);
1013 assert!(!page2.has_more);
1014 }
1015
1016 #[test]
1017 fn tip_collection_per_chain() {
1018 let db = Db::in_memory();
1021 assert!(db.tip_collection("blocks").is_none());
1022
1023 db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
1024 db.put("tx", "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1025 let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1026 let t1 = db.put("tx", "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1027
1028 assert_eq!(db.tip().unwrap().id, "t1");
1030 let bt = db.tip_collection("blocks").expect("blocks tip");
1032 assert_eq!(bt.id, "b1");
1033 assert_eq!(bt.seq, b1.seq);
1034 assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
1035 assert!(db.tip_collection("absent").is_none());
1036 }
1037
1038 #[test]
1039 fn seq_index_survives_batch() {
1040 let db = Db::in_memory();
1041 let nodes = db.put_batch(vec![
1042 ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
1043 ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
1044 ]).unwrap();
1045 for node in &nodes {
1046 assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
1047 }
1048 }
1049
1050 #[test]
1051 fn link_and_neighbors() {
1052 let db = Db::in_memory();
1053 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1054 db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
1055 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1056 db.put("trip", "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1057
1058 db.link("driver:d1", "handles", "trip:t1").unwrap();
1059 db.link("driver:d1", "handles", "trip:t2").unwrap();
1060 db.link("driver:d2", "handles", "trip:t1").unwrap();
1061
1062 let d1_trips = db.neighbors("driver:d1", "handles");
1063 assert_eq!(d1_trips.len(), 2);
1064 let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
1065 assert!(ids.contains("t1") && ids.contains("t2"));
1066
1067 let d2_trips = db.neighbors("driver:d2", "handles");
1068 assert_eq!(d2_trips.len(), 1);
1069 assert_eq!(d2_trips[0].id, "t1");
1070 }
1071
1072 #[test]
1073 fn link_stored_in_links_collection() {
1074 let db = Db::in_memory();
1077 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1078 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1079 db.link("driver:d1", "handles", "trip:t1").unwrap();
1080 let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1082 assert!(link_doc.is_some(), "__links__ doc should exist");
1083 let doc = link_doc.unwrap();
1084 assert_eq!(doc.data["_from"], "driver:d1");
1085 assert_eq!(doc.data["_rel"], "handles");
1086 assert_eq!(doc.data["_to"], "trip:t1");
1087 let nb = db.neighbors("driver:d1", "handles");
1089 assert_eq!(nb.len(), 1);
1090 assert_eq!(nb[0].id, "t1");
1091 }
1092
1093 #[test]
1094 fn link_missing_node_errors() {
1095 let db = Db::in_memory();
1096 db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1097 assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1098 }
1099
1100 #[test]
1101 fn link_durable_survives_reopen() {
1102 let dir = tempdir().unwrap();
1103 {
1104 let db = Db::open(dir.path(), None).unwrap();
1105 db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
1106 db.put("trip", "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1107 db.link("driver:d1", "handles", "trip:t1").unwrap();
1108 }
1109 let db2 = Db::open(dir.path(), None).unwrap();
1110 db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1111 let trips = db2.neighbors("driver:d1", "handles");
1112 assert_eq!(trips.len(), 1);
1113 assert_eq!(trips[0].id, "t1");
1114 }
1115
1116 #[test]
1117 fn tip_survives_warm_restart() {
1118 let dir = tempdir().unwrap();
1122 {
1123 let db = Db::open(dir.path(), None).unwrap();
1124 db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1125 db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1126 db.flush_all(); assert_eq!(db.tip().expect("tip in-session").id, "b2");
1128 }
1129 let db2 = Db::open(dir.path(), None).unwrap();
1131 assert!(db2.get_hash_by_seq(1).is_none(), "seq_index is cold on a warm boot");
1132 let tip = db2.tip().expect("tip() must survive a warm restart");
1133 assert_eq!(tip.id, "b2");
1134 assert_eq!(tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1135 }
1136
1137 #[test]
1138 fn tip_collection_survives_warm_restart() {
1139 let dir = tempdir().unwrap();
1143 {
1144 let db = Db::open(dir.path(), None).unwrap();
1145 db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1146 db.put("tx", "t1", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1147 let b2 = db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1148 db.flush_all(); assert_eq!(db.tip_collection("blocks").unwrap().id, "b2");
1150 assert_eq!(db.tip_collection("blocks").unwrap().seq, b2.seq);
1151 }
1152 let db2 = Db::open(dir.path(), None).unwrap();
1154 assert!(db2.get_hash_by_seq(0).is_none(), "seq_index is cold on a warm boot");
1155 let blocks_tip = db2.tip_collection("blocks").expect("tip_collection must survive a warm restart");
1156 assert_eq!(blocks_tip.id, "b2");
1157 assert_eq!(blocks_tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1158 let tx_tip = db2.tip_collection("tx").expect("tx tip must also survive");
1159 assert_eq!(tx_tip.id, "t1");
1160 assert!(db2.tip_collection("absent").is_none());
1161 }
1162}