1use std::collections::BTreeMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use anyhow::Result;
18use dashmap::DashMap;
19use serde_json::Value;
20
21#[derive(Debug, Clone, PartialEq)]
23pub enum OrderedValue {
24 Null,
25 Bool(bool),
26 Number(f64), Str(String),
28 Array(Vec<OrderedValue>),
29 Object, }
31
32impl Eq for OrderedValue {}
33
34impl PartialOrd for OrderedValue {
35 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
36 Some(self.cmp(other))
37 }
38}
39
40impl Ord for OrderedValue {
41 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
42 use OrderedValue::*;
43 use std::cmp::Ordering::*;
44 match (self, other) {
45 (Null, Null) => Equal,
46 (Null, _) => Less,
47 (_, Null) => Greater,
48 (Bool(a), Bool(b)) => a.cmp(b),
49 (Bool(_), _) => Less,
50 (_, Bool(_)) => Greater,
51 (Number(a), Number(b)) => a.total_cmp(b),
52 (Number(_), _) => Less,
53 (_, Number(_)) => Greater,
54 (Str(a), Str(b)) => a.cmp(b),
55 (Str(_), _) => Less,
56 (_, Str(_)) => Greater,
57 (Array(a), Array(b)) => a.cmp(b),
58 (Array(_), _) => Less,
59 (_, Array(_)) => Greater,
60 (Object, Object) => Equal,
61 }
62 }
63}
64
65impl From<&Value> for OrderedValue {
66 fn from(v: &Value) -> Self {
67 match v {
68 Value::Null => OrderedValue::Null,
69 Value::Bool(b) => OrderedValue::Bool(*b),
70 Value::Number(n) => OrderedValue::Number(n.as_f64().unwrap_or(f64::NAN)),
71 Value::String(s) => OrderedValue::Str(s.clone()),
72 Value::Array(a) => OrderedValue::Array(a.iter().map(|x| x.into()).collect()),
73 Value::Object(_) => OrderedValue::Object,
74 }
75 }
76}
77
78fn id_shard(id: &str) -> String {
82 let mut hash: u32 = 2166136261;
84 for b in id.bytes() {
85 hash ^= b as u32;
86 hash = hash.wrapping_mul(16777619);
87 }
88 format!("{:02x}", hash & 0xff)
89}
90
91fn encode_id(id: &str) -> String {
104 fn is_unreserved(b: u8) -> bool {
105 b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.')
106 }
107 if id.bytes().all(is_unreserved) {
108 return id.to_string();
109 }
110 let mut out = String::with_capacity(id.len() + 8);
111 for &b in id.as_bytes() {
112 if is_unreserved(b) {
113 out.push(b as char);
114 } else {
115 out.push_str(&format!("%{:02X}", b));
116 }
117 }
118 out
119}
120
121fn decode_id(name: &str) -> String {
125 if !name.contains('%') {
126 return name.to_string();
127 }
128 fn hexval(b: u8) -> Option<u8> {
129 match b {
130 b'0'..=b'9' => Some(b - b'0'),
131 b'A'..=b'F' => Some(b - b'A' + 10),
132 b'a'..=b'f' => Some(b - b'a' + 10),
133 _ => None,
134 }
135 }
136 let bytes = name.as_bytes();
137 let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
138 let mut i = 0;
139 while i < bytes.len() {
140 if bytes[i] == b'%' && i + 2 < bytes.len() {
141 if let (Some(hi), Some(lo)) = (hexval(bytes[i + 1]), hexval(bytes[i + 2])) {
142 out.push(hi * 16 + lo);
143 i += 3;
144 continue;
145 }
146 }
147 out.push(bytes[i]);
148 i += 1;
149 }
150 String::from_utf8_lossy(&out).into_owned()
151}
152
153pub struct IdIndex {
160 root: PathBuf,
161 mem: Option<Arc<dashmap::DashMap<(String, String), String>>>,
163 write_buf: Arc<dashmap::DashMap<(String, String), Option<String>>>, }
166
167impl IdIndex {
168 pub fn new(db_root: &Path) -> Result<Self> {
169 let root = db_root.join("indexes");
170 fs::create_dir_all(&root)?;
171 Ok(Self { root, mem: None, write_buf: Arc::new(dashmap::DashMap::new()) })
172 }
173
174 pub fn in_memory() -> Self {
176 Self {
177 root: PathBuf::from(":memory:"),
178 mem: Some(Arc::new(dashmap::DashMap::new())),
179 write_buf: Arc::new(dashmap::DashMap::new()),
180 }
181 }
182
183 pub fn flush_write_buf(&self) {
186 if self.mem.is_some() || self.write_buf.is_empty() { return; }
187 use rayon::prelude::*;
188 let entries: Vec<((String, String), Option<String>)> = self.write_buf
190 .iter()
191 .map(|e| (e.key().clone(), e.value().clone()))
192 .collect();
193 entries.par_iter().for_each(|((coll, id), hash_opt)| {
194 match hash_opt {
195 Some(hash) => {
196 let path = self.path(coll, id);
198 if let Some(parent) = path.parent() {
199 let _ = fs::create_dir_all(parent);
200 }
201 let tmp = path.with_extension("tmp");
202 if fs::write(&tmp, hash).is_ok() {
203 let _ = fs::rename(&tmp, &path);
204 }
205 }
206 None => {
207 let path = self.path(coll, id);
209 let _ = fs::remove_file(&path);
210 let raw = self.raw_path(coll, id);
211 if raw != path { let _ = fs::remove_file(&raw); }
212 }
213 }
214 });
215 for (key, flushed_val) in &entries {
223 self.write_buf.remove_if(key, |_, current| current == flushed_val);
224 }
225 }
226
227 fn path(&self, coll: &str, id: &str) -> PathBuf {
228 let shard = id_shard(id);
235 self.root.join(coll).join("id").join(&shard).join(encode_id(id))
236 }
237
238 fn raw_path(&self, coll: &str, id: &str) -> PathBuf {
243 let shard = id_shard(id);
244 self.root.join(coll).join("id").join(&shard).join(id)
245 }
246
247 pub fn get(&self, coll: &str, id: &str) -> Option<String> {
250 if let Some(ref mem) = self.mem {
251 return mem.get(&(coll.to_string(), id.to_string())).map(|v| v.clone());
252 }
253 let key = (coll.to_string(), id.to_string());
255 if let Some(entry) = self.write_buf.get(&key) {
256 return entry.value().clone(); }
258 let p = self.path(coll, id);
262 let content = match fs::read_to_string(&p) {
263 Ok(c) => c,
264 Err(_) => {
265 let raw = self.raw_path(coll, id);
266 if raw == p { return None; }
267 fs::read_to_string(&raw).ok()?
268 }
269 };
270 let h = content.trim().to_string();
271 if h.is_empty() { None } else { Some(h) }
272 }
273
274 pub fn set(&self, coll: &str, id: &str, hash: &str) -> Result<()> {
278 if let Some(ref mem) = self.mem {
279 mem.insert((coll.to_string(), id.to_string()), hash.to_string());
280 return Ok(());
281 }
282 self.write_buf.insert(
284 (coll.to_string(), id.to_string()),
285 Some(hash.to_string()),
286 );
287 Ok(())
288 }
289
290 pub fn list_ids(&self, coll: &str) -> Vec<String> {
292 if let Some(ref mem) = self.mem {
293 return mem.iter()
294 .filter(|e| e.key().0 == coll)
295 .map(|e| e.key().1.clone())
296 .collect();
297 }
298 let id_root = self.root.join(coll).join("id");
300 fs::read_dir(&id_root)
302 .into_iter()
303 .flatten()
304 .filter_map(|e| e.ok())
305 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
306 .flat_map(|shard_dir| {
307 fs::read_dir(shard_dir.path())
308 .into_iter()
309 .flatten()
310 .filter_map(|e| e.ok())
311 .filter_map(|e| {
312 let name = e.file_name().to_string_lossy().to_string();
313 if name.ends_with(".tmp") { return None; }
314 Some(decode_id(&name))
317 })
318 .collect::<Vec<_>>()
319 })
320 .collect::<std::collections::HashSet<_>>()
321 .into_iter()
322 .chain(
324 self.write_buf.iter()
325 .filter(|e| e.key().0 == coll && e.value().is_some())
326 .map(|e| e.key().1.clone())
327 )
328 .collect::<std::collections::HashSet<_>>()
329 .into_iter()
330 .filter(|id| {
331 self.write_buf.get(&(coll.to_string(), id.clone()))
333 .map(|v| v.is_some())
334 .unwrap_or(true)
335 })
336 .collect()
337 }
338
339 pub fn remove(&self, coll: &str, id: &str) -> Result<()> {
342 if let Some(ref mem) = self.mem {
343 mem.remove(&(coll.to_string(), id.to_string()));
344 return Ok(());
345 }
346 self.write_buf.insert((coll.to_string(), id.to_string()), None);
348 Ok(())
349 }
350
351 pub fn collections(&self) -> Vec<String> {
353 if let Some(ref mem) = self.mem {
354 let mut colls: Vec<String> = mem.iter()
355 .map(|e| e.key().0.clone())
356 .collect::<std::collections::HashSet<_>>()
357 .into_iter().collect();
358 colls.sort();
359 return colls;
360 }
361 fs::read_dir(&self.root)
362 .into_iter()
363 .flatten()
364 .filter_map(|e| e.ok())
365 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
366 .map(|e| e.file_name().to_string_lossy().to_string())
367 .collect()
368 }
369}
370
371pub struct SortedIndexes {
374 inner: DashMap<(String, String), BTreeMap<OrderedValue, Vec<String>>>,
376}
377
378impl SortedIndexes {
379 pub fn new() -> Self {
380 Self { inner: DashMap::new() }
381 }
382
383 pub fn ensure(&self, coll: &str, field: &str) {
386 self.inner
387 .entry((coll.to_string(), field.to_string()))
388 .or_default();
389 }
390
391 pub fn insert(&self, coll: &str, field: &str, value: &Value, hash: &str) {
393 let key = (coll.to_string(), field.to_string());
394 if let Some(mut idx) = self.inner.get_mut(&key) {
395 let ov = OrderedValue::from(value);
396 idx.entry(ov)
397 .or_default()
398 .push(hash.to_string());
399 }
400 }
401
402 pub fn remove(&self, coll: &str, field: &str, value: &Value, hash: &str) {
404 let key = (coll.to_string(), field.to_string());
405 if let Some(mut idx) = self.inner.get_mut(&key) {
406 let ov = OrderedValue::from(value);
407 if let Some(hashes) = idx.get_mut(&ov) {
408 hashes.retain(|h| h != hash);
409 if hashes.is_empty() { idx.remove(&ov); }
410 }
411 }
412 }
413
414 pub fn top_k_asc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
416 let key = (coll.to_string(), field.to_string());
417 self.inner.get(&key).map(|idx| {
418 idx.values().flat_map(|v| v.iter().cloned()).take(k).collect()
419 }).unwrap_or_default()
420 }
421
422 pub fn top_k_desc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
424 let key = (coll.to_string(), field.to_string());
425 self.inner.get(&key).map(|idx| {
426 idx.values().rev().flat_map(|v| v.iter().cloned()).take(k).collect()
427 }).unwrap_or_default()
428 }
429
430 pub fn has(&self, coll: &str, field: &str) -> bool {
432 self.inner.contains_key(&(coll.to_string(), field.to_string()))
433 }
434
435 pub fn is_empty(&self) -> bool {
437 self.inner.is_empty()
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use tempfile::tempdir;
445
446 #[test]
447 fn id_index_roundtrip() {
448 let dir = tempdir().unwrap();
449 let idx = IdIndex::new(dir.path()).unwrap();
450 idx.set("blocks", "618000", "abcdef1234").unwrap();
451 assert_eq!(idx.get("blocks", "618000"), Some("abcdef1234".to_string()));
452 }
453
454 #[test]
455 fn encode_decode_id_bijective() {
456 for safe in ["618000", "utxo-000000042", "abc_DEF.123", "deadBEEF"] {
458 assert_eq!(encode_id(safe), safe, "safe id must be identity");
459 assert_eq!(decode_id(&encode_id(safe)), safe);
460 }
461 for weird in ["driver:d1|handles|trip:t1", "a/b\\c", "x<y>z?\"*", "100%done"] {
464 let enc = encode_id(weird);
465 assert!(
466 !enc.chars().any(|c| matches!(c,
467 ':' | '|' | '/' | '\\' | '<' | '>' | '?' | '"' | '*')),
468 "encoded leaf must be filesystem-safe: {}", enc);
469 assert_eq!(decode_id(&enc), weird, "encode/decode must round-trip");
470 }
471 }
472
473 #[test]
474 fn id_index_fs_unsafe_id_survives_disk_roundtrip() {
475 let dir = tempdir().unwrap();
478 let weird = "driver:d1|handles|trip:t1";
479 {
480 let idx = IdIndex::new(dir.path()).unwrap();
481 idx.set("__links__", weird, "deadbeefcafe").unwrap();
482 idx.flush_write_buf(); }
484 let idx2 = IdIndex::new(dir.path()).unwrap();
486 assert_eq!(idx2.get("__links__", weird), Some("deadbeefcafe".to_string()),
487 "FS-unsafe id must be readable from disk after reopen");
488 assert_eq!(idx2.list_ids("__links__"), vec![weird.to_string()],
489 "list_ids must decode the on-disk filename back to the id");
490 }
491
492 #[test]
493 fn ordered_value_ordering() {
494 use OrderedValue::*;
495 assert!(Null < Bool(false));
496 assert!(Bool(false) < Bool(true));
497 assert!(Bool(true) < Number(0.0));
498 assert!(Number(1.0) < Number(2.0));
499 assert!(Number(2.0) < Str("a".to_string()));
500 assert!(Str("a".to_string()) < Str("b".to_string()));
501 }
502
503 #[test]
504 fn sorted_index_top_k() {
505 let idx = SortedIndexes::new();
506 idx.ensure("blocks", "height");
507 idx.insert("blocks", "height", &serde_json::json!(3), "hash3");
508 idx.insert("blocks", "height", &serde_json::json!(1), "hash1");
509 idx.insert("blocks", "height", &serde_json::json!(2), "hash2");
510 let asc = idx.top_k_asc("blocks", "height", 2);
511 assert_eq!(asc, vec!["hash1", "hash2"]);
512 let desc = idx.top_k_desc("blocks", "height", 2);
513 assert_eq!(desc, vec!["hash3", "hash2"]);
514 }
515
516 #[test]
533 fn flush_never_drops_a_concurrent_newer_write() {
534 use std::sync::Arc;
535 use std::sync::atomic::{AtomicBool, Ordering};
536
537 let dir = tempdir().unwrap();
538 let idx = Arc::new(IdIndex::new(dir.path()).unwrap());
539 let stop = Arc::new(AtomicBool::new(false));
540 const N: usize = 2000;
541
542 let flusher = {
543 let idx = Arc::clone(&idx);
544 let stop = Arc::clone(&stop);
545 std::thread::spawn(move || {
546 while !stop.load(Ordering::Relaxed) {
547 idx.flush_write_buf();
548 }
549 })
550 };
551
552 for i in 0..N {
554 idx.set("c", &format!("k{}", i), "v1").unwrap();
555 }
556 for i in 0..N {
557 idx.set("c", &format!("k{}", i), "v2").unwrap();
558 }
559
560 stop.store(true, Ordering::Relaxed);
561 flusher.join().unwrap();
562 idx.flush_write_buf();
564 idx.flush_write_buf();
565
566 for i in 0..N {
568 let k = format!("k{}", i);
569 assert_eq!(idx.get("c", &k), Some("v2".to_string()),
570 "key {} lost its newer write (buffer path)", k);
571 }
572 let cold = IdIndex::new(dir.path()).unwrap();
573 for i in 0..N {
574 let k = format!("k{}", i);
575 assert_eq!(cold.get("c", &k), Some("v2".to_string()),
576 "key {} lost its newer write (disk path)", k);
577 }
578 }
579}