use std::path::Path;
use rocksdb::{ColumnFamilyDescriptor, Direction, IteratorMode, Options, WriteBatch, DB};
use crate::types::DbError;
const CF_NODES: &str = "nodes";
const CF_EDGES: &str = "edges";
const CF_ADJ_OUT: &str = "adj_out";
const CF_ADJ_IN: &str = "adj_in";
const CF_LABEL_IDX: &str = "label_idx";
const CF_EDGE_LABEL_IDX: &str = "edge_label_idx";
const CF_PROP_IDX: &str = "prop_idx";
const CF_META: &str = "meta";
pub(crate) const ALL_CFS: &[&str] = &[
CF_NODES,
CF_EDGES,
CF_ADJ_OUT,
CF_ADJ_IN,
CF_LABEL_IDX,
CF_EDGE_LABEL_IDX,
CF_PROP_IDX,
CF_META,
];
pub(crate) const META_NODE_COUNT: &[u8] = b"node_count";
pub(crate) const META_EDGE_COUNT: &[u8] = b"edge_count";
pub(crate) const META_EDGE_LABEL_IDX_BUILT: &[u8] = b"edge_label_idx_v1";
pub(crate) struct RocksStore {
pub(crate) db: DB,
}
impl RocksStore {
pub fn open(path: &Path) -> Result<Self, DbError> {
std::fs::create_dir_all(path)?;
let mut db_opts = Options::default();
db_opts.create_if_missing(true);
db_opts.create_missing_column_families(true);
let cf_descs: Vec<ColumnFamilyDescriptor> = ALL_CFS
.iter()
.map(|&name| ColumnFamilyDescriptor::new(name, Options::default()))
.collect();
let db = DB::open_cf_descriptors(&db_opts, path, cf_descs)
.map_err(|e| DbError::RocksDb(e.to_string()))?;
Ok(Self { db })
}
#[inline]
pub(crate) fn id_key(id: u128) -> [u8; 16] {
id.to_be_bytes()
}
#[inline]
pub(crate) fn bytes_to_id(b: &[u8]) -> u128 {
u128::from_be_bytes(b[..16].try_into().expect("slice must be ≥16 bytes"))
}
fn adj_key(node_id: u128, edge_id: u128) -> [u8; 32] {
let mut k = [0u8; 32];
k[..16].copy_from_slice(&node_id.to_be_bytes());
k[16..].copy_from_slice(&edge_id.to_be_bytes());
k
}
fn label_key(label: &str, node_id: u128) -> Vec<u8> {
let mut k = Vec::with_capacity(label.len() + 1 + 16);
k.extend_from_slice(label.as_bytes());
k.push(0); k.extend_from_slice(&node_id.to_be_bytes());
k
}
fn label_prefix(label: &str) -> Vec<u8> {
let mut p = Vec::with_capacity(label.len() + 1);
p.extend_from_slice(label.as_bytes());
p.push(0); p
}
fn prop_key(label: &str, prop: &str, encoded_val: &str, node_id: u128) -> Vec<u8> {
let mut k = Vec::new();
k.extend_from_slice(label.as_bytes()); k.push(0); k.extend_from_slice(prop.as_bytes()); k.push(0); k.extend_from_slice(encoded_val.as_bytes()); k.push(0); k.extend_from_slice(&node_id.to_be_bytes()); k
}
fn prop_prefix(label: &str, prop: &str, encoded_val: &str) -> Vec<u8> {
let mut p = Vec::new();
p.extend_from_slice(label.as_bytes()); p.push(0);
p.extend_from_slice(prop.as_bytes()); p.push(0);
p.extend_from_slice(encoded_val.as_bytes()); p.push(0);
p
}
fn prop_range_prefix(label: &str, prop: &str) -> Vec<u8> {
let mut p = Vec::new();
p.extend_from_slice(label.as_bytes()); p.push(0); p.extend_from_slice(prop.as_bytes()); p.push(0); p
}
#[cfg(test)]
pub fn put_node_raw(&self, id: u128, data: &[u8]) -> Result<(), DbError> {
let cf = self.db.cf_handle(CF_NODES).expect("nodes CF");
self.db.put_cf(&cf, Self::id_key(id), data)
.map_err(|e| DbError::RocksDb(e.to_string()))
}
pub fn get_node_raw(&self, id: u128) -> Result<Option<Vec<u8>>, DbError> {
let cf = self.db.cf_handle(CF_NODES).expect("nodes CF");
self.db.get_cf(&cf, Self::id_key(id))
.map_err(|e| DbError::RocksDb(e.to_string()))
}
pub fn delete_node_batch(&self, batch: &mut WriteBatch, id: u128) {
let cf = self.db.cf_handle(CF_NODES).expect("nodes CF");
batch.delete_cf(&cf, Self::id_key(id));
}
pub fn put_node_batch(&self, batch: &mut WriteBatch, id: u128, data: &[u8]) {
let cf = self.db.cf_handle(CF_NODES).expect("nodes CF");
batch.put_cf(&cf, Self::id_key(id), data);
}
pub fn all_node_ids(&self) -> Result<Vec<u128>, DbError> {
let cf = self.db.cf_handle(CF_NODES).expect("nodes CF");
let iter = self.db.iterator_cf(&cf, IteratorMode::Start);
let mut ids = Vec::new();
for item in iter {
let (key, _) = item.map_err(|e| DbError::RocksDb(e.to_string()))?;
ids.push(Self::bytes_to_id(&key));
}
Ok(ids)
}
pub fn put_edge_raw(&self, id: u128, data: &[u8]) -> Result<(), DbError> {
let cf = self.db.cf_handle(CF_EDGES).expect("edges CF");
self.db.put_cf(&cf, Self::id_key(id), data)
.map_err(|e| DbError::RocksDb(e.to_string()))
}
pub fn get_edge_raw(&self, id: u128) -> Result<Option<Vec<u8>>, DbError> {
let cf = self.db.cf_handle(CF_EDGES).expect("edges CF");
self.db.get_cf(&cf, Self::id_key(id))
.map_err(|e| DbError::RocksDb(e.to_string()))
}
pub fn delete_edge_batch(&self, batch: &mut WriteBatch, id: u128) {
let cf = self.db.cf_handle(CF_EDGES).expect("edges CF");
batch.delete_cf(&cf, Self::id_key(id));
}
pub fn put_edge_batch(&self, batch: &mut WriteBatch, id: u128, data: &[u8]) {
let cf = self.db.cf_handle(CF_EDGES).expect("edges CF");
batch.put_cf(&cf, Self::id_key(id), data);
}
pub fn all_edge_ids(&self) -> Result<Vec<u128>, DbError> {
let cf = self.db.cf_handle(CF_EDGES).expect("edges CF");
let iter = self.db.iterator_cf(&cf, IteratorMode::Start);
let mut ids = Vec::new();
for item in iter {
let (key, _) = item.map_err(|e| DbError::RocksDb(e.to_string()))?;
ids.push(Self::bytes_to_id(&key));
}
Ok(ids)
}
pub fn put_adj_out(
&self,
batch: &mut WriteBatch,
from: u128,
edge_id: u128,
to: u128,
label: &str,
) {
let cf = self.db.cf_handle(CF_ADJ_OUT).expect("adj_out CF");
let mut val = Vec::with_capacity(16 + label.len());
val.extend_from_slice(&to.to_be_bytes());
val.extend_from_slice(label.as_bytes());
batch.put_cf(&cf, Self::adj_key(from, edge_id), val);
}
pub fn put_adj_in(
&self,
batch: &mut WriteBatch,
to: u128,
edge_id: u128,
from: u128,
label: &str,
) {
let cf = self.db.cf_handle(CF_ADJ_IN).expect("adj_in CF");
let mut val = Vec::with_capacity(16 + label.len());
val.extend_from_slice(&from.to_be_bytes());
val.extend_from_slice(label.as_bytes());
batch.put_cf(&cf, Self::adj_key(to, edge_id), val);
}
pub fn delete_adj_out_batch(&self, batch: &mut WriteBatch, from: u128, edge_id: u128) {
let cf = self.db.cf_handle(CF_ADJ_OUT).expect("adj_out CF");
batch.delete_cf(&cf, Self::adj_key(from, edge_id));
}
pub fn delete_adj_in_batch(&self, batch: &mut WriteBatch, to: u128, edge_id: u128) {
let cf = self.db.cf_handle(CF_ADJ_IN).expect("adj_in CF");
batch.delete_cf(&cf, Self::adj_key(to, edge_id));
}
pub fn scan_adj_out(&self, from: u128) -> Result<Vec<(u128, u128, String)>, DbError> {
self.scan_adj(CF_ADJ_OUT, from)
}
pub fn scan_adj_in(&self, to: u128) -> Result<Vec<(u128, u128, String)>, DbError> {
self.scan_adj(CF_ADJ_IN, to)
}
fn scan_adj(&self, cf_name: &str, node_id: u128) -> Result<Vec<(u128, u128, String)>, DbError> {
let prefix = node_id.to_be_bytes();
let cf = self.db.cf_handle(cf_name).expect("adjacency CF");
let mode = IteratorMode::From(prefix.as_ref(), Direction::Forward);
let iter = self.db.iterator_cf(&cf, mode);
let mut results = Vec::new();
for item in iter {
let (key, val) = item.map_err(|e| DbError::RocksDb(e.to_string()))?;
if key.len() < 32 || key[..16] != prefix { break; }
let edge_id = Self::bytes_to_id(&key[16..]);
let other_id = Self::bytes_to_id(&val);
let label = String::from_utf8_lossy(&val[16..]).into_owned();
results.push((edge_id, other_id, label));
}
Ok(results)
}
pub fn put_label_entry(&self, batch: &mut WriteBatch, label: &str, node_id: u128) {
let cf = self.db.cf_handle(CF_LABEL_IDX).expect("label_idx CF");
batch.put_cf(&cf, Self::label_key(label, node_id), b"");
}
pub fn delete_label_entry(&self, batch: &mut WriteBatch, label: &str, node_id: u128) {
let cf = self.db.cf_handle(CF_LABEL_IDX).expect("label_idx CF");
batch.delete_cf(&cf, Self::label_key(label, node_id));
}
pub fn scan_label(&self, label: &str) -> Result<Vec<u128>, DbError> {
let prefix = Self::label_prefix(label);
let cf = self.db.cf_handle(CF_LABEL_IDX).expect("label_idx CF");
let mode = IteratorMode::From(prefix.as_slice(), Direction::Forward);
let iter = self.db.iterator_cf(&cf, mode);
let mut ids = Vec::new();
for item in iter {
let (key, _) = item.map_err(|e| DbError::RocksDb(e.to_string()))?;
if !key.starts_with(prefix.as_slice()) { break; }
if key.len() >= prefix.len() + 16 {
ids.push(Self::bytes_to_id(&key[prefix.len()..]));
}
}
Ok(ids)
}
fn edge_label_key(label: &str, edge_id: u128) -> Vec<u8> {
let mut k = Vec::with_capacity(label.len() + 1 + 16);
k.extend_from_slice(label.as_bytes());
k.push(0); k.extend_from_slice(&edge_id.to_be_bytes());
k
}
fn edge_label_prefix(label: &str) -> Vec<u8> {
let mut p = Vec::with_capacity(label.len() + 1);
p.extend_from_slice(label.as_bytes());
p.push(0); p
}
pub fn put_edge_label_entry(&self, batch: &mut WriteBatch, label: &str, edge_id: u128) {
let cf = self.db.cf_handle(CF_EDGE_LABEL_IDX).expect("edge_label_idx CF");
batch.put_cf(&cf, Self::edge_label_key(label, edge_id), b"");
}
pub fn delete_edge_label_entry(&self, batch: &mut WriteBatch, label: &str, edge_id: u128) {
let cf = self.db.cf_handle(CF_EDGE_LABEL_IDX).expect("edge_label_idx CF");
batch.delete_cf(&cf, Self::edge_label_key(label, edge_id));
}
pub fn scan_edge_label(&self, label: &str) -> Result<Vec<u128>, DbError> {
let prefix = Self::edge_label_prefix(label);
let cf = self.db.cf_handle(CF_EDGE_LABEL_IDX).expect("edge_label_idx CF");
let mode = IteratorMode::From(prefix.as_slice(), Direction::Forward);
let iter = self.db.iterator_cf(&cf, mode);
let mut ids = Vec::new();
for item in iter {
let (key, _) = item.map_err(|e| DbError::RocksDb(e.to_string()))?;
if !key.starts_with(prefix.as_slice()) { break; }
if key.len() >= prefix.len() + 16 {
ids.push(Self::bytes_to_id(&key[prefix.len()..]));
}
}
Ok(ids)
}
pub fn put_prop_entry(
&self,
batch: &mut WriteBatch,
label: &str,
prop: &str,
encoded_val: &str,
node_id: u128,
) {
let cf = self.db.cf_handle(CF_PROP_IDX).expect("prop_idx CF");
batch.put_cf(&cf, Self::prop_key(label, prop, encoded_val, node_id), b"");
}
pub fn delete_prop_entry(
&self,
batch: &mut WriteBatch,
label: &str,
prop: &str,
encoded_val: &str,
node_id: u128,
) {
let cf = self.db.cf_handle(CF_PROP_IDX).expect("prop_idx CF");
batch.delete_cf(&cf, Self::prop_key(label, prop, encoded_val, node_id));
}
pub fn scan_prop(
&self,
label: &str,
prop: &str,
encoded_val: &str,
) -> Result<Vec<u128>, DbError> {
let prefix = Self::prop_prefix(label, prop, encoded_val);
let cf = self.db.cf_handle(CF_PROP_IDX).expect("prop_idx CF");
let mode = IteratorMode::From(prefix.as_slice(), Direction::Forward);
let iter = self.db.iterator_cf(&cf, mode);
let mut ids = Vec::new();
for item in iter {
let (key, _) = item.map_err(|e| DbError::RocksDb(e.to_string()))?;
if !key.starts_with(prefix.as_slice()) { break; }
if key.len() >= prefix.len() + 16 {
ids.push(Self::bytes_to_id(&key[prefix.len()..]));
}
}
Ok(ids)
}
pub fn scan_prop_range(
&self,
label: &str,
prop: &str,
lo: Option<(&str, bool)>, hi: Option<(&str, bool)>, ) -> Result<Vec<u128>, DbError> {
let range_pfx = Self::prop_range_prefix(label, prop);
let seek_key: Vec<u8> = if let Some((lo_val, _)) = lo {
let mut k = range_pfx.clone();
k.extend_from_slice(lo_val.as_bytes());
k.push(0); k
} else {
range_pfx.clone()
};
let cf = self.db.cf_handle(CF_PROP_IDX).expect("prop_idx CF");
let mode = IteratorMode::From(seek_key.as_slice(), Direction::Forward);
let iter = self.db.iterator_cf(&cf, mode);
let mut ids = Vec::new();
for item in iter {
let (key, _) = item.map_err(|e| DbError::RocksDb(e.to_string()))?;
if !key.starts_with(range_pfx.as_slice()) {
break;
}
if key.len() < range_pfx.len() + 17 {
continue; }
let enc_end = key.len() - 17; let enc_bytes = &key[range_pfx.len()..enc_end];
let enc_val = match std::str::from_utf8(enc_bytes) {
Ok(s) => s,
Err(_) => continue, };
if let Some((lo_val, lo_incl)) = lo {
let cmp = enc_val.cmp(lo_val);
if cmp == std::cmp::Ordering::Less {
continue; }
if cmp == std::cmp::Ordering::Equal && !lo_incl {
continue; }
}
if let Some((hi_val, hi_incl)) = hi {
let cmp = enc_val.cmp(hi_val);
if cmp == std::cmp::Ordering::Greater {
break; }
if cmp == std::cmp::Ordering::Equal && !hi_incl {
break; }
}
ids.push(Self::bytes_to_id(&key[enc_end + 1..]));
}
Ok(ids)
}
pub fn delete_prop_range(&self, label: &str, prop: &str) -> Result<(), DbError> {
let prefix = Self::prop_range_prefix(label, prop);
let cf = self.db.cf_handle(CF_PROP_IDX).expect("prop_idx CF");
let mode = IteratorMode::From(prefix.as_slice(), Direction::Forward);
let keys: Vec<Box<[u8]>> = self.db.iterator_cf(&cf, mode)
.map_while(|item| item.ok())
.take_while(|(key, _)| key.starts_with(prefix.as_slice()))
.map(|(key, _)| key)
.collect();
if keys.is_empty() { return Ok(()); }
let mut batch = WriteBatch::default();
let cf2 = self.db.cf_handle(CF_PROP_IDX).expect("prop_idx CF");
for key in &keys {
batch.delete_cf(&cf2, key.as_ref());
}
self.db.write(batch).map_err(|e| DbError::RocksDb(e.to_string()))
}
pub fn count_prop_entries(&self, label: &str, prop: &str) -> usize {
let prefix = Self::prop_range_prefix(label, prop);
let cf = self.db.cf_handle(CF_PROP_IDX).expect("prop_idx CF");
let mode = IteratorMode::From(prefix.as_slice(), Direction::Forward);
self.db.iterator_cf(&cf, mode)
.map_while(|item| item.ok())
.take_while(|(key, _)| key.starts_with(prefix.as_slice()))
.count()
}
pub fn get_meta(&self, key: &[u8]) -> Result<Option<Vec<u8>>, DbError> {
let cf = self.db.cf_handle(CF_META).expect("meta CF");
self.db.get_cf(&cf, key).map_err(|e| DbError::RocksDb(e.to_string()))
}
pub fn put_meta(&self, key: &[u8], val: &[u8]) -> Result<(), DbError> {
let cf = self.db.cf_handle(CF_META).expect("meta CF");
self.db.put_cf(&cf, key, val).map_err(|e| DbError::RocksDb(e.to_string()))
}
pub fn put_meta_batch(&self, batch: &mut WriteBatch, key: &[u8], val: &[u8]) {
let cf = self.db.cf_handle(CF_META).expect("meta CF");
batch.put_cf(&cf, key, val);
}
pub fn node_count(&self) -> Result<u64, DbError> {
Ok(self.get_meta(META_NODE_COUNT)?.map(|v| u64_from_le(&v)).unwrap_or(0))
}
pub fn edge_count(&self) -> Result<u64, DbError> {
Ok(self.get_meta(META_EDGE_COUNT)?.map(|v| u64_from_le(&v)).unwrap_or(0))
}
#[cfg(test)]
pub fn set_node_count(&self, n: u64) -> Result<(), DbError> {
self.put_meta(META_NODE_COUNT, &n.to_le_bytes())
}
#[cfg(test)]
pub fn set_edge_count(&self, n: u64) -> Result<(), DbError> {
self.put_meta(META_EDGE_COUNT, &n.to_le_bytes())
}
pub fn clear_all(&self) -> Result<(), DbError> {
const MAX_KEY: &[u8] = &[
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
];
const MIN_KEY: &[u8] = b"";
let data_cfs = [
CF_NODES, CF_EDGES, CF_ADJ_OUT, CF_ADJ_IN,
CF_LABEL_IDX, CF_EDGE_LABEL_IDX, CF_PROP_IDX,
];
let mut batch = WriteBatch::default();
for cf_name in &data_cfs {
let cf = self.db.cf_handle(cf_name).expect("CF missing");
batch.delete_range_cf(&cf, MIN_KEY, MAX_KEY);
}
self.put_meta_batch(&mut batch, META_NODE_COUNT, &0u64.to_le_bytes());
self.put_meta_batch(&mut batch, META_EDGE_COUNT, &0u64.to_le_bytes());
let meta_cf = self.db.cf_handle(CF_META).expect("meta CF missing");
batch.delete_cf(&meta_cf, META_EDGE_LABEL_IDX_BUILT);
self.write(batch)
}
pub fn write(&self, batch: WriteBatch) -> Result<(), DbError> {
self.db.write(batch).map_err(|e| DbError::RocksDb(e.to_string()))
}
pub fn batch() -> WriteBatch {
WriteBatch::default()
}
pub fn flush(&self) -> Result<(), DbError> {
self.db.flush().map_err(|e| DbError::RocksDb(e.to_string()))
}
}
fn u64_from_le(b: &[u8]) -> u64 {
if b.len() < 8 { return 0; }
u64::from_le_bytes(b[..8].try_into().unwrap())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn open() -> (RocksStore, TempDir) {
let dir = TempDir::new().unwrap();
let store = RocksStore::open(dir.path()).unwrap();
(store, dir)
}
#[test]
fn open_creates_all_column_families() {
let (_, _dir) = open();
}
#[test]
fn reopen_existing_db_succeeds() {
let dir = TempDir::new().unwrap();
{
let s = RocksStore::open(dir.path()).unwrap();
s.put_node_raw(1, b"data").unwrap();
}
let s2 = RocksStore::open(dir.path()).unwrap();
assert_eq!(s2.get_node_raw(1).unwrap().as_deref(), Some(b"data".as_ref()));
}
#[test]
fn id_key_round_trips() {
let id: u128 = 0x0123_4567_89AB_CDEF_FEDC_BA98_7654_3210;
assert_eq!(RocksStore::bytes_to_id(&RocksStore::id_key(id)), id);
}
#[test]
fn id_keys_sort_correctly() {
let a = RocksStore::id_key(1u128);
let b = RocksStore::id_key(2u128);
let big = RocksStore::id_key(u128::MAX);
assert!(a < b);
assert!(b < big);
}
#[test]
fn node_put_get_roundtrip() {
let (s, _d) = open();
s.put_node_raw(42, b"node-payload").unwrap();
assert_eq!(s.get_node_raw(42).unwrap().as_deref(), Some(b"node-payload".as_ref()));
}
#[test]
fn missing_node_returns_none() {
let (s, _d) = open();
assert!(s.get_node_raw(999).unwrap().is_none());
}
#[test]
fn node_overwrite() {
let (s, _d) = open();
s.put_node_raw(1, b"v1").unwrap();
s.put_node_raw(1, b"v2").unwrap();
assert_eq!(s.get_node_raw(1).unwrap().as_deref(), Some(b"v2".as_ref()));
}
#[test]
fn node_batch_delete() {
let (s, _d) = open();
s.put_node_raw(10, b"x").unwrap();
let mut batch = RocksStore::batch();
s.delete_node_batch(&mut batch, 10);
s.write(batch).unwrap();
assert!(s.get_node_raw(10).unwrap().is_none());
}
#[test]
fn all_node_ids_empty() {
let (s, _d) = open();
assert!(s.all_node_ids().unwrap().is_empty());
}
#[test]
fn all_node_ids_returns_all() {
let (s, _d) = open();
for i in 0u128..5 {
s.put_node_raw(i, b"x").unwrap();
}
let mut ids = s.all_node_ids().unwrap();
ids.sort();
assert_eq!(ids, vec![0, 1, 2, 3, 4]);
}
#[test]
fn edge_put_get_roundtrip() {
let (s, _d) = open();
s.put_edge_raw(7, b"edge-data").unwrap();
assert_eq!(s.get_edge_raw(7).unwrap().as_deref(), Some(b"edge-data".as_ref()));
}
#[test]
fn edge_batch_delete() {
let (s, _d) = open();
s.put_edge_raw(5, b"e").unwrap();
let mut batch = RocksStore::batch();
s.delete_edge_batch(&mut batch, 5);
s.write(batch).unwrap();
assert!(s.get_edge_raw(5).unwrap().is_none());
}
#[test]
fn adj_out_put_and_scan() {
let (s, _d) = open();
let from = 100u128;
let to = 200u128;
let edge_id = 1u128;
let mut batch = RocksStore::batch();
s.put_adj_out(&mut batch, from, edge_id, to, "KNOWS");
s.write(batch).unwrap();
let results = s.scan_adj_out(from).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0], (edge_id, to, "KNOWS".to_string()));
}
#[test]
fn adj_in_put_and_scan() {
let (s, _d) = open();
let from = 10u128;
let to = 20u128;
let edge_id = 99u128;
let mut batch = RocksStore::batch();
s.put_adj_in(&mut batch, to, edge_id, from, "LIKES");
s.write(batch).unwrap();
let results = s.scan_adj_in(to).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0], (edge_id, from, "LIKES".to_string()));
}
#[test]
fn adj_scan_multiple_edges_same_source() {
let (s, _d) = open();
let from = 1u128;
let mut batch = RocksStore::batch();
s.put_adj_out(&mut batch, from, 10, 100, "A");
s.put_adj_out(&mut batch, from, 11, 101, "B");
s.put_adj_out(&mut batch, from, 12, 102, "C");
s.write(batch).unwrap();
let results = s.scan_adj_out(from).unwrap();
assert_eq!(results.len(), 3);
}
#[test]
fn adj_scan_does_not_bleed_into_other_nodes() {
let (s, _d) = open();
let mut batch = RocksStore::batch();
s.put_adj_out(&mut batch, 1, 10, 100, "X");
s.put_adj_out(&mut batch, 2, 11, 101, "Y");
s.write(batch).unwrap();
assert_eq!(s.scan_adj_out(1).unwrap().len(), 1);
assert_eq!(s.scan_adj_out(2).unwrap().len(), 1);
assert_eq!(s.scan_adj_out(3).unwrap().len(), 0);
}
#[test]
fn adj_delete() {
let (s, _d) = open();
let from = 5u128;
let edge_id = 50u128;
let mut batch = RocksStore::batch();
s.put_adj_out(&mut batch, from, edge_id, 999, "EDGE");
s.write(batch).unwrap();
let mut batch2 = RocksStore::batch();
s.delete_adj_out_batch(&mut batch2, from, edge_id);
s.write(batch2).unwrap();
assert!(s.scan_adj_out(from).unwrap().is_empty());
}
#[test]
fn label_put_and_scan() {
let (s, _d) = open();
let node_a = 1u128;
let node_b = 2u128;
let mut batch = RocksStore::batch();
s.put_label_entry(&mut batch, "Person", node_a);
s.put_label_entry(&mut batch, "Person", node_b);
s.put_label_entry(&mut batch, "Company", 3);
s.write(batch).unwrap();
let mut persons = s.scan_label("Person").unwrap();
persons.sort();
assert_eq!(persons, vec![node_a, node_b]);
let companies = s.scan_label("Company").unwrap();
assert_eq!(companies, vec![3]);
}
#[test]
fn label_delete() {
let (s, _d) = open();
let mut batch = RocksStore::batch();
s.put_label_entry(&mut batch, "Person", 1);
s.put_label_entry(&mut batch, "Person", 2);
s.write(batch).unwrap();
let mut batch2 = RocksStore::batch();
s.delete_label_entry(&mut batch2, "Person", 1);
s.write(batch2).unwrap();
assert_eq!(s.scan_label("Person").unwrap(), vec![2]);
}
#[test]
fn label_scan_empty_label() {
let (s, _d) = open();
assert!(s.scan_label("Ghost").unwrap().is_empty());
}
#[test]
fn label_prefix_isolation() {
let (s, _d) = open();
let mut batch = RocksStore::batch();
s.put_label_entry(&mut batch, "Person", 1);
s.write(batch).unwrap();
assert!(s.scan_label("Per").unwrap().is_empty());
}
#[test]
fn edge_label_put_and_scan() {
let (s, _d) = open();
let e1 = 10u128;
let e2 = 11u128;
let mut batch = RocksStore::batch();
s.put_edge_label_entry(&mut batch, "KNOWS", e1);
s.put_edge_label_entry(&mut batch, "KNOWS", e2);
s.put_edge_label_entry(&mut batch, "LIKES", 99);
s.write(batch).unwrap();
let mut knows = s.scan_edge_label("KNOWS").unwrap();
knows.sort();
assert_eq!(knows, vec![e1, e2]);
assert_eq!(s.scan_edge_label("LIKES").unwrap(), vec![99]);
assert!(s.scan_edge_label("HATES").unwrap().is_empty());
}
#[test]
fn edge_label_delete() {
let (s, _d) = open();
let mut batch = RocksStore::batch();
s.put_edge_label_entry(&mut batch, "KNOWS", 1);
s.put_edge_label_entry(&mut batch, "KNOWS", 2);
s.write(batch).unwrap();
let mut batch2 = RocksStore::batch();
s.delete_edge_label_entry(&mut batch2, "KNOWS", 1);
s.write(batch2).unwrap();
assert_eq!(s.scan_edge_label("KNOWS").unwrap(), vec![2]);
}
#[test]
fn edge_label_prefix_isolation() {
let (s, _d) = open();
let mut batch = RocksStore::batch();
s.put_edge_label_entry(&mut batch, "KNOWS", 1);
s.write(batch).unwrap();
assert!(s.scan_edge_label("KNO").unwrap().is_empty());
}
#[test]
fn prop_put_and_scan() {
let (s, _d) = open();
let n1 = 1u128;
let n2 = 2u128;
let mut batch = RocksStore::batch();
s.put_prop_entry(&mut batch, "Person", "age", "I:25", n1);
s.put_prop_entry(&mut batch, "Person", "age", "I:25", n2);
s.put_prop_entry(&mut batch, "Person", "age", "I:30", 3);
s.write(batch).unwrap();
let mut age25 = s.scan_prop("Person", "age", "I:25").unwrap();
age25.sort();
assert_eq!(age25, vec![n1, n2]);
let age30 = s.scan_prop("Person", "age", "I:30").unwrap();
assert_eq!(age30, vec![3]);
}
#[test]
fn prop_delete() {
let (s, _d) = open();
let mut batch = RocksStore::batch();
s.put_prop_entry(&mut batch, "Person", "name", "S:Alice", 1);
s.write(batch).unwrap();
let mut batch2 = RocksStore::batch();
s.delete_prop_entry(&mut batch2, "Person", "name", "S:Alice", 1);
s.write(batch2).unwrap();
assert!(s.scan_prop("Person", "name", "S:Alice").unwrap().is_empty());
}
#[test]
fn prop_scan_isolation() {
let (s, _d) = open();
let mut batch = RocksStore::batch();
s.put_prop_entry(&mut batch, "Person", "age", "I:25", 1);
s.write(batch).unwrap();
assert!(s.scan_prop("Person", "age", "I:26").unwrap().is_empty());
}
#[test]
fn meta_put_get() {
let (s, _d) = open();
s.put_meta(b"hello", b"world").unwrap();
assert_eq!(s.get_meta(b"hello").unwrap().as_deref(), Some(b"world".as_ref()));
assert!(s.get_meta(b"missing").unwrap().is_none());
}
#[test]
fn counters_start_at_zero() {
let (s, _d) = open();
assert_eq!(s.node_count().unwrap(), 0);
assert_eq!(s.edge_count().unwrap(), 0);
}
#[test]
fn counters_set_and_read() {
let (s, _d) = open();
s.set_node_count(42).unwrap();
s.set_edge_count(7).unwrap();
assert_eq!(s.node_count().unwrap(), 42);
assert_eq!(s.edge_count().unwrap(), 7);
}
#[test]
fn counters_persist_across_reopen() {
let dir = TempDir::new().unwrap();
{
let s = RocksStore::open(dir.path()).unwrap();
s.set_node_count(100).unwrap();
s.set_edge_count(50).unwrap();
}
let s2 = RocksStore::open(dir.path()).unwrap();
assert_eq!(s2.node_count().unwrap(), 100);
assert_eq!(s2.edge_count().unwrap(), 50);
}
#[test]
fn write_batch_is_atomic() {
let (s, _d) = open();
let mut batch = RocksStore::batch();
s.put_node_batch(&mut batch, 1, b"n1");
s.put_node_batch(&mut batch, 2, b"n2");
s.put_edge_batch(&mut batch, 100, b"e1");
s.write(batch).unwrap();
assert!(s.get_node_raw(1).unwrap().is_some());
assert!(s.get_node_raw(2).unwrap().is_some());
assert!(s.get_edge_raw(100).unwrap().is_some());
}
#[test]
fn write_batch_mixed_ops() {
let (s, _d) = open();
s.put_node_raw(1, b"old").unwrap();
let mut batch = RocksStore::batch();
s.put_node_batch(&mut batch, 1, b"new"); s.put_node_batch(&mut batch, 2, b"fresh");
s.delete_node_batch(&mut batch, 3); s.write(batch).unwrap();
assert_eq!(s.get_node_raw(1).unwrap().as_deref(), Some(b"new".as_ref()));
assert_eq!(s.get_node_raw(2).unwrap().as_deref(), Some(b"fresh".as_ref()));
assert!(s.get_node_raw(3).unwrap().is_none());
}
#[test]
fn ten_thousand_nodes_round_trip() {
let (s, _d) = open();
let n = 10_000u128;
for i in 0..n {
s.put_node_raw(i, format!("node-{i}").as_bytes()).unwrap();
}
assert_eq!(s.all_node_ids().unwrap().len(), n as usize);
assert_eq!(
s.get_node_raw(9_999).unwrap().as_deref(),
Some(b"node-9999".as_ref())
);
}
#[test]
fn prop_range_scan_all() {
let (s, _d) = open();
let enc20 = crate::types::value_index_key(&crate::types::Value::Int(20)).unwrap();
let enc30 = crate::types::value_index_key(&crate::types::Value::Int(30)).unwrap();
let enc40 = crate::types::value_index_key(&crate::types::Value::Int(40)).unwrap();
let mut batch = RocksStore::batch();
s.put_prop_entry(&mut batch, "P", "age", &enc20, 20);
s.put_prop_entry(&mut batch, "P", "age", &enc30, 30);
s.put_prop_entry(&mut batch, "P", "age", &enc40, 40);
s.write(batch).unwrap();
let mut ids = s.scan_prop_range("P", "age", None, None).unwrap();
ids.sort();
assert_eq!(ids, vec![20, 30, 40]);
}
#[test]
fn prop_range_scan_lower_exclusive() {
let (s, _d) = open();
let enc20 = crate::types::value_index_key(&crate::types::Value::Int(20)).unwrap();
let enc30 = crate::types::value_index_key(&crate::types::Value::Int(30)).unwrap();
let enc40 = crate::types::value_index_key(&crate::types::Value::Int(40)).unwrap();
let lo_enc = crate::types::value_index_key(&crate::types::Value::Int(25)).unwrap();
let mut batch = RocksStore::batch();
s.put_prop_entry(&mut batch, "P", "age", &enc20, 20);
s.put_prop_entry(&mut batch, "P", "age", &enc30, 30);
s.put_prop_entry(&mut batch, "P", "age", &enc40, 40);
s.write(batch).unwrap();
let mut ids = s.scan_prop_range("P", "age", Some((&lo_enc, false)), None).unwrap();
ids.sort();
assert_eq!(ids, vec![30, 40]);
}
#[test]
fn prop_range_scan_upper_inclusive() {
let (s, _d) = open();
let enc20 = crate::types::value_index_key(&crate::types::Value::Int(20)).unwrap();
let enc30 = crate::types::value_index_key(&crate::types::Value::Int(30)).unwrap();
let enc40 = crate::types::value_index_key(&crate::types::Value::Int(40)).unwrap();
let hi_enc = crate::types::value_index_key(&crate::types::Value::Int(30)).unwrap();
let mut batch = RocksStore::batch();
s.put_prop_entry(&mut batch, "P", "age", &enc20, 20);
s.put_prop_entry(&mut batch, "P", "age", &enc30, 30);
s.put_prop_entry(&mut batch, "P", "age", &enc40, 40);
s.write(batch).unwrap();
let mut ids = s.scan_prop_range("P", "age", None, Some((&hi_enc, true))).unwrap();
ids.sort();
assert_eq!(ids, vec![20, 30]);
}
#[test]
fn prop_range_scan_closed_interval() {
let (s, _d) = open();
let enc10 = crate::types::value_index_key(&crate::types::Value::Int(10)).unwrap();
let enc20 = crate::types::value_index_key(&crate::types::Value::Int(20)).unwrap();
let enc30 = crate::types::value_index_key(&crate::types::Value::Int(30)).unwrap();
let enc40 = crate::types::value_index_key(&crate::types::Value::Int(40)).unwrap();
let lo_enc = crate::types::value_index_key(&crate::types::Value::Int(20)).unwrap();
let hi_enc = crate::types::value_index_key(&crate::types::Value::Int(40)).unwrap();
let mut batch = RocksStore::batch();
s.put_prop_entry(&mut batch, "P", "age", &enc10, 10);
s.put_prop_entry(&mut batch, "P", "age", &enc20, 20);
s.put_prop_entry(&mut batch, "P", "age", &enc30, 30);
s.put_prop_entry(&mut batch, "P", "age", &enc40, 40);
s.write(batch).unwrap();
let mut ids = s.scan_prop_range("P", "age", Some((&lo_enc, true)), Some((&hi_enc, false))).unwrap();
ids.sort();
assert_eq!(ids, vec![20, 30]);
}
#[test]
fn prop_range_scan_negative_integers() {
let (s, _d) = open();
let enc_n5 = crate::types::value_index_key(&crate::types::Value::Int(-5)).unwrap();
let enc_0 = crate::types::value_index_key(&crate::types::Value::Int(0)).unwrap();
let enc_5 = crate::types::value_index_key(&crate::types::Value::Int(5)).unwrap();
let lo_enc = crate::types::value_index_key(&crate::types::Value::Int(-3)).unwrap();
let mut batch = RocksStore::batch();
s.put_prop_entry(&mut batch, "P", "x", &enc_n5, 1);
s.put_prop_entry(&mut batch, "P", "x", &enc_0, 2);
s.put_prop_entry(&mut batch, "P", "x", &enc_5, 3);
s.write(batch).unwrap();
let mut ids = s.scan_prop_range("P", "x", Some((&lo_enc, false)), None).unwrap();
ids.sort();
assert_eq!(ids, vec![2, 3]);
}
#[test]
fn clear_all_wipes_nodes_and_resets_counts() {
let (s, _d) = open();
let mut batch = RocksStore::batch();
s.put_node_raw(1, b"node-1").unwrap();
s.put_node_raw(2, b"node-2").unwrap();
s.put_label_entry(&mut batch, "Person", 1);
s.put_label_entry(&mut batch, "Person", 2);
s.write(batch).unwrap();
assert!(s.get_node_raw(1).unwrap().is_some());
s.clear_all().unwrap();
assert!(s.get_node_raw(1).unwrap().is_none());
assert!(s.get_node_raw(2).unwrap().is_none());
assert_eq!(s.node_count().unwrap(), 0);
assert_eq!(s.edge_count().unwrap(), 0);
assert!(s.scan_label("Person").unwrap().is_empty());
}
}