interstellar 0.2.0

A high-performance graph database with Gremlin-style traversals and GQL query language
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
//! Execution context for traversal operations.
//!
//! The `ExecutionContext` provides graph access at execution time, decoupling
//! traversal construction from graph binding. This is key to supporting
//! anonymous traversals - graph access is provided when the traversal executes,
//! not when it's constructed.
//!
//! # SnapshotLike Trait
//!
//! This module also defines the [`SnapshotLike`] trait, which abstracts over
//! different snapshot types. Both `GraphSnapshot` and COW snapshot types
//! implement this trait, enabling unified traversal and GQL execution.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;

use crate::storage::interner::StringInterner;
use crate::storage::GraphStorage;
use crate::value::Value;

// =============================================================================
// SnapshotLike Trait
// =============================================================================

/// A trait for types that provide snapshot-like access to graph data.
///
/// This trait abstracts over different snapshot implementations:
/// - `GraphSnapshot<'g>` - Borrows from a `Graph` with read lock
/// - `CowSnapshot` - Owned snapshot with structural sharing
/// - `CowMmapSnapshot` - Persistent snapshot with mmap backing
///
/// Both `GraphTraversalSource` and the GQL compiler use this trait to
/// work with any snapshot type generically.
///
/// # Example
///
/// ```ignore
/// fn count_vertices<S: SnapshotLike>(snapshot: &S) -> u64 {
///     snapshot.storage().vertex_count()
/// }
/// ```
pub trait SnapshotLike {
    /// Get a reference to the underlying graph storage.
    fn storage(&self) -> &dyn GraphStorage;

    /// Get a reference to the string interner for label resolution.
    fn interner(&self) -> &StringInterner;

    /// Get a reference to self as a trait object.
    ///
    /// This enables coercion from generic `&S where S: SnapshotLike` to
    /// `&dyn SnapshotLike` for storage in structures that need trait object references.
    fn as_dyn(&self) -> &dyn SnapshotLike;

    /// Get Arc-wrapped storage for streaming execution.
    ///
    /// This enables streaming pipelines to own the storage without lifetime
    /// constraints. The default implementation clones `self` into an Arc if
    /// the type supports it.
    ///
    /// # Panics
    ///
    /// The default implementation panics. Types that support streaming must
    /// override this method.
    fn arc_storage(&self) -> Arc<dyn GraphStorage + Send + Sync> {
        panic!("arc_storage() not implemented for this snapshot type - streaming not supported")
    }

    /// Get Arc-wrapped interner for streaming execution.
    ///
    /// This enables streaming pipelines to own the interner without lifetime
    /// constraints.
    ///
    /// # Panics
    ///
    /// The default implementation panics. Types that support streaming must
    /// override this method.
    fn arc_interner(&self) -> Arc<StringInterner> {
        panic!("arc_interner() not implemented for this snapshot type - streaming not supported")
    }

    /// Get Arc-wrapped streamable storage for true O(1) streaming execution.
    ///
    /// This enables streaming pipelines to use `StreamableStorage` methods
    /// like `stream_out_neighbors()` which return owned iterators without
    /// upfront collection.
    ///
    /// # Panics
    ///
    /// The default implementation panics. Types that support streaming must
    /// override this method.
    fn arc_streamable(&self) -> Arc<dyn crate::storage::StreamableStorage> {
        panic!("arc_streamable() not implemented for this snapshot type - streaming not supported")
    }

    /// Get a reference to the subscription manager for reactive queries.
    ///
    /// Returns `None` by default. Override for snapshot types that
    /// support reactive subscriptions.
    #[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
    fn subscription_manager(&self) -> Option<&crate::traversal::reactive::SubscriptionManager> {
        None
    }

    /// Get a factory function that creates fresh snapshots of the current
    /// graph state. Used by reactive subscriptions for re-evaluation.
    ///
    /// Returns `None` by default. Override for snapshot types that
    /// support reactive subscriptions.
    #[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
    fn reactive_snapshot_fn(
        &self,
    ) -> Option<
        std::sync::Arc<dyn Fn() -> Box<dyn SnapshotLike + Send> + Send + Sync>,
    > {
        None
    }
}

