use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Result;
use dashmap::DashMap;
use serde_json::Value;
#[derive(Debug, Clone, PartialEq)]
pub enum OrderedValue {
Null,
Bool(bool),
Number(f64), Str(String),
Array(Vec<OrderedValue>),
Object, }
impl Eq for OrderedValue {}
impl PartialOrd for OrderedValue {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for OrderedValue {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use OrderedValue::*;
use std::cmp::Ordering::*;
match (self, other) {
(Null, Null) => Equal,
(Null, _) => Less,
(_, Null) => Greater,
(Bool(a), Bool(b)) => a.cmp(b),
(Bool(_), _) => Less,
(_, Bool(_)) => Greater,
(Number(a), Number(b)) => a.total_cmp(b),
(Number(_), _) => Less,
(_, Number(_)) => Greater,
(Str(a), Str(b)) => a.cmp(b),
(Str(_), _) => Less,
(_, Str(_)) => Greater,
(Array(a), Array(b)) => a.cmp(b),
(Array(_), _) => Less,
(_, Array(_)) => Greater,
(Object, Object) => Equal,
}
}
}
impl From<&Value> for OrderedValue {
fn from(v: &Value) -> Self {
match v {
Value::Null => OrderedValue::Null,
Value::Bool(b) => OrderedValue::Bool(*b),
Value::Number(n) => OrderedValue::Number(n.as_f64().unwrap_or(f64::NAN)),
Value::String(s) => OrderedValue::Str(s.clone()),
Value::Array(a) => OrderedValue::Array(a.iter().map(|x| x.into()).collect()),
Value::Object(_) => OrderedValue::Object,
}
}
}
fn id_shard(id: &str) -> String {
let mut hash: u32 = 2166136261;
for b in id.bytes() {
hash ^= b as u32;
hash = hash.wrapping_mul(16777619);
}
format!("{:02x}", hash & 0xff)
}
pub struct IdIndex {
root: PathBuf,
mem: Option<Arc<dashmap::DashMap<(String, String), String>>>,
write_buf: Arc<dashmap::DashMap<(String, String), Option<String>>>, }
impl IdIndex {
pub fn new(db_root: &Path) -> Result<Self> {
let root = db_root.join("indexes");
fs::create_dir_all(&root)?;
Ok(Self { root, mem: None, write_buf: Arc::new(dashmap::DashMap::new()) })
}
pub fn in_memory() -> Self {
Self {
root: PathBuf::from(":memory:"),
mem: Some(Arc::new(dashmap::DashMap::new())),
write_buf: Arc::new(dashmap::DashMap::new()),
}
}
pub fn flush_write_buf(&self) {
if self.mem.is_some() || self.write_buf.is_empty() { return; }
use rayon::prelude::*;
let entries: Vec<((String, String), Option<String>)> = self.write_buf
.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
entries.par_iter().for_each(|((coll, id), hash_opt)| {
match hash_opt {
Some(hash) => {
let path = self.path(coll, id);
if let Some(parent) = path.parent() {
let _ = fs::create_dir_all(parent);
}
let tmp = path.with_extension("tmp");
if fs::write(&tmp, hash).is_ok() {
let _ = fs::rename(&tmp, &path);
}
}
None => {
let path = self.path(coll, id);
let _ = fs::remove_file(&path);
}
}
});
for ((coll, id), _) in &entries {
self.write_buf.remove(&(coll.clone(), id.clone()));
}
}
fn path(&self, coll: &str, id: &str) -> PathBuf {
let shard = id_shard(id);
self.root.join(coll).join("id").join(&shard).join(id)
}
pub fn get(&self, coll: &str, id: &str) -> Option<String> {
if let Some(ref mem) = self.mem {
return mem.get(&(coll.to_string(), id.to_string())).map(|v| v.clone());
}
let key = (coll.to_string(), id.to_string());
if let Some(entry) = self.write_buf.get(&key) {
return entry.value().clone(); }
let content = fs::read_to_string(self.path(coll, id)).ok()?;
let h = content.trim().to_string();
if h.is_empty() { None } else { Some(h) }
}
pub fn set(&self, coll: &str, id: &str, hash: &str) -> Result<()> {
if let Some(ref mem) = self.mem {
mem.insert((coll.to_string(), id.to_string()), hash.to_string());
return Ok(());
}
self.write_buf.insert(
(coll.to_string(), id.to_string()),
Some(hash.to_string()),
);
Ok(())
}
pub fn list_ids(&self, coll: &str) -> Vec<String> {
if let Some(ref mem) = self.mem {
return mem.iter()
.filter(|e| e.key().0 == coll)
.map(|e| e.key().1.clone())
.collect();
}
let id_root = self.root.join(coll).join("id");
fs::read_dir(&id_root)
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
.flat_map(|shard_dir| {
fs::read_dir(shard_dir.path())
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.filter_map(|e| {
let name = e.file_name().to_string_lossy().to_string();
if name.ends_with(".tmp") { return None; }
Some(name)
})
.collect::<Vec<_>>()
})
.collect::<std::collections::HashSet<_>>()
.into_iter()
.chain(
self.write_buf.iter()
.filter(|e| e.key().0 == coll && e.value().is_some())
.map(|e| e.key().1.clone())
)
.collect::<std::collections::HashSet<_>>()
.into_iter()
.filter(|id| {
self.write_buf.get(&(coll.to_string(), id.clone()))
.map(|v| v.is_some())
.unwrap_or(true)
})
.collect()
}
pub fn remove(&self, coll: &str, id: &str) -> Result<()> {
if let Some(ref mem) = self.mem {
mem.remove(&(coll.to_string(), id.to_string()));
return Ok(());
}
self.write_buf.insert((coll.to_string(), id.to_string()), None);
Ok(())
}
pub fn collections(&self) -> Vec<String> {
if let Some(ref mem) = self.mem {
let mut colls: Vec<String> = mem.iter()
.map(|e| e.key().0.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter().collect();
colls.sort();
return colls;
}
fs::read_dir(&self.root)
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
.map(|e| e.file_name().to_string_lossy().to_string())
.collect()
}
}
pub struct SortedIndexes {
inner: DashMap<(String, String), BTreeMap<OrderedValue, Vec<String>>>,
}
impl SortedIndexes {
pub fn new() -> Self {
Self { inner: DashMap::new() }
}
pub fn ensure(&self, coll: &str, field: &str) {
self.inner
.entry((coll.to_string(), field.to_string()))
.or_default();
}
pub fn insert(&self, coll: &str, field: &str, value: &Value, hash: &str) {
let key = (coll.to_string(), field.to_string());
if let Some(mut idx) = self.inner.get_mut(&key) {
let ov = OrderedValue::from(value);
idx.entry(ov)
.or_default()
.push(hash.to_string());
}
}
pub fn remove(&self, coll: &str, field: &str, value: &Value, hash: &str) {
let key = (coll.to_string(), field.to_string());
if let Some(mut idx) = self.inner.get_mut(&key) {
let ov = OrderedValue::from(value);
if let Some(hashes) = idx.get_mut(&ov) {
hashes.retain(|h| h != hash);
if hashes.is_empty() { idx.remove(&ov); }
}
}
}
pub fn top_k_asc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
let key = (coll.to_string(), field.to_string());
self.inner.get(&key).map(|idx| {
idx.values().flat_map(|v| v.iter().cloned()).take(k).collect()
}).unwrap_or_default()
}
pub fn top_k_desc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
let key = (coll.to_string(), field.to_string());
self.inner.get(&key).map(|idx| {
idx.values().rev().flat_map(|v| v.iter().cloned()).take(k).collect()
}).unwrap_or_default()
}
pub fn has(&self, coll: &str, field: &str) -> bool {
self.inner.contains_key(&(coll.to_string(), field.to_string()))
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn id_index_roundtrip() {
let dir = tempdir().unwrap();
let idx = IdIndex::new(dir.path()).unwrap();
idx.set("blocks", "618000", "abcdef1234").unwrap();
assert_eq!(idx.get("blocks", "618000"), Some("abcdef1234".to_string()));
}
#[test]
fn ordered_value_ordering() {
use OrderedValue::*;
assert!(Null < Bool(false));
assert!(Bool(false) < Bool(true));
assert!(Bool(true) < Number(0.0));
assert!(Number(1.0) < Number(2.0));
assert!(Number(2.0) < Str("a".to_string()));
assert!(Str("a".to_string()) < Str("b".to_string()));
}
#[test]
fn sorted_index_top_k() {
let idx = SortedIndexes::new();
idx.ensure("blocks", "height");
idx.insert("blocks", "height", &serde_json::json!(3), "hash3");
idx.insert("blocks", "height", &serde_json::json!(1), "hash1");
idx.insert("blocks", "height", &serde_json::json!(2), "hash2");
let asc = idx.top_k_asc("blocks", "height", 2);
assert_eq!(asc, vec!["hash1", "hash2"]);
let desc = idx.top_k_desc("blocks", "height", 2);
assert_eq!(desc, vec!["hash3", "hash2"]);
}
}