Skip to main content

converge_analytics/
batch.rs

1// Copyright 2024-2026 Reflective Labs
2
3//! Batch feature extraction utilities for temporal anomaly detection.
4//!
5//! Provides high-level functions that abstract Polars internals, returning
6//! plain Rust types. Consumers (spikes, applications) never see Polars
7//! `DataFrame` or `LazyFrame` — only [`TemporalFeatures`] and [`FeatureVector`].
8
9use anyhow::Result;
10use polars::prelude::*;
11use serde::{Deserialize, Serialize};
12
13use crate::engine::FeatureVector;
14
15/// Temporal features extracted for a single entity (user, device, etc.).
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct TemporalFeatures {
18    pub entity_id: String,
19    pub event_count: u32,
20    pub mean_delta_s: f64,
21    pub min_delta_s: f64,
22    pub std_delta_s: f64,
23    pub burst_score: u32,
24    pub type_entropy: f64,
25    pub unique_categories: u32,
26    pub night_ratio: f64,
27}
28
29/// Configuration for temporal feature extraction.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TemporalFeatureConfig {
32    pub entity_column: String,
33    pub timestamp_column: String,
34    pub type_column: String,
35    pub category_column: String,
36    pub burst_threshold_seconds: i64,
37}
38
39impl Default for TemporalFeatureConfig {
40    fn default() -> Self {
41        Self {
42            entity_column: "user_id".into(),
43            timestamp_column: "timestamp".into(),
44            type_column: "event_type".into(),
45            category_column: "repo_id".into(),
46            burst_threshold_seconds: 60,
47        }
48    }
49}
50
51/// Extract temporal features from a Parquet file.
52///
53/// Loads the Parquet file using Polars, groups events by entity, and computes
54/// per-entity temporal features. Returns plain Rust types — no Polars dependency
55/// leaks to the caller.
56///
57/// # Features computed
58///
59/// - `event_count` — total events
60/// - `mean_delta_s` — average inter-event time (seconds)
61/// - `min_delta_s` — minimum inter-event time
62/// - `std_delta_s` — standard deviation of inter-event times
63/// - `burst_score` — count of events with delta below threshold
64/// - `type_entropy` — Shannon entropy of event type distribution
65/// - `unique_categories` — number of distinct categories (repos, etc.)
66/// - `night_ratio` — fraction of events during 00:00–06:00 UTC
67pub fn extract_temporal_features(
68    parquet_path: &str,
69    config: &TemporalFeatureConfig,
70) -> Result<Vec<TemporalFeatures>> {
71    let ec = &config.entity_column;
72    let tc = &config.timestamp_column;
73    let tyc = &config.type_column;
74    let cc = &config.category_column;
75
76    // Load events from Parquet or CSV (detect by extension).
77    let events = load_events_lazy(parquet_path)?;
78
79    // Aggregates: count, unique categories.
80    let aggregates = events
81        .clone()
82        .group_by([col(ec)])
83        .agg([
84            col(tc).count().alias("event_count"),
85            col(cc).n_unique().alias("unique_categories"),
86        ])
87        .collect()?;
88
89    // Temporal deltas: sort by entity+time, shift within groups, compute delta.
90    let with_deltas = events
91        .clone()
92        .sort([ec, tc], Default::default())
93        .with_columns([(col(tc) - col(tc).shift(lit(1)).over([col(ec)])).alias("delta_s")])
94        .filter(col("delta_s").is_not_null())
95        .group_by([col(ec)])
96        .agg([
97            col("delta_s").mean().alias("mean_delta_s"),
98            col("delta_s").min().alias("min_delta_s"),
99            col("delta_s").std(1).alias("std_delta_s"),
100            col("delta_s")
101                .lt(lit(config.burst_threshold_seconds))
102                .sum()
103                .alias("burst_score"),
104        ])
105        .collect()?;
106
107    // Type entropy: -Σ p(t) * ln(p(t)).
108    let entropy = compute_type_entropy_polars(&events, ec, tyc)?;
109
110    // Night ratio: fraction of events during 00:00–06:00 UTC.
111    let night = events
112        .clone()
113        .with_columns([((col(tc) % lit(86400)) / lit(3600)).alias("hour")])
114        .group_by([col(ec)])
115        .agg([col("hour")
116            .lt(lit(6))
117            .cast(DataType::Float64)
118            .mean()
119            .alias("night_ratio")])
120        .collect()?;
121
122    // Join all feature sets.
123    let features = aggregates
124        .lazy()
125        .join(
126            with_deltas.lazy(),
127            [col(ec)],
128            [col(ec)],
129            JoinArgs::new(JoinType::Left),
130        )
131        .join(
132            entropy.lazy(),
133            [col(ec)],
134            [col(ec)],
135            JoinArgs::new(JoinType::Left),
136        )
137        .join(
138            night.lazy(),
139            [col(ec)],
140            [col(ec)],
141            JoinArgs::new(JoinType::Left),
142        )
143        .with_columns([
144            col("mean_delta_s").fill_null(lit(0.0)),
145            col("min_delta_s").fill_null(lit(0.0)),
146            col("std_delta_s").fill_null(lit(0.0)),
147            col("burst_score").fill_null(lit(0)),
148            col("type_entropy").fill_null(lit(0.0)),
149            col("night_ratio").fill_null(lit(0.0)),
150        ])
151        .collect()?;
152
153    // Convert to Vec<TemporalFeatures>.
154    dataframe_to_temporal_features(&features, ec)
155}
156
157/// Load events lazily from Parquet or CSV based on file extension.
158fn load_events_lazy(path: &str) -> Result<LazyFrame> {
159    if std::path::Path::new(path)
160        .extension()
161        .is_some_and(|ext| ext.eq_ignore_ascii_case("csv"))
162    {
163        let pb = std::path::PathBuf::from(path);
164        Ok(CsvReadOptions::default()
165            .with_has_header(true)
166            .try_into_reader_with_file_path(Some(pb))?
167            .finish()?
168            .lazy())
169    } else {
170        let pl_path = PlPath::from_str(path);
171        Ok(LazyFrame::scan_parquet(pl_path, Default::default())?)
172    }
173}
174
175/// Compute Shannon entropy of event type distribution per entity.
176fn compute_type_entropy_polars(events: &LazyFrame, ec: &str, tyc: &str) -> Result<DataFrame> {
177    let counts = events
178        .clone()
179        .group_by([col(ec), col(tyc)])
180        .agg([col(ec).count().alias("type_count")])
181        .collect()?;
182
183    let totals = counts
184        .clone()
185        .lazy()
186        .group_by([col(ec)])
187        .agg([col("type_count").sum().alias("total_count")])
188        .collect()?;
189
190    let with_prob = counts
191        .lazy()
192        .join(
193            totals.lazy(),
194            [col(ec)],
195            [col(ec)],
196            JoinArgs::new(JoinType::Left),
197        )
198        .with_columns([(col("type_count").cast(DataType::Float64)
199            / col("total_count").cast(DataType::Float64))
200        .alias("prob")])
201        .with_columns([
202            (col("prob") * col("prob").log(lit(std::f64::consts::E)) * lit(-1.0))
203                .alias("entropy_contrib"),
204        ])
205        .group_by([col(ec)])
206        .agg([col("entropy_contrib").sum().alias("type_entropy")])
207        .collect()?;
208
209    Ok(with_prob)
210}
211
212/// Convert a joined feature DataFrame to `Vec<TemporalFeatures>`.
213fn dataframe_to_temporal_features(df: &DataFrame, ec: &str) -> Result<Vec<TemporalFeatures>> {
214    let ids = df.column(ec)?.str()?;
215    let counts = df.column("event_count")?.cast(&DataType::UInt32)?;
216    let counts = counts.u32()?;
217    let mean_d = df.column("mean_delta_s")?.cast(&DataType::Float64)?;
218    let mean_d = mean_d.f64()?;
219    let min_d = df.column("min_delta_s")?.cast(&DataType::Float64)?;
220    let min_d = min_d.f64()?;
221    let std_d = df.column("std_delta_s")?.cast(&DataType::Float64)?;
222    let std_d = std_d.f64()?;
223    let burst = df.column("burst_score")?.cast(&DataType::UInt32)?;
224    let burst = burst.u32()?;
225    let entropy = df.column("type_entropy")?.cast(&DataType::Float64)?;
226    let entropy = entropy.f64()?;
227    let uniq = df.column("unique_categories")?.cast(&DataType::UInt32)?;
228    let uniq = uniq.u32()?;
229    let night = df.column("night_ratio")?.cast(&DataType::Float64)?;
230    let night = night.f64()?;
231
232    let mut result = Vec::with_capacity(df.height());
233    for i in 0..df.height() {
234        result.push(TemporalFeatures {
235            entity_id: ids.get(i).unwrap_or("?").to_string(),
236            event_count: counts.get(i).unwrap_or(0),
237            mean_delta_s: mean_d.get(i).unwrap_or(0.0),
238            min_delta_s: min_d.get(i).unwrap_or(0.0),
239            std_delta_s: std_d.get(i).unwrap_or(0.0),
240            burst_score: burst.get(i).unwrap_or(0),
241            type_entropy: entropy.get(i).unwrap_or(0.0),
242            unique_categories: uniq.get(i).unwrap_or(0),
243            night_ratio: night.get(i).unwrap_or(0.0),
244        });
245    }
246
247    Ok(result)
248}
249
250/// Compute z-scores for a set of values: `(x - mean) / std`.
251///
252/// Pure Rust — no Polars dependency.
253pub fn z_scores(values: &[f64]) -> Vec<f64> {
254    if values.is_empty() {
255        return vec![];
256    }
257    let n = values.len() as f64;
258    let mean = values.iter().sum::<f64>() / n;
259    let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
260    let std = variance.sqrt().max(1e-10);
261    values.iter().map(|x| (x - mean) / std).collect()
262}
263
264/// Convert [`TemporalFeatures`] to a [`FeatureVector`] suitable for ML inference.
265///
266/// Returns an [n_entities, 8] matrix with columns:
267/// `[event_count, mean_delta_s, min_delta_s, std_delta_s, burst_score, type_entropy, unique_categories, night_ratio]`
268pub fn temporal_to_feature_vector(features: &[TemporalFeatures]) -> Result<FeatureVector> {
269    let n = features.len();
270    let cols = 8;
271    let mut data = Vec::with_capacity(n * cols);
272    for f in features {
273        data.push(f.event_count as f32);
274        data.push(f.mean_delta_s as f32);
275        data.push(f.min_delta_s as f32);
276        data.push(f.std_delta_s as f32);
277        data.push(f.burst_score as f32);
278        data.push(f.type_entropy as f32);
279        data.push(f.unique_categories as f32);
280        data.push(f.night_ratio as f32);
281    }
282    FeatureVector::new(data, [n, cols])
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn z_scores_centers_and_scales() {
291        let vals = vec![1.0, 2.0, 3.0, 4.0, 5.0];
292        let z = z_scores(&vals);
293        // Mean of z-scores should be ~0.
294        let mean: f64 = z.iter().sum::<f64>() / z.len() as f64;
295        assert!(mean.abs() < 1e-10);
296        // Std of z-scores should be ~1.
297        let std: f64 = (z.iter().map(|x| x.powi(2)).sum::<f64>() / z.len() as f64).sqrt();
298        assert!((std - 1.0).abs() < 1e-10);
299    }
300
301    #[test]
302    fn z_scores_empty_returns_empty() {
303        assert!(z_scores(&[]).is_empty());
304    }
305
306    #[test]
307    fn temporal_to_feature_vector_shape() {
308        let features = vec![
309            TemporalFeatures {
310                entity_id: "a".into(),
311                event_count: 10,
312                mean_delta_s: 100.0,
313                min_delta_s: 5.0,
314                std_delta_s: 50.0,
315                burst_score: 3,
316                type_entropy: 1.5,
317                unique_categories: 5,
318                night_ratio: 0.1,
319            },
320            TemporalFeatures {
321                entity_id: "b".into(),
322                event_count: 20,
323                mean_delta_s: 200.0,
324                min_delta_s: 10.0,
325                std_delta_s: 80.0,
326                burst_score: 0,
327                type_entropy: 1.8,
328                unique_categories: 10,
329                night_ratio: 0.0,
330            },
331        ];
332        let fv = temporal_to_feature_vector(&features).unwrap();
333        assert_eq!(fv.rows(), 2);
334        assert_eq!(fv.cols(), 8);
335    }
336}