1use crate::compiler::{CompiledFilter, ErrorMode, FilterCompiler, FilterContext};
8use dsq_shared::value::Value;
9use dsq_shared::Result;
10use lru::LruCache;
11use std::num::NonZeroUsize;
12use std::sync::Arc;
13#[cfg(target_arch = "wasm32")]
14use std::time::Duration;
15#[cfg(not(target_arch = "wasm32"))]
16use std::time::{Duration, Instant};
17
18#[derive(Debug, Clone)]
20pub struct ExecutorConfig {
21 pub timeout_ms: Option<u64>,
23 pub error_mode: ErrorMode,
25 pub collect_stats: bool,
27 pub max_recursion_depth: usize,
29 pub debug_mode: bool,
31 pub batch_size: usize,
33 pub variables: std::collections::HashMap<String, Value>,
35 pub filter_cache_size: usize,
37}
38
39impl Default for ExecutorConfig {
40 fn default() -> Self {
41 Self {
42 timeout_ms: None,
43 error_mode: ErrorMode::Strict,
44 collect_stats: false,
45 max_recursion_depth: 1000,
46 debug_mode: false,
47 batch_size: 10000,
48 variables: std::collections::HashMap::new(),
49 filter_cache_size: 1000, }
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum ExecutionMode {
57 Standard,
59 Lazy,
61 Streaming,
63}
64
65#[derive(Debug, Clone)]
67pub struct ExecutionResult {
68 pub value: Value,
70 pub stats: Option<ExecutionStats>,
72 pub warnings: Vec<String>,
74}
75
76#[derive(Debug, Clone)]
78pub struct ExecutionStats {
79 pub execution_time: Duration,
81 pub operations_executed: usize,
83 pub peak_memory_bytes: usize,
85 pub function_calls: usize,
87 pub dataframe_operations: usize,
89 pub cache_hit_rate: f64,
91}
92
93pub struct FilterExecutor {
95 compiler: FilterCompiler,
97 config: ExecutorConfig,
99 filter_cache: LruCache<String, Arc<CompiledFilter>>,
102 cache_hits: usize,
104 cache_misses: usize,
105 stats_accumulator: Option<ExecutionStats>,
107}
108
109impl FilterExecutor {
110 pub fn new() -> Self {
112 Self::with_config(ExecutorConfig::default())
113 }
114
115 pub fn with_config(config: ExecutorConfig) -> Self {
117 let collect_stats = config.collect_stats;
118 let cache_size = config.filter_cache_size;
119 let cache_capacity =
121 NonZeroUsize::new(cache_size).unwrap_or(NonZeroUsize::new(1000).unwrap());
122 Self {
123 compiler: FilterCompiler::new(),
124 config,
125 filter_cache: LruCache::new(cache_capacity),
126 cache_hits: 0,
127 cache_misses: 0,
128 stats_accumulator: if collect_stats {
129 Some(ExecutionStats {
130 execution_time: Duration::ZERO,
131 operations_executed: 0,
132 peak_memory_bytes: 0,
133 function_calls: 0,
134 dataframe_operations: 0,
135 cache_hit_rate: 0.0,
136 })
137 } else {
138 None
139 },
140 }
141 }
142
143 pub fn execute_str(&mut self, filter: &str, input: Value) -> Result<ExecutionResult> {
145 #[cfg(not(target_arch = "wasm32"))]
146 let start_time = Instant::now();
147 let collect_stats = self.config.collect_stats;
148
149 let compiled = if let Some(cached) = self.filter_cache.get(filter) {
151 self.cache_hits += 1;
152 Arc::clone(cached)
153 } else {
154 self.cache_misses += 1;
155
156 #[cfg(feature = "profiling")]
157 coz::progress!("filter_compilation");
158
159 let compiled = self.compiler.compile_str(filter)?;
161 let arc_compiled = Arc::new(compiled);
162
163 self.filter_cache
165 .put(filter.to_string(), Arc::clone(&arc_compiled));
166
167 arc_compiled
168 };
169
170 if collect_stats {
172 if let Some(ref mut stats) = self.stats_accumulator {
173 let total_requests = self.cache_hits + self.cache_misses;
174 stats.cache_hit_rate = if total_requests > 0 {
175 self.cache_hits as f64 / total_requests as f64
176 } else {
177 0.0
178 };
179 }
180 }
181
182 let operations_count = compiled.operations.len();
183
184 #[cfg(feature = "profiling")]
185 coz::progress!("filter_execute_start");
186
187 let mut result = self.execute_compiled(&compiled, input)?;
188
189 #[cfg(feature = "profiling")]
190 coz::progress!("filter_execute_end");
191
192 if collect_stats {
193 if let Some(ref mut stats) = self.stats_accumulator {
194 #[cfg(not(target_arch = "wasm32"))]
195 {
196 stats.execution_time += start_time.elapsed();
197 }
198 stats.operations_executed += operations_count;
199 }
200 result.stats = self.stats_accumulator.clone();
201 }
202
203 Ok(result)
204 }
205
206 pub fn execute_compiled(
208 &self,
209 filter: &CompiledFilter,
210 input: Value,
211 ) -> Result<ExecutionResult> {
212 #[cfg(not(target_arch = "wasm32"))]
213 let start_time = Instant::now();
214
215 let mut context = FilterContext::new();
217 context.set_error_mode(self.config.error_mode);
218 context.set_debug_mode(self.config.debug_mode);
219 context.set_input(input);
220 context.set_functions(filter.functions.clone());
221
222 for (name, value) in &self.config.variables {
224 context.set_variable(name, value.clone());
225 }
226
227 let mut current_value = context.get_input().cloned().unwrap_or(Value::Null);
229 let mut warnings = Vec::new();
230
231 for operation in &filter.operations {
232 #[cfg(feature = "profiling")]
233 coz::progress!("operation_exec");
234
235 let mut ctx = Some(&mut context as &mut dyn dsq_shared::ops::Context);
236 match operation.apply_with_context(¤t_value, &mut ctx) {
237 Ok(new_value) => {
238 current_value = new_value;
239 }
240 Err(e) => {
241 match self.config.error_mode {
242 ErrorMode::Strict => return Err(e),
243 ErrorMode::Collect => {
244 warnings.push(format!("Operation failed: {}", e));
245 current_value = Value::Null;
247 }
248 ErrorMode::Ignore => {
249 current_value = Value::Null;
251 }
252 }
253 }
254 }
255
256 #[cfg(not(target_arch = "wasm32"))]
258 if let Some(timeout_ms) = self.config.timeout_ms {
259 if start_time.elapsed() > Duration::from_millis(timeout_ms) {
260 return Err(dsq_shared::error::operation_error("Execution timeout"));
261 }
262 }
263 }
264
265 let stats = if self.config.collect_stats {
266 self.stats_accumulator.clone()
267 } else {
268 None
269 };
270
271 Ok(ExecutionResult {
272 value: current_value,
273 stats,
274 warnings,
275 })
276 }
277
278 pub fn execute_streaming(
280 &mut self,
281 filter: &str,
282 input_stream: impl Iterator<Item = Result<Value>>,
283 ) -> Result<Vec<ExecutionResult>> {
284 let compiled = self.compiler.compile_str(filter)?;
285 let mut results = Vec::new();
286
287 for item_result in input_stream {
288 let input = item_result?;
289 let result = self.execute_compiled(&compiled, input)?;
290 results.push(result);
291
292 }
295
296 Ok(results)
297 }
298
299 pub fn validate_filter(&self, filter: &str) -> Result<()> {
301 self.compiler.compile_str(filter)?;
302 Ok(())
303 }
304
305 pub fn get_stats(&self) -> Option<&ExecutionStats> {
307 self.stats_accumulator.as_ref()
308 }
309
310 pub fn clear_cache(&mut self) {
312 self.filter_cache.clear();
313 }
314
315 pub fn cache_size(&self) -> usize {
317 self.filter_cache.len()
318 }
319
320 pub fn precompile(&mut self, filter: &str) -> Result<()> {
322 let compiled = self.compiler.compile_str(filter)?;
323 self.filter_cache
324 .put(filter.to_string(), Arc::new(compiled));
325 Ok(())
326 }
327
328 pub fn set_config(&mut self, config: ExecutorConfig) {
330 let collect_stats = config.collect_stats;
331 self.config = config;
332 if collect_stats && self.stats_accumulator.is_none() {
333 self.stats_accumulator = Some(ExecutionStats {
334 execution_time: Duration::ZERO,
335 operations_executed: 0,
336 peak_memory_bytes: 0,
337 function_calls: 0,
338 dataframe_operations: 0,
339 cache_hit_rate: 0.0,
340 });
341 } else if !collect_stats {
342 self.stats_accumulator = None;
343 }
344 }
345
346 pub fn get_config(&self) -> &ExecutorConfig {
348 &self.config
349 }
350}
351
352impl Default for FilterExecutor {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361 use dsq_shared::value::Value;
362
363 #[test]
364 fn test_execute_identity_filter() {
365 let mut executor = FilterExecutor::new();
366 let input = Value::int(42);
367 let result = executor.execute_str(".", input.clone()).unwrap();
368
369 assert_eq!(result.value, input);
370 assert!(result.warnings.is_empty());
371 }
372
373 #[test]
374 fn test_execute_field_access() {
375 let mut executor = FilterExecutor::new();
376 let input = Value::object(std::collections::HashMap::from([
377 ("name".to_string(), Value::string("Alice")),
378 ("age".to_string(), Value::int(30)),
379 ]));
380
381 let result = executor.execute_str(".name", input).unwrap();
382 assert_eq!(result.value, Value::string("Alice"));
383 }
384
385 #[test]
386 fn test_filter_validation() {
387 let executor = FilterExecutor::new();
388
389 assert!(executor.validate_filter(".").is_ok());
391 assert!(executor.validate_filter(".name").is_ok());
392
393 assert!(executor.validate_filter("invalid syntax +++").is_err());
395 }
396
397 #[test]
398 fn test_cache_functionality() {
399 let mut executor = FilterExecutor::new();
400
401 let input = Value::int(42);
403 let result1 = executor.execute_str(". + 1", input.clone()).unwrap();
404 assert_eq!(result1.value, Value::int(43));
405
406 let result2 = executor.execute_str(". + 1", input).unwrap();
408 assert_eq!(result2.value, Value::int(43));
409
410 assert_eq!(executor.cache_size(), 1);
411 }
412
413 #[test]
414 fn test_error_handling() {
415 let mut executor = FilterExecutor::new();
416
417 let input = Value::int(42);
419 let result = executor.execute_str(".invalid_field", input);
420 assert!(result.is_err());
421
422 let config = ExecutorConfig {
424 error_mode: ErrorMode::Ignore,
425 ..Default::default()
426 };
427 executor.set_config(config);
428
429 let input = Value::int(42);
430 let result = executor.execute_str(".invalid_field", input).unwrap();
431 assert_eq!(result.value, Value::Null);
432 }
433
434 #[test]
435 fn test_assignment_operation() {
436 let mut executor = FilterExecutor::new();
437
438 let mut obj = std::collections::HashMap::new();
440 obj.insert("salary".to_string(), Value::int(75000));
441 obj.insert("name".to_string(), Value::string("Alice"));
442 let input = Value::object(obj);
443
444 let result = executor.execute_str(".salary += 5000", input).unwrap();
445
446 if let Value::Object(result_obj) = result.value {
447 assert_eq!(result_obj.get("salary"), Some(&Value::int(80000)));
448 assert_eq!(result_obj.get("name"), Some(&Value::string("Alice")));
449 } else {
450 panic!("Expected object result");
451 }
452 }
453
454 #[test]
455 fn test_assignment_in_map_pipeline() {
456 let mut executor = FilterExecutor::new();
457
458 let mut obj = std::collections::HashMap::new();
460 obj.insert("id".to_string(), Value::int(1));
461 obj.insert("name".to_string(), Value::string("Alice Johnson"));
462 obj.insert("age".to_string(), Value::int(28));
463 obj.insert("city".to_string(), Value::string("New York"));
464 obj.insert("salary".to_string(), Value::int(75000));
465 obj.insert("department".to_string(), Value::string("Engineering"));
466 let input = Value::Array(vec![Value::Object(obj)]);
467
468 let result = executor
469 .execute_str(
470 r#"map(.salary += 5000) | map({name, new_salary: .salary, department})"#,
471 input,
472 )
473 .unwrap();
474
475 if let Value::Array(arr) = result.value {
476 assert_eq!(arr.len(), 1);
477 if let Value::Object(obj) = &arr[0] {
478 assert_eq!(obj.get("name"), Some(&Value::string("Alice Johnson")));
479 assert_eq!(obj.get("new_salary"), Some(&Value::int(80000)));
480 assert_eq!(obj.get("department"), Some(&Value::string("Engineering")));
481 } else {
482 panic!("Expected object in array");
483 }
484 } else {
485 panic!("Expected array result");
486 }
487 }
488
489 #[test]
490 fn test_stats_collection() {
491 let config = ExecutorConfig {
492 collect_stats: true,
493 ..Default::default()
494 };
495 let mut executor = FilterExecutor::with_config(config);
496
497 let input = Value::int(42);
498 let result = executor.execute_str(".", input).unwrap();
499
500 let stats = result.stats.unwrap();
501 assert!(stats.execution_time > Duration::ZERO);
502 assert!(stats.operations_executed > 0);
503 assert_eq!(stats.peak_memory_bytes, 0);
505 assert_eq!(stats.function_calls, 0);
506 assert_eq!(stats.dataframe_operations, 0);
507 assert_eq!(stats.cache_hit_rate, 0.0);
508 }
509
510 #[test]
511 fn test_streaming_execution() {
512 let mut executor = FilterExecutor::new();
513 let inputs = vec![Ok(Value::int(1)), Ok(Value::int(2)), Ok(Value::int(3))];
514
515 let results = executor.execute_streaming(".", inputs.into_iter()).unwrap();
516 assert_eq!(results.len(), 3);
517 assert_eq!(results[0].value, Value::int(1));
518 assert_eq!(results[1].value, Value::int(2));
519 assert_eq!(results[2].value, Value::int(3));
520 }
521
522 #[test]
523 fn test_precompile() {
524 let mut executor = FilterExecutor::new();
525 executor.precompile(". + 1").unwrap();
526 assert_eq!(executor.cache_size(), 1);
527
528 let input = Value::int(42);
529 let result = executor.execute_str(". + 1", input).unwrap();
530 assert_eq!(result.value, Value::int(43));
531 }
532
533 #[test]
534 fn test_clear_cache() {
535 let mut executor = FilterExecutor::new();
536 executor.execute_str(".", Value::int(1)).unwrap();
537 assert_eq!(executor.cache_size(), 1);
538
539 executor.clear_cache();
540 assert_eq!(executor.cache_size(), 0);
541 }
542
543 #[test]
544 fn test_config_management() {
545 let mut executor = FilterExecutor::new();
546 let config = executor.get_config();
547 assert_eq!(config.timeout_ms, None);
548
549 let new_config = ExecutorConfig {
550 timeout_ms: Some(1000),
551 ..Default::default()
552 };
553 executor.set_config(new_config);
554
555 let config = executor.get_config();
556 assert_eq!(config.timeout_ms, Some(1000));
557 }
558
559 #[test]
560 fn test_error_collect_mode() {
561 let mut executor = FilterExecutor::new();
562 let config = ExecutorConfig {
563 error_mode: ErrorMode::Collect,
564 ..Default::default()
565 };
566 executor.set_config(config);
567
568 let input = Value::int(42);
569 let result = executor.execute_str(".invalid_field", input).unwrap();
570 assert_eq!(result.value, Value::Null);
571 assert!(!result.warnings.is_empty());
572 assert!(result.warnings[0].contains("Operation failed"));
573 }
574
575 #[test]
576 fn test_timeout_configuration() {
577 let mut executor = FilterExecutor::new();
578 let config = ExecutorConfig {
579 timeout_ms: Some(1000),
580 ..Default::default()
581 };
582 executor.set_config(config);
583
584 let input = Value::int(42);
585 let result = executor.execute_str(".", input).unwrap();
586 assert_eq!(result.value, Value::int(42));
588 }
589}