/// Execution context passed to steps at runtime.
///
/// This is the key to supporting anonymous traversals - graph access
/// is provided when the traversal executes, not when it's constructed.
///
/// The context is generic over the storage type `S`, allowing it to work
/// with any type that implements `GraphStorage`. This enables both
/// `GraphSnapshot` and COW snapshot types to use the same traversal engine.
///
/// # Type Parameters
///
/// * `'g` - Lifetime of the borrowed storage and interner
/// * `S` - The storage type, defaults to `dyn GraphStorage` for trait object usage
///
/// # Example
///
/// ```ignore
/// let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
/// let label_id = ctx.resolve_label("person");
/// ```
pub struct ExecutionContext<'g, S: GraphStorage + ?Sized + 'g = dyn GraphStorage + 'g> {
    /// Graph storage for consistent reads
    storage: &'g S,
    /// String interner for label lookups
    interner: &'g StringInterner,
    /// Side effects storage (for store(), aggregate(), etc.)
    pub side_effects: SideEffects,
    /// Whether to automatically track paths for navigation steps
    pub track_paths: bool,
}

impl<'g, S: GraphStorage + ?Sized + 'g> ExecutionContext<'g, S> {
    /// Create a new execution context.
    ///
    /// # Arguments
    ///
    /// * `storage` - Graph storage for consistent reads
    /// * `interner` - String interner for label resolution
    pub fn new(storage: &'g S, interner: &'g StringInterner) -> Self {
        Self {
            storage,
            interner,
            side_effects: SideEffects::new(),
            track_paths: false,
        }
    }

    /// Create a new execution context with path tracking enabled.
    ///
    /// When path tracking is enabled, navigation steps automatically
    /// record elements to the traverser's path.
    ///
    /// # Arguments
    ///
    /// * `storage` - Graph storage for consistent reads
    /// * `interner` - String interner for label resolution
    pub fn with_path_tracking(storage: &'g S, interner: &'g StringInterner) -> Self {
        Self {
            storage,
            interner,
            side_effects: SideEffects::new(),
            track_paths: true,
        }
    }

    /// Check if path tracking is enabled.
    #[inline]
    pub fn is_tracking_paths(&self) -> bool {
        self.track_paths
    }

    /// Get the graph storage.
    #[inline]
    pub fn storage(&self) -> &'g S {
        self.storage
    }

    /// Get the string interner.
    #[inline]
    pub fn interner(&self) -> &'g StringInterner {
        self.interner
    }

    /// Resolve a label string to its interned ID.
    ///
    /// Returns `None` if the label has not been interned (i.e., doesn't exist
    /// in the graph).
    #[inline]
    pub fn resolve_label(&self, label: &str) -> Option<u32> {
        self.interner.lookup(label)
    }

    /// Resolve multiple labels to their interned IDs.
    ///
    /// Labels that don't exist are filtered out.
    pub fn resolve_labels(&self, labels: &[&str]) -> Vec<u32> {
        labels
            .iter()
            .filter_map(|l| self.interner.lookup(l))
            .collect()
    }

    /// Get label string from ID.
    #[inline]
    pub fn get_label(&self, id: u32) -> Option<&str> {
        self.interner.resolve(id)
    }
}

/// Storage for traversal side effects.
///
/// Used by steps like `store()`, `aggregate()`, `sack()`, etc.
///
/// # Thread Safety
///
/// Uses interior mutability via `RwLock` to allow mutation through
/// shared references (since `ExecutionContext` is passed as `&'a`).
/// This enables side-effect steps to accumulate data during traversal.
///
/// # Cloning
///
/// `SideEffects` uses internal `Arc` wrapping for cheap cloning.
/// All clones share the same underlying data, enabling streaming
/// pipelines to share side effect state.
///
/// # Example
///
/// ```ignore
/// let side_effects = SideEffects::new();
/// side_effects.store("collected", Value::Int(42));
/// let values = side_effects.get("collected");
/// ```
#[derive(Clone, Default)]
pub struct SideEffects {
    /// Internal state wrapped in Arc for cheap cloning.
    inner: Arc<SideEffectsInner>,
}

/// Internal state for SideEffects.
#[derive(Default)]
struct SideEffectsInner {
    /// Named collections of values
    collections: RwLock<HashMap<String, Vec<Value>>>,
    /// Arbitrary side effect data
    data: RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>,
}

impl SideEffects {
    /// Create a new empty side effects store.
    pub fn new() -> Self {
        Self {
            inner: Arc::new(SideEffectsInner::default()),
        }
    }

