kql_panopticon/execution/processing/context.rs
1//! Processing phase execution context
2//!
3//! Provides read access to acquisition results and write capabilities for processing steps.
4
5use crate::error::Result;
6use crate::execution::result::{ResultContext, ResultHandle, ResultWriter};
7use serde_json::Value as JsonValue;
8use std::path::Path;
9
10/// Context for processing phase execution
11///
12/// Provides:
13/// - Read access to acquisition results
14/// - Write capabilities for processing step outputs
15/// - Result accumulation for downstream phases
16pub struct ProcessingContext<'a> {
17 /// Acquisition step results (read-only)
18 acquisition_results: &'a ResultContext,
19
20 /// Output directory for processing result files
21 output_dir: &'a Path,
22
23 /// Accumulated processing step results
24 results: ResultContext,
25}
26
27impl<'a> ProcessingContext<'a> {
28 /// Create a new processing context
29 pub fn new(acquisition_results: &'a ResultContext, output_dir: &'a Path) -> Self {
30 Self {
31 acquisition_results,
32 output_dir,
33 results: ResultContext::new(),
34 }
35 }
36
37 // =========================================================================
38 // Output and Writing
39 // =========================================================================
40
41 /// Get the output directory
42 pub fn output_dir(&self) -> &Path {
43 self.output_dir
44 }
45
46 /// Create a result writer for a processing step
47 pub fn writer(&self, step_name: &str) -> Result<ResultWriter> {
48 ResultWriter::for_step(self.output_dir, step_name)
49 }
50
51 /// Register a completed step's result handle
52 pub fn register_result(&mut self, step_name: impl Into<String>, handle: ResultHandle) {
53 self.results.insert(step_name, handle);
54 }
55
56 /// Get the accumulated processing results
57 pub fn results(&self) -> &ResultContext {
58 &self.results
59 }
60
61 /// Take ownership of the accumulated processing results
62 ///
63 /// Called at the end of the processing phase.
64 pub fn take_results(self) -> ResultContext {
65 self.results
66 }
67
68 // =========================================================================
69 // Acquisition Results Access (read-only)
70 // =========================================================================
71
72 /// Get the acquisition results context
73 pub fn acquisition_results(&self) -> &ResultContext {
74 self.acquisition_results
75 }
76
77 /// Get all rows for an acquisition step (materializes data)
78 ///
79 /// WARNING: This loads all rows into memory.
80 pub fn get_rows(&self, step_name: &str) -> Result<Vec<JsonValue>> {
81 self.acquisition_results.materialize(step_name)
82 }
83
84 /// Get row count for an acquisition step
85 pub fn row_count(&self, step_name: &str) -> Result<usize> {
86 self.acquisition_results.row_count(step_name)
87 }
88
89 /// Check if acquisition step has results
90 pub fn has_results(&self, step_name: &str) -> bool {
91 self.acquisition_results.has_results(step_name)
92 }
93
94 /// Check if acquisition step is empty
95 pub fn is_step_empty(&self, step_name: &str) -> Result<bool> {
96 self.acquisition_results.is_step_empty(step_name)
97 }
98
99 /// Get first row for an acquisition step
100 pub fn first(&self, step_name: &str) -> Result<Option<JsonValue>> {
101 self.acquisition_results.first(step_name)
102 }
103
104 /// Get column values for an acquisition step
105 pub fn column_values(&self, step_name: &str, column: &str) -> Result<Vec<String>> {
106 self.acquisition_results.column_values(step_name, column)
107 }
108}