term_guard/analyzers/incremental/
runner.rs

1//! Incremental analysis runner for efficient partition-based computation.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use datafusion::prelude::*;
7use serde::{Deserialize, Serialize};
8use serde_json;
9use tracing::{debug, info, instrument, warn};
10
11use crate::analyzers::{Analyzer, AnalyzerContext, AnalyzerError, AnalyzerResult, MetricValue};
12
13use super::state_store::{StateMap, StateStore};
14
15/// Configuration for incremental analysis
16#[derive(Debug, Clone)]
17pub struct IncrementalConfig {
18    /// Whether to fail fast on first error
19    pub fail_fast: bool,
20    /// Whether to save empty states
21    pub save_empty_states: bool,
22    /// Maximum number of partitions to merge at once
23    pub max_merge_batch_size: usize,
24}
25
26impl Default for IncrementalConfig {
27    fn default() -> Self {
28        Self {
29            fail_fast: true,
30            save_empty_states: false,
31            max_merge_batch_size: 100,
32        }
33    }
34}
35
36/// Type-erased analyzer wrapper for dynamic dispatch
37trait ErasedAnalyzer: Send + Sync {
38    /// Computes state from data
39    fn compute_state<'a>(
40        &'a self,
41        ctx: &'a SessionContext,
42    ) -> futures::future::BoxFuture<'a, AnalyzerResult<Vec<u8>>>;
43
44    /// Merges serialized states
45    fn merge_states(&self, states: Vec<Vec<u8>>) -> AnalyzerResult<Vec<u8>>;
46
47    /// Computes metric from serialized state
48    fn compute_metric(&self, state: &[u8]) -> AnalyzerResult<(String, MetricValue)>;
49
50    /// Returns the analyzer name
51    fn name(&self) -> &str;
52
53    /// Returns the metric key
54    fn metric_key(&self) -> String;
55}
56
57/// Concrete implementation of ErasedAnalyzer for any Analyzer type
58struct ErasedAnalyzerImpl<A>
59where
60    A: Analyzer + 'static,
61{
62    analyzer: Arc<A>,
63}
64
65impl<A> ErasedAnalyzer for ErasedAnalyzerImpl<A>
66where
67    A: Analyzer + 'static,
68    A::State: Serialize + for<'de> Deserialize<'de>,
69    A::Metric: Into<MetricValue>,
70{
71    fn compute_state<'a>(
72        &'a self,
73        ctx: &'a SessionContext,
74    ) -> futures::future::BoxFuture<'a, AnalyzerResult<Vec<u8>>> {
75        Box::pin(async move {
76            let state = self.analyzer.compute_state_from_data(ctx).await?;
77            let serialized = serde_json::to_vec(&state)
78                .map_err(|e| AnalyzerError::Custom(format!("Failed to serialize state: {e}")))?;
79            Ok(serialized)
80        })
81    }
82
83    fn merge_states(&self, states: Vec<Vec<u8>>) -> AnalyzerResult<Vec<u8>> {
84        let mut deserialized_states = Vec::new();
85        for state_data in states {
86            let state: A::State = serde_json::from_slice(&state_data)
87                .map_err(|e| AnalyzerError::Custom(format!("Failed to deserialize state: {e}")))?;
88            deserialized_states.push(state);
89        }
90
91        let merged = self.analyzer.merge_states(deserialized_states)?;
92        let serialized = serde_json::to_vec(&merged)
93            .map_err(|e| AnalyzerError::Custom(format!("Failed to serialize merged state: {e}")))?;
94        Ok(serialized)
95    }
96
97    fn compute_metric(&self, state: &[u8]) -> AnalyzerResult<(String, MetricValue)> {
98        let state: A::State = serde_json::from_slice(state)
99            .map_err(|e| AnalyzerError::Custom(format!("Failed to deserialize state: {e}")))?;
100        let metric = self.analyzer.compute_metric_from_state(&state)?;
101        Ok((self.analyzer.metric_key(), metric.into()))
102    }
103
104    fn name(&self) -> &str {
105        self.analyzer.name()
106    }
107
108    fn metric_key(&self) -> String {
109        self.analyzer.metric_key()
110    }
111}
112
113/// Orchestrates incremental analysis across partitions.
114///
115/// The runner maintains a collection of analyzers and manages their state
116/// across data partitions, enabling efficient incremental computation.
117pub struct IncrementalAnalysisRunner {
118    state_store: Box<dyn StateStore>,
119    analyzers: Vec<Box<dyn ErasedAnalyzer>>,
120    config: IncrementalConfig,
121}
122
123impl IncrementalAnalysisRunner {
124    /// Creates a new incremental analysis runner
125    pub fn new(state_store: Box<dyn StateStore>) -> Self {
126        Self {
127            state_store,
128            analyzers: Vec::new(),
129            config: IncrementalConfig::default(),
130        }
131    }
132
133    /// Creates a new incremental analysis runner with custom config
134    pub fn with_config(state_store: Box<dyn StateStore>, config: IncrementalConfig) -> Self {
135        Self {
136            state_store,
137            analyzers: Vec::new(),
138            config,
139        }
140    }
141
142    /// Adds an analyzer to the runner
143    pub fn add_analyzer<A>(mut self, analyzer: A) -> Self
144    where
145        A: Analyzer + 'static,
146        A::State: Serialize + for<'de> Deserialize<'de>,
147        A::Metric: Into<MetricValue>,
148    {
149        let erased = Box::new(ErasedAnalyzerImpl {
150            analyzer: Arc::new(analyzer),
151        });
152        self.analyzers.push(erased);
153        self
154    }
155
156    /// Analyzes a single partition, computing and storing its state
157    ///
158    /// # Arguments
159    /// * `ctx` - DataFusion context with registered data  
160    /// * `partition` - Partition identifier
161    ///
162    /// # Returns
163    /// Analysis context with computed metrics
164    #[instrument(skip(self, ctx))]
165    pub async fn analyze_partition(
166        &self,
167        ctx: &SessionContext,
168        partition: &str,
169    ) -> AnalyzerResult<AnalyzerContext> {
170        info!(
171            partition = %partition,
172            analyzers = self.analyzers.len(),
173            "Starting partition analysis"
174        );
175
176        let mut state_map = StateMap::new();
177        let mut context = AnalyzerContext::new();
178        context.metadata_mut().record_start();
179
180        // Compute state for each analyzer
181        for analyzer in &self.analyzers {
182            debug!(
183                analyzer = analyzer.name(),
184                partition = %partition,
185                "Computing analyzer state"
186            );
187
188            match analyzer.compute_state(ctx).await {
189                Ok(state) => {
190                    // Only save non-empty states or if configured to save empty
191                    if !state.is_empty() || self.config.save_empty_states {
192                        state_map.insert(analyzer.metric_key(), state.clone());
193                    }
194
195                    // Compute metric from state
196                    match analyzer.compute_metric(&state) {
197                        Ok((key, metric)) => {
198                            context.store_metric(&key, metric);
199                        }
200                        Err(e) => {
201                            warn!(
202                                analyzer = analyzer.name(),
203                                error = %e,
204                                "Failed to compute metric from state"
205                            );
206                            if self.config.fail_fast {
207                                return Err(e);
208                            }
209                            context.record_error(analyzer.name(), e);
210                        }
211                    }
212                }
213                Err(e) => {
214                    warn!(
215                        analyzer = analyzer.name(),
216                        partition = %partition,
217                        error = %e,
218                        "Failed to compute state"
219                    );
220                    if self.config.fail_fast {
221                        return Err(e);
222                    }
223                    context.record_error(analyzer.name(), e);
224                }
225            }
226        }
227
228        // Save state to store
229        self.state_store.save_state(partition, state_map).await?;
230
231        context.metadata_mut().record_end();
232        info!(
233            partition = %partition,
234            metrics = context.all_metrics().len(),
235            "Completed partition analysis"
236        );
237
238        Ok(context)
239    }
240
241    /// Analyzes new data and merges with existing partition state
242    ///
243    /// This method loads the existing state for a partition, computes new state
244    /// from the provided data, merges them, and saves the updated state.
245    ///
246    /// # Arguments
247    /// * `ctx` - DataFusion context with new data
248    /// * `partition` - Partition identifier
249    #[instrument(skip(self, ctx))]
250    pub async fn analyze_incremental(
251        &self,
252        ctx: &SessionContext,
253        partition: &str,
254    ) -> AnalyzerResult<AnalyzerContext> {
255        info!(
256            partition = %partition,
257            "Starting incremental analysis"
258        );
259
260        // Load existing state
261        let existing_state = self.state_store.load_state(partition).await?;
262
263        let mut merged_state_map = StateMap::new();
264        let mut context = AnalyzerContext::new();
265        context.metadata_mut().record_start();
266
267        // Process each analyzer
268        for analyzer in &self.analyzers {
269            let key = analyzer.metric_key();
270            debug!(
271                analyzer = analyzer.name(),
272                partition = %partition,
273                "Processing incremental update"
274            );
275
276            // Compute new state from data
277            let new_state = match analyzer.compute_state(ctx).await {
278                Ok(state) => state,
279                Err(e) => {
280                    warn!(
281                        analyzer = analyzer.name(),
282                        error = %e,
283                        "Failed to compute new state"
284                    );
285                    if self.config.fail_fast {
286                        return Err(e);
287                    }
288                    context.record_error(analyzer.name(), e);
289                    continue;
290                }
291            };
292
293            // Merge with existing state if present
294            let final_state = if let Some(existing) = existing_state.get(&key) {
295                match analyzer.merge_states(vec![existing.clone(), new_state]) {
296                    Ok(merged) => merged,
297                    Err(e) => {
298                        warn!(
299                            analyzer = analyzer.name(),
300                            error = %e,
301                            "Failed to merge states"
302                        );
303                        if self.config.fail_fast {
304                            return Err(e);
305                        }
306                        context.record_error(analyzer.name(), e);
307                        continue;
308                    }
309                }
310            } else {
311                new_state
312            };
313
314            // Store merged state
315            if !final_state.is_empty() || self.config.save_empty_states {
316                merged_state_map.insert(key.clone(), final_state.clone());
317            }
318
319            // Compute metric from merged state
320            match analyzer.compute_metric(&final_state) {
321                Ok((metric_key, metric)) => {
322                    context.store_metric(&metric_key, metric);
323                }
324                Err(e) => {
325                    warn!(
326                        analyzer = analyzer.name(),
327                        error = %e,
328                        "Failed to compute metric"
329                    );
330                    if self.config.fail_fast {
331                        return Err(e);
332                    }
333                    context.record_error(analyzer.name(), e);
334                }
335            }
336        }
337
338        // Save updated state
339        self.state_store
340            .save_state(partition, merged_state_map)
341            .await?;
342
343        context.metadata_mut().record_end();
344        info!(
345            partition = %partition,
346            metrics = context.all_metrics().len(),
347            "Completed incremental analysis"
348        );
349
350        Ok(context)
351    }
352
353    /// Computes metrics over a range of partitions by merging their states.
354    ///
355    /// # Arguments
356    /// * `partitions` - List of partition identifiers to analyze
357    ///
358    /// # Returns
359    /// The merged analysis context with aggregate metrics
360    #[instrument(skip(self))]
361    pub async fn analyze_partitions(
362        &self,
363        partitions: &[String],
364    ) -> AnalyzerResult<AnalyzerContext> {
365        info!(
366            partitions = partitions.len(),
367            "Analyzing multiple partitions"
368        );
369
370        if partitions.is_empty() {
371            return Ok(AnalyzerContext::new());
372        }
373
374        let mut context = AnalyzerContext::new();
375        context.metadata_mut().record_start();
376
377        // Collect all states across all batches first
378        let mut all_analyzer_states: HashMap<String, Vec<Vec<u8>>> = HashMap::new();
379
380        // Process in batches to avoid memory issues
381        for batch in partitions.chunks(self.config.max_merge_batch_size) {
382            debug!(batch_size = batch.len(), "Processing partition batch");
383
384            // Load states for all partitions in batch
385            let partition_states = self.state_store.load_states_batch(batch).await?;
386
387            // Group states by analyzer
388            for (_partition, state_map) in partition_states {
389                for (analyzer_key, state_data) in state_map {
390                    all_analyzer_states
391                        .entry(analyzer_key)
392                        .or_default()
393                        .push(state_data);
394                }
395            }
396        }
397
398        // Now merge all collected states for each analyzer
399        for analyzer in &self.analyzers {
400            let key = analyzer.metric_key();
401
402            if let Some(states) = all_analyzer_states.get(&key) {
403                if states.is_empty() {
404                    continue;
405                }
406
407                debug!(
408                    analyzer = analyzer.name(),
409                    states = states.len(),
410                    "Merging all analyzer states"
411                );
412
413                match analyzer.merge_states(states.clone()) {
414                    Ok(merged_state) => {
415                        // Compute metric from merged state
416                        match analyzer.compute_metric(&merged_state) {
417                            Ok((metric_key, metric)) => {
418                                context.store_metric(&metric_key, metric);
419                            }
420                            Err(e) => {
421                                warn!(
422                                    analyzer = analyzer.name(),
423                                    error = %e,
424                                    "Failed to compute metric from merged state"
425                                );
426                                if self.config.fail_fast {
427                                    return Err(e);
428                                }
429                                context.record_error(analyzer.name(), e);
430                            }
431                        }
432                    }
433                    Err(e) => {
434                        warn!(
435                            analyzer = analyzer.name(),
436                            error = %e,
437                            "Failed to merge states"
438                        );
439                        if self.config.fail_fast {
440                            return Err(e);
441                        }
442                        context.record_error(analyzer.name(), e);
443                    }
444                }
445            }
446        }
447
448        context.metadata_mut().record_end();
449        info!(
450            partitions = partitions.len(),
451            metrics = context.all_metrics().len(),
452            "Completed multi-partition analysis"
453        );
454
455        Ok(context)
456    }
457
458    /// Returns the number of analyzers configured
459    pub fn analyzer_count(&self) -> usize {
460        self.analyzers.len()
461    }
462
463    /// Lists all stored partitions
464    pub async fn list_partitions(&self) -> AnalyzerResult<Vec<String>> {
465        self.state_store.list_partitions().await
466    }
467
468    /// Deletes a partition's stored state
469    pub async fn delete_partition(&self, partition: &str) -> AnalyzerResult<()> {
470        self.state_store.delete_partition(partition).await
471    }
472}