    /// Store a value in a named collection.
    ///
    /// Values are appended to the collection (multiple values per key).
    pub fn store(&self, key: &str, value: Value) {
        self.inner
            .collections
            .write()
            .entry(key.to_string())
            .or_default()
            .push(value);
    }

    /// Get values from a named collection (returns a clone).
    ///
    /// Returns `None` if the key doesn't exist.
    pub fn get(&self, key: &str) -> Option<Vec<Value>> {
        self.inner.collections.read().get(key).cloned()
    }

    /// Get values from a named collection by reference (for iteration).
    ///
    /// Returns a guard that holds the read lock.
    ///
    /// # Note
    ///
    /// The returned guard holds the read lock. Use sparingly and drop
    /// the guard as soon as possible to avoid blocking other operations.
    pub fn get_ref(&self, key: &str) -> Option<parking_lot::MappedRwLockReadGuard<'_, Vec<Value>>> {
        let guard = self.inner.collections.read();
        if guard.contains_key(key) {
            Some(parking_lot::RwLockReadGuard::map(guard, |m| {
                m.get(key).unwrap()
            }))
        } else {
            None
        }
    }

    /// Store arbitrary typed data.
    ///
    /// Overwrites any existing value with the same key.
    pub fn set_data<T: Any + Send + Sync>(&self, key: &str, value: T) {
        self.inner
            .data
            .write()
            .insert(key.to_string(), Box::new(value));
    }

    /// Get arbitrary typed data (clones if `T: Clone`).
    ///
    /// Returns `None` if the key doesn't exist or the type doesn't match.
    pub fn get_data<T: Any + Clone>(&self, key: &str) -> Option<T> {
        self.inner
            .data
            .read()
            .get(key)
            .and_then(|v| v.downcast_ref::<T>())
            .cloned()
    }

    /// Check if a collection key exists.
    pub fn contains_key(&self, key: &str) -> bool {
        self.inner.collections.read().contains_key(key)
    }

    /// Get the number of values in a collection.
    pub fn collection_len(&self, key: &str) -> usize {
        self.inner
            .collections
            .read()
            .get(key)
            .map(|v| v.len())
            .unwrap_or(0)
    }

    /// Clear all side effects.
    pub fn clear(&self) {
        self.inner.collections.write().clear();
        self.inner.data.write().clear();
    }

    /// Get all collection keys.
    pub fn keys(&self) -> Vec<String> {
        self.inner.collections.read().keys().cloned().collect()
    }
}

// =============================================================================
// StreamingContext - Owned context for streaming pipelines
// =============================================================================

/// Owned execution context for streaming pipelines.
///
/// Unlike [`ExecutionContext`] which borrows storage and interner,
/// `StreamingContext` owns Arc references that can be cloned into
/// iterator closures for `'static` lifetimes.
///
/// This is the key enabler for true O(1) streaming: steps can clone
/// this context into their returned iterators without lifetime constraints.
///
/// # Cloning
///
/// `StreamingContext` is designed for cheap cloning. All internal state
/// is `Arc`-wrapped, so cloning only increments reference counts.
///
/// # Example
///
/// ```ignore
/// let ctx = StreamingContext::new(storage, interner);
/// let iter = step.apply_streaming(ctx.clone(), traverser);
/// // `iter` is 'static - can be stored, returned, etc.
/// ```
#[derive(Clone)]
pub struct StreamingContext {
    /// Streamable storage (shared ownership) - provides both GraphStorage
    /// and streaming methods like stream_out_neighbors()
    storage: Arc<dyn crate::storage::StreamableStorage>,
    /// String interner (shared ownership)
    interner: Arc<StringInterner>,
    /// Side effects (already Arc-wrapped internally)
    side_effects: SideEffects,
    /// Whether to track traversal paths
    track_paths: bool,
}

impl StreamingContext {
    /// Create a new streaming context.
    ///
    /// # Arguments
    ///
    /// * `storage` - Arc-wrapped streamable storage
    /// * `interner` - Arc-wrapped string interner
    pub fn new(
        storage: Arc<dyn crate::storage::StreamableStorage>,
        interner: Arc<StringInterner>,
    ) -> Self {
        Self {
            storage,
            interner,
            side_effects: SideEffects::new(),
            track_paths: false,
        }
    }

    /// Enable or disable path tracking.
    pub fn with_path_tracking(mut self, enabled: bool) -> Self {
        self.track_paths = enabled;
        self
    }

