extern crate ordered_float;
extern crate indexmap;
#[macro_use]
extern crate serde;
#[macro_use]
extern crate serde_json;
extern crate rayon;
extern crate multimap;
extern crate glob;
use ordered_float::OrderedFloat;
use indexmap::map::IndexMap;
use serde_json::Value;
use json_dotpath::DotPaths;
use std::cmp::Ordering;
use rayon::prelude::*;
use serde::{Serialize, Deserialize};
use std::collections::{HashSet, HashMap};
use std::sync::{Mutex, RwLock, Arc};
use std::borrow::Borrow;
use std::ops::{DerefMut, Deref};
use multimap::MultiMap;
use std::hash::{Hash, Hasher};
use std::collections::hash_map::RandomState;
use glob::{Pattern, PatternError};
#[derive(Serialize, Deserialize, Clone)]
pub enum Indexer {
Json(IndexJson),
Integer(IndexInt),
Float(IndexFloat),
String(IndexString),
}
pub enum QueryOperator {
EQ,
LT,
GT,
LIKE,
UNKNOWN,
}
impl QueryOperator {
pub fn from_str(op: &str) -> Self {
let op = op.to_lowercase();
match op.as_str() {
"eq" => QueryOperator::EQ,
"lt" => QueryOperator::LT,
"gt" => QueryOperator::GT,
"like" => QueryOperator::LIKE,
_ => { QueryOperator::UNKNOWN }
}
}
}
#[derive(Serialize, Deserialize, Clone)]
pub enum IndexOrd {
ASC,
DESC,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct IndexInt {
pub ordering: IndexOrd
}
#[derive(Serialize, Deserialize, Clone)]
pub struct IndexString {
pub ordering: IndexOrd
}
#[derive(Serialize, Deserialize, Clone)]
pub struct IndexFloat {
pub ordering: IndexOrd
}
#[derive(Serialize, Deserialize, Clone)]
pub struct IndexJson {
pub path_orders: Vec<JsonPathOrder>
}
#[derive(Serialize, Deserialize, Clone)]
pub struct JsonPathOrder {
pub path: String,
pub ordering: IndexOrd,
}
#[derive(Serialize, Deserialize, Clone)]
struct FloatKey(f64);
impl Hash for FloatKey {
fn hash<H: Hasher>(&self, state: &mut H) {
OrderedFloat(self.0).hash(state)
}
}
impl PartialEq for FloatKey {
fn eq(&self, other: &Self) -> bool {
OrderedFloat(self.0).eq(&OrderedFloat(other.0))
}
}
impl Eq for FloatKey {}
#[derive(Serialize, Deserialize, Clone)]
pub struct Index {
pub indexer: Indexer,
int_tree: Arc<RwLock<HashMap<String, MultiMap<i64, (String, Value)>>>>,
str_tree: Arc<RwLock<HashMap<String, MultiMap<String, (String, Value)>>>>,
float_tree: Arc<RwLock<HashMap<String, MultiMap<FloatKey, (String, Value)>>>>,
rs: IndexMap<String, Value>,
ws: Arc<RwLock<IndexMap<String, Value>>>,
}
pub trait BatchTransaction {
fn insert(&mut self, k: String, v: Value);
fn update(&mut self, k: String, v: Value);
fn delete(&mut self, k: String);
fn commit(&mut self);
}
pub struct Batch<'a> {
index: &'a mut Index,
inserts: HashMap<String, Value>,
updates: HashMap<String, Value>,
deletes: HashSet<String>,
}
impl<'a> Batch<'a> {
fn new(idx: &'a mut Index) -> Self {
Batch {
index: idx,
inserts: HashMap::new(),
updates: HashMap::new(),
deletes: HashSet::new(),
}
}
fn filter(&'a self, k: &'a String, v: &'a Value) -> Result<(&'a String, &'a Value), ()> {
let indexer = self.index.indexer.clone();
match indexer {
Indexer::Json(j) => {
let mut found = 0;
j.path_orders.iter().for_each(|p| {
let value = v.dot_get_or(&p.path, Value::Null).unwrap_or(Value::Null);
if !value.is_null() {
found += 1
}
});
if found == j.path_orders.len() {
Ok((k, v))
} else {
Err(())
}
}
Indexer::Integer(_) => {
if v.is_i64() {
Ok((k, v))
} else {
Err(())
}
}
Indexer::Float(_) => {
if v.is_f64() {
Ok((k, v))
} else {
Err(())
}
}
Indexer::String(_) => {
if v.is_string() {
Ok((k, v))
} else {
Err(())
}
}
}
}
}
impl<'a> BatchTransaction for Batch<'a> {
fn insert(&mut self, k: String, v: Value) {
match self.filter(&k, &v) {
Ok((k, v)) => {
self.inserts.insert(k.to_owned(), v.clone());
}
Err(_) => {}
};
}
fn update(&mut self, k: String, v: Value) {
match self.filter(&k, &v) {
Ok((k, v)) => {
self.updates.insert(k.to_owned(), v.clone());
}
Err(_) => {}
};
}
fn delete(&mut self, k: String) {
self.deletes.insert(k);
}
fn commit(&mut self) {
self.inserts.iter().for_each(|(k, v)| {
let mut collection = self.index.ws.write().unwrap();
collection.insert(k.to_string(), v.clone());
});
self.updates.iter().for_each(|(k, v)| {
let mut collection = self.index.ws.write().unwrap();
if collection.contains_key(k) {
collection.insert(k.to_string(), v.clone());
}
});
self.deletes.iter().for_each(|k| {
let mut collection = self.index.ws.write().unwrap();
collection.remove(k);
});
self.inserts.clear();
self.inserts.shrink_to_fit();
self.updates.clear();
self.updates.shrink_to_fit();
self.deletes.clear();
self.deletes.shrink_to_fit();
{
self.index.build();
}
}
}
impl<'a> Index {
pub fn new(indexer: Indexer) -> Self {
let mut collection: IndexMap<String, Value> = IndexMap::new();
let mut idx = Index {
indexer,
ws: Arc::new(RwLock::new(collection.clone())),
rs: collection.clone(),
int_tree: Arc::new(RwLock::new(HashMap::new())),
str_tree: Arc::new(RwLock::new(HashMap::new())),
float_tree: Arc::new(RwLock::new(HashMap::new())),
};
drop(collection);
idx.build();
idx
}
pub fn insert(&mut self, k: String, v: Value) {
match self.filter(&k, &v) {
Ok(e) => {
let mut collection = self.ws.write().unwrap();
let (key, value) = e;
collection.insert(key.to_string(), value.clone());
}
Err(_) => {}
}
{
self.build();
}
}
pub fn remove(&mut self, k: &String) {
let mut write_side = self.ws.write().unwrap();
write_side.remove(k);
}
pub fn batch(&mut self, f: impl Fn(&mut Batch) + std::marker::Sync + std::marker::Send) {
let mut batch = Batch::new(self);
f(&mut batch);
}
pub fn iter(&self, f: impl Fn((&String, &Value)) + std::marker::Sync + std::marker::Send) {
self.rs.iter().for_each(f);
}
pub fn par_iter(&self, f: impl Fn((&String, &Value)) + std::marker::Sync + std::marker::Send) {
self.rs.par_iter().for_each(f);
}
pub fn find_all(&self, by: &str, cond: &str, eval: Value) -> Index {
let op = QueryOperator::from_str(cond);
let mut indexer = self.indexer.clone();
let matches = match &indexer {
Indexer::Json(j) => {
if eval.is_i64() {
let q = eval.as_i64().unwrap();
self.query_int_index(by, q, op)
} else if eval.is_f64() {
let q = eval.as_f64().unwrap();
self.query_float_index(by, q, op)
} else if eval.is_string() {
let q = String::from(eval.as_str().unwrap());
self.query_string_index(by, q, op)
} else {
vec![]
}
}
Indexer::Integer(i) => {
let q = eval.as_i64().unwrap();
self.query_int_index(by, q, op)
}
Indexer::Float(f) => {
let q = eval.as_f64().unwrap();
self.query_float_index(by, q, op)
}
Indexer::String(s) => {
let q = String::from(eval.as_str().unwrap());
self.query_string_index(by, q, op)
}
};
let mut new_index = Index::new(indexer);
new_index.batch(|b| {
matches.iter().for_each(|(k, v)| {
b.insert(k.to_owned(), v.clone())
});
b.commit()
});
new_index
}
fn query_int_index(&self, key: &str, q: i64, op: QueryOperator) -> Vec<(String, Value)> {
let empty_map = MultiMap::new();
let empty_matches: Vec<(String, Value)> = Vec::new();
let read_guard = self.int_tree.read().unwrap();
let int_tree_reader = read_guard.get(key).unwrap_or(&empty_map);
match op {
QueryOperator::EQ => {
int_tree_reader.get_vec(&q).unwrap_or(&empty_matches).to_vec()
}
QueryOperator::LT => {
let mut matches: Vec<(String, Value)> = vec![];
int_tree_reader.iter_all().for_each(|(k, v)| {
if k.lt(&q) {
matches.extend_from_slice(v)
}
});
matches
}
QueryOperator::GT => {
let mut matches = vec![];
int_tree_reader.iter_all().for_each(|(k, v)| {
if k.gt(&q) {
matches.extend_from_slice(v)
}
});
matches
}
QueryOperator::LIKE => { vec![]}
QueryOperator::UNKNOWN => { vec![] }
}
}
fn query_float_index(&self, key: &str, q: f64, op: QueryOperator) -> Vec<(String, Value)> {
let empty_map = MultiMap::new();
let read_guard = self.float_tree.read().unwrap();
let mut float_tree_reader = read_guard.get(key).unwrap_or(&empty_map);
let empty_matches: Vec<(String, Value)> = Vec::new();
match op {
QueryOperator::EQ => {
float_tree_reader.get_vec(&FloatKey(OrderedFloat(q).0)).unwrap_or(&empty_matches).to_vec()
}
QueryOperator::LT => {
let mut matches: Vec<(String, Value)> = vec![];
float_tree_reader.iter_all().for_each(|(k, v)| {
if OrderedFloat(k.0).lt(&OrderedFloat(q)) {
matches.extend_from_slice(v)
}
});
matches
}
QueryOperator::GT => {
let mut matches: Vec<(String, Value)> = vec![];
float_tree_reader.iter_all().for_each(|(k, v)| {
if OrderedFloat(k.0).gt(&OrderedFloat(q)) {
matches.extend_from_slice(v)
}
});
matches
}
QueryOperator::LIKE => {vec![]}
QueryOperator::UNKNOWN => { vec![] }
}
}
fn query_string_index(&self, key: &str, q: String, op: QueryOperator) -> Vec<(String, Value)> {
let empty_map = MultiMap::new();
let empty_matches: Vec<(String, Value)> = Vec::new();
let read_guard = self.str_tree.read().unwrap();
let mut str_tree_reader = read_guard.get(key).unwrap_or(&empty_map);
match op {
QueryOperator::EQ => {
str_tree_reader.get_vec(&q).unwrap_or(&empty_matches).to_vec()
}
QueryOperator::LT => {
let mut matches: Vec<(String, Value)> = vec![];
str_tree_reader.iter_all().for_each(|(k, v)| {
if k.lt(&q) {
matches.extend_from_slice(v)
}
});
matches
}
QueryOperator::GT => {
let mut matches: Vec<(String, Value)> = vec![];
str_tree_reader.iter_all().for_each(|(k, v)| {
if k.gt(&q) {
matches.extend_from_slice(v)
}
});
matches
}
QueryOperator::LIKE => {
let mut matches: Vec<(String, Value)> = vec![];
let options = glob::MatchOptions {
case_sensitive: false,
require_literal_separator: false,
require_literal_leading_dot: false
};
let glob_matcher = match glob::Pattern::new(&q) {
Ok(m) => {m},
Err(_) => {
return vec![]
},
};
str_tree_reader.iter_all().for_each(|(k, v)| {
if glob_matcher.matches_with(k, options) {
matches.extend_from_slice(v)
}
});
matches
}
QueryOperator::UNKNOWN => {
vec![]
}
}
}
fn insert_int_index(&self, field: &str, iv: &Value, k: &str, v: &Value) {
let mut int_tree_writer = self.int_tree.write().unwrap();
let key = iv.as_i64().unwrap();
match int_tree_writer.get_mut(field) {
None => {
let mut m = MultiMap::new();
m.insert(key, (k.to_string(), v.clone()));
int_tree_writer.insert(field.to_string(), m);
}
Some(m) => {
m.insert(key, (k.to_string(), v.clone()))
}
}
}
fn insert_float_index(&self, field: &str, iv: &Value, k: &str, v: &Value) {
let mut float_tree_writer = self.float_tree.write().unwrap();
let key = iv.as_f64().unwrap();
match float_tree_writer.get_mut(field) {
None => {
let mut m = MultiMap::new();
m.insert(FloatKey(key), (k.to_string(), v.clone()));
float_tree_writer.insert(field.to_string(), m);
}
Some(m) => {
m.insert(FloatKey(key), (k.to_string(), v.clone()))
}
}
}
fn insert_string_index(&self, field: &str, iv: &Value, k: &str, v: &Value) {
let mut str_tree_writer = self.str_tree.write().unwrap();
let key = String::from(iv.as_str().unwrap());
match str_tree_writer.get_mut(field) {
None => {
let mut m = MultiMap::new();
m.insert(key, (k.to_string(), v.clone()));
str_tree_writer.insert(field.to_string(), m);
}
Some(m) => {
m.insert(key, (k.to_string(), v.clone()))
}
}
}
pub fn read(&self) -> &IndexMap<String, Value> {
&self.rs
}
fn filter(&mut self, k: &'a String, v: &'a Value) -> Result<(&'a String, &'a Value), ()> {
match &self.indexer {
Indexer::Json(j) => {
let mut found = 0;
j.path_orders.iter().for_each(|p| {
let value = v.dot_get_or(&p.path, Value::Null).unwrap_or(Value::Null);
if !value.is_null() {
found += 1
}
});
if found == j.path_orders.len() {
Ok((k, v))
} else {
Err(())
}
}
Indexer::Integer(_) => {
if v.is_i64() {
Ok((k, v))
} else {
Err(())
}
}
Indexer::Float(_) => {
if v.is_f64() {
Ok((k, v))
} else {
Err(())
}
}
Indexer::String(_) => {
if v.is_string() {
Ok((k, v))
} else {
Err(())
}
}
}
}
fn build(&mut self) {
let mut indexer = self.indexer.clone();
match indexer {
Indexer::Json(j) => {
self.ws.write().unwrap().par_sort_by(|_, lhs, _, rhs| {
let ordering: Vec<Ordering> = j.path_orders.iter().map(|path_order| {
let lvalue = lhs.dot_get_or(&path_order.path, Value::Null).unwrap_or(Value::Null);
let rvalue = rhs.dot_get_or(&path_order.path, Value::Null).unwrap_or(Value::Null);
match (lvalue, rvalue) {
(Value::String(ls), Value::String(rs)) => {
match path_order.ordering {
IndexOrd::ASC => {
ls.cmp(&rs)
}
IndexOrd::DESC => {
rs.cmp(&ls)
}
}
}
(Value::Number(ls), Value::Number(rs)) => {
let ln = ls.as_f64().unwrap_or(0.0);
let rn = rs.as_f64().unwrap_or(0.0);
match path_order.ordering {
IndexOrd::ASC => {
OrderedFloat(ln).cmp(&OrderedFloat(rn))
}
IndexOrd::DESC => {
OrderedFloat(rn).cmp(&OrderedFloat(ln))
}
}
}
_ => {
Ordering::Equal
}
}
}).collect();
let mut itr = ordering.iter();
let mut order_chain = itr.next().unwrap_or(&Ordering::Equal).to_owned();
while let Some(t) = itr.next() {
order_chain = order_chain.then(t.to_owned()).to_owned();
}
order_chain
});
}
Indexer::Integer(i) => {
self.ws.write().unwrap().par_sort_by(|_, lhs, _, rhs| {
let lvalue = lhs.as_i64().unwrap_or(0);
let rvalue = rhs.as_i64().unwrap_or(0);
match i.ordering {
IndexOrd::ASC => {
lvalue.cmp(&rvalue)
}
IndexOrd::DESC => {
rvalue.cmp(&lvalue)
}
}
});
}
Indexer::Float(f) => {
self.ws.write().unwrap().par_sort_by(|_, lhs, _, rhs| {
let lvalue = lhs.as_f64().unwrap_or(0.0);
let rvalue = rhs.as_f64().unwrap_or(0.0);
match f.ordering {
IndexOrd::ASC => {
OrderedFloat(lvalue).cmp(&OrderedFloat(rvalue))
}
IndexOrd::DESC => {
OrderedFloat(rvalue).cmp(&OrderedFloat(lvalue))
}
}
});
}
Indexer::String(s) => {
self.ws.write().unwrap().par_sort_by(|_, lhs, _, rhs| {
let lvalue = lhs.as_str().unwrap_or("");
let rvalue = rhs.as_str().unwrap_or("");
match s.ordering {
IndexOrd::ASC => {
lvalue.cmp(&rvalue)
}
IndexOrd::DESC => {
rvalue.cmp(&lvalue)
}
}
});
}
}
let reader = self.ws.read().unwrap();
{
let mut int_tree_writer = self.int_tree.write().unwrap();
let mut float_tree_writer = self.float_tree.write().unwrap();
let mut str_tree_writer = self.str_tree.write().unwrap();
int_tree_writer.clear();
float_tree_writer.clear();
str_tree_writer.clear();
}
reader.par_iter().for_each(|(k, v)| {
let mut indexer = self.indexer.clone();
match indexer {
Indexer::Json(j) => {
j.path_orders.iter().for_each(|path_order| {
let value: Value = v.dot_get_or(&path_order.path, Value::Null).unwrap_or(Value::Null);
if value.is_i64() {
self.insert_int_index(&path_order.path, &value, k, v)
} else if value.is_f64() {
self.insert_float_index(&path_order.path, &value, k, v)
} else if value.is_string() {
self.insert_string_index(&path_order.path, &value, k, v)
}
})
}
Indexer::Integer(i) => {
let value: Value = v.clone();
self.insert_int_index("*", &value, k, v)
}
Indexer::Float(f) => {
let value: Value = v.clone();
self.insert_float_index("*", &value, k, v)
}
Indexer::String(s) => {
let value: Value = v.clone();
self.insert_string_index("*", &value, k, v)
}
}
});
self.rs.clone_from(reader.deref())
}
}
#[cfg(test)]
#[macro_use]
extern crate log;
#[cfg(test)]
mod tests;