kql_panopticon/execution/processing/
context.rs

1//! Processing phase execution context
2//!
3//! Provides read access to acquisition results and write capabilities for processing steps.
4
5use crate::error::Result;
6use crate::execution::result::{ResultContext, ResultHandle, ResultWriter};
7use serde_json::Value as JsonValue;
8use std::path::Path;
9
10/// Context for processing phase execution
11///
12/// Provides:
13/// - Read access to acquisition results
14/// - Write capabilities for processing step outputs
15/// - Result accumulation for downstream phases
16pub struct ProcessingContext<'a> {
17    /// Acquisition step results (read-only)
18    acquisition_results: &'a ResultContext,
19
20    /// Output directory for processing result files
21    output_dir: &'a Path,
22
23    /// Accumulated processing step results
24    results: ResultContext,
25}
26
27impl<'a> ProcessingContext<'a> {
28    /// Create a new processing context
29    pub fn new(acquisition_results: &'a ResultContext, output_dir: &'a Path) -> Self {
30        Self {
31            acquisition_results,
32            output_dir,
33            results: ResultContext::new(),
34        }
35    }
36
37    // =========================================================================
38    // Output and Writing
39    // =========================================================================
40
41    /// Get the output directory
42    pub fn output_dir(&self) -> &Path {
43        self.output_dir
44    }
45
46    /// Create a result writer for a processing step
47    pub fn writer(&self, step_name: &str) -> Result<ResultWriter> {
48        ResultWriter::for_step(self.output_dir, step_name)
49    }
50
51    /// Register a completed step's result handle
52    pub fn register_result(&mut self, step_name: impl Into<String>, handle: ResultHandle) {
53        self.results.insert(step_name, handle);
54    }
55
56    /// Get the accumulated processing results
57    pub fn results(&self) -> &ResultContext {
58        &self.results
59    }
60
61    /// Take ownership of the accumulated processing results
62    ///
63    /// Called at the end of the processing phase.
64    pub fn take_results(self) -> ResultContext {
65        self.results
66    }
67
68    // =========================================================================
69    // Acquisition Results Access (read-only)
70    // =========================================================================
71
72    /// Get the acquisition results context
73    pub fn acquisition_results(&self) -> &ResultContext {
74        self.acquisition_results
75    }
76
77    /// Get all rows for an acquisition step (materializes data)
78    ///
79    /// WARNING: This loads all rows into memory.
80    pub fn get_rows(&self, step_name: &str) -> Result<Vec<JsonValue>> {
81        self.acquisition_results.materialize(step_name)
82    }
83
84    /// Get row count for an acquisition step
85    pub fn row_count(&self, step_name: &str) -> Result<usize> {
86        self.acquisition_results.row_count(step_name)
87    }
88
89    /// Check if acquisition step has results
90    pub fn has_results(&self, step_name: &str) -> bool {
91        self.acquisition_results.has_results(step_name)
92    }
93
94    /// Check if acquisition step is empty
95    pub fn is_step_empty(&self, step_name: &str) -> Result<bool> {
96        self.acquisition_results.is_step_empty(step_name)
97    }
98
99    /// Get first row for an acquisition step
100    pub fn first(&self, step_name: &str) -> Result<Option<JsonValue>> {
101        self.acquisition_results.first(step_name)
102    }
103
104    /// Get column values for an acquisition step
105    pub fn column_values(&self, step_name: &str, column: &str) -> Result<Vec<String>> {
106        self.acquisition_results.column_values(step_name, column)
107    }
108}