1use anyhow::Result;
10use polars::prelude::*;
11use serde::{Deserialize, Serialize};
12
13use crate::engine::FeatureVector;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct TemporalFeatures {
18 pub entity_id: String,
19 pub event_count: u32,
20 pub mean_delta_s: f64,
21 pub min_delta_s: f64,
22 pub std_delta_s: f64,
23 pub burst_score: u32,
24 pub type_entropy: f64,
25 pub unique_categories: u32,
26 pub night_ratio: f64,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TemporalFeatureConfig {
32 pub entity_column: String,
33 pub timestamp_column: String,
34 pub type_column: String,
35 pub category_column: String,
36 pub burst_threshold_seconds: i64,
37}
38
39impl Default for TemporalFeatureConfig {
40 fn default() -> Self {
41 Self {
42 entity_column: "user_id".into(),
43 timestamp_column: "timestamp".into(),
44 type_column: "event_type".into(),
45 category_column: "repo_id".into(),
46 burst_threshold_seconds: 60,
47 }
48 }
49}
50
51pub fn extract_temporal_features(
68 parquet_path: &str,
69 config: &TemporalFeatureConfig,
70) -> Result<Vec<TemporalFeatures>> {
71 let ec = &config.entity_column;
72 let tc = &config.timestamp_column;
73 let tyc = &config.type_column;
74 let cc = &config.category_column;
75
76 let events = load_events_lazy(parquet_path)?;
78
79 let aggregates = events
81 .clone()
82 .group_by([col(ec)])
83 .agg([
84 col(tc).count().alias("event_count"),
85 col(cc).n_unique().alias("unique_categories"),
86 ])
87 .collect()?;
88
89 let with_deltas = events
91 .clone()
92 .sort([ec, tc], Default::default())
93 .with_columns([(col(tc) - col(tc).shift(lit(1)).over([col(ec)])).alias("delta_s")])
94 .filter(col("delta_s").is_not_null())
95 .group_by([col(ec)])
96 .agg([
97 col("delta_s").mean().alias("mean_delta_s"),
98 col("delta_s").min().alias("min_delta_s"),
99 col("delta_s").std(1).alias("std_delta_s"),
100 col("delta_s")
101 .lt(lit(config.burst_threshold_seconds))
102 .sum()
103 .alias("burst_score"),
104 ])
105 .collect()?;
106
107 let entropy = compute_type_entropy_polars(&events, ec, tyc)?;
109
110 let night = events
112 .clone()
113 .with_columns([((col(tc) % lit(86400)) / lit(3600)).alias("hour")])
114 .group_by([col(ec)])
115 .agg([col("hour")
116 .lt(lit(6))
117 .cast(DataType::Float64)
118 .mean()
119 .alias("night_ratio")])
120 .collect()?;
121
122 let features = aggregates
124 .lazy()
125 .join(
126 with_deltas.lazy(),
127 [col(ec)],
128 [col(ec)],
129 JoinArgs::new(JoinType::Left),
130 )
131 .join(
132 entropy.lazy(),
133 [col(ec)],
134 [col(ec)],
135 JoinArgs::new(JoinType::Left),
136 )
137 .join(
138 night.lazy(),
139 [col(ec)],
140 [col(ec)],
141 JoinArgs::new(JoinType::Left),
142 )
143 .with_columns([
144 col("mean_delta_s").fill_null(lit(0.0)),
145 col("min_delta_s").fill_null(lit(0.0)),
146 col("std_delta_s").fill_null(lit(0.0)),
147 col("burst_score").fill_null(lit(0)),
148 col("type_entropy").fill_null(lit(0.0)),
149 col("night_ratio").fill_null(lit(0.0)),
150 ])
151 .collect()?;
152
153 dataframe_to_temporal_features(&features, ec)
155}
156
157fn load_events_lazy(path: &str) -> Result<LazyFrame> {
159 if std::path::Path::new(path)
160 .extension()
161 .is_some_and(|ext| ext.eq_ignore_ascii_case("csv"))
162 {
163 let pb = std::path::PathBuf::from(path);
164 Ok(CsvReadOptions::default()
165 .with_has_header(true)
166 .try_into_reader_with_file_path(Some(pb))?
167 .finish()?
168 .lazy())
169 } else {
170 let pl_path = PlPath::from_str(path);
171 Ok(LazyFrame::scan_parquet(pl_path, Default::default())?)
172 }
173}
174
175fn compute_type_entropy_polars(events: &LazyFrame, ec: &str, tyc: &str) -> Result<DataFrame> {
177 let counts = events
178 .clone()
179 .group_by([col(ec), col(tyc)])
180 .agg([col(ec).count().alias("type_count")])
181 .collect()?;
182
183 let totals = counts
184 .clone()
185 .lazy()
186 .group_by([col(ec)])
187 .agg([col("type_count").sum().alias("total_count")])
188 .collect()?;
189
190 let with_prob = counts
191 .lazy()
192 .join(
193 totals.lazy(),
194 [col(ec)],
195 [col(ec)],
196 JoinArgs::new(JoinType::Left),
197 )
198 .with_columns([(col("type_count").cast(DataType::Float64)
199 / col("total_count").cast(DataType::Float64))
200 .alias("prob")])
201 .with_columns([
202 (col("prob") * col("prob").log(lit(std::f64::consts::E)) * lit(-1.0))
203 .alias("entropy_contrib"),
204 ])
205 .group_by([col(ec)])
206 .agg([col("entropy_contrib").sum().alias("type_entropy")])
207 .collect()?;
208
209 Ok(with_prob)
210}
211
212fn dataframe_to_temporal_features(df: &DataFrame, ec: &str) -> Result<Vec<TemporalFeatures>> {
214 let ids = df.column(ec)?.str()?;
215 let counts = df.column("event_count")?.cast(&DataType::UInt32)?;
216 let counts = counts.u32()?;
217 let mean_d = df.column("mean_delta_s")?.cast(&DataType::Float64)?;
218 let mean_d = mean_d.f64()?;
219 let min_d = df.column("min_delta_s")?.cast(&DataType::Float64)?;
220 let min_d = min_d.f64()?;
221 let std_d = df.column("std_delta_s")?.cast(&DataType::Float64)?;
222 let std_d = std_d.f64()?;
223 let burst = df.column("burst_score")?.cast(&DataType::UInt32)?;
224 let burst = burst.u32()?;
225 let entropy = df.column("type_entropy")?.cast(&DataType::Float64)?;
226 let entropy = entropy.f64()?;
227 let uniq = df.column("unique_categories")?.cast(&DataType::UInt32)?;
228 let uniq = uniq.u32()?;
229 let night = df.column("night_ratio")?.cast(&DataType::Float64)?;
230 let night = night.f64()?;
231
232 let mut result = Vec::with_capacity(df.height());
233 for i in 0..df.height() {
234 result.push(TemporalFeatures {
235 entity_id: ids.get(i).unwrap_or("?").to_string(),
236 event_count: counts.get(i).unwrap_or(0),
237 mean_delta_s: mean_d.get(i).unwrap_or(0.0),
238 min_delta_s: min_d.get(i).unwrap_or(0.0),
239 std_delta_s: std_d.get(i).unwrap_or(0.0),
240 burst_score: burst.get(i).unwrap_or(0),
241 type_entropy: entropy.get(i).unwrap_or(0.0),
242 unique_categories: uniq.get(i).unwrap_or(0),
243 night_ratio: night.get(i).unwrap_or(0.0),
244 });
245 }
246
247 Ok(result)
248}
249
250pub fn z_scores(values: &[f64]) -> Vec<f64> {
254 if values.is_empty() {
255 return vec![];
256 }
257 let n = values.len() as f64;
258 let mean = values.iter().sum::<f64>() / n;
259 let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
260 let std = variance.sqrt().max(1e-10);
261 values.iter().map(|x| (x - mean) / std).collect()
262}
263
264pub fn temporal_to_feature_vector(features: &[TemporalFeatures]) -> Result<FeatureVector> {
269 let n = features.len();
270 let cols = 8;
271 let mut data = Vec::with_capacity(n * cols);
272 for f in features {
273 data.push(f.event_count as f32);
274 data.push(f.mean_delta_s as f32);
275 data.push(f.min_delta_s as f32);
276 data.push(f.std_delta_s as f32);
277 data.push(f.burst_score as f32);
278 data.push(f.type_entropy as f32);
279 data.push(f.unique_categories as f32);
280 data.push(f.night_ratio as f32);
281 }
282 FeatureVector::new(data, [n, cols])
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[test]
290 fn z_scores_centers_and_scales() {
291 let vals = vec![1.0, 2.0, 3.0, 4.0, 5.0];
292 let z = z_scores(&vals);
293 let mean: f64 = z.iter().sum::<f64>() / z.len() as f64;
295 assert!(mean.abs() < 1e-10);
296 let std: f64 = (z.iter().map(|x| x.powi(2)).sum::<f64>() / z.len() as f64).sqrt();
298 assert!((std - 1.0).abs() < 1e-10);
299 }
300
301 #[test]
302 fn z_scores_empty_returns_empty() {
303 assert!(z_scores(&[]).is_empty());
304 }
305
306 #[test]
307 fn temporal_to_feature_vector_shape() {
308 let features = vec![
309 TemporalFeatures {
310 entity_id: "a".into(),
311 event_count: 10,
312 mean_delta_s: 100.0,
313 min_delta_s: 5.0,
314 std_delta_s: 50.0,
315 burst_score: 3,
316 type_entropy: 1.5,
317 unique_categories: 5,
318 night_ratio: 0.1,
319 },
320 TemporalFeatures {
321 entity_id: "b".into(),
322 event_count: 20,
323 mean_delta_s: 200.0,
324 min_delta_s: 10.0,
325 std_delta_s: 80.0,
326 burst_score: 0,
327 type_entropy: 1.8,
328 unique_categories: 10,
329 night_ratio: 0.0,
330 },
331 ];
332 let fv = temporal_to_feature_vector(&features).unwrap();
333 assert_eq!(fv.rows(), 2);
334 assert_eq!(fv.cols(), 8);
335 }
336}