use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use crate::storage::interner::StringInterner;
use crate::storage::GraphStorage;
use crate::value::Value;
pub trait SnapshotLike {
fn storage(&self) -> &dyn GraphStorage;
fn interner(&self) -> &StringInterner;
fn as_dyn(&self) -> &dyn SnapshotLike;
fn arc_storage(&self) -> Arc<dyn GraphStorage + Send + Sync> {
panic!("arc_storage() not implemented for this snapshot type - streaming not supported")
}
fn arc_interner(&self) -> Arc<StringInterner> {
panic!("arc_interner() not implemented for this snapshot type - streaming not supported")
}
fn arc_streamable(&self) -> Arc<dyn crate::storage::StreamableStorage> {
panic!("arc_streamable() not implemented for this snapshot type - streaming not supported")
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
fn subscription_manager(&self) -> Option<&crate::traversal::reactive::SubscriptionManager> {
None
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
fn reactive_snapshot_fn(
&self,
) -> Option<
std::sync::Arc<dyn Fn() -> Box<dyn SnapshotLike + Send> + Send + Sync>,
> {
None
}
}
pub struct ExecutionContext<'g, S: GraphStorage + ?Sized + 'g = dyn GraphStorage + 'g> {
storage: &'g S,
interner: &'g StringInterner,
pub side_effects: SideEffects,
pub track_paths: bool,
}
impl<'g, S: GraphStorage + ?Sized + 'g> ExecutionContext<'g, S> {
pub fn new(storage: &'g S, interner: &'g StringInterner) -> Self {
Self {
storage,
interner,
side_effects: SideEffects::new(),
track_paths: false,
}
}
pub fn with_path_tracking(storage: &'g S, interner: &'g StringInterner) -> Self {
Self {
storage,
interner,
side_effects: SideEffects::new(),
track_paths: true,
}
}
#[inline]
pub fn is_tracking_paths(&self) -> bool {
self.track_paths
}
#[inline]
pub fn storage(&self) -> &'g S {
self.storage
}
#[inline]
pub fn interner(&self) -> &'g StringInterner {
self.interner
}
#[inline]
pub fn resolve_label(&self, label: &str) -> Option<u32> {
self.interner.lookup(label)
}
pub fn resolve_labels(&self, labels: &[&str]) -> Vec<u32> {
labels
.iter()
.filter_map(|l| self.interner.lookup(l))
.collect()
}
#[inline]
pub fn get_label(&self, id: u32) -> Option<&str> {
self.interner.resolve(id)
}
}
#[derive(Clone, Default)]
pub struct SideEffects {
inner: Arc<SideEffectsInner>,
}
#[derive(Default)]
struct SideEffectsInner {
collections: RwLock<HashMap<String, Vec<Value>>>,
data: RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>,
}
impl SideEffects {
pub fn new() -> Self {
Self {
inner: Arc::new(SideEffectsInner::default()),
}
}
pub fn store(&self, key: &str, value: Value) {
self.inner
.collections
.write()
.entry(key.to_string())
.or_default()
.push(value);
}
pub fn get(&self, key: &str) -> Option<Vec<Value>> {
self.inner.collections.read().get(key).cloned()
}
pub fn get_ref(&self, key: &str) -> Option<parking_lot::MappedRwLockReadGuard<'_, Vec<Value>>> {
let guard = self.inner.collections.read();
if guard.contains_key(key) {
Some(parking_lot::RwLockReadGuard::map(guard, |m| {
m.get(key).unwrap()
}))
} else {
None
}
}
pub fn set_data<T: Any + Send + Sync>(&self, key: &str, value: T) {
self.inner
.data
.write()
.insert(key.to_string(), Box::new(value));
}
pub fn get_data<T: Any + Clone>(&self, key: &str) -> Option<T> {
self.inner
.data
.read()
.get(key)
.and_then(|v| v.downcast_ref::<T>())
.cloned()
}
pub fn contains_key(&self, key: &str) -> bool {
self.inner.collections.read().contains_key(key)
}
pub fn collection_len(&self, key: &str) -> usize {
self.inner
.collections
.read()
.get(key)
.map(|v| v.len())
.unwrap_or(0)
}
pub fn clear(&self) {
self.inner.collections.write().clear();
self.inner.data.write().clear();
}
pub fn keys(&self) -> Vec<String> {
self.inner.collections.read().keys().cloned().collect()
}
}
#[derive(Clone)]
pub struct StreamingContext {
storage: Arc<dyn crate::storage::StreamableStorage>,
interner: Arc<StringInterner>,
side_effects: SideEffects,
track_paths: bool,
}
impl StreamingContext {
pub fn new(
storage: Arc<dyn crate::storage::StreamableStorage>,
interner: Arc<StringInterner>,
) -> Self {
Self {
storage,
interner,
side_effects: SideEffects::new(),
track_paths: false,
}
}
pub fn with_path_tracking(mut self, enabled: bool) -> Self {
self.track_paths = enabled;
self
}
pub fn with_side_effects(mut self, side_effects: SideEffects) -> Self {
self.side_effects = side_effects;
self
}
#[inline]
pub fn is_tracking_paths(&self) -> bool {
self.track_paths
}
#[inline]
pub fn storage(&self) -> &dyn GraphStorage {
&*self.storage
}
#[inline]
pub fn streamable_storage(&self) -> &dyn crate::storage::StreamableStorage {
&*self.storage
}
#[inline]
pub fn arc_streamable(&self) -> Arc<dyn crate::storage::StreamableStorage> {
Arc::clone(&self.storage)
}
#[inline]
pub fn arc_storage(&self) -> Arc<dyn GraphStorage + Send + Sync> {
self.storage.clone() as Arc<dyn GraphStorage + Send + Sync>
}
#[inline]
pub fn interner(&self) -> &StringInterner {
&self.interner
}
#[inline]
pub fn arc_interner(&self) -> Arc<StringInterner> {
Arc::clone(&self.interner)
}
#[inline]
pub fn side_effects(&self) -> &SideEffects {
&self.side_effects
}
#[inline]
pub fn resolve_label(&self, label: &str) -> Option<u32> {
self.interner.lookup(label)
}
pub fn resolve_labels(&self, labels: &[&str]) -> Vec<u32> {
labels
.iter()
.filter_map(|l| self.interner.lookup(l))
.collect()
}
#[inline]
pub fn get_label(&self, id: u32) -> Option<&str> {
self.interner.resolve(id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn side_effects_new_is_empty() {
let se = SideEffects::new();
assert!(se.keys().is_empty());
assert_eq!(se.get("nonexistent"), None);
}
#[test]
fn side_effects_store_and_get() {
let se = SideEffects::new();
se.store("numbers", Value::Int(1));
se.store("numbers", Value::Int(2));
se.store("numbers", Value::Int(3));
let values = se.get("numbers").unwrap();
assert_eq!(values.len(), 3);
assert_eq!(values[0], Value::Int(1));
assert_eq!(values[1], Value::Int(2));
assert_eq!(values[2], Value::Int(3));
}
#[test]
fn side_effects_get_ref() {
let se = SideEffects::new();
se.store("items", Value::String("hello".to_string()));
se.store("items", Value::String("world".to_string()));
{
let guard = se.get_ref("items").unwrap();
assert_eq!(guard.len(), 2);
assert_eq!(guard[0], Value::String("hello".to_string()));
}
assert_eq!(se.collection_len("items"), 2);
}
#[test]
fn side_effects_get_ref_missing_key() {
let se = SideEffects::new();
assert!(se.get_ref("missing").is_none());
}
#[test]
fn side_effects_set_and_get_data() {
let se = SideEffects::new();
se.set_data("count", 42i32);
se.set_data("name", "Alice".to_string());
assert_eq!(se.get_data::<i32>("count"), Some(42));
assert_eq!(se.get_data::<String>("name"), Some("Alice".to_string()));
}
#[test]
fn side_effects_get_data_wrong_type() {
let se = SideEffects::new();
se.set_data("count", 42i32);
assert_eq!(se.get_data::<String>("count"), None);
assert_eq!(se.get_data::<i64>("count"), None);
}
#[test]
fn side_effects_get_data_missing_key() {
let se = SideEffects::new();
assert_eq!(se.get_data::<i32>("missing"), None);
}
#[test]
fn side_effects_contains_key() {
let se = SideEffects::new();
assert!(!se.contains_key("test"));
se.store("test", Value::Null);
assert!(se.contains_key("test"));
}
#[test]
fn side_effects_collection_len() {
let se = SideEffects::new();
assert_eq!(se.collection_len("items"), 0);
se.store("items", Value::Int(1));
assert_eq!(se.collection_len("items"), 1);
se.store("items", Value::Int(2));
assert_eq!(se.collection_len("items"), 2);
}
#[test]
fn side_effects_clear() {
let se = SideEffects::new();
se.store("a", Value::Int(1));
se.store("b", Value::Int(2));
se.set_data("c", 3i32);
se.clear();
assert!(se.keys().is_empty());
assert_eq!(se.get("a"), None);
assert_eq!(se.get("b"), None);
assert_eq!(se.get_data::<i32>("c"), None);
}
#[test]
fn side_effects_keys() {
let se = SideEffects::new();
se.store("alpha", Value::Int(1));
se.store("beta", Value::Int(2));
se.store("gamma", Value::Int(3));
let mut keys = se.keys();
keys.sort();
assert_eq!(keys, vec!["alpha", "beta", "gamma"]);
}
#[test]
fn side_effects_multiple_stores_same_key() {
let se = SideEffects::new();
for i in 0..100 {
se.store("many", Value::Int(i));
}
assert_eq!(se.collection_len("many"), 100);
let values = se.get("many").unwrap();
for (i, v) in values.iter().enumerate() {
assert_eq!(*v, Value::Int(i as i64));
}
}
#[test]
fn side_effects_set_data_overwrites() {
let se = SideEffects::new();
se.set_data("key", 1i32);
assert_eq!(se.get_data::<i32>("key"), Some(1));
se.set_data("key", 2i32);
assert_eq!(se.get_data::<i32>("key"), Some(2));
}
mod execution_context_tests {
use super::*;
use crate::storage::Graph;
use std::collections::HashMap;
fn create_test_graph() -> Graph {
let graph = Graph::new();
graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Alice".to_string()));
props
});
graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Bob".to_string()));
props
});
graph.add_vertex("software", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Graph DB".to_string()));
props
});
graph
}
#[test]
fn execution_context_new_compiles() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let _ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
}
#[test]
fn execution_context_resolve_label_existing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let person_id = ctx.resolve_label("person");
assert!(person_id.is_some());
let software_id = ctx.resolve_label("software");
assert!(software_id.is_some());
assert_ne!(person_id, software_id);
}
#[test]
fn execution_context_resolve_label_missing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let unknown_id = ctx.resolve_label("unknown");
assert!(unknown_id.is_none());
}
#[test]
fn execution_context_resolve_labels_multiple() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let ids = ctx.resolve_labels(&["person", "software", "unknown"]);
assert_eq!(ids.len(), 2);
}
#[test]
fn execution_context_resolve_labels_all_missing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let ids = ctx.resolve_labels(&["unknown1", "unknown2"]);
assert!(ids.is_empty());
}
#[test]
fn execution_context_get_label() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let person_id = ctx.resolve_label("person").unwrap();
let label_str = ctx.get_label(person_id);
assert_eq!(label_str, Some("person"));
}
#[test]
fn execution_context_get_label_missing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let label_str = ctx.get_label(999);
assert!(label_str.is_none());
}
#[test]
fn execution_context_storage_accessor() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let storage = ctx.storage();
assert_eq!(storage.vertex_count(), 3);
}
#[test]
fn execution_context_interner_accessor() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let interner = ctx.interner();
assert!(interner.lookup("person").is_some());
}
#[test]
fn execution_context_side_effects_accessible() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
ctx.side_effects.store("test", Value::Int(42));
let values = ctx.side_effects.get("test");
assert_eq!(values, Some(vec![Value::Int(42)]));
}
}
#[test]
fn side_effects_clone_shares_data() {
let se1 = SideEffects::new();
se1.store("key", Value::Int(1));
let se2 = se1.clone();
assert_eq!(se2.get("key"), Some(vec![Value::Int(1)]));
se2.store("key", Value::Int(2));
assert_eq!(se1.get("key"), Some(vec![Value::Int(1), Value::Int(2)]));
}
mod streaming_context_tests {
use super::*;
use crate::storage::Graph;
use std::collections::HashMap;
fn create_test_graph() -> Graph {
let graph = Graph::new();
graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Alice".to_string()));
props
});
graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Bob".to_string()));
props
});
graph.add_vertex("software", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Graph DB".to_string()));
props
});
graph
}
#[test]
fn streaming_context_new_compiles() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let _ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
}
#[test]
fn streaming_context_is_cloneable() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx1 = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let ctx2 = ctx1.clone();
assert_eq!(ctx1.resolve_label("person"), ctx2.resolve_label("person"));
}
#[test]
fn streaming_context_with_path_tracking() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner())
.with_path_tracking(true);
assert!(ctx.is_tracking_paths());
}
#[test]
fn streaming_context_with_side_effects() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let side_effects = SideEffects::new();
side_effects.store("test", Value::Int(42));
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner())
.with_side_effects(side_effects);
assert_eq!(ctx.side_effects().get("test"), Some(vec![Value::Int(42)]));
}
#[test]
fn streaming_context_resolve_label() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
assert!(ctx.resolve_label("person").is_some());
assert!(ctx.resolve_label("software").is_some());
assert!(ctx.resolve_label("unknown").is_none());
}
#[test]
fn streaming_context_resolve_labels() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let ids = ctx.resolve_labels(&["person", "software", "unknown"]);
assert_eq!(ids.len(), 2); }
#[test]
fn streaming_context_storage_accessor() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
assert_eq!(ctx.storage().vertex_count(), 3);
}
#[test]
fn streaming_context_arc_accessors() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let _storage = ctx.arc_storage();
let _interner = ctx.arc_interner();
}
#[test]
fn streaming_context_clones_share_side_effects() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx1 = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let ctx2 = ctx1.clone();
ctx1.side_effects().store("shared", Value::Int(1));
assert_eq!(ctx2.side_effects().get("shared"), Some(vec![Value::Int(1)]));
}
}
}