use sparrowdb_catalog::catalog::Catalog;
use sparrowdb_common::col_id_of;
use sparrowdb_storage::csr::CsrForward;
use sparrowdb_storage::edge_store::{EdgeStore, RelTableId};
use sparrowdb_storage::node_store::{NodeStore, Value};
use std::collections::{HashMap, HashSet};
use std::path::Path;
pub fn fnv1a_col_id(key: &str) -> u32 {
col_id_of(key)
}
pub fn cypher_escape_string(s: &str) -> String {
s.replace('\\', "\\\\").replace('\'', "\\'")
}
pub(crate) fn literal_to_value(lit: &sparrowdb_cypher::ast::Literal) -> Value {
use sparrowdb_cypher::ast::Literal;
match lit {
Literal::Int(n) => Value::Int64(*n),
Literal::Float(f) => Value::Float(*f),
Literal::Bool(b) => Value::Int64(if *b { 1 } else { 0 }),
Literal::String(s) => Value::Bytes(s.as_bytes().to_vec()),
Literal::Null | Literal::Param(_) => Value::Int64(0),
}
}
pub(crate) fn expr_to_value(expr: &sparrowdb_cypher::ast::Expr) -> Value {
use sparrowdb_cypher::ast::Expr;
match expr {
Expr::Literal(lit) => literal_to_value(lit),
_ => Value::Int64(0),
}
}
pub(crate) fn literal_to_value_with_params(
lit: &sparrowdb_cypher::ast::Literal,
params: &HashMap<String, sparrowdb_execution::Value>,
) -> crate::Result<Value> {
use sparrowdb_cypher::ast::Literal;
match lit {
Literal::Int(n) => Ok(Value::Int64(*n)),
Literal::Float(f) => Ok(Value::Float(*f)),
Literal::Bool(b) => Ok(Value::Int64(if *b { 1 } else { 0 })),
Literal::String(s) => Ok(Value::Bytes(s.as_bytes().to_vec())),
Literal::Null => Ok(Value::Int64(0)),
Literal::Param(p) => match params.get(p.as_str()) {
Some(v) => Ok(exec_value_to_storage(v)),
None => Err(sparrowdb_common::Error::InvalidArgument(format!(
"parameter ${p} was referenced in the query but not supplied"
))),
},
}
}
pub(crate) fn expr_to_value_with_params(
expr: &sparrowdb_cypher::ast::Expr,
params: &HashMap<String, sparrowdb_execution::Value>,
) -> crate::Result<Value> {
use sparrowdb_cypher::ast::Expr;
match expr {
Expr::Literal(lit) => literal_to_value_with_params(lit, params),
_ => Err(sparrowdb_common::Error::InvalidArgument(
"property value must be a literal or $parameter".into(),
)),
}
}
pub(crate) fn exec_value_to_storage(v: &sparrowdb_execution::Value) -> Value {
use sparrowdb_execution::Value as EV;
match v {
EV::Int64(n) => Value::Int64(*n),
EV::Float64(f) => Value::Float(*f),
EV::Bool(b) => Value::Int64(if *b { 1 } else { 0 }),
EV::String(s) => Value::Bytes(s.as_bytes().to_vec()),
_ => Value::Int64(0),
}
}
pub(crate) fn storage_value_to_exec(val: &Value) -> sparrowdb_execution::Value {
match val {
Value::Int64(n) => sparrowdb_execution::Value::Int64(*n),
Value::Bytes(b) => {
sparrowdb_execution::Value::String(String::from_utf8_lossy(b).into_owned())
}
Value::Float(f) => sparrowdb_execution::Value::Float64(*f),
}
}
pub(crate) fn eval_expr_merge(
expr: &sparrowdb_cypher::ast::Expr,
vals: &HashMap<String, sparrowdb_execution::Value>,
) -> sparrowdb_execution::Value {
use sparrowdb_cypher::ast::{Expr, Literal};
match expr {
Expr::PropAccess { var, prop } => {
let key = format!("{var}.{prop}");
vals.get(&key)
.cloned()
.unwrap_or(sparrowdb_execution::Value::Null)
}
Expr::Literal(lit) => match lit {
Literal::Int(n) => sparrowdb_execution::Value::Int64(*n),
Literal::Float(f) => sparrowdb_execution::Value::Float64(*f),
Literal::Bool(b) => sparrowdb_execution::Value::Bool(*b),
Literal::String(s) => sparrowdb_execution::Value::String(s.clone()),
Literal::Null | Literal::Param(_) => sparrowdb_execution::Value::Null,
},
Expr::Var(v) => vals
.get(v.as_str())
.cloned()
.unwrap_or(sparrowdb_execution::Value::Null),
_ => sparrowdb_execution::Value::Null,
}
}
pub(crate) fn is_edge_delete_mutation(mm: &sparrowdb_cypher::ast::MatchMutateStatement) -> bool {
let sparrowdb_cypher::ast::Mutation::Delete { var } = &mm.mutation else {
return false;
};
mm.match_patterns
.iter()
.any(|p| p.rels.iter().any(|r| !r.var.is_empty() && &r.var == var))
}
#[inline]
pub(crate) fn is_reserved_label(label: &str) -> bool {
label.starts_with("__SO_")
}
pub(crate) const CONSTRAINTS_FILE: &str = "constraints.bin";
pub(crate) fn save_constraints(
db_root: &Path,
constraints: &HashSet<(u32, u32)>,
) -> crate::Result<()> {
use std::io::Write;
let path = db_root.join(CONSTRAINTS_FILE);
let mut buf = Vec::with_capacity(4 + constraints.len() * 8);
buf.extend_from_slice(&(constraints.len() as u32).to_le_bytes());
for &(label_id, col_id) in constraints {
buf.extend_from_slice(&label_id.to_le_bytes());
buf.extend_from_slice(&col_id.to_le_bytes());
}
let tmp_path = db_root.join("constraints.bin.tmp");
let mut f = std::fs::File::create(&tmp_path)?;
f.write_all(&buf)?;
f.sync_all()?;
std::fs::rename(&tmp_path, &path)?;
Ok(())
}
pub(crate) fn load_constraints(db_root: &Path) -> HashSet<(u32, u32)> {
let path = db_root.join(CONSTRAINTS_FILE);
let data = match std::fs::read(&path) {
Ok(d) => d,
Err(_) => return HashSet::new(),
};
if data.len() < 4 {
return HashSet::new();
}
let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let expected_len = 4 + count * 8;
if data.len() < expected_len {
return HashSet::new();
}
let mut set = HashSet::with_capacity(count);
for i in 0..count {
let off = 4 + i * 8;
let label_id = u32::from_le_bytes([data[off], data[off + 1], data[off + 2], data[off + 3]]);
let col_id =
u32::from_le_bytes([data[off + 4], data[off + 5], data[off + 6], data[off + 7]]);
set.insert((label_id, col_id));
}
set
}
pub(crate) fn build_label_row_counts_from_disk(
catalog: &Catalog,
db_root: &Path,
) -> HashMap<sparrowdb_catalog::catalog::LabelId, usize> {
let store = match NodeStore::open(db_root) {
Ok(s) => s,
Err(_) => return HashMap::new(),
};
catalog
.list_labels()
.unwrap_or_default()
.into_iter()
.filter_map(|(lid, _name)| {
let hwm = store.hwm_for_label(lid as u32).unwrap_or(0);
if hwm > 0 {
Some((lid, hwm as usize))
} else {
None
}
})
.collect()
}
pub(crate) fn open_csr_map(path: &Path) -> HashMap<u32, CsrForward> {
let catalog = match Catalog::open(path) {
Ok(c) => c,
Err(_) => return HashMap::new(),
};
let mut map = HashMap::new();
let mut rel_ids: Vec<u32> = catalog
.list_rel_table_ids()
.into_iter()
.map(|(id, _, _, _)| id as u32)
.collect();
if !rel_ids.contains(&0u32) {
rel_ids.push(0u32);
}
for rid in rel_ids {
if let Ok(store) = EdgeStore::open(path, RelTableId(rid)) {
if let Ok(csr) = store.open_fwd() {
map.insert(rid, csr);
}
}
}
map
}
pub(crate) fn try_open_csr_map(path: &Path) -> crate::Result<HashMap<u32, CsrForward>> {
let catalog = Catalog::open(path)?;
let mut map = HashMap::new();
let mut rel_ids: Vec<u32> = catalog
.list_rel_table_ids()
.into_iter()
.map(|(id, _, _, _)| id as u32)
.collect();
if !rel_ids.contains(&0u32) {
rel_ids.push(0u32);
}
for rid in rel_ids {
if let Ok(store) = EdgeStore::open(path, RelTableId(rid)) {
if let Ok(csr) = store.open_fwd() {
map.insert(rid, csr);
}
}
}
Ok(map)
}
pub(crate) fn dir_size_bytes(dir: &Path) -> u64 {
let mut total: u64 = 0;
let Ok(entries) = std::fs::read_dir(dir) else {
return 0;
};
for e in entries.flatten() {
let p = e.path();
if p.is_dir() {
total += dir_size_bytes(&p);
} else if let Ok(m) = std::fs::metadata(&p) {
total += m.len();
}
}
total
}
pub(crate) fn collect_maintenance_params(
catalog: &Catalog,
node_store: &NodeStore,
db_root: &Path,
) -> Vec<(u32, u64)> {
let rel_table_entries = catalog.list_rel_table_ids();
let mut rel_triples: Vec<(u32, Option<u16>, Option<u16>)> = rel_table_entries
.iter()
.map(|(id, src, dst, _)| (*id as u32, Some(*src), Some(*dst)))
.collect();
if !rel_triples.iter().any(|(id, _, _)| *id == 0u32) {
rel_triples.push((0u32, None, None));
}
let global_max_hwm: u64 = catalog
.list_labels()
.unwrap_or_default()
.iter()
.map(|(label_id, _name)| node_store.hwm_for_label(*label_id as u32).unwrap_or(0))
.max()
.unwrap_or(0);
rel_triples
.iter()
.map(|&(rel_id, src_label, dst_label)| {
let hwm_n_nodes = match (src_label, dst_label) {
(Some(src), Some(dst)) => {
let src_hwm = node_store.hwm_for_label(src as u32).unwrap_or(0);
let dst_hwm = node_store.hwm_for_label(dst as u32).unwrap_or(0);
src_hwm.max(dst_hwm)
}
_ => global_max_hwm,
};
let delta_max: u64 = EdgeStore::open(db_root, RelTableId(rel_id))
.ok()
.and_then(|s| s.read_delta().ok())
.map(|records| {
records
.iter()
.flat_map(|r| {
let src_slot = r.src.0 & 0xFFFF_FFFF;
let dst_slot = r.dst.0 & 0xFFFF_FFFF;
[src_slot, dst_slot].into_iter()
})
.max()
.map(|max_slot| max_slot + 1)
.unwrap_or(0)
})
.unwrap_or(0);
let n_nodes = hwm_n_nodes.max(delta_max).max(1);
(rel_id, n_nodes)
})
.collect()
}