    /// Set the side effects store.
    ///
    /// Use this to share side effects across the streaming pipeline.
    pub fn with_side_effects(mut self, side_effects: SideEffects) -> Self {
        self.side_effects = side_effects;
        self
    }

    /// Check if path tracking is enabled.
    #[inline]
    pub fn is_tracking_paths(&self) -> bool {
        self.track_paths
    }

    /// Get a reference to the graph storage.
    #[inline]
    pub fn storage(&self) -> &dyn GraphStorage {
        &*self.storage
    }

    /// Get a reference to the streamable storage for streaming methods.
    ///
    /// Use this for methods like `stream_out_neighbors()` that return
    /// owned iterators without upfront collection.
    #[inline]
    pub fn streamable_storage(&self) -> &dyn crate::storage::StreamableStorage {
        &*self.storage
    }

    /// Get the Arc-wrapped streamable storage for cloning into iterators.
    #[inline]
    pub fn arc_streamable(&self) -> Arc<dyn crate::storage::StreamableStorage> {
        Arc::clone(&self.storage)
    }

    /// Get the Arc-wrapped storage as GraphStorage for cloning into iterators.
    #[inline]
    pub fn arc_storage(&self) -> Arc<dyn GraphStorage + Send + Sync> {
        // StreamableStorage: GraphStorage, so this cast is safe
        self.storage.clone() as Arc<dyn GraphStorage + Send + Sync>
    }

    /// Get a reference to the string interner.
    #[inline]
    pub fn interner(&self) -> &StringInterner {
        &self.interner
    }

    /// Get the Arc-wrapped interner for cloning into iterators.
    #[inline]
    pub fn arc_interner(&self) -> Arc<StringInterner> {
        Arc::clone(&self.interner)
    }

    /// Get a reference to the side effects store.
    #[inline]
    pub fn side_effects(&self) -> &SideEffects {
        &self.side_effects
    }

    /// Resolve a label string to its interned ID.
    ///
    /// Returns `None` if the label has not been interned (i.e., doesn't exist
    /// in the graph).
    #[inline]
    pub fn resolve_label(&self, label: &str) -> Option<u32> {
        self.interner.lookup(label)
    }

    /// Resolve multiple labels to their interned IDs.
    ///
    /// Labels that don't exist are filtered out.
    pub fn resolve_labels(&self, labels: &[&str]) -> Vec<u32> {
        labels
            .iter()
            .filter_map(|l| self.interner.lookup(l))
            .collect()
    }

