dsq_filter/
executor.rs

1//! Filter execution engine for dsq
2//!
3//! This module provides the execution engine that runs compiled filters against
4//! data values, managing the execution lifecycle, error handling, and performance
5//! monitoring.
6
7use crate::compiler::{CompiledFilter, ErrorMode, FilterCompiler, FilterContext};
8use dsq_shared::value::Value;
9use dsq_shared::Result;
10use lru::LruCache;
11use std::num::NonZeroUsize;
12use std::sync::Arc;
13#[cfg(target_arch = "wasm32")]
14use std::time::Duration;
15#[cfg(not(target_arch = "wasm32"))]
16use std::time::{Duration, Instant};
17
18/// Execution configuration options
19#[derive(Debug, Clone)]
20pub struct ExecutorConfig {
21    /// Maximum execution time in milliseconds
22    pub timeout_ms: Option<u64>,
23    /// Error handling mode
24    pub error_mode: ErrorMode,
25    /// Whether to collect execution statistics
26    pub collect_stats: bool,
27    /// Maximum recursion depth
28    pub max_recursion_depth: usize,
29    /// Whether to enable debug mode
30    pub debug_mode: bool,
31    /// Batch size for DataFrame operations
32    pub batch_size: usize,
33    /// Variables available during execution
34    pub variables: std::collections::HashMap<String, Value>,
35    /// Maximum number of compiled filters to cache
36    pub filter_cache_size: usize,
37}
38
39impl Default for ExecutorConfig {
40    fn default() -> Self {
41        Self {
42            timeout_ms: None,
43            error_mode: ErrorMode::Strict,
44            collect_stats: false,
45            max_recursion_depth: 1000,
46            debug_mode: false,
47            batch_size: 10000,
48            variables: std::collections::HashMap::new(),
49            filter_cache_size: 1000, // Cache up to 1000 compiled filters
50        }
51    }
52}
53
54/// Execution mode for different types of operations
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum ExecutionMode {
57    /// Standard execution
58    Standard,
59    /// Lazy evaluation mode
60    Lazy,
61    /// Streaming mode for large datasets
62    Streaming,
63}
64
65/// Result of filter execution
66#[derive(Debug, Clone)]
67pub struct ExecutionResult {
68    /// The output value
69    pub value: Value,
70    /// Execution statistics (if collected)
71    pub stats: Option<ExecutionStats>,
72    /// Any warnings generated during execution
73    pub warnings: Vec<String>,
74}
75
76/// Execution statistics
77#[derive(Debug, Clone)]
78pub struct ExecutionStats {
79    /// Total execution time
80    pub execution_time: Duration,
81    /// Number of operations executed
82    pub operations_executed: usize,
83    /// Peak memory usage in bytes
84    pub peak_memory_bytes: usize,
85    /// Number of function calls
86    pub function_calls: usize,
87    /// Number of DataFrame operations
88    pub dataframe_operations: usize,
89    /// Cache hit rate (0.0 to 1.0)
90    pub cache_hit_rate: f64,
91}
92
93/// Filter executor that runs compiled filters against data
94pub struct FilterExecutor {
95    /// Compiler for filters
96    compiler: FilterCompiler,
97    /// Execution configuration
98    config: ExecutorConfig,
99    /// LRU cache for compiled filters (provides O(1) get/put operations)
100    /// Arc allows cheap cloning for concurrent access
101    filter_cache: LruCache<String, Arc<CompiledFilter>>,
102    /// Cache hit/miss counters
103    cache_hits: usize,
104    cache_misses: usize,
105    /// Statistics accumulator
106    stats_accumulator: Option<ExecutionStats>,
107}
108
109impl FilterExecutor {
110    /// Create a new filter executor with default configuration
111    pub fn new() -> Self {
112        Self::with_config(ExecutorConfig::default())
113    }
114
115    /// Create a new filter executor with custom configuration
116    pub fn with_config(config: ExecutorConfig) -> Self {
117        let collect_stats = config.collect_stats;
118        let cache_size = config.filter_cache_size;
119        // Use LruCache for O(1) get/put operations instead of manual LRU tracking
120        let cache_capacity =
121            NonZeroUsize::new(cache_size).unwrap_or(NonZeroUsize::new(1000).unwrap());
122        Self {
123            compiler: FilterCompiler::new(),
124            config,
125            filter_cache: LruCache::new(cache_capacity),
126            cache_hits: 0,
127            cache_misses: 0,
128            stats_accumulator: if collect_stats {
129                Some(ExecutionStats {
130                    execution_time: Duration::ZERO,
131                    operations_executed: 0,
132                    peak_memory_bytes: 0,
133                    function_calls: 0,
134                    dataframe_operations: 0,
135                    cache_hit_rate: 0.0,
136                })
137            } else {
138                None
139            },
140        }
141    }
142
143    /// Execute a filter string against a value
144    pub fn execute_str(&mut self, filter: &str, input: Value) -> Result<ExecutionResult> {
145        #[cfg(not(target_arch = "wasm32"))]
146        let start_time = Instant::now();
147        let collect_stats = self.config.collect_stats;
148
149        // Check cache and get Arc clone (cheap - just increments reference count)
150        let compiled = if let Some(cached) = self.filter_cache.get(filter) {
151            self.cache_hits += 1;
152            Arc::clone(cached)
153        } else {
154            self.cache_misses += 1;
155
156            #[cfg(feature = "profiling")]
157            coz::progress!("filter_compilation");
158
159            // Compile the filter
160            let compiled = self.compiler.compile_str(filter)?;
161            let arc_compiled = Arc::new(compiled);
162
163            // Insert into cache - LRU eviction handled automatically by LruCache
164            self.filter_cache
165                .put(filter.to_string(), Arc::clone(&arc_compiled));
166
167            arc_compiled
168        };
169
170        // Update cache hit rate in stats
171        if collect_stats {
172            if let Some(ref mut stats) = self.stats_accumulator {
173                let total_requests = self.cache_hits + self.cache_misses;
174                stats.cache_hit_rate = if total_requests > 0 {
175                    self.cache_hits as f64 / total_requests as f64
176                } else {
177                    0.0
178                };
179            }
180        }
181
182        let operations_count = compiled.operations.len();
183
184        #[cfg(feature = "profiling")]
185        coz::progress!("filter_execute_start");
186
187        let mut result = self.execute_compiled(&compiled, input)?;
188
189        #[cfg(feature = "profiling")]
190        coz::progress!("filter_execute_end");
191
192        if collect_stats {
193            if let Some(ref mut stats) = self.stats_accumulator {
194                #[cfg(not(target_arch = "wasm32"))]
195                {
196                    stats.execution_time += start_time.elapsed();
197                }
198                stats.operations_executed += operations_count;
199            }
200            result.stats = self.stats_accumulator.clone();
201        }
202
203        Ok(result)
204    }
205
206    /// Execute a compiled filter against a value
207    pub fn execute_compiled(
208        &self,
209        filter: &CompiledFilter,
210        input: Value,
211    ) -> Result<ExecutionResult> {
212        #[cfg(not(target_arch = "wasm32"))]
213        let start_time = Instant::now();
214
215        // Create execution context
216        let mut context = FilterContext::new();
217        context.set_error_mode(self.config.error_mode);
218        context.set_debug_mode(self.config.debug_mode);
219        context.set_input(input);
220        context.set_functions(filter.functions.clone());
221
222        // Set variables from config
223        for (name, value) in &self.config.variables {
224            context.set_variable(name, value.clone());
225        }
226
227        // Execute operations
228        let mut current_value = context.get_input().cloned().unwrap_or(Value::Null);
229        let mut warnings = Vec::new();
230
231        for operation in &filter.operations {
232            #[cfg(feature = "profiling")]
233            coz::progress!("operation_exec");
234
235            let mut ctx = Some(&mut context as &mut dyn dsq_shared::ops::Context);
236            match operation.apply_with_context(&current_value, &mut ctx) {
237                Ok(new_value) => {
238                    current_value = new_value;
239                }
240                Err(e) => {
241                    match self.config.error_mode {
242                        ErrorMode::Strict => return Err(e),
243                        ErrorMode::Collect => {
244                            warnings.push(format!("Operation failed: {}", e));
245                            // Continue with null value
246                            current_value = Value::Null;
247                        }
248                        ErrorMode::Ignore => {
249                            // Continue with null value
250                            current_value = Value::Null;
251                        }
252                    }
253                }
254            }
255
256            // Check timeout
257            #[cfg(not(target_arch = "wasm32"))]
258            if let Some(timeout_ms) = self.config.timeout_ms {
259                if start_time.elapsed() > Duration::from_millis(timeout_ms) {
260                    return Err(dsq_shared::error::operation_error("Execution timeout"));
261                }
262            }
263        }
264
265        let stats = if self.config.collect_stats {
266            self.stats_accumulator.clone()
267        } else {
268            None
269        };
270
271        Ok(ExecutionResult {
272            value: current_value,
273            stats,
274            warnings,
275        })
276    }
277
278    /// Execute a filter in streaming mode for large datasets
279    pub fn execute_streaming(
280        &mut self,
281        filter: &str,
282        input_stream: impl Iterator<Item = Result<Value>>,
283    ) -> Result<Vec<ExecutionResult>> {
284        let compiled = self.compiler.compile_str(filter)?;
285        let mut results = Vec::new();
286
287        for item_result in input_stream {
288            let input = item_result?;
289            let result = self.execute_compiled(&compiled, input)?;
290            results.push(result);
291
292            // TODO: Check if we should yield control (for async/streaming).
293            //       placeholder for future async implementation
294        }
295
296        Ok(results)
297    }
298
299    /// Validate a filter string without executing it
300    pub fn validate_filter(&self, filter: &str) -> Result<()> {
301        self.compiler.compile_str(filter)?;
302        Ok(())
303    }
304
305    /// Get execution statistics
306    pub fn get_stats(&self) -> Option<&ExecutionStats> {
307        self.stats_accumulator.as_ref()
308    }
309
310    /// Clear the filter cache
311    pub fn clear_cache(&mut self) {
312        self.filter_cache.clear();
313    }
314
315    /// Get cache size
316    pub fn cache_size(&self) -> usize {
317        self.filter_cache.len()
318    }
319
320    /// Precompile and cache a filter
321    pub fn precompile(&mut self, filter: &str) -> Result<()> {
322        let compiled = self.compiler.compile_str(filter)?;
323        self.filter_cache
324            .put(filter.to_string(), Arc::new(compiled));
325        Ok(())
326    }
327
328    /// Set execution configuration
329    pub fn set_config(&mut self, config: ExecutorConfig) {
330        let collect_stats = config.collect_stats;
331        self.config = config;
332        if collect_stats && self.stats_accumulator.is_none() {
333            self.stats_accumulator = Some(ExecutionStats {
334                execution_time: Duration::ZERO,
335                operations_executed: 0,
336                peak_memory_bytes: 0,
337                function_calls: 0,
338                dataframe_operations: 0,
339                cache_hit_rate: 0.0,
340            });
341        } else if !collect_stats {
342            self.stats_accumulator = None;
343        }
344    }
345
346    /// Get current configuration
347    pub fn get_config(&self) -> &ExecutorConfig {
348        &self.config
349    }
350}
351
352impl Default for FilterExecutor {
353    fn default() -> Self {
354        Self::new()
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use dsq_shared::value::Value;
362
363    #[test]
364    fn test_execute_identity_filter() {
365        let mut executor = FilterExecutor::new();
366        let input = Value::int(42);
367        let result = executor.execute_str(".", input.clone()).unwrap();
368
369        assert_eq!(result.value, input);
370        assert!(result.warnings.is_empty());
371    }
372
373    #[test]
374    fn test_execute_field_access() {
375        let mut executor = FilterExecutor::new();
376        let input = Value::object(std::collections::HashMap::from([
377            ("name".to_string(), Value::string("Alice")),
378            ("age".to_string(), Value::int(30)),
379        ]));
380
381        let result = executor.execute_str(".name", input).unwrap();
382        assert_eq!(result.value, Value::string("Alice"));
383    }
384
385    #[test]
386    fn test_filter_validation() {
387        let executor = FilterExecutor::new();
388
389        // Valid filter
390        assert!(executor.validate_filter(".").is_ok());
391        assert!(executor.validate_filter(".name").is_ok());
392
393        // Invalid filter
394        assert!(executor.validate_filter("invalid syntax +++").is_err());
395    }
396
397    #[test]
398    fn test_cache_functionality() {
399        let mut executor = FilterExecutor::new();
400
401        // First execution should compile
402        let input = Value::int(42);
403        let result1 = executor.execute_str(". + 1", input.clone()).unwrap();
404        assert_eq!(result1.value, Value::int(43));
405
406        // Second execution should use cache
407        let result2 = executor.execute_str(". + 1", input).unwrap();
408        assert_eq!(result2.value, Value::int(43));
409
410        assert_eq!(executor.cache_size(), 1);
411    }
412
413    #[test]
414    fn test_error_handling() {
415        let mut executor = FilterExecutor::new();
416
417        // Test with strict error mode (default)
418        let input = Value::int(42);
419        let result = executor.execute_str(".invalid_field", input);
420        assert!(result.is_err());
421
422        // Test with ignore error mode
423        let config = ExecutorConfig {
424            error_mode: ErrorMode::Ignore,
425            ..Default::default()
426        };
427        executor.set_config(config);
428
429        let input = Value::int(42);
430        let result = executor.execute_str(".invalid_field", input).unwrap();
431        assert_eq!(result.value, Value::Null);
432    }
433
434    #[test]
435    fn test_assignment_operation() {
436        let mut executor = FilterExecutor::new();
437
438        // Test field assignment on object
439        let mut obj = std::collections::HashMap::new();
440        obj.insert("salary".to_string(), Value::int(75000));
441        obj.insert("name".to_string(), Value::string("Alice"));
442        let input = Value::object(obj);
443
444        let result = executor.execute_str(".salary += 5000", input).unwrap();
445
446        if let Value::Object(result_obj) = result.value {
447            assert_eq!(result_obj.get("salary"), Some(&Value::int(80000)));
448            assert_eq!(result_obj.get("name"), Some(&Value::string("Alice")));
449        } else {
450            panic!("Expected object result");
451        }
452    }
453
454    #[test]
455    fn test_assignment_in_map_pipeline() {
456        let mut executor = FilterExecutor::new();
457
458        // Test the query from example_095: map(.salary += 5000) | map({name, new_salary: .salary, department})
459        let mut obj = std::collections::HashMap::new();
460        obj.insert("id".to_string(), Value::int(1));
461        obj.insert("name".to_string(), Value::string("Alice Johnson"));
462        obj.insert("age".to_string(), Value::int(28));
463        obj.insert("city".to_string(), Value::string("New York"));
464        obj.insert("salary".to_string(), Value::int(75000));
465        obj.insert("department".to_string(), Value::string("Engineering"));
466        let input = Value::Array(vec![Value::Object(obj)]);
467
468        let result = executor
469            .execute_str(
470                r#"map(.salary += 5000) | map({name, new_salary: .salary, department})"#,
471                input,
472            )
473            .unwrap();
474
475        if let Value::Array(arr) = result.value {
476            assert_eq!(arr.len(), 1);
477            if let Value::Object(obj) = &arr[0] {
478                assert_eq!(obj.get("name"), Some(&Value::string("Alice Johnson")));
479                assert_eq!(obj.get("new_salary"), Some(&Value::int(80000)));
480                assert_eq!(obj.get("department"), Some(&Value::string("Engineering")));
481            } else {
482                panic!("Expected object in array");
483            }
484        } else {
485            panic!("Expected array result");
486        }
487    }
488
489    #[test]
490    fn test_stats_collection() {
491        let config = ExecutorConfig {
492            collect_stats: true,
493            ..Default::default()
494        };
495        let mut executor = FilterExecutor::with_config(config);
496
497        let input = Value::int(42);
498        let result = executor.execute_str(".", input).unwrap();
499
500        let stats = result.stats.unwrap();
501        assert!(stats.execution_time > Duration::ZERO);
502        assert!(stats.operations_executed > 0);
503        // Other stats are initialized to 0 and not updated yet
504        assert_eq!(stats.peak_memory_bytes, 0);
505        assert_eq!(stats.function_calls, 0);
506        assert_eq!(stats.dataframe_operations, 0);
507        assert_eq!(stats.cache_hit_rate, 0.0);
508    }
509
510    #[test]
511    fn test_streaming_execution() {
512        let mut executor = FilterExecutor::new();
513        let inputs = vec![Ok(Value::int(1)), Ok(Value::int(2)), Ok(Value::int(3))];
514
515        let results = executor.execute_streaming(".", inputs.into_iter()).unwrap();
516        assert_eq!(results.len(), 3);
517        assert_eq!(results[0].value, Value::int(1));
518        assert_eq!(results[1].value, Value::int(2));
519        assert_eq!(results[2].value, Value::int(3));
520    }
521
522    #[test]
523    fn test_precompile() {
524        let mut executor = FilterExecutor::new();
525        executor.precompile(". + 1").unwrap();
526        assert_eq!(executor.cache_size(), 1);
527
528        let input = Value::int(42);
529        let result = executor.execute_str(". + 1", input).unwrap();
530        assert_eq!(result.value, Value::int(43));
531    }
532
533    #[test]
534    fn test_clear_cache() {
535        let mut executor = FilterExecutor::new();
536        executor.execute_str(".", Value::int(1)).unwrap();
537        assert_eq!(executor.cache_size(), 1);
538
539        executor.clear_cache();
540        assert_eq!(executor.cache_size(), 0);
541    }
542
543    #[test]
544    fn test_config_management() {
545        let mut executor = FilterExecutor::new();
546        let config = executor.get_config();
547        assert_eq!(config.timeout_ms, None);
548
549        let new_config = ExecutorConfig {
550            timeout_ms: Some(1000),
551            ..Default::default()
552        };
553        executor.set_config(new_config);
554
555        let config = executor.get_config();
556        assert_eq!(config.timeout_ms, Some(1000));
557    }
558
559    #[test]
560    fn test_error_collect_mode() {
561        let mut executor = FilterExecutor::new();
562        let config = ExecutorConfig {
563            error_mode: ErrorMode::Collect,
564            ..Default::default()
565        };
566        executor.set_config(config);
567
568        let input = Value::int(42);
569        let result = executor.execute_str(".invalid_field", input).unwrap();
570        assert_eq!(result.value, Value::Null);
571        assert!(!result.warnings.is_empty());
572        assert!(result.warnings[0].contains("Operation failed"));
573    }
574
575    #[test]
576    fn test_timeout_configuration() {
577        let mut executor = FilterExecutor::new();
578        let config = ExecutorConfig {
579            timeout_ms: Some(1000),
580            ..Default::default()
581        };
582        executor.set_config(config);
583
584        let input = Value::int(42);
585        let result = executor.execute_str(".", input).unwrap();
586        // Should succeed since operation is fast
587        assert_eq!(result.value, Value::int(42));
588    }
589}