1use std::collections::BTreeMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use anyhow::Result;
18use dashmap::DashMap;
19use serde_json::Value;
20
21#[derive(Debug, Clone, PartialEq)]
23pub enum OrderedValue {
24 Null,
25 Bool(bool),
26 Number(f64), Str(String),
28 Array(Vec<OrderedValue>),
29 Object, }
31
32impl Eq for OrderedValue {}
33
34impl PartialOrd for OrderedValue {
35 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
36 Some(self.cmp(other))
37 }
38}
39
40impl Ord for OrderedValue {
41 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
42 use OrderedValue::*;
43 use std::cmp::Ordering::*;
44 match (self, other) {
45 (Null, Null) => Equal,
46 (Null, _) => Less,
47 (_, Null) => Greater,
48 (Bool(a), Bool(b)) => a.cmp(b),
49 (Bool(_), _) => Less,
50 (_, Bool(_)) => Greater,
51 (Number(a), Number(b)) => a.total_cmp(b),
52 (Number(_), _) => Less,
53 (_, Number(_)) => Greater,
54 (Str(a), Str(b)) => a.cmp(b),
55 (Str(_), _) => Less,
56 (_, Str(_)) => Greater,
57 (Array(a), Array(b)) => a.cmp(b),
58 (Array(_), _) => Less,
59 (_, Array(_)) => Greater,
60 (Object, Object) => Equal,
61 }
62 }
63}
64
65impl From<&Value> for OrderedValue {
66 fn from(v: &Value) -> Self {
67 match v {
68 Value::Null => OrderedValue::Null,
69 Value::Bool(b) => OrderedValue::Bool(*b),
70 Value::Number(n) => OrderedValue::Number(n.as_f64().unwrap_or(f64::NAN)),
71 Value::String(s) => OrderedValue::Str(s.clone()),
72 Value::Array(a) => OrderedValue::Array(a.iter().map(|x| x.into()).collect()),
73 Value::Object(_) => OrderedValue::Object,
74 }
75 }
76}
77
78fn id_shard(id: &str) -> String {
82 let mut hash: u32 = 2166136261;
84 for b in id.bytes() {
85 hash ^= b as u32;
86 hash = hash.wrapping_mul(16777619);
87 }
88 format!("{:02x}", hash & 0xff)
89}
90
91fn encode_id(id: &str) -> String {
104 fn is_unreserved(b: u8) -> bool {
105 b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.')
106 }
107 if id.bytes().all(is_unreserved) {
108 return id.to_string();
109 }
110 let mut out = String::with_capacity(id.len() + 8);
111 for &b in id.as_bytes() {
112 if is_unreserved(b) {
113 out.push(b as char);
114 } else {
115 out.push_str(&format!("%{:02X}", b));
116 }
117 }
118 out
119}
120
121fn decode_id(name: &str) -> String {
125 if !name.contains('%') {
126 return name.to_string();
127 }
128 fn hexval(b: u8) -> Option<u8> {
129 match b {
130 b'0'..=b'9' => Some(b - b'0'),
131 b'A'..=b'F' => Some(b - b'A' + 10),
132 b'a'..=b'f' => Some(b - b'a' + 10),
133 _ => None,
134 }
135 }
136 let bytes = name.as_bytes();
137 let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
138 let mut i = 0;
139 while i < bytes.len() {
140 if bytes[i] == b'%' && i + 2 < bytes.len() {
141 if let (Some(hi), Some(lo)) = (hexval(bytes[i + 1]), hexval(bytes[i + 2])) {
142 out.push(hi * 16 + lo);
143 i += 3;
144 continue;
145 }
146 }
147 out.push(bytes[i]);
148 i += 1;
149 }
150 String::from_utf8_lossy(&out).into_owned()
151}
152
153pub struct IdIndex {
160 root: PathBuf,
161 mem: Option<Arc<dashmap::DashMap<(String, String), String>>>,
163 write_buf: Arc<dashmap::DashMap<(String, String), Option<String>>>, }
166
167impl IdIndex {
168 pub fn new(db_root: &Path) -> Result<Self> {
169 let root = db_root.join("indexes");
170 fs::create_dir_all(&root)?;
171 Ok(Self { root, mem: None, write_buf: Arc::new(dashmap::DashMap::new()) })
172 }
173
174 pub fn in_memory() -> Self {
176 Self {
177 root: PathBuf::from(":memory:"),
178 mem: Some(Arc::new(dashmap::DashMap::new())),
179 write_buf: Arc::new(dashmap::DashMap::new()),
180 }
181 }
182
183 pub fn flush_write_buf(&self) {
186 if self.mem.is_some() || self.write_buf.is_empty() { return; }
187 use rayon::prelude::*;
188 let entries: Vec<((String, String), Option<String>)> = self.write_buf
190 .iter()
191 .map(|e| (e.key().clone(), e.value().clone()))
192 .collect();
193 entries.par_iter().for_each(|((coll, id), hash_opt)| {
194 match hash_opt {
195 Some(hash) => {
196 let path = self.path(coll, id);
198 if let Some(parent) = path.parent() {
199 let _ = fs::create_dir_all(parent);
200 }
201 let tmp = path.with_extension("tmp");
202 if fs::write(&tmp, hash).is_ok() {
203 let _ = fs::rename(&tmp, &path);
204 }
205 }
206 None => {
207 let path = self.path(coll, id);
209 let _ = fs::remove_file(&path);
210 let raw = self.raw_path(coll, id);
211 if raw != path { let _ = fs::remove_file(&raw); }
212 }
213 }
214 });
215 for ((coll, id), _) in &entries {
217 self.write_buf.remove(&(coll.clone(), id.clone()));
218 }
219 }
220
221 fn path(&self, coll: &str, id: &str) -> PathBuf {
222 let shard = id_shard(id);
229 self.root.join(coll).join("id").join(&shard).join(encode_id(id))
230 }
231
232 fn raw_path(&self, coll: &str, id: &str) -> PathBuf {
237 let shard = id_shard(id);
238 self.root.join(coll).join("id").join(&shard).join(id)
239 }
240
241 pub fn get(&self, coll: &str, id: &str) -> Option<String> {
244 if let Some(ref mem) = self.mem {
245 return mem.get(&(coll.to_string(), id.to_string())).map(|v| v.clone());
246 }
247 let key = (coll.to_string(), id.to_string());
249 if let Some(entry) = self.write_buf.get(&key) {
250 return entry.value().clone(); }
252 let p = self.path(coll, id);
256 let content = match fs::read_to_string(&p) {
257 Ok(c) => c,
258 Err(_) => {
259 let raw = self.raw_path(coll, id);
260 if raw == p { return None; }
261 fs::read_to_string(&raw).ok()?
262 }
263 };
264 let h = content.trim().to_string();
265 if h.is_empty() { None } else { Some(h) }
266 }
267
268 pub fn set(&self, coll: &str, id: &str, hash: &str) -> Result<()> {
272 if let Some(ref mem) = self.mem {
273 mem.insert((coll.to_string(), id.to_string()), hash.to_string());
274 return Ok(());
275 }
276 self.write_buf.insert(
278 (coll.to_string(), id.to_string()),
279 Some(hash.to_string()),
280 );
281 Ok(())
282 }
283
284 pub fn list_ids(&self, coll: &str) -> Vec<String> {
286 if let Some(ref mem) = self.mem {
287 return mem.iter()
288 .filter(|e| e.key().0 == coll)
289 .map(|e| e.key().1.clone())
290 .collect();
291 }
292 let id_root = self.root.join(coll).join("id");
294 fs::read_dir(&id_root)
296 .into_iter()
297 .flatten()
298 .filter_map(|e| e.ok())
299 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
300 .flat_map(|shard_dir| {
301 fs::read_dir(shard_dir.path())
302 .into_iter()
303 .flatten()
304 .filter_map(|e| e.ok())
305 .filter_map(|e| {
306 let name = e.file_name().to_string_lossy().to_string();
307 if name.ends_with(".tmp") { return None; }
308 Some(decode_id(&name))
311 })
312 .collect::<Vec<_>>()
313 })
314 .collect::<std::collections::HashSet<_>>()
315 .into_iter()
316 .chain(
318 self.write_buf.iter()
319 .filter(|e| e.key().0 == coll && e.value().is_some())
320 .map(|e| e.key().1.clone())
321 )
322 .collect::<std::collections::HashSet<_>>()
323 .into_iter()
324 .filter(|id| {
325 self.write_buf.get(&(coll.to_string(), id.clone()))
327 .map(|v| v.is_some())
328 .unwrap_or(true)
329 })
330 .collect()
331 }
332
333 pub fn remove(&self, coll: &str, id: &str) -> Result<()> {
336 if let Some(ref mem) = self.mem {
337 mem.remove(&(coll.to_string(), id.to_string()));
338 return Ok(());
339 }
340 self.write_buf.insert((coll.to_string(), id.to_string()), None);
342 Ok(())
343 }
344
345 pub fn collections(&self) -> Vec<String> {
347 if let Some(ref mem) = self.mem {
348 let mut colls: Vec<String> = mem.iter()
349 .map(|e| e.key().0.clone())
350 .collect::<std::collections::HashSet<_>>()
351 .into_iter().collect();
352 colls.sort();
353 return colls;
354 }
355 fs::read_dir(&self.root)
356 .into_iter()
357 .flatten()
358 .filter_map(|e| e.ok())
359 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
360 .map(|e| e.file_name().to_string_lossy().to_string())
361 .collect()
362 }
363}
364
365pub struct SortedIndexes {
368 inner: DashMap<(String, String), BTreeMap<OrderedValue, Vec<String>>>,
370}
371
372impl SortedIndexes {
373 pub fn new() -> Self {
374 Self { inner: DashMap::new() }
375 }
376
377 pub fn ensure(&self, coll: &str, field: &str) {
380 self.inner
381 .entry((coll.to_string(), field.to_string()))
382 .or_default();
383 }
384
385 pub fn insert(&self, coll: &str, field: &str, value: &Value, hash: &str) {
387 let key = (coll.to_string(), field.to_string());
388 if let Some(mut idx) = self.inner.get_mut(&key) {
389 let ov = OrderedValue::from(value);
390 idx.entry(ov)
391 .or_default()
392 .push(hash.to_string());
393 }
394 }
395
396 pub fn remove(&self, coll: &str, field: &str, value: &Value, hash: &str) {
398 let key = (coll.to_string(), field.to_string());
399 if let Some(mut idx) = self.inner.get_mut(&key) {
400 let ov = OrderedValue::from(value);
401 if let Some(hashes) = idx.get_mut(&ov) {
402 hashes.retain(|h| h != hash);
403 if hashes.is_empty() { idx.remove(&ov); }
404 }
405 }
406 }
407
408 pub fn top_k_asc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
410 let key = (coll.to_string(), field.to_string());
411 self.inner.get(&key).map(|idx| {
412 idx.values().flat_map(|v| v.iter().cloned()).take(k).collect()
413 }).unwrap_or_default()
414 }
415
416 pub fn top_k_desc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
418 let key = (coll.to_string(), field.to_string());
419 self.inner.get(&key).map(|idx| {
420 idx.values().rev().flat_map(|v| v.iter().cloned()).take(k).collect()
421 }).unwrap_or_default()
422 }
423
424 pub fn has(&self, coll: &str, field: &str) -> bool {
426 self.inner.contains_key(&(coll.to_string(), field.to_string()))
427 }
428
429 pub fn is_empty(&self) -> bool {
431 self.inner.is_empty()
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use tempfile::tempdir;
439
440 #[test]
441 fn id_index_roundtrip() {
442 let dir = tempdir().unwrap();
443 let idx = IdIndex::new(dir.path()).unwrap();
444 idx.set("blocks", "618000", "abcdef1234").unwrap();
445 assert_eq!(idx.get("blocks", "618000"), Some("abcdef1234".to_string()));
446 }
447
448 #[test]
449 fn encode_decode_id_bijective() {
450 for safe in ["618000", "utxo-000000042", "abc_DEF.123", "deadBEEF"] {
452 assert_eq!(encode_id(safe), safe, "safe id must be identity");
453 assert_eq!(decode_id(&encode_id(safe)), safe);
454 }
455 for weird in ["driver:d1|handles|trip:t1", "a/b\\c", "x<y>z?\"*", "100%done"] {
458 let enc = encode_id(weird);
459 assert!(
460 !enc.chars().any(|c| matches!(c,
461 ':' | '|' | '/' | '\\' | '<' | '>' | '?' | '"' | '*')),
462 "encoded leaf must be filesystem-safe: {}", enc);
463 assert_eq!(decode_id(&enc), weird, "encode/decode must round-trip");
464 }
465 }
466
467 #[test]
468 fn id_index_fs_unsafe_id_survives_disk_roundtrip() {
469 let dir = tempdir().unwrap();
472 let weird = "driver:d1|handles|trip:t1";
473 {
474 let idx = IdIndex::new(dir.path()).unwrap();
475 idx.set("__links__", weird, "deadbeefcafe").unwrap();
476 idx.flush_write_buf(); }
478 let idx2 = IdIndex::new(dir.path()).unwrap();
480 assert_eq!(idx2.get("__links__", weird), Some("deadbeefcafe".to_string()),
481 "FS-unsafe id must be readable from disk after reopen");
482 assert_eq!(idx2.list_ids("__links__"), vec![weird.to_string()],
483 "list_ids must decode the on-disk filename back to the id");
484 }
485
486 #[test]
487 fn ordered_value_ordering() {
488 use OrderedValue::*;
489 assert!(Null < Bool(false));
490 assert!(Bool(false) < Bool(true));
491 assert!(Bool(true) < Number(0.0));
492 assert!(Number(1.0) < Number(2.0));
493 assert!(Number(2.0) < Str("a".to_string()));
494 assert!(Str("a".to_string()) < Str("b".to_string()));
495 }
496
497 #[test]
498 fn sorted_index_top_k() {
499 let idx = SortedIndexes::new();
500 idx.ensure("blocks", "height");
501 idx.insert("blocks", "height", &serde_json::json!(3), "hash3");
502 idx.insert("blocks", "height", &serde_json::json!(1), "hash1");
503 idx.insert("blocks", "height", &serde_json::json!(2), "hash2");
504 let asc = idx.top_k_asc("blocks", "height", 2);
505 assert_eq!(asc, vec!["hash1", "hash2"]);
506 let desc = idx.top_k_desc("blocks", "height", 2);
507 assert_eq!(desc, vec!["hash3", "hash2"]);
508 }
509}