    /// Get label string from ID.
    #[inline]
    pub fn get_label(&self, id: u32) -> Option<&str> {
        self.interner.resolve(id)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn side_effects_new_is_empty() {
        let se = SideEffects::new();
        assert!(se.keys().is_empty());
        assert_eq!(se.get("nonexistent"), None);
    }

    #[test]
    fn side_effects_store_and_get() {
        let se = SideEffects::new();

        se.store("numbers", Value::Int(1));
        se.store("numbers", Value::Int(2));
        se.store("numbers", Value::Int(3));

        let values = se.get("numbers").unwrap();
        assert_eq!(values.len(), 3);
        assert_eq!(values[0], Value::Int(1));
        assert_eq!(values[1], Value::Int(2));
        assert_eq!(values[2], Value::Int(3));
    }

    #[test]
    fn side_effects_get_ref() {
        let se = SideEffects::new();
        se.store("items", Value::String("hello".to_string()));
        se.store("items", Value::String("world".to_string()));

        {
            let guard = se.get_ref("items").unwrap();
            assert_eq!(guard.len(), 2);
            assert_eq!(guard[0], Value::String("hello".to_string()));
        }

        // Guard dropped, can access again
        assert_eq!(se.collection_len("items"), 2);
    }

    #[test]
    fn side_effects_get_ref_missing_key() {
        let se = SideEffects::new();
        assert!(se.get_ref("missing").is_none());
    }

    #[test]
    fn side_effects_set_and_get_data() {
        let se = SideEffects::new();

        se.set_data("count", 42i32);
        se.set_data("name", "Alice".to_string());

        assert_eq!(se.get_data::<i32>("count"), Some(42));
        assert_eq!(se.get_data::<String>("name"), Some("Alice".to_string()));
    }

    #[test]
    fn side_effects_get_data_wrong_type() {
        let se = SideEffects::new();
        se.set_data("count", 42i32);

        // Wrong type returns None
        assert_eq!(se.get_data::<String>("count"), None);
        assert_eq!(se.get_data::<i64>("count"), None);
    }

    #[test]
    fn side_effects_get_data_missing_key() {
        let se = SideEffects::new();
        assert_eq!(se.get_data::<i32>("missing"), None);
    }

    #[test]
    fn side_effects_contains_key() {
        let se = SideEffects::new();
        assert!(!se.contains_key("test"));

        se.store("test", Value::Null);
        assert!(se.contains_key("test"));
    }

    #[test]
    fn side_effects_collection_len() {
        let se = SideEffects::new();
        assert_eq!(se.collection_len("items"), 0);

        se.store("items", Value::Int(1));
        assert_eq!(se.collection_len("items"), 1);

        se.store("items", Value::Int(2));
        assert_eq!(se.collection_len("items"), 2);
    }

    #[test]
    fn side_effects_clear() {
        let se = SideEffects::new();
        se.store("a", Value::Int(1));
        se.store("b", Value::Int(2));
        se.set_data("c", 3i32);

        se.clear();

        assert!(se.keys().is_empty());
        assert_eq!(se.get("a"), None);
        assert_eq!(se.get("b"), None);
        assert_eq!(se.get_data::<i32>("c"), None);
    }

    #[test]
    fn side_effects_keys() {
        let se = SideEffects::new();
        se.store("alpha", Value::Int(1));
        se.store("beta", Value::Int(2));
        se.store("gamma", Value::Int(3));

        let mut keys = se.keys();
        keys.sort();

        assert_eq!(keys, vec!["alpha", "beta", "gamma"]);
    }

    #[test]
    fn side_effects_multiple_stores_same_key() {
        let se = SideEffects::new();

        for i in 0..100 {
            se.store("many", Value::Int(i));
        }

        assert_eq!(se.collection_len("many"), 100);
        let values = se.get("many").unwrap();
        for (i, v) in values.iter().enumerate() {
            assert_eq!(*v, Value::Int(i as i64));
        }
    }

    #[test]
    fn side_effects_set_data_overwrites() {
        let se = SideEffects::new();

        se.set_data("key", 1i32);
        assert_eq!(se.get_data::<i32>("key"), Some(1));

        se.set_data("key", 2i32);
        assert_eq!(se.get_data::<i32>("key"), Some(2));
    }

    // Tests for ExecutionContext require integration with Graph
    mod execution_context_tests {
        use super::*;
        use crate::storage::Graph;
        use std::collections::HashMap;

        fn create_test_graph() -> Graph {
            let graph = Graph::new();

            // Add vertices with different labels
            graph.add_vertex("person", {
                let mut props = HashMap::new();
                props.insert("name".to_string(), Value::String("Alice".to_string()));
                props
            });
            graph.add_vertex("person", {
                let mut props = HashMap::new();
                props.insert("name".to_string(), Value::String("Bob".to_string()));
                props
            });
            graph.add_vertex("software", {
                let mut props = HashMap::new();
                props.insert("name".to_string(), Value::String("Graph DB".to_string()));
                props
            });

            graph
        }

        #[test]
        fn execution_context_new_compiles() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let _ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
            // If this compiles and doesn't panic, the test passes
        }

        #[test]
        fn execution_context_resolve_label_existing() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());

            // "person" label exists (added vertices with this label)
            let person_id = ctx.resolve_label("person");
            assert!(person_id.is_some());

            // "software" label exists
            let software_id = ctx.resolve_label("software");
            assert!(software_id.is_some());

