1use std::collections::BTreeMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use anyhow::Result;
18use dashmap::DashMap;
19use serde_json::Value;
20
21#[derive(Debug, Clone, PartialEq)]
23pub enum OrderedValue {
24 Null,
25 Bool(bool),
26 Number(f64), Str(String),
28 Array(Vec<OrderedValue>),
29 Object, }
31
32impl Eq for OrderedValue {}
33
34impl PartialOrd for OrderedValue {
35 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
36 Some(self.cmp(other))
37 }
38}
39
40impl Ord for OrderedValue {
41 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
42 use OrderedValue::*;
43 use std::cmp::Ordering::*;
44 match (self, other) {
45 (Null, Null) => Equal,
46 (Null, _) => Less,
47 (_, Null) => Greater,
48 (Bool(a), Bool(b)) => a.cmp(b),
49 (Bool(_), _) => Less,
50 (_, Bool(_)) => Greater,
51 (Number(a), Number(b)) => a.total_cmp(b),
52 (Number(_), _) => Less,
53 (_, Number(_)) => Greater,
54 (Str(a), Str(b)) => a.cmp(b),
55 (Str(_), _) => Less,
56 (_, Str(_)) => Greater,
57 (Array(a), Array(b)) => a.cmp(b),
58 (Array(_), _) => Less,
59 (_, Array(_)) => Greater,
60 (Object, Object) => Equal,
61 }
62 }
63}
64
65impl From<&Value> for OrderedValue {
66 fn from(v: &Value) -> Self {
67 match v {
68 Value::Null => OrderedValue::Null,
69 Value::Bool(b) => OrderedValue::Bool(*b),
70 Value::Number(n) => OrderedValue::Number(n.as_f64().unwrap_or(f64::NAN)),
71 Value::String(s) => OrderedValue::Str(s.clone()),
72 Value::Array(a) => OrderedValue::Array(a.iter().map(|x| x.into()).collect()),
73 Value::Object(_) => OrderedValue::Object,
74 }
75 }
76}
77
78fn id_shard(id: &str) -> String {
82 let mut hash: u32 = 2166136261;
84 for b in id.bytes() {
85 hash ^= b as u32;
86 hash = hash.wrapping_mul(16777619);
87 }
88 format!("{:02x}", hash & 0xff)
89}
90
91pub struct IdIndex {
98 root: PathBuf,
99 mem: Option<Arc<dashmap::DashMap<(String, String), String>>>,
101 write_buf: Arc<dashmap::DashMap<(String, String), Option<String>>>, }
104
105impl IdIndex {
106 pub fn new(db_root: &Path) -> Result<Self> {
107 let root = db_root.join("indexes");
108 fs::create_dir_all(&root)?;
109 Ok(Self { root, mem: None, write_buf: Arc::new(dashmap::DashMap::new()) })
110 }
111
112 pub fn in_memory() -> Self {
114 Self {
115 root: PathBuf::from(":memory:"),
116 mem: Some(Arc::new(dashmap::DashMap::new())),
117 write_buf: Arc::new(dashmap::DashMap::new()),
118 }
119 }
120
121 pub fn flush_write_buf(&self) {
124 if self.mem.is_some() || self.write_buf.is_empty() { return; }
125 use rayon::prelude::*;
126 let entries: Vec<((String, String), Option<String>)> = self.write_buf
128 .iter()
129 .map(|e| (e.key().clone(), e.value().clone()))
130 .collect();
131 entries.par_iter().for_each(|((coll, id), hash_opt)| {
132 match hash_opt {
133 Some(hash) => {
134 let path = self.path(coll, id);
136 if let Some(parent) = path.parent() {
137 let _ = fs::create_dir_all(parent);
138 }
139 let tmp = path.with_extension("tmp");
140 if fs::write(&tmp, hash).is_ok() {
141 let _ = fs::rename(&tmp, &path);
142 }
143 }
144 None => {
145 let path = self.path(coll, id);
147 let _ = fs::remove_file(&path);
148 }
149 }
150 });
151 for ((coll, id), _) in &entries {
153 self.write_buf.remove(&(coll.clone(), id.clone()));
154 }
155 }
156
157 fn path(&self, coll: &str, id: &str) -> PathBuf {
158 let shard = id_shard(id);
163 self.root.join(coll).join("id").join(&shard).join(id)
164 }
165
166 pub fn get(&self, coll: &str, id: &str) -> Option<String> {
169 if let Some(ref mem) = self.mem {
170 return mem.get(&(coll.to_string(), id.to_string())).map(|v| v.clone());
171 }
172 let key = (coll.to_string(), id.to_string());
174 if let Some(entry) = self.write_buf.get(&key) {
175 return entry.value().clone(); }
177 let content = fs::read_to_string(self.path(coll, id)).ok()?;
179 let h = content.trim().to_string();
180 if h.is_empty() { None } else { Some(h) }
181 }
182
183 pub fn set(&self, coll: &str, id: &str, hash: &str) -> Result<()> {
187 if let Some(ref mem) = self.mem {
188 mem.insert((coll.to_string(), id.to_string()), hash.to_string());
189 return Ok(());
190 }
191 self.write_buf.insert(
193 (coll.to_string(), id.to_string()),
194 Some(hash.to_string()),
195 );
196 Ok(())
197 }
198
199 pub fn list_ids(&self, coll: &str) -> Vec<String> {
201 if let Some(ref mem) = self.mem {
202 return mem.iter()
203 .filter(|e| e.key().0 == coll)
204 .map(|e| e.key().1.clone())
205 .collect();
206 }
207 let id_root = self.root.join(coll).join("id");
209 fs::read_dir(&id_root)
211 .into_iter()
212 .flatten()
213 .filter_map(|e| e.ok())
214 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
215 .flat_map(|shard_dir| {
216 fs::read_dir(shard_dir.path())
217 .into_iter()
218 .flatten()
219 .filter_map(|e| e.ok())
220 .filter_map(|e| {
221 let name = e.file_name().to_string_lossy().to_string();
222 if name.ends_with(".tmp") { return None; }
223 Some(name)
224 })
225 .collect::<Vec<_>>()
226 })
227 .collect::<std::collections::HashSet<_>>()
228 .into_iter()
229 .chain(
231 self.write_buf.iter()
232 .filter(|e| e.key().0 == coll && e.value().is_some())
233 .map(|e| e.key().1.clone())
234 )
235 .collect::<std::collections::HashSet<_>>()
236 .into_iter()
237 .filter(|id| {
238 self.write_buf.get(&(coll.to_string(), id.clone()))
240 .map(|v| v.is_some())
241 .unwrap_or(true)
242 })
243 .collect()
244 }
245
246 pub fn remove(&self, coll: &str, id: &str) -> Result<()> {
249 if let Some(ref mem) = self.mem {
250 mem.remove(&(coll.to_string(), id.to_string()));
251 return Ok(());
252 }
253 self.write_buf.insert((coll.to_string(), id.to_string()), None);
255 Ok(())
256 }
257
258 pub fn collections(&self) -> Vec<String> {
260 if let Some(ref mem) = self.mem {
261 let mut colls: Vec<String> = mem.iter()
262 .map(|e| e.key().0.clone())
263 .collect::<std::collections::HashSet<_>>()
264 .into_iter().collect();
265 colls.sort();
266 return colls;
267 }
268 fs::read_dir(&self.root)
269 .into_iter()
270 .flatten()
271 .filter_map(|e| e.ok())
272 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
273 .map(|e| e.file_name().to_string_lossy().to_string())
274 .collect()
275 }
276}
277
278pub struct SortedIndexes {
281 inner: DashMap<(String, String), BTreeMap<OrderedValue, Vec<String>>>,
283}
284
285impl SortedIndexes {
286 pub fn new() -> Self {
287 Self { inner: DashMap::new() }
288 }
289
290 pub fn ensure(&self, coll: &str, field: &str) {
293 self.inner
294 .entry((coll.to_string(), field.to_string()))
295 .or_default();
296 }
297
298 pub fn insert(&self, coll: &str, field: &str, value: &Value, hash: &str) {
300 let key = (coll.to_string(), field.to_string());
301 if let Some(mut idx) = self.inner.get_mut(&key) {
302 let ov = OrderedValue::from(value);
303 idx.entry(ov)
304 .or_default()
305 .push(hash.to_string());
306 }
307 }
308
309 pub fn remove(&self, coll: &str, field: &str, value: &Value, hash: &str) {
311 let key = (coll.to_string(), field.to_string());
312 if let Some(mut idx) = self.inner.get_mut(&key) {
313 let ov = OrderedValue::from(value);
314 if let Some(hashes) = idx.get_mut(&ov) {
315 hashes.retain(|h| h != hash);
316 if hashes.is_empty() { idx.remove(&ov); }
317 }
318 }
319 }
320
321 pub fn top_k_asc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
323 let key = (coll.to_string(), field.to_string());
324 self.inner.get(&key).map(|idx| {
325 idx.values().flat_map(|v| v.iter().cloned()).take(k).collect()
326 }).unwrap_or_default()
327 }
328
329 pub fn top_k_desc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
331 let key = (coll.to_string(), field.to_string());
332 self.inner.get(&key).map(|idx| {
333 idx.values().rev().flat_map(|v| v.iter().cloned()).take(k).collect()
334 }).unwrap_or_default()
335 }
336
337 pub fn has(&self, coll: &str, field: &str) -> bool {
339 self.inner.contains_key(&(coll.to_string(), field.to_string()))
340 }
341
342 pub fn is_empty(&self) -> bool {
344 self.inner.is_empty()
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351 use tempfile::tempdir;
352
353 #[test]
354 fn id_index_roundtrip() {
355 let dir = tempdir().unwrap();
356 let idx = IdIndex::new(dir.path()).unwrap();
357 idx.set("blocks", "618000", "abcdef1234").unwrap();
358 assert_eq!(idx.get("blocks", "618000"), Some("abcdef1234".to_string()));
359 }
360
361 #[test]
362 fn ordered_value_ordering() {
363 use OrderedValue::*;
364 assert!(Null < Bool(false));
365 assert!(Bool(false) < Bool(true));
366 assert!(Bool(true) < Number(0.0));
367 assert!(Number(1.0) < Number(2.0));
368 assert!(Number(2.0) < Str("a".to_string()));
369 assert!(Str("a".to_string()) < Str("b".to_string()));
370 }
371
372 #[test]
373 fn sorted_index_top_k() {
374 let idx = SortedIndexes::new();
375 idx.ensure("blocks", "height");
376 idx.insert("blocks", "height", &serde_json::json!(3), "hash3");
377 idx.insert("blocks", "height", &serde_json::json!(1), "hash1");
378 idx.insert("blocks", "height", &serde_json::json!(2), "hash2");
379 let asc = idx.top_k_asc("blocks", "height", 2);
380 assert_eq!(asc, vec!["hash1", "hash2"]);
381 let desc = idx.top_k_desc("blocks", "height", 2);
382 assert_eq!(desc, vec!["hash3", "hash2"]);
383 }
384}