manifoldb_query/exec/
context.rs

1//! Execution context for query execution.
2//!
3//! The execution context provides access to transaction state,
4//! query parameters, and runtime configuration.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Instant;
10
11use manifoldb_core::{CollectionId, EntityId, Value};
12use manifoldb_vector::{Embedding, SearchResult, VectorData, VectorError};
13
14/// A trait for providing access to vector indexes.
15///
16/// This trait allows type-erased access to vector indexes for query execution.
17/// Implementations can wrap HNSW indexes or other vector index types.
18pub trait VectorIndexProvider: Send + Sync {
19    /// Search for nearest neighbors in the specified index.
20    ///
21    /// # Arguments
22    ///
23    /// * `index_name` - The name of the vector index to search
24    /// * `query` - The query embedding
25    /// * `k` - Number of nearest neighbors to return
26    /// * `ef_search` - Optional HNSW ef_search parameter
27    ///
28    /// # Returns
29    ///
30    /// A vector of search results sorted by distance, or an error if the
31    /// index is not found or the search fails.
32    fn search(
33        &self,
34        index_name: &str,
35        query: &Embedding,
36        k: usize,
37        ef_search: Option<usize>,
38    ) -> Result<Vec<SearchResult>, VectorError>;
39
40    /// Check if a vector index exists.
41    fn has_index(&self, index_name: &str) -> bool;
42
43    /// Get the dimension of vectors in the specified index.
44    fn dimension(&self, index_name: &str) -> Option<usize>;
45}
46
47/// A trait for providing collection-based vector operations.
48///
49/// This trait allows type-erased access to the separated vector storage
50/// used by collections with named vectors. Unlike `VectorIndexProvider` which
51/// works with entity-property-based vectors, this trait works with the new
52/// collection-based vector storage architecture.
53pub trait CollectionVectorProvider: Send + Sync {
54    /// Store a vector for an entity in a collection.
55    ///
56    /// # Arguments
57    ///
58    /// * `collection_id` - The collection ID
59    /// * `entity_id` - The entity ID
60    /// * `collection_name` - The collection name (for index lookup)
61    /// * `vector_name` - The named vector within the collection
62    /// * `data` - The vector data to store
63    fn upsert_vector(
64        &self,
65        collection_id: CollectionId,
66        entity_id: EntityId,
67        collection_name: &str,
68        vector_name: &str,
69        data: &VectorData,
70    ) -> Result<(), VectorError>;
71
72    /// Delete a vector from storage and any associated index.
73    fn delete_vector(
74        &self,
75        collection_id: CollectionId,
76        entity_id: EntityId,
77        collection_name: &str,
78        vector_name: &str,
79    ) -> Result<bool, VectorError>;
80
81    /// Delete all vectors for an entity in a collection.
82    fn delete_entity_vectors(
83        &self,
84        collection_id: CollectionId,
85        entity_id: EntityId,
86        collection_name: &str,
87    ) -> Result<usize, VectorError>;
88
89    /// Get a vector from storage.
90    fn get_vector(
91        &self,
92        collection_id: CollectionId,
93        entity_id: EntityId,
94        vector_name: &str,
95    ) -> Result<Option<VectorData>, VectorError>;
96
97    /// Get all vectors for an entity.
98    fn get_all_vectors(
99        &self,
100        collection_id: CollectionId,
101        entity_id: EntityId,
102    ) -> Result<std::collections::HashMap<String, VectorData>, VectorError>;
103
104    /// Search for similar vectors using HNSW (if index exists).
105    fn search(
106        &self,
107        collection_name: &str,
108        vector_name: &str,
109        query: &Embedding,
110        k: usize,
111        ef_search: Option<usize>,
112    ) -> Result<Vec<SearchResult>, VectorError>;
113}
114
115use super::graph_accessor::{GraphAccessor, NullGraphAccessor};
116
117/// Execution context for a query.
118///
119/// The context provides access to:
120/// - Query parameters (bound values for placeholders)
121/// - Cancellation support
122/// - Execution statistics
123/// - Runtime configuration
124/// - Graph storage access (for graph traversal queries)
125/// - Vector index access (optional)
126/// - Collection vector storage access (for named vectors)
127pub struct ExecutionContext {
128    /// Query parameters (1-indexed).
129    parameters: HashMap<u32, Value>,
130    /// Whether the query has been cancelled.
131    cancelled: AtomicBool,
132    /// Execution statistics.
133    stats: ExecutionStats,
134    /// Configuration options.
135    config: ExecutionConfig,
136    /// Graph accessor for graph traversal operations.
137    graph: Arc<dyn GraphAccessor>,
138    /// Optional vector index provider for HNSW searches (entity-property based).
139    vector_index_provider: Option<Arc<dyn VectorIndexProvider>>,
140    /// Optional collection vector provider for named vector storage.
141    collection_vector_provider: Option<Arc<dyn CollectionVectorProvider>>,
142}
143
144impl ExecutionContext {
145    /// Creates a new execution context without graph storage.
146    ///
147    /// Use [`with_graph`](Self::with_graph) to add graph storage access.
148    #[must_use]
149    pub fn new() -> Self {
150        Self {
151            parameters: HashMap::new(),
152            cancelled: AtomicBool::new(false),
153            stats: ExecutionStats::new(),
154            config: ExecutionConfig::default(),
155            graph: Arc::new(NullGraphAccessor),
156            vector_index_provider: None,
157            collection_vector_provider: None,
158        }
159    }
160
161    /// Creates a context with parameters.
162    #[must_use]
163    pub fn with_parameters(parameters: HashMap<u32, Value>) -> Self {
164        Self {
165            parameters,
166            cancelled: AtomicBool::new(false),
167            stats: ExecutionStats::new(),
168            config: ExecutionConfig::default(),
169            graph: Arc::new(NullGraphAccessor),
170            vector_index_provider: None,
171            collection_vector_provider: None,
172        }
173    }
174
175    /// Sets the graph accessor for graph traversal operations.
176    ///
177    /// This enables the query executor to perform actual graph traversals
178    /// using the underlying storage.
179    #[must_use]
180    pub fn with_graph(mut self, graph: Arc<dyn GraphAccessor>) -> Self {
181        self.graph = graph;
182        self
183    }
184
185    /// Returns a reference to the graph accessor.
186    #[inline]
187    #[must_use]
188    pub fn graph(&self) -> &dyn GraphAccessor {
189        self.graph.as_ref()
190    }
191
192    /// Returns the graph accessor as an Arc.
193    #[inline]
194    #[must_use]
195    pub fn graph_arc(&self) -> Arc<dyn GraphAccessor> {
196        Arc::clone(&self.graph)
197    }
198
199    /// Creates a context with a vector index provider.
200    #[must_use]
201    pub fn with_vector_index_provider(mut self, provider: Arc<dyn VectorIndexProvider>) -> Self {
202        self.vector_index_provider = Some(provider);
203        self
204    }
205
206    /// Sets the vector index provider.
207    pub fn set_vector_index_provider(&mut self, provider: Arc<dyn VectorIndexProvider>) {
208        self.vector_index_provider = Some(provider);
209    }
210
211    /// Returns a reference to the vector index provider if one is set.
212    #[must_use]
213    pub fn vector_index_provider(&self) -> Option<&dyn VectorIndexProvider> {
214        self.vector_index_provider.as_deref()
215    }
216
217    /// Returns a clone of the vector index provider Arc if one is set.
218    ///
219    /// This is useful for operators that need to hold onto the provider
220    /// for the duration of their execution.
221    #[must_use]
222    pub fn vector_index_provider_arc(&self) -> Option<Arc<dyn VectorIndexProvider>> {
223        self.vector_index_provider.clone()
224    }
225
226    /// Sets the collection vector provider for named vector storage.
227    #[must_use]
228    pub fn with_collection_vector_provider(
229        mut self,
230        provider: Arc<dyn CollectionVectorProvider>,
231    ) -> Self {
232        self.collection_vector_provider = Some(provider);
233        self
234    }
235
236    /// Sets the collection vector provider.
237    pub fn set_collection_vector_provider(&mut self, provider: Arc<dyn CollectionVectorProvider>) {
238        self.collection_vector_provider = Some(provider);
239    }
240
241    /// Returns a reference to the collection vector provider if one is set.
242    #[must_use]
243    pub fn collection_vector_provider(&self) -> Option<&dyn CollectionVectorProvider> {
244        self.collection_vector_provider.as_deref()
245    }
246
247    /// Returns a clone of the collection vector provider Arc if one is set.
248    #[must_use]
249    pub fn collection_vector_provider_arc(&self) -> Option<Arc<dyn CollectionVectorProvider>> {
250        self.collection_vector_provider.clone()
251    }
252
253    /// Adds a parameter value.
254    pub fn set_parameter(&mut self, index: u32, value: Value) {
255        self.parameters.insert(index, value);
256    }
257
258    /// Gets a parameter value.
259    #[inline]
260    #[must_use]
261    pub fn get_parameter(&self, index: u32) -> Option<&Value> {
262        self.parameters.get(&index)
263    }
264
265    /// Returns all parameters.
266    #[inline]
267    #[must_use]
268    pub fn parameters(&self) -> &HashMap<u32, Value> {
269        &self.parameters
270    }
271
272    /// Cancels the query execution.
273    #[inline]
274    pub fn cancel(&self) {
275        self.cancelled.store(true, Ordering::SeqCst);
276    }
277
278    /// Checks if the query has been cancelled.
279    #[inline]
280    #[must_use]
281    pub fn is_cancelled(&self) -> bool {
282        self.cancelled.load(Ordering::SeqCst)
283    }
284
285    /// Returns the execution statistics.
286    #[inline]
287    #[must_use]
288    pub fn stats(&self) -> &ExecutionStats {
289        &self.stats
290    }
291
292    /// Returns the configuration.
293    #[inline]
294    #[must_use]
295    pub fn config(&self) -> &ExecutionConfig {
296        &self.config
297    }
298
299    /// Returns mutable configuration.
300    pub fn config_mut(&mut self) -> &mut ExecutionConfig {
301        &mut self.config
302    }
303
304    /// Records that rows were read.
305    #[inline]
306    pub fn record_rows_read(&self, count: u64) {
307        self.stats.rows_read.fetch_add(count, Ordering::Relaxed);
308    }
309
310    /// Records that rows were produced.
311    #[inline]
312    pub fn record_rows_produced(&self, count: u64) {
313        self.stats.rows_produced.fetch_add(count, Ordering::Relaxed);
314    }
315
316    /// Records that rows were filtered.
317    #[inline]
318    pub fn record_rows_filtered(&self, count: u64) {
319        self.stats.rows_filtered.fetch_add(count, Ordering::Relaxed);
320    }
321
322    /// Sets the execution configuration.
323    #[must_use]
324    pub fn with_config(mut self, config: ExecutionConfig) -> Self {
325        self.config = config;
326        self
327    }
328
329    /// Returns the maximum rows in memory limit.
330    ///
331    /// Returns 0 if the limit is disabled.
332    #[inline]
333    #[must_use]
334    pub fn max_rows_in_memory(&self) -> usize {
335        self.config.max_rows_in_memory
336    }
337}
338
339impl Default for ExecutionContext {
340    fn default() -> Self {
341        Self::new()
342    }
343}
344
345impl std::fmt::Debug for ExecutionContext {
346    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347        f.debug_struct("ExecutionContext")
348            .field("parameters", &self.parameters)
349            .field("cancelled", &self.cancelled)
350            .field("stats", &self.stats)
351            .field("config", &self.config)
352            .field("graph", &"<GraphAccessor>")
353            .field("vector_index_provider", &self.vector_index_provider.is_some())
354            .finish_non_exhaustive()
355    }
356}
357
358/// Execution statistics collected during query execution.
359#[derive(Debug)]
360pub struct ExecutionStats {
361    /// When execution started.
362    start_time: Instant,
363    /// Number of rows read from storage.
364    rows_read: AtomicU64,
365    /// Number of rows produced by the query.
366    rows_produced: AtomicU64,
367    /// Number of rows filtered out.
368    rows_filtered: AtomicU64,
369}
370
371impl ExecutionStats {
372    /// Creates new execution statistics.
373    #[must_use]
374    pub fn new() -> Self {
375        Self {
376            start_time: Instant::now(),
377            rows_read: AtomicU64::new(0),
378            rows_produced: AtomicU64::new(0),
379            rows_filtered: AtomicU64::new(0),
380        }
381    }
382
383    /// Returns the number of rows read.
384    #[inline]
385    #[must_use]
386    pub fn rows_read(&self) -> u64 {
387        self.rows_read.load(Ordering::Relaxed)
388    }
389
390    /// Returns the number of rows produced.
391    #[inline]
392    #[must_use]
393    pub fn rows_produced(&self) -> u64 {
394        self.rows_produced.load(Ordering::Relaxed)
395    }
396
397    /// Returns the number of rows filtered.
398    #[inline]
399    #[must_use]
400    pub fn rows_filtered(&self) -> u64 {
401        self.rows_filtered.load(Ordering::Relaxed)
402    }
403
404    /// Returns the elapsed execution time.
405    #[inline]
406    #[must_use]
407    pub fn elapsed(&self) -> std::time::Duration {
408        self.start_time.elapsed()
409    }
410}
411
412impl Default for ExecutionStats {
413    fn default() -> Self {
414        Self::new()
415    }
416}
417
418/// Default maximum rows in memory (1 million rows).
419pub const DEFAULT_MAX_ROWS_IN_MEMORY: usize = 1_000_000;
420
421/// Configuration options for query execution.
422#[derive(Debug, Clone)]
423pub struct ExecutionConfig {
424    /// Maximum number of rows to buffer in memory.
425    pub max_batch_size: usize,
426    /// Whether to collect detailed statistics.
427    pub collect_stats: bool,
428    /// Memory limit in bytes (0 for no limit).
429    pub memory_limit: usize,
430    /// Maximum number of rows that operators can materialize in memory.
431    ///
432    /// This limit applies to blocking operators like sort, join, and aggregate
433    /// that need to collect rows before producing output. When an operator
434    /// exceeds this limit, it returns a `QueryTooLarge` error.
435    ///
436    /// Set to 0 to disable the limit (not recommended for production).
437    /// Default: 1,000,000 rows.
438    pub max_rows_in_memory: usize,
439}
440
441impl ExecutionConfig {
442    /// Creates a new configuration with defaults.
443    #[must_use]
444    pub const fn new() -> Self {
445        Self {
446            max_batch_size: 1024,
447            collect_stats: false,
448            memory_limit: 0,
449            max_rows_in_memory: DEFAULT_MAX_ROWS_IN_MEMORY,
450        }
451    }
452
453    /// Sets the maximum batch size.
454    #[must_use]
455    pub const fn with_batch_size(mut self, size: usize) -> Self {
456        self.max_batch_size = size;
457        self
458    }
459
460    /// Enables statistics collection.
461    #[must_use]
462    pub const fn with_stats(mut self) -> Self {
463        self.collect_stats = true;
464        self
465    }
466
467    /// Sets the memory limit.
468    #[must_use]
469    pub const fn with_memory_limit(mut self, limit: usize) -> Self {
470        self.memory_limit = limit;
471        self
472    }
473
474    /// Sets the maximum rows that can be materialized in memory.
475    ///
476    /// This limit applies to blocking operators like sort, join, and aggregate.
477    /// Set to 0 to disable the limit (not recommended for production).
478    #[must_use]
479    pub const fn with_max_rows_in_memory(mut self, limit: usize) -> Self {
480        self.max_rows_in_memory = limit;
481        self
482    }
483}
484
485impl Default for ExecutionConfig {
486    fn default() -> Self {
487        Self::new()
488    }
489}
490
491/// A handle for cancelling query execution.
492///
493/// Can be shared between threads to allow cancellation from outside
494/// the query execution thread.
495#[derive(Debug, Clone)]
496pub struct CancellationToken {
497    cancelled: Arc<AtomicBool>,
498}
499
500impl CancellationToken {
501    /// Creates a new cancellation token.
502    #[must_use]
503    pub fn new() -> Self {
504        Self { cancelled: Arc::new(AtomicBool::new(false)) }
505    }
506
507    /// Cancels the associated query.
508    #[inline]
509    pub fn cancel(&self) {
510        self.cancelled.store(true, Ordering::SeqCst);
511    }
512
513    /// Checks if cancellation was requested.
514    #[inline]
515    #[must_use]
516    pub fn is_cancelled(&self) -> bool {
517        self.cancelled.load(Ordering::SeqCst)
518    }
519}
520
521impl Default for CancellationToken {
522    fn default() -> Self {
523        Self::new()
524    }
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530
531    #[test]
532    fn context_parameters() {
533        let mut ctx = ExecutionContext::new();
534        ctx.set_parameter(1, Value::Int(42));
535        ctx.set_parameter(2, Value::from("hello"));
536
537        assert_eq!(ctx.get_parameter(1), Some(&Value::Int(42)));
538        assert_eq!(ctx.get_parameter(2), Some(&Value::from("hello")));
539        assert_eq!(ctx.get_parameter(3), None);
540    }
541
542    #[test]
543    fn context_cancellation() {
544        let ctx = ExecutionContext::new();
545        assert!(!ctx.is_cancelled());
546        ctx.cancel();
547        assert!(ctx.is_cancelled());
548    }
549
550    #[test]
551    fn context_stats() {
552        let ctx = ExecutionContext::new();
553        ctx.record_rows_read(100);
554        ctx.record_rows_produced(50);
555        ctx.record_rows_filtered(50);
556
557        assert_eq!(ctx.stats().rows_read(), 100);
558        assert_eq!(ctx.stats().rows_produced(), 50);
559        assert_eq!(ctx.stats().rows_filtered(), 50);
560    }
561
562    #[test]
563    fn cancellation_token() {
564        let token = CancellationToken::new();
565        assert!(!token.is_cancelled());
566
567        let token2 = token.clone();
568        token.cancel();
569
570        assert!(token.is_cancelled());
571        assert!(token2.is_cancelled());
572    }
573}