use crate::codec::CompressionCodec;
#[cfg(not(feature = "temporal"))]
use crate::codec::{CompressedData, DictionaryBuilder, DictionaryEncoding, TypeSpecificCompressor};
use crate::index::zone_map::ZoneMapEntry;
#[cfg(not(feature = "temporal"))]
use arcstr::ArcStr;
#[cfg(feature = "temporal")]
use grafeo_common::temporal::VersionLog;
#[cfg(feature = "temporal")]
use grafeo_common::types::EpochId;
use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
use grafeo_common::utils::hash::FxHashMap;
use parking_lot::RwLock;
use std::cmp::Ordering;
use std::hash::Hash;
use std::marker::PhantomData;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum CompressionMode {
#[default]
None,
Auto,
Eager,
}
#[cfg(not(feature = "temporal"))]
const COMPRESSION_THRESHOLD: usize = 1000;
#[cfg(not(feature = "temporal"))]
const HOT_BUFFER_SIZE: usize = 4096;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum CompareOp {
Eq,
Ne,
Lt,
Le,
Gt,
Ge,
}
pub trait EntityId: Copy + Eq + Hash + 'static {
fn as_u64(self) -> u64;
fn from_u64(v: u64) -> Self;
}
impl EntityId for NodeId {
#[inline]
fn as_u64(self) -> u64 {
self.0
}
#[inline]
fn from_u64(v: u64) -> Self {
Self(v)
}
}
impl EntityId for EdgeId {
#[inline]
fn as_u64(self) -> u64 {
self.0
}
#[inline]
fn from_u64(v: u64) -> Self {
Self(v)
}
}
pub struct PropertyStorage<Id: EntityId = NodeId> {
columns: RwLock<FxHashMap<PropertyKey, PropertyColumn<Id>>>,
default_compression: CompressionMode,
_marker: PhantomData<Id>,
}
impl<Id: EntityId> PropertyStorage<Id> {
#[must_use]
pub fn new() -> Self {
Self {
columns: RwLock::new(FxHashMap::default()),
default_compression: CompressionMode::None,
_marker: PhantomData,
}
}
#[must_use]
pub fn with_compression(mode: CompressionMode) -> Self {
Self {
columns: RwLock::new(FxHashMap::default()),
default_compression: mode,
_marker: PhantomData,
}
}
pub fn set_default_compression(&mut self, mode: CompressionMode) {
self.default_compression = mode;
}
#[cfg(not(feature = "temporal"))]
pub fn set(&self, id: Id, key: PropertyKey, value: Value) {
let mut columns = self.columns.write();
let mode = self.default_compression;
columns
.entry(key)
.or_insert_with(|| PropertyColumn::with_compression(mode))
.set(id, value);
}
#[cfg(feature = "temporal")]
pub fn set(&self, id: Id, key: PropertyKey, value: Value, epoch: EpochId) {
let mut columns = self.columns.write();
let mode = self.default_compression;
columns
.entry(key)
.or_insert_with(|| PropertyColumn::with_compression(mode))
.set(id, value, epoch);
}
pub fn enable_compression(&self, key: &PropertyKey, mode: CompressionMode) {
let mut columns = self.columns.write();
if let Some(col) = columns.get_mut(key) {
col.set_compression_mode(mode);
}
}
pub fn compress_all(&self) {
let mut columns = self.columns.write();
for col in columns.values_mut() {
if col.compression_mode() != CompressionMode::None {
col.compress();
}
}
}
pub fn force_compress_all(&self) {
let mut columns = self.columns.write();
for col in columns.values_mut() {
col.force_compress();
}
}
#[must_use]
pub fn compression_stats(&self) -> FxHashMap<PropertyKey, CompressionStats> {
let columns = self.columns.read();
columns
.iter()
.map(|(key, col)| (key.clone(), col.compression_stats()))
.collect()
}
#[must_use]
pub fn memory_usage(&self) -> usize {
let columns = self.columns.read();
columns
.values()
.map(|col| col.compression_stats().compressed_size)
.sum()
}
#[must_use]
pub fn heap_memory_bytes(&self) -> usize {
let columns = self.columns.read();
let map_overhead = columns.capacity()
* (std::mem::size_of::<PropertyKey>() + std::mem::size_of::<PropertyColumn<Id>>() + 1);
let column_bytes: usize = columns.values().map(|col| col.heap_memory_bytes()).sum();
map_overhead + column_bytes
}
#[must_use]
pub fn get(&self, id: Id, key: &PropertyKey) -> Option<Value> {
let columns = self.columns.read();
columns.get(key).and_then(|col| col.get(id))
}
#[cfg(not(feature = "temporal"))]
pub fn remove(&self, id: Id, key: &PropertyKey) -> Option<Value> {
let mut columns = self.columns.write();
columns.get_mut(key).and_then(|col| col.remove(id))
}
#[cfg(feature = "temporal")]
pub fn remove(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
let mut columns = self.columns.write();
columns.get_mut(key).and_then(|col| col.remove(id, epoch))
}
#[cfg(not(feature = "temporal"))]
pub fn remove_all(&self, id: Id) {
let mut columns = self.columns.write();
for col in columns.values_mut() {
col.remove(id);
}
}
#[cfg(feature = "temporal")]
pub fn remove_all(&self, id: Id, epoch: EpochId) {
let mut columns = self.columns.write();
for col in columns.values_mut() {
col.remove(id, epoch);
}
}
#[must_use]
pub fn get_all(&self, id: Id) -> FxHashMap<PropertyKey, Value> {
let columns = self.columns.read();
let mut result = FxHashMap::default();
for (key, col) in columns.iter() {
if let Some(value) = col.get(id) {
result.insert(key.clone(), value);
}
}
result
}
#[must_use]
pub fn get_batch(&self, ids: &[Id], key: &PropertyKey) -> Vec<Option<Value>> {
let columns = self.columns.read();
match columns.get(key) {
Some(col) => ids.iter().map(|&id| col.get(id)).collect(),
None => vec![None; ids.len()],
}
}
#[must_use]
pub fn get_all_batch(&self, ids: &[Id]) -> Vec<FxHashMap<PropertyKey, Value>> {
let columns = self.columns.read();
let column_count = columns.len();
let mut results = Vec::with_capacity(ids.len());
for &id in ids {
let mut result = FxHashMap::with_capacity_and_hasher(column_count, Default::default());
for (key, col) in columns.iter() {
if let Some(value) = col.get(id) {
result.insert(key.clone(), value);
}
}
results.push(result);
}
results
}
#[must_use]
pub fn get_selective_batch(
&self,
ids: &[Id],
keys: &[PropertyKey],
) -> Vec<FxHashMap<PropertyKey, Value>> {
if keys.is_empty() {
return vec![FxHashMap::default(); ids.len()];
}
let columns = self.columns.read();
let requested_columns: Vec<_> = keys
.iter()
.filter_map(|key| columns.get(key).map(|col| (key, col)))
.collect();
let mut results = Vec::with_capacity(ids.len());
for &id in ids {
let mut result =
FxHashMap::with_capacity_and_hasher(requested_columns.len(), Default::default());
for (key, col) in &requested_columns {
if let Some(value) = col.get(id) {
result.insert((*key).clone(), value);
}
}
results.push(result);
}
results
}
#[must_use]
pub fn column_count(&self) -> usize {
self.columns.read().len()
}
#[must_use]
pub fn keys(&self) -> Vec<PropertyKey> {
self.columns.read().keys().cloned().collect()
}
pub fn clear(&self) {
self.columns.write().clear();
}
#[cfg(not(feature = "temporal"))]
pub fn evict_column(&self, key: &PropertyKey) -> (usize, usize) {
let mut columns = self.columns.write();
if let Some(column) = columns.get_mut(key) {
column.evict_values()
} else {
(0, 0)
}
}
#[cfg(not(feature = "temporal"))]
pub fn restore_column(&self, key: &PropertyKey, values: impl Iterator<Item = (Id, Value)>) {
let mut columns = self.columns.write();
let column = columns
.entry(key.clone())
.or_insert_with(|| PropertyColumn::with_compression(self.default_compression));
column.restore_values(values);
}
#[cfg(not(feature = "temporal"))]
pub fn drain_column(&self, key: &PropertyKey) -> Vec<(Id, Value)> {
let mut columns = self.columns.write();
if let Some(column) = columns.get_mut(key) {
column.drain_values()
} else {
Vec::new()
}
}
#[cfg(not(feature = "temporal"))]
#[must_use]
pub fn is_column_spilled(&self, key: &PropertyKey) -> bool {
self.columns
.read()
.get(key)
.is_some_and(|col| col.is_spilled())
}
#[cfg(not(feature = "temporal"))]
pub fn mark_column_spilled(&self, key: &PropertyKey) {
let mut columns = self.columns.write();
let column = columns.entry(key.clone()).or_default();
column.mark_spilled();
}
#[must_use]
pub fn column(&self, key: &PropertyKey) -> Option<PropertyColumnRef<'_, Id>> {
let columns = self.columns.read();
if columns.contains_key(key) {
Some(PropertyColumnRef {
_guard: columns,
_key: key.clone(),
_marker: PhantomData,
})
} else {
None
}
}
#[must_use]
pub fn might_match(&self, key: &PropertyKey, op: CompareOp, value: &Value) -> bool {
let columns = self.columns.read();
columns
.get(key)
.map_or(true, |col| col.might_match(op, value)) }
#[must_use]
pub fn zone_map(&self, key: &PropertyKey) -> Option<ZoneMapEntry> {
let columns = self.columns.read();
columns.get(key).map(|col| col.zone_map().clone())
}
#[must_use]
pub fn might_match_range(
&self,
key: &PropertyKey,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> bool {
let columns = self.columns.read();
columns.get(key).map_or(true, |col| {
col.zone_map()
.might_contain_range(min, max, min_inclusive, max_inclusive)
}) }
pub fn rebuild_zone_maps(&self) {
let mut columns = self.columns.write();
for col in columns.values_mut() {
col.rebuild_zone_map();
}
}
}
impl<Id: EntityId> Default for PropertyStorage<Id> {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "temporal")]
impl<Id: EntityId> PropertyStorage<Id> {
pub(crate) fn columns_write(
&self,
) -> parking_lot::RwLockWriteGuard<'_, FxHashMap<PropertyKey, PropertyColumn<Id>>> {
self.columns.write()
}
#[must_use]
pub fn get_at(&self, id: Id, key: &PropertyKey, epoch: EpochId) -> Option<Value> {
let columns = self.columns.read();
columns.get(key).and_then(|col| col.get_at(id, epoch))
}
#[must_use]
pub fn get_all_at(&self, id: Id, epoch: EpochId) -> FxHashMap<PropertyKey, Value> {
let columns = self.columns.read();
let mut result = FxHashMap::default();
for (key, col) in columns.iter() {
if let Some(value) = col.get_at(id, epoch) {
result.insert(key.clone(), value);
}
}
result
}
pub fn finalize_pending(&self, real_epoch: EpochId) {
let mut columns = self.columns.write();
for col in columns.values_mut() {
col.finalize_pending(real_epoch);
}
}
pub fn remove_pending(&self) {
let mut columns = self.columns.write();
for col in columns.values_mut() {
col.remove_pending();
}
}
pub fn gc(&self, min_epoch: EpochId) {
let mut columns = self.columns.write();
for col in columns.values_mut() {
col.gc(min_epoch);
}
}
#[must_use]
pub fn get_all_history(&self, id: Id) -> Vec<(PropertyKey, Vec<(EpochId, Value)>)> {
let columns = self.columns.read();
let mut result = Vec::new();
for (key, col) in columns.iter() {
if let Some(log) = col.values.get(&id) {
let entries: Vec<(EpochId, Value)> = log
.history()
.iter()
.map(|(epoch, value)| (*epoch, value.clone()))
.collect();
if !entries.is_empty() {
result.push((key.clone(), entries));
}
}
}
result
}
#[must_use]
pub fn get_history(&self, id: Id, key: &PropertyKey) -> Vec<(EpochId, Value)> {
let columns = self.columns.read();
columns
.get(key)
.and_then(|col| col.values.get(&id))
.map(|log| log.history().iter().map(|(e, v)| (*e, v.clone())).collect())
.unwrap_or_default()
}
}
#[cfg(not(feature = "temporal"))]
#[derive(Debug)]
#[non_exhaustive]
pub enum CompressedColumnData {
Integers {
data: CompressedData,
id_to_index: Vec<u64>,
index_to_id: Vec<u64>,
},
Strings {
encoding: DictionaryEncoding,
id_to_index: Vec<u64>,
index_to_id: Vec<u64>,
},
Booleans {
data: CompressedData,
id_to_index: Vec<u64>,
index_to_id: Vec<u64>,
},
}
#[cfg(not(feature = "temporal"))]
impl CompressedColumnData {
#[must_use]
pub fn memory_usage(&self) -> usize {
match self {
CompressedColumnData::Integers {
data,
id_to_index,
index_to_id,
} => {
data.data.len()
+ id_to_index.len() * std::mem::size_of::<u64>()
+ index_to_id.len() * std::mem::size_of::<u64>()
}
CompressedColumnData::Strings {
encoding,
id_to_index,
index_to_id,
} => {
encoding.codes().len() * std::mem::size_of::<u32>()
+ encoding.dictionary().iter().map(|s| s.len()).sum::<usize>()
+ id_to_index.len() * std::mem::size_of::<u64>()
+ index_to_id.len() * std::mem::size_of::<u64>()
}
CompressedColumnData::Booleans {
data,
id_to_index,
index_to_id,
} => {
data.data.len()
+ id_to_index.len() * std::mem::size_of::<u64>()
+ index_to_id.len() * std::mem::size_of::<u64>()
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CompressionStats {
pub uncompressed_size: usize,
pub compressed_size: usize,
pub value_count: usize,
pub codec: Option<CompressionCodec>,
}
impl CompressionStats {
#[must_use]
pub fn compression_ratio(&self) -> f64 {
if self.compressed_size == 0 {
return 1.0;
}
self.uncompressed_size as f64 / self.compressed_size as f64
}
}
pub struct PropertyColumn<Id: EntityId = NodeId> {
#[cfg(not(feature = "temporal"))]
values: FxHashMap<Id, Value>,
#[cfg(feature = "temporal")]
values: FxHashMap<Id, VersionLog<Value>>,
zone_map: ZoneMapEntry,
zone_map_dirty: bool,
compression_mode: CompressionMode,
#[cfg(not(feature = "temporal"))]
compressed: Option<CompressedColumnData>,
#[cfg(not(feature = "temporal"))]
compressed_count: usize,
#[cfg(not(feature = "temporal"))]
spilled: bool,
}
#[cfg(not(feature = "temporal"))]
impl<Id: EntityId> PropertyColumn<Id> {
#[must_use]
pub fn new() -> Self {
Self {
values: FxHashMap::default(),
zone_map: ZoneMapEntry::new(),
zone_map_dirty: false,
compression_mode: CompressionMode::None,
compressed: None,
compressed_count: 0,
spilled: false,
}
}
#[must_use]
pub fn with_compression(mode: CompressionMode) -> Self {
Self {
values: FxHashMap::default(),
zone_map: ZoneMapEntry::new(),
zone_map_dirty: false,
compression_mode: mode,
compressed: None,
compressed_count: 0,
spilled: false,
}
}
pub fn set_compression_mode(&mut self, mode: CompressionMode) {
self.compression_mode = mode;
if mode == CompressionMode::None {
if self.compressed.is_some() {
self.decompress_all();
}
}
}
#[must_use]
pub fn compression_mode(&self) -> CompressionMode {
self.compression_mode
}
pub fn set(&mut self, id: Id, value: Value) {
self.update_zone_map_on_insert(&value);
self.values.insert(id, value);
if self.compression_mode == CompressionMode::Auto {
let total_count = self.values.len() + self.compressed_count;
let hot_buffer_count = self.values.len();
if hot_buffer_count >= HOT_BUFFER_SIZE && total_count >= COMPRESSION_THRESHOLD {
self.compress();
}
}
}
fn update_zone_map_on_insert(&mut self, value: &Value) {
self.zone_map.row_count += 1;
if matches!(value, Value::Null) {
self.zone_map.null_count += 1;
return;
}
match &self.zone_map.min {
None => self.zone_map.min = Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Less) {
self.zone_map.min = Some(value.clone());
}
}
}
match &self.zone_map.max {
None => self.zone_map.max = Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Greater) {
self.zone_map.max = Some(value.clone());
}
}
}
}
#[must_use]
pub fn get(&self, id: Id) -> Option<Value> {
if let Some(value) = self.values.get(&id) {
return Some(value.clone());
}
None
}
pub fn remove(&mut self, id: Id) -> Option<Value> {
let removed = self.values.remove(&id);
if removed.is_some() {
self.zone_map_dirty = true;
}
removed
}
pub fn mark_spilled(&mut self) {
self.spilled = true;
}
#[must_use]
pub fn is_spilled(&self) -> bool {
self.spilled
}
pub fn evict_values(&mut self) -> (usize, usize) {
let count = self.values.len();
let freed_bytes = self.heap_memory_bytes();
self.values.clear();
self.values.shrink_to_fit();
self.compressed = None;
self.compressed_count = 0;
self.spilled = true;
(count, freed_bytes)
}
pub fn drain_values(&mut self) -> Vec<(Id, Value)> {
let drained: Vec<(Id, Value)> = self.values.drain().collect();
self.values.shrink_to_fit();
self.compressed = None;
self.compressed_count = 0;
self.spilled = true;
drained
}
pub fn restore_values(&mut self, values: impl Iterator<Item = (Id, Value)>) {
self.spilled = false;
for (id, value) in values {
self.values.insert(id, value);
}
}
#[must_use]
pub fn len(&self) -> usize {
self.values.len() + self.compressed_count
}
#[cfg(test)]
#[must_use]
pub fn is_empty(&self) -> bool {
self.values.is_empty() && self.compressed_count == 0
}
#[must_use]
pub fn compression_stats(&self) -> CompressionStats {
let hot_size = self.values.len() * std::mem::size_of::<Value>();
let compressed_size = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
let codec = match &self.compressed {
Some(CompressedColumnData::Integers { data, .. }) => Some(data.codec),
Some(CompressedColumnData::Strings { .. }) => Some(CompressionCodec::Dictionary),
Some(CompressedColumnData::Booleans { data, .. }) => Some(data.codec),
None => None,
};
CompressionStats {
uncompressed_size: hot_size + self.compressed_count * std::mem::size_of::<Value>(),
compressed_size: hot_size + compressed_size,
value_count: self.len(),
codec,
}
}
#[must_use]
pub fn heap_memory_bytes(&self) -> usize {
let hot_bytes =
self.values.capacity() * (std::mem::size_of::<Id>() + std::mem::size_of::<Value>() + 1);
let compressed_bytes = self.compressed.as_ref().map_or(0, |c| c.memory_usage());
hot_bytes + compressed_bytes
}
#[must_use]
#[cfg(test)]
pub fn is_compressed(&self) -> bool {
self.compressed.is_some()
}
pub fn compress(&mut self) {
if self.values.is_empty() {
return;
}
if self.compressed.is_some() {
return;
}
let (int_count, str_count, bool_count) = self.count_types();
let total = self.values.len();
if int_count > total / 2 {
self.compress_as_integers();
} else if str_count > total / 2 {
self.compress_as_strings();
} else if bool_count > total / 2 {
self.compress_as_booleans();
}
}
fn count_types(&self) -> (usize, usize, usize) {
let mut int_count = 0;
let mut str_count = 0;
let mut bool_count = 0;
for value in self.values.values() {
match value {
Value::Int64(_) => int_count += 1,
Value::String(_) => str_count += 1,
Value::Bool(_) => bool_count += 1,
_ => {}
}
}
(int_count, str_count, bool_count)
}
fn compress_as_integers(&mut self) {
let mut values: Vec<(u64, i64)> = Vec::new();
let mut non_int_values: FxHashMap<Id, Value> = FxHashMap::default();
for (&id, value) in &self.values {
match value {
Value::Int64(v) => {
let id_u64 = id.as_u64();
values.push((id_u64, *v));
}
_ => {
non_int_values.insert(id, value.clone());
}
}
}
if values.len() < 8 {
return;
}
values.sort_by_key(|(id, _)| *id);
let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
let index_to_id: Vec<u64> = id_to_index.clone();
let int_values: Vec<i64> = values.iter().map(|(_, v)| *v).collect();
let Ok(compressed) = TypeSpecificCompressor::compress_signed_integers(&int_values) else {
return;
};
if compressed.compression_ratio() > 1.2 {
self.compressed = Some(CompressedColumnData::Integers {
data: compressed,
id_to_index,
index_to_id,
});
self.compressed_count = values.len();
self.values = non_int_values;
}
}
fn compress_as_strings(&mut self) {
let mut values: Vec<(u64, ArcStr)> = Vec::new();
let mut non_str_values: FxHashMap<Id, Value> = FxHashMap::default();
for (&id, value) in &self.values {
match value {
Value::String(s) => {
values.push((id.as_u64(), s.clone()));
}
_ => {
non_str_values.insert(id, value.clone());
}
}
}
if values.len() < 8 {
return;
}
values.sort_by_key(|(id, _)| *id);
let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
let index_to_id: Vec<u64> = id_to_index.clone();
let mut builder = DictionaryBuilder::new();
for (_, s) in &values {
builder.add(s.as_ref());
}
let encoding = builder.build();
if encoding.compression_ratio() > 1.2 {
self.compressed = Some(CompressedColumnData::Strings {
encoding,
id_to_index,
index_to_id,
});
self.compressed_count = values.len();
self.values = non_str_values;
}
}
fn compress_as_booleans(&mut self) {
let mut values: Vec<(u64, bool)> = Vec::new();
let mut non_bool_values: FxHashMap<Id, Value> = FxHashMap::default();
for (&id, value) in &self.values {
match value {
Value::Bool(b) => {
values.push((id.as_u64(), *b));
}
_ => {
non_bool_values.insert(id, value.clone());
}
}
}
if values.len() < 8 {
return;
}
values.sort_by_key(|(id, _)| *id);
let id_to_index: Vec<u64> = values.iter().map(|(id, _)| *id).collect();
let index_to_id: Vec<u64> = id_to_index.clone();
let bool_values: Vec<bool> = values.iter().map(|(_, v)| *v).collect();
let Ok(compressed) = TypeSpecificCompressor::compress_booleans(&bool_values) else {
return;
};
self.compressed = Some(CompressedColumnData::Booleans {
data: compressed,
id_to_index,
index_to_id,
});
self.compressed_count = values.len();
self.values = non_bool_values;
}
fn decompress_all(&mut self) {
let Some(compressed) = self.compressed.take() else {
return;
};
match compressed {
CompressedColumnData::Integers {
data, index_to_id, ..
} => {
if let Ok(values) = TypeSpecificCompressor::decompress_integers(&data) {
let signed: Vec<i64> = values
.iter()
.map(|&v| crate::codec::zigzag_decode(v))
.collect();
for (i, id_u64) in index_to_id.iter().enumerate() {
if let Some(&value) = signed.get(i) {
let id = Id::from_u64(*id_u64);
self.values.insert(id, Value::Int64(value));
}
}
}
}
CompressedColumnData::Strings {
encoding,
index_to_id,
..
} => {
for (i, id_u64) in index_to_id.iter().enumerate() {
if let Some(s) = encoding.get(i) {
let id = Id::from_u64(*id_u64);
self.values.insert(id, Value::String(ArcStr::from(s)));
}
}
}
CompressedColumnData::Booleans {
data, index_to_id, ..
} => {
if let Ok(values) = TypeSpecificCompressor::decompress_booleans(&data) {
for (i, id_u64) in index_to_id.iter().enumerate() {
if let Some(&value) = values.get(i) {
let id = Id::from_u64(*id_u64);
self.values.insert(id, Value::Bool(value));
}
}
}
}
}
self.compressed_count = 0;
}
pub fn force_compress(&mut self) {
self.compress();
}
#[must_use]
pub fn zone_map(&self) -> &ZoneMapEntry {
&self.zone_map
}
#[must_use]
pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
if self.zone_map_dirty {
return true;
}
match op {
CompareOp::Eq => self.zone_map.might_contain_equal(value),
CompareOp::Ne => {
match (&self.zone_map.min, &self.zone_map.max) {
(Some(min), Some(max)) => {
!(compare_values(min, value) == Some(Ordering::Equal)
&& compare_values(max, value) == Some(Ordering::Equal))
}
_ => true,
}
}
CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
}
}
pub fn rebuild_zone_map(&mut self) {
let mut zone_map = ZoneMapEntry::new();
for value in self.values.values() {
zone_map.row_count += 1;
if matches!(value, Value::Null) {
zone_map.null_count += 1;
continue;
}
match &zone_map.min {
None => zone_map.min = Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Less) {
zone_map.min = Some(value.clone());
}
}
}
match &zone_map.max {
None => zone_map.max = Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Greater) {
zone_map.max = Some(value.clone());
}
}
}
}
self.zone_map = zone_map;
self.zone_map_dirty = false;
}
}
#[cfg(feature = "temporal")]
impl<Id: EntityId> PropertyColumn<Id> {
#[must_use]
pub fn new() -> Self {
Self {
values: FxHashMap::default(),
zone_map: ZoneMapEntry::new(),
zone_map_dirty: false,
compression_mode: CompressionMode::None,
}
}
#[must_use]
pub fn with_compression(mode: CompressionMode) -> Self {
Self {
values: FxHashMap::default(),
zone_map: ZoneMapEntry::new(),
zone_map_dirty: false,
compression_mode: mode,
}
}
pub fn set_compression_mode(&mut self, mode: CompressionMode) {
self.compression_mode = mode;
}
#[must_use]
pub fn compression_mode(&self) -> CompressionMode {
self.compression_mode
}
pub fn set(&mut self, id: Id, value: Value, epoch: EpochId) {
self.update_zone_map_on_insert(&value);
self.values.entry(id).or_default().append(epoch, value);
}
fn update_zone_map_on_insert(&mut self, value: &Value) {
self.zone_map.row_count += 1;
if matches!(value, Value::Null) {
self.zone_map.null_count += 1;
return;
}
match &self.zone_map.min {
None => self.zone_map.min = Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Less) {
self.zone_map.min = Some(value.clone());
}
}
}
match &self.zone_map.max {
None => self.zone_map.max = Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Greater) {
self.zone_map.max = Some(value.clone());
}
}
}
}
#[must_use]
pub fn get(&self, id: Id) -> Option<Value> {
self.values
.get(&id)
.and_then(|log| log.latest())
.filter(|v| !v.is_null())
.cloned()
}
pub fn remove(&mut self, id: Id, epoch: EpochId) -> Option<Value> {
let previous = self.get(id);
if previous.is_some() {
self.values
.entry(id)
.or_default()
.append(epoch, Value::Null);
self.zone_map_dirty = true;
}
previous
}
#[must_use]
pub fn len(&self) -> usize {
self.values
.values()
.filter(|log| log.latest().is_some_and(|v| !v.is_null()))
.count()
}
#[cfg(test)]
#[must_use]
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn compression_stats(&self) -> CompressionStats {
let live_count = self.len();
let hot_size = live_count * std::mem::size_of::<Value>();
CompressionStats {
uncompressed_size: hot_size,
compressed_size: hot_size,
value_count: live_count,
codec: None,
}
}
#[must_use]
pub fn heap_memory_bytes(&self) -> usize {
self.values.capacity()
* (std::mem::size_of::<Id>() + std::mem::size_of::<VersionLog<Value>>() + 1)
}
pub fn compress(&mut self) {}
pub fn force_compress(&mut self) {}
#[must_use]
pub fn zone_map(&self) -> &ZoneMapEntry {
&self.zone_map
}
#[must_use]
pub fn might_match(&self, op: CompareOp, value: &Value) -> bool {
if self.zone_map_dirty {
return true;
}
match op {
CompareOp::Eq => self.zone_map.might_contain_equal(value),
CompareOp::Ne => match (&self.zone_map.min, &self.zone_map.max) {
(Some(min), Some(max)) => {
!(compare_values(min, value) == Some(Ordering::Equal)
&& compare_values(max, value) == Some(Ordering::Equal))
}
_ => true,
},
CompareOp::Lt => self.zone_map.might_contain_less_than(value, false),
CompareOp::Le => self.zone_map.might_contain_less_than(value, true),
CompareOp::Gt => self.zone_map.might_contain_greater_than(value, false),
CompareOp::Ge => self.zone_map.might_contain_greater_than(value, true),
}
}
pub fn rebuild_zone_map(&mut self) {
let mut zone_map = ZoneMapEntry::new();
for log in self.values.values() {
if let Some(value) = log.latest() {
zone_map.row_count += 1;
if matches!(value, Value::Null) {
zone_map.null_count += 1;
continue;
}
match &zone_map.min {
None => zone_map.min = Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Less) {
zone_map.min = Some(value.clone());
}
}
}
match &zone_map.max {
None => zone_map.max = Some(value.clone()),
Some(current) => {
if compare_values(value, current) == Some(Ordering::Greater) {
zone_map.max = Some(value.clone());
}
}
}
}
}
self.zone_map = zone_map;
self.zone_map_dirty = false;
}
#[must_use]
pub fn get_at(&self, id: Id, epoch: EpochId) -> Option<Value> {
self.values
.get(&id)
.and_then(|log| log.at(epoch))
.filter(|v| !v.is_null())
.cloned()
}
pub fn finalize_pending(&mut self, real_epoch: EpochId) {
for log in self.values.values_mut() {
log.finalize_pending(real_epoch);
}
}
pub fn remove_pending(&mut self) {
for log in self.values.values_mut() {
log.remove_pending();
}
self.values.retain(|_, log| !log.is_empty());
}
pub fn gc(&mut self, min_epoch: EpochId) {
for log in self.values.values_mut() {
log.gc(min_epoch);
}
self.values.retain(|_, log| !log.is_empty());
}
pub fn remove_pending_for(&mut self, id: Id) {
if let Some(log) = self.values.get_mut(&id) {
log.remove_pending();
if log.is_empty() {
self.values.remove(&id);
}
}
}
pub fn pop_n_pending_for(&mut self, id: Id, n: usize) {
if let Some(log) = self.values.get_mut(&id) {
log.pop_n_pending(n);
if log.is_empty() {
self.values.remove(&id);
}
}
}
}
fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
match (a, b) {
(Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
(Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
(Value::String(a), Value::String(b)) => Some(a.cmp(b)),
(Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
(Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
(Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
(Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
(Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
(Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
_ => None,
}
}
impl<Id: EntityId> Default for PropertyColumn<Id> {
fn default() -> Self {
Self::new()
}
}
pub struct PropertyColumnRef<'a, Id: EntityId = NodeId> {
_guard: parking_lot::RwLockReadGuard<'a, FxHashMap<PropertyKey, PropertyColumn<Id>>>,
_key: PropertyKey,
_marker: PhantomData<Id>,
}
#[cfg(test)]
#[cfg(not(feature = "temporal"))]
mod tests {
use super::*;
use arcstr::ArcStr;
#[test]
fn test_property_storage_basic() {
let storage = PropertyStorage::new();
let node1 = NodeId::new(1);
let node2 = NodeId::new(2);
let name_key = PropertyKey::new("name");
let age_key = PropertyKey::new("age");
storage.set(node1, name_key.clone(), "Alix".into());
storage.set(node1, age_key.clone(), 30i64.into());
storage.set(node2, name_key.clone(), "Gus".into());
assert_eq!(
storage.get(node1, &name_key),
Some(Value::String("Alix".into()))
);
assert_eq!(storage.get(node1, &age_key), Some(Value::Int64(30)));
assert_eq!(
storage.get(node2, &name_key),
Some(Value::String("Gus".into()))
);
assert!(storage.get(node2, &age_key).is_none());
}
#[test]
fn test_property_storage_remove() {
let storage = PropertyStorage::new();
let node = NodeId::new(1);
let key = PropertyKey::new("name");
storage.set(node, key.clone(), "Alix".into());
assert!(storage.get(node, &key).is_some());
let removed = storage.remove(node, &key);
assert!(removed.is_some());
assert!(storage.get(node, &key).is_none());
}
#[test]
fn test_property_storage_get_all() {
let storage = PropertyStorage::new();
let node = NodeId::new(1);
storage.set(node, PropertyKey::new("name"), "Alix".into());
storage.set(node, PropertyKey::new("age"), 30i64.into());
storage.set(node, PropertyKey::new("active"), true.into());
let props = storage.get_all(node);
assert_eq!(props.len(), 3);
}
#[test]
fn test_property_storage_remove_all() {
let storage = PropertyStorage::new();
let node = NodeId::new(1);
storage.set(node, PropertyKey::new("name"), "Alix".into());
storage.set(node, PropertyKey::new("age"), 30i64.into());
storage.remove_all(node);
assert!(storage.get(node, &PropertyKey::new("name")).is_none());
assert!(storage.get(node, &PropertyKey::new("age")).is_none());
}
#[test]
fn test_property_column() {
let mut col = PropertyColumn::new();
col.set(NodeId::new(1), "Alix".into());
col.set(NodeId::new(2), "Gus".into());
assert_eq!(col.len(), 2);
assert!(!col.is_empty());
assert_eq!(col.get(NodeId::new(1)), Some(Value::String("Alix".into())));
col.remove(NodeId::new(1));
assert!(col.get(NodeId::new(1)).is_none());
assert_eq!(col.len(), 1);
}
#[test]
fn test_compression_mode() {
let col: PropertyColumn<NodeId> = PropertyColumn::new();
assert_eq!(col.compression_mode(), CompressionMode::None);
let col: PropertyColumn<NodeId> = PropertyColumn::with_compression(CompressionMode::Auto);
assert_eq!(col.compression_mode(), CompressionMode::Auto);
}
#[test]
fn test_property_storage_with_compression() {
let storage = PropertyStorage::with_compression(CompressionMode::Auto);
for i in 0..100 {
storage.set(
NodeId::new(i),
PropertyKey::new("age"),
Value::Int64(20 + (i as i64 % 50)),
);
}
assert_eq!(
storage.get(NodeId::new(0), &PropertyKey::new("age")),
Some(Value::Int64(20))
);
assert_eq!(
storage.get(NodeId::new(50), &PropertyKey::new("age")),
Some(Value::Int64(20))
);
}
#[test]
fn test_compress_integer_column() {
let mut col: PropertyColumn<NodeId> =
PropertyColumn::with_compression(CompressionMode::Auto);
for i in 0..2000 {
col.set(NodeId::new(i), Value::Int64(1000 + i as i64));
}
let stats = col.compression_stats();
assert_eq!(stats.value_count, 2000);
let last_value = col.get(NodeId::new(1999));
assert!(last_value.is_some() || col.is_compressed());
}
#[test]
fn test_compress_string_column() {
let mut col: PropertyColumn<NodeId> =
PropertyColumn::with_compression(CompressionMode::Auto);
let categories = ["Person", "Company", "Product", "Location"];
for i in 0..2000 {
let cat = categories[i % 4];
col.set(NodeId::new(i as u64), Value::String(ArcStr::from(cat)));
}
assert_eq!(col.len(), 2000);
let last_value = col.get(NodeId::new(1999));
assert!(last_value.is_some() || col.is_compressed());
}
#[test]
fn test_compress_boolean_column() {
let mut col: PropertyColumn<NodeId> =
PropertyColumn::with_compression(CompressionMode::Auto);
for i in 0..2000 {
col.set(NodeId::new(i as u64), Value::Bool(i % 2 == 0));
}
assert_eq!(col.len(), 2000);
let last_value = col.get(NodeId::new(1999));
assert!(last_value.is_some() || col.is_compressed());
}
#[test]
fn test_force_compress() {
let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
for i in 0..100 {
col.set(NodeId::new(i), Value::Int64(i as i64));
}
col.force_compress();
let stats = col.compression_stats();
assert_eq!(stats.value_count, 100);
}
#[test]
fn test_compression_stats() {
let mut col: PropertyColumn<NodeId> = PropertyColumn::new();
for i in 0..50 {
col.set(NodeId::new(i), Value::Int64(i as i64));
}
let stats = col.compression_stats();
assert_eq!(stats.value_count, 50);
assert!(stats.uncompressed_size > 0);
}
#[test]
fn test_storage_compression_stats() {
let storage = PropertyStorage::with_compression(CompressionMode::Auto);
for i in 0..100 {
storage.set(
NodeId::new(i),
PropertyKey::new("age"),
Value::Int64(i as i64),
);
storage.set(
NodeId::new(i),
PropertyKey::new("name"),
Value::String(ArcStr::from("Alix")),
);
}
let stats = storage.compression_stats();
assert_eq!(stats.len(), 2); assert!(stats.contains_key(&PropertyKey::new("age")));
assert!(stats.contains_key(&PropertyKey::new("name")));
}
#[test]
fn test_memory_usage() {
let storage = PropertyStorage::new();
for i in 0..100 {
storage.set(
NodeId::new(i),
PropertyKey::new("value"),
Value::Int64(i as i64),
);
}
let usage = storage.memory_usage();
assert!(usage > 0);
}
#[test]
fn test_get_batch_single_property() {
let storage: PropertyStorage<NodeId> = PropertyStorage::new();
let node1 = NodeId::new(1);
let node2 = NodeId::new(2);
let node3 = NodeId::new(3);
let age_key = PropertyKey::new("age");
storage.set(node1, age_key.clone(), 25i64.into());
storage.set(node2, age_key.clone(), 30i64.into());
let ids = vec![node1, node2, node3];
let values = storage.get_batch(&ids, &age_key);
assert_eq!(values.len(), 3);
assert_eq!(values[0], Some(Value::Int64(25)));
assert_eq!(values[1], Some(Value::Int64(30)));
assert_eq!(values[2], None);
}
#[test]
fn test_get_batch_missing_column() {
let storage: PropertyStorage<NodeId> = PropertyStorage::new();
let node1 = NodeId::new(1);
let node2 = NodeId::new(2);
let missing_key = PropertyKey::new("nonexistent");
let ids = vec![node1, node2];
let values = storage.get_batch(&ids, &missing_key);
assert_eq!(values.len(), 2);
assert_eq!(values[0], None);
assert_eq!(values[1], None);
}
#[test]
fn test_get_batch_empty_ids() {
let storage: PropertyStorage<NodeId> = PropertyStorage::new();
let key = PropertyKey::new("any");
let values = storage.get_batch(&[], &key);
assert!(values.is_empty());
}
#[test]
fn test_get_all_batch() {
let storage: PropertyStorage<NodeId> = PropertyStorage::new();
let node1 = NodeId::new(1);
let node2 = NodeId::new(2);
let node3 = NodeId::new(3);
storage.set(node1, PropertyKey::new("name"), "Alix".into());
storage.set(node1, PropertyKey::new("age"), 25i64.into());
storage.set(node2, PropertyKey::new("name"), "Gus".into());
let ids = vec![node1, node2, node3];
let all_props = storage.get_all_batch(&ids);
assert_eq!(all_props.len(), 3);
assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0);
assert_eq!(
all_props[0].get(&PropertyKey::new("name")),
Some(&Value::String("Alix".into()))
);
assert_eq!(
all_props[1].get(&PropertyKey::new("name")),
Some(&Value::String("Gus".into()))
);
}
#[test]
fn test_get_all_batch_empty_ids() {
let storage: PropertyStorage<NodeId> = PropertyStorage::new();
let all_props = storage.get_all_batch(&[]);
assert!(all_props.is_empty());
}
}