            // Different labels should have different IDs
            assert_ne!(person_id, software_id);
        }

        #[test]
        fn execution_context_resolve_label_missing() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());

            // "unknown" label was never added
            let unknown_id = ctx.resolve_label("unknown");
            assert!(unknown_id.is_none());
        }

        #[test]
        fn execution_context_resolve_labels_multiple() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());

            // Resolve multiple labels at once
            let ids = ctx.resolve_labels(&["person", "software", "unknown"]);

            // Should return 2 IDs (unknown is filtered out)
            assert_eq!(ids.len(), 2);
        }

        #[test]
        fn execution_context_resolve_labels_all_missing() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());

            let ids = ctx.resolve_labels(&["unknown1", "unknown2"]);
            assert!(ids.is_empty());
        }

        #[test]
        fn execution_context_get_label() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());

            // First resolve to get the ID
            let person_id = ctx.resolve_label("person").unwrap();

            // Then get the string back
            let label_str = ctx.get_label(person_id);
            assert_eq!(label_str, Some("person"));
        }

        #[test]
        fn execution_context_get_label_missing() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());

            // ID that doesn't exist
            let label_str = ctx.get_label(999);
            assert!(label_str.is_none());
        }

        #[test]
        fn execution_context_storage_accessor() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());

            // Should be able to access the storage through the context
            let storage = ctx.storage();
            assert_eq!(storage.vertex_count(), 3);
        }

        #[test]
        fn execution_context_interner_accessor() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());

            // Should be able to access the interner through the context
            let interner = ctx.interner();

            // Interner should have the same labels
            assert!(interner.lookup("person").is_some());
        }

        #[test]
        fn execution_context_side_effects_accessible() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());

            // Side effects should be accessible and usable
            ctx.side_effects.store("test", Value::Int(42));
            let values = ctx.side_effects.get("test");
            assert_eq!(values, Some(vec![Value::Int(42)]));
        }
    }

    #[test]
    fn side_effects_clone_shares_data() {
        let se1 = SideEffects::new();
        se1.store("key", Value::Int(1));

        // Clone and verify shared data
        let se2 = se1.clone();
        assert_eq!(se2.get("key"), Some(vec![Value::Int(1)]));

        // Modify through clone
        se2.store("key", Value::Int(2));

        // Original should see the change
        assert_eq!(se1.get("key"), Some(vec![Value::Int(1), Value::Int(2)]));
    }

    mod streaming_context_tests {
        use super::*;
        use crate::storage::Graph;
        use std::collections::HashMap;

        fn create_test_graph() -> Graph {
            let graph = Graph::new();

            graph.add_vertex("person", {
                let mut props = HashMap::new();
                props.insert("name".to_string(), Value::String("Alice".to_string()));
                props
            });
            graph.add_vertex("person", {
                let mut props = HashMap::new();
                props.insert("name".to_string(), Value::String("Bob".to_string()));
                props
            });
            graph.add_vertex("software", {
                let mut props = HashMap::new();
                props.insert("name".to_string(), Value::String("Graph DB".to_string()));
                props
            });

            graph
        }

        #[test]
        fn streaming_context_new_compiles() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let _ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
            // If this compiles and doesn't panic, the test passes
        }

        #[test]
        fn streaming_context_is_cloneable() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx1 = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
            let ctx2 = ctx1.clone();

            // Both should resolve the same label
            assert_eq!(ctx1.resolve_label("person"), ctx2.resolve_label("person"));
        }

        #[test]
        fn streaming_context_with_path_tracking() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner())
                .with_path_tracking(true);

            assert!(ctx.is_tracking_paths());
        }

        #[test]
        fn streaming_context_with_side_effects() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let side_effects = SideEffects::new();
            side_effects.store("test", Value::Int(42));

            let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner())
                .with_side_effects(side_effects);

            assert_eq!(ctx.side_effects().get("test"), Some(vec![Value::Int(42)]));
        }

        #[test]
        fn streaming_context_resolve_label() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());

            assert!(ctx.resolve_label("person").is_some());
            assert!(ctx.resolve_label("software").is_some());
            assert!(ctx.resolve_label("unknown").is_none());
        }

        #[test]
        fn streaming_context_resolve_labels() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());

            let ids = ctx.resolve_labels(&["person", "software", "unknown"]);
            assert_eq!(ids.len(), 2); // unknown filtered out
        }

        #[test]
        fn streaming_context_storage_accessor() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());

            assert_eq!(ctx.storage().vertex_count(), 3);
        }

        #[test]
        fn streaming_context_arc_accessors() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());

            // Arc accessors should return cloneable Arc references
            let _storage = ctx.arc_storage();
            let _interner = ctx.arc_interner();
        }

        #[test]
        fn streaming_context_clones_share_side_effects() {
            let graph = create_test_graph();
            let snapshot = graph.snapshot();
            let ctx1 = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());

            // Clone the context
            let ctx2 = ctx1.clone();

            // Store through one clone
            ctx1.side_effects().store("shared", Value::Int(1));

            // Other clone should see the change
            assert_eq!(ctx2.side_effects().get("shared"), Some(vec![Value::Int(1)]));
        }
    }
}