term_guard/analyzers/incremental/
runner.rs1use std::collections::HashMap;
4use std::sync::Arc;
5
6use datafusion::prelude::*;
7use serde::{Deserialize, Serialize};
8use serde_json;
9use tracing::{debug, info, instrument, warn};
10
11use crate::analyzers::{Analyzer, AnalyzerContext, AnalyzerError, AnalyzerResult, MetricValue};
12
13use super::state_store::{StateMap, StateStore};
14
15#[derive(Debug, Clone)]
17pub struct IncrementalConfig {
18 pub fail_fast: bool,
20 pub save_empty_states: bool,
22 pub max_merge_batch_size: usize,
24}
25
26impl Default for IncrementalConfig {
27 fn default() -> Self {
28 Self {
29 fail_fast: true,
30 save_empty_states: false,
31 max_merge_batch_size: 100,
32 }
33 }
34}
35
36trait ErasedAnalyzer: Send + Sync {
38 fn compute_state<'a>(
40 &'a self,
41 ctx: &'a SessionContext,
42 ) -> futures::future::BoxFuture<'a, AnalyzerResult<Vec<u8>>>;
43
44 fn merge_states(&self, states: Vec<Vec<u8>>) -> AnalyzerResult<Vec<u8>>;
46
47 fn compute_metric(&self, state: &[u8]) -> AnalyzerResult<(String, MetricValue)>;
49
50 fn name(&self) -> &str;
52
53 fn metric_key(&self) -> String;
55}
56
57struct ErasedAnalyzerImpl<A>
59where
60 A: Analyzer + 'static,
61{
62 analyzer: Arc<A>,
63}
64
65impl<A> ErasedAnalyzer for ErasedAnalyzerImpl<A>
66where
67 A: Analyzer + 'static,
68 A::State: Serialize + for<'de> Deserialize<'de>,
69 A::Metric: Into<MetricValue>,
70{
71 fn compute_state<'a>(
72 &'a self,
73 ctx: &'a SessionContext,
74 ) -> futures::future::BoxFuture<'a, AnalyzerResult<Vec<u8>>> {
75 Box::pin(async move {
76 let state = self.analyzer.compute_state_from_data(ctx).await?;
77 let serialized = serde_json::to_vec(&state)
78 .map_err(|e| AnalyzerError::Custom(format!("Failed to serialize state: {e}")))?;
79 Ok(serialized)
80 })
81 }
82
83 fn merge_states(&self, states: Vec<Vec<u8>>) -> AnalyzerResult<Vec<u8>> {
84 let mut deserialized_states = Vec::new();
85 for state_data in states {
86 let state: A::State = serde_json::from_slice(&state_data)
87 .map_err(|e| AnalyzerError::Custom(format!("Failed to deserialize state: {e}")))?;
88 deserialized_states.push(state);
89 }
90
91 let merged = self.analyzer.merge_states(deserialized_states)?;
92 let serialized = serde_json::to_vec(&merged)
93 .map_err(|e| AnalyzerError::Custom(format!("Failed to serialize merged state: {e}")))?;
94 Ok(serialized)
95 }
96
97 fn compute_metric(&self, state: &[u8]) -> AnalyzerResult<(String, MetricValue)> {
98 let state: A::State = serde_json::from_slice(state)
99 .map_err(|e| AnalyzerError::Custom(format!("Failed to deserialize state: {e}")))?;
100 let metric = self.analyzer.compute_metric_from_state(&state)?;
101 Ok((self.analyzer.metric_key(), metric.into()))
102 }
103
104 fn name(&self) -> &str {
105 self.analyzer.name()
106 }
107
108 fn metric_key(&self) -> String {
109 self.analyzer.metric_key()
110 }
111}
112
113pub struct IncrementalAnalysisRunner {
118 state_store: Box<dyn StateStore>,
119 analyzers: Vec<Box<dyn ErasedAnalyzer>>,
120 config: IncrementalConfig,
121}
122
123impl IncrementalAnalysisRunner {
124 pub fn new(state_store: Box<dyn StateStore>) -> Self {
126 Self {
127 state_store,
128 analyzers: Vec::new(),
129 config: IncrementalConfig::default(),
130 }
131 }
132
133 pub fn with_config(state_store: Box<dyn StateStore>, config: IncrementalConfig) -> Self {
135 Self {
136 state_store,
137 analyzers: Vec::new(),
138 config,
139 }
140 }
141
142 pub fn add_analyzer<A>(mut self, analyzer: A) -> Self
144 where
145 A: Analyzer + 'static,
146 A::State: Serialize + for<'de> Deserialize<'de>,
147 A::Metric: Into<MetricValue>,
148 {
149 let erased = Box::new(ErasedAnalyzerImpl {
150 analyzer: Arc::new(analyzer),
151 });
152 self.analyzers.push(erased);
153 self
154 }
155
156 #[instrument(skip(self, ctx))]
165 pub async fn analyze_partition(
166 &self,
167 ctx: &SessionContext,
168 partition: &str,
169 ) -> AnalyzerResult<AnalyzerContext> {
170 info!(
171 partition = %partition,
172 analyzers = self.analyzers.len(),
173 "Starting partition analysis"
174 );
175
176 let mut state_map = StateMap::new();
177 let mut context = AnalyzerContext::new();
178 context.metadata_mut().record_start();
179
180 for analyzer in &self.analyzers {
182 debug!(
183 analyzer = analyzer.name(),
184 partition = %partition,
185 "Computing analyzer state"
186 );
187
188 match analyzer.compute_state(ctx).await {
189 Ok(state) => {
190 if !state.is_empty() || self.config.save_empty_states {
192 state_map.insert(analyzer.metric_key(), state.clone());
193 }
194
195 match analyzer.compute_metric(&state) {
197 Ok((key, metric)) => {
198 context.store_metric(&key, metric);
199 }
200 Err(e) => {
201 warn!(
202 analyzer = analyzer.name(),
203 error = %e,
204 "Failed to compute metric from state"
205 );
206 if self.config.fail_fast {
207 return Err(e);
208 }
209 context.record_error(analyzer.name(), e);
210 }
211 }
212 }
213 Err(e) => {
214 warn!(
215 analyzer = analyzer.name(),
216 partition = %partition,
217 error = %e,
218 "Failed to compute state"
219 );
220 if self.config.fail_fast {
221 return Err(e);
222 }
223 context.record_error(analyzer.name(), e);
224 }
225 }
226 }
227
228 self.state_store.save_state(partition, state_map).await?;
230
231 context.metadata_mut().record_end();
232 info!(
233 partition = %partition,
234 metrics = context.all_metrics().len(),
235 "Completed partition analysis"
236 );
237
238 Ok(context)
239 }
240
241 #[instrument(skip(self, ctx))]
250 pub async fn analyze_incremental(
251 &self,
252 ctx: &SessionContext,
253 partition: &str,
254 ) -> AnalyzerResult<AnalyzerContext> {
255 info!(
256 partition = %partition,
257 "Starting incremental analysis"
258 );
259
260 let existing_state = self.state_store.load_state(partition).await?;
262
263 let mut merged_state_map = StateMap::new();
264 let mut context = AnalyzerContext::new();
265 context.metadata_mut().record_start();
266
267 for analyzer in &self.analyzers {
269 let key = analyzer.metric_key();
270 debug!(
271 analyzer = analyzer.name(),
272 partition = %partition,
273 "Processing incremental update"
274 );
275
276 let new_state = match analyzer.compute_state(ctx).await {
278 Ok(state) => state,
279 Err(e) => {
280 warn!(
281 analyzer = analyzer.name(),
282 error = %e,
283 "Failed to compute new state"
284 );
285 if self.config.fail_fast {
286 return Err(e);
287 }
288 context.record_error(analyzer.name(), e);
289 continue;
290 }
291 };
292
293 let final_state = if let Some(existing) = existing_state.get(&key) {
295 match analyzer.merge_states(vec![existing.clone(), new_state]) {
296 Ok(merged) => merged,
297 Err(e) => {
298 warn!(
299 analyzer = analyzer.name(),
300 error = %e,
301 "Failed to merge states"
302 );
303 if self.config.fail_fast {
304 return Err(e);
305 }
306 context.record_error(analyzer.name(), e);
307 continue;
308 }
309 }
310 } else {
311 new_state
312 };
313
314 if !final_state.is_empty() || self.config.save_empty_states {
316 merged_state_map.insert(key.clone(), final_state.clone());
317 }
318
319 match analyzer.compute_metric(&final_state) {
321 Ok((metric_key, metric)) => {
322 context.store_metric(&metric_key, metric);
323 }
324 Err(e) => {
325 warn!(
326 analyzer = analyzer.name(),
327 error = %e,
328 "Failed to compute metric"
329 );
330 if self.config.fail_fast {
331 return Err(e);
332 }
333 context.record_error(analyzer.name(), e);
334 }
335 }
336 }
337
338 self.state_store
340 .save_state(partition, merged_state_map)
341 .await?;
342
343 context.metadata_mut().record_end();
344 info!(
345 partition = %partition,
346 metrics = context.all_metrics().len(),
347 "Completed incremental analysis"
348 );
349
350 Ok(context)
351 }
352
353 #[instrument(skip(self))]
361 pub async fn analyze_partitions(
362 &self,
363 partitions: &[String],
364 ) -> AnalyzerResult<AnalyzerContext> {
365 info!(
366 partitions = partitions.len(),
367 "Analyzing multiple partitions"
368 );
369
370 if partitions.is_empty() {
371 return Ok(AnalyzerContext::new());
372 }
373
374 let mut context = AnalyzerContext::new();
375 context.metadata_mut().record_start();
376
377 let mut all_analyzer_states: HashMap<String, Vec<Vec<u8>>> = HashMap::new();
379
380 for batch in partitions.chunks(self.config.max_merge_batch_size) {
382 debug!(batch_size = batch.len(), "Processing partition batch");
383
384 let partition_states = self.state_store.load_states_batch(batch).await?;
386
387 for (_partition, state_map) in partition_states {
389 for (analyzer_key, state_data) in state_map {
390 all_analyzer_states
391 .entry(analyzer_key)
392 .or_default()
393 .push(state_data);
394 }
395 }
396 }
397
398 for analyzer in &self.analyzers {
400 let key = analyzer.metric_key();
401
402 if let Some(states) = all_analyzer_states.get(&key) {
403 if states.is_empty() {
404 continue;
405 }
406
407 debug!(
408 analyzer = analyzer.name(),
409 states = states.len(),
410 "Merging all analyzer states"
411 );
412
413 match analyzer.merge_states(states.clone()) {
414 Ok(merged_state) => {
415 match analyzer.compute_metric(&merged_state) {
417 Ok((metric_key, metric)) => {
418 context.store_metric(&metric_key, metric);
419 }
420 Err(e) => {
421 warn!(
422 analyzer = analyzer.name(),
423 error = %e,
424 "Failed to compute metric from merged state"
425 );
426 if self.config.fail_fast {
427 return Err(e);
428 }
429 context.record_error(analyzer.name(), e);
430 }
431 }
432 }
433 Err(e) => {
434 warn!(
435 analyzer = analyzer.name(),
436 error = %e,
437 "Failed to merge states"
438 );
439 if self.config.fail_fast {
440 return Err(e);
441 }
442 context.record_error(analyzer.name(), e);
443 }
444 }
445 }
446 }
447
448 context.metadata_mut().record_end();
449 info!(
450 partitions = partitions.len(),
451 metrics = context.all_metrics().len(),
452 "Completed multi-partition analysis"
453 );
454
455 Ok(context)
456 }
457
458 pub fn analyzer_count(&self) -> usize {
460 self.analyzers.len()
461 }
462
463 pub async fn list_partitions(&self) -> AnalyzerResult<Vec<String>> {
465 self.state_store.list_partitions().await
466 }
467
468 pub async fn delete_partition(&self, partition: &str) -> AnalyzerResult<()> {
470 self.state_store.delete_partition(partition).await
471 }
472}