Skip to main content

context_bar_core/
aggregate.rs

1//! Deterministic transforms — slice 2 of folding `usage_signal.py` into Rust
2//! (ROADMAP E1). Ported 1:1 from `empty_metrics`/`_add_metrics`,
3//! `split_logical_sessions`, `_empty_bucket`/`_accumulate`, `bucket_aggregates`,
4//! and `project_name_from_cwd`. Pure given (events, NOW, UTC offset) — pinned by
5//! a golden fixture generated from the Python (`tests/aggregate_golden.rs`).
6//!
7//! Day/week/month bucketing uses a fixed UTC offset (seconds east of UTC). The
8//! Python uses the system local tz via `astimezone()`; for fixed-offset zones
9//! (e.g. Türkiye, permanent UTC+3) that is identical, and the golden pins it
10//! with `TZ=UTC` / offset 0. DST-aware per-timestamp offsets are a later
11//! refinement (would need a tz database); documented in COST_MODEL/ROADMAP.
12
13use time::{Date, OffsetDateTime, UtcOffset};
14
15use crate::usage_signal::{DailyInstance, NamedBucket, SessionRecord, TimeBucket};
16
17/// Idle gap (seconds) that splits one transcript file into logical sessions —
18/// matches Claude's 5h window, which resets 5h after the *first* turn.
19pub const SESSION_IDLE_GAP: f64 = 5.0 * 3600.0;
20/// 30-day rolling window in seconds.
21pub const WIN_30D: f64 = 30.0 * 86400.0;
22
23/// Per-turn token buckets + estimated cost. `total` is the stats token total
24/// (fresh_in + output) the parser computes; the four buckets feed the cost view.
25#[derive(Clone, Copy, Debug, Default, PartialEq)]
26pub struct TurnMetrics {
27    pub total: u64,
28    pub cache_read: u64,
29    pub input: u64,
30    pub output: u64,
31    pub cache_creation: u64,
32    pub cost: f64,
33}
34
35impl TurnMetrics {
36    fn add(&mut self, m: &TurnMetrics) {
37        self.total += m.total;
38        self.cache_read += m.cache_read;
39        self.input += m.input;
40        self.output += m.output;
41        self.cache_creation += m.cache_creation;
42        self.cost += m.cost;
43    }
44}
45
46/// One transcript file's per-turn events plus its resolved model/cwd.
47#[derive(Clone, Debug, Default)]
48pub struct FileEvents {
49    pub model: Option<String>,
50    pub cwd: Option<String>,
51    /// (epoch seconds, metrics) — sorted by ts inside [`split_logical_sessions`].
52    pub events: Vec<(f64, TurnMetrics)>,
53}
54
55/// A logical session chunk that feeds [`bucket_aggregates`].
56#[derive(Clone, Debug)]
57pub struct Session {
58    pub tokens: u64,
59    pub cache_read: u64,
60    pub input: u64,
61    pub output: u64,
62    pub cache_creation: u64,
63    pub cost: f64,
64    pub last_ts: f64,
65    pub first_ts: f64,
66    pub model: Option<String>,
67    pub cwd: Option<String>,
68}
69
70/// `project_name_from_cwd`: basename of the cwd, or `—` when absent.
71pub fn project_name_from_cwd(cwd: Option<&str>) -> String {
72    match cwd {
73        None => "—".to_string(),
74        Some(c) if c.is_empty() => "—".to_string(),
75        Some(c) => {
76            let trimmed = c.trim_end_matches('/');
77            let base = trimmed.rsplit('/').next().unwrap_or("");
78            if base.is_empty() {
79                c.to_string()
80            } else {
81                base.to_string()
82            }
83        }
84    }
85}
86
87/// `basename(path)` with the final extension removed (mirrors
88/// `os.path.basename(path).rsplit(".", 1)[0]`).
89fn session_base_id(path: &str) -> String {
90    let base = path.trim_end_matches('/').rsplit('/').next().unwrap_or(path);
91    match base.rsplit_once('.') {
92        Some((stem, _ext)) if !stem.is_empty() => stem.to_string(),
93        _ => base.to_string(),
94    }
95}
96
97// Python's round() is banker's rounding (ties to even); mirror it so the
98// rounded cost/duration values are byte-identical.
99fn round6(x: f64) -> f64 {
100    (x * 1e6).round_ties_even() / 1e6
101}
102
103fn round1(x: f64) -> f64 {
104    (x * 10.0).round_ties_even() / 10.0
105}
106
107/// Format an epoch-seconds value as a UTC ISO8601 string ending in `Z`.
108/// Subsecond is emitted as 6 digits only when nonzero (mirrors Python's
109/// `datetime.fromtimestamp(ts, tz=utc).isoformat().replace("+00:00","Z")`).
110pub fn iso_utc(ts: f64) -> String {
111    let whole = ts.floor() as i64;
112    let micros = ((ts - ts.floor()) * 1_000_000.0).round() as i64;
113    let (whole, micros) = if micros >= 1_000_000 {
114        (whole + 1, 0)
115    } else {
116        (whole, micros)
117    };
118    let dt = OffsetDateTime::from_unix_timestamp(whole)
119        .unwrap_or(OffsetDateTime::UNIX_EPOCH)
120        .to_offset(UtcOffset::UTC);
121    let base = format!(
122        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}",
123        dt.year(),
124        u8::from(dt.month()),
125        dt.day(),
126        dt.hour(),
127        dt.minute(),
128        dt.second(),
129    );
130    if micros == 0 {
131        format!("{base}Z")
132    } else {
133        format!("{base}.{micros:06}Z")
134    }
135}
136
137/// Parse an ISO-8601 / RFC-3339 timestamp to epoch seconds (mirrors Python
138/// `datetime.fromisoformat(...).timestamp()` for the `Z`/offset forms used in
139/// transcripts). Returns `None` on empty/unparseable input.
140pub fn parse_iso(value: Option<&str>) -> Option<f64> {
141    let s = value?;
142    if s.is_empty() {
143        return None;
144    }
145    time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
146        .ok()
147        .map(|dt| dt.unix_timestamp_nanos() as f64 / 1e9)
148}
149
150/// Local civil date/labels for an epoch second at a fixed UTC offset.
151fn local_dt(ts: f64, offset: UtcOffset) -> OffsetDateTime {
152    OffsetDateTime::from_unix_timestamp(ts.floor() as i64)
153        .unwrap_or(OffsetDateTime::UNIX_EPOCH)
154        .to_offset(offset)
155}
156
157fn day_key(dt: OffsetDateTime) -> String {
158    format!("{:04}-{:02}-{:02}", dt.year(), u8::from(dt.month()), dt.day())
159}
160
161fn week_key(dt: OffsetDateTime) -> String {
162    let (iso_year, week, _) = dt.to_iso_week_date();
163    format!("{iso_year}-W{week:02}")
164}
165
166fn month_key(dt: OffsetDateTime) -> String {
167    format!("{:04}-{:02}", dt.year(), u8::from(dt.month()))
168}
169
170/// Insertion-ordered string-keyed accumulator, so a later stable sort
171/// reproduces Python's dict-insertion-order tie-breaking exactly.
172#[derive(Default)]
173struct OrderedBuckets {
174    order: Vec<String>,
175    idx: std::collections::HashMap<String, usize>,
176    buckets: Vec<Bucket>,
177}
178
179#[derive(Clone, Default)]
180struct Bucket {
181    tokens: u64,
182    sessions: u64,
183    cache_read: u64,
184    input: u64,
185    output: u64,
186    cache_creation: u64,
187    cost: f64,
188}
189
190impl Bucket {
191    fn accumulate(&mut self, s: &Session) {
192        self.tokens += s.tokens;
193        self.sessions += 1;
194        self.cache_read += s.cache_read;
195        self.input += s.input;
196        self.output += s.output;
197        self.cache_creation += s.cache_creation;
198        self.cost += s.cost;
199    }
200}
201
202impl OrderedBuckets {
203    fn entry(&mut self, key: &str) -> &mut Bucket {
204        if let Some(&i) = self.idx.get(key) {
205            return &mut self.buckets[i];
206        }
207        let i = self.buckets.len();
208        self.idx.insert(key.to_string(), i);
209        self.order.push(key.to_string());
210        self.buckets.push(Bucket::default());
211        &mut self.buckets[i]
212    }
213}
214
215/// Result of [`bucket_aggregates`] — mirrors the snapshot's aggregate fields.
216#[derive(Clone, Debug, Default)]
217pub struct Buckets {
218    pub total_tokens_30d: u64,
219    pub total_sessions_30d: u64,
220    pub total_cost_30d: f64,
221    pub total_input_30d: u64,
222    pub total_output_30d: u64,
223    pub cost_today: f64,
224    pub max_session_minutes: f64,
225    pub by_day: Vec<TimeBucket>,
226    pub by_week: Vec<TimeBucket>,
227    pub by_month: Vec<TimeBucket>,
228    pub by_model: Vec<NamedBucket>,
229    pub by_project: Vec<NamedBucket>,
230    pub by_day_project: Vec<DailyInstance>,
231}
232
233/// Split each file's events into logical sessions on the 5h idle gap, returning
234/// `(sessions, recent)` — sessions feed [`bucket_aggregates`], recent feeds
235/// `recent_sessions`. `files` is iterated in its existing order (use a
236/// `BTreeMap` for determinism). Mirrors `split_logical_sessions`.
237pub fn split_logical_sessions(
238    files: &std::collections::BTreeMap<String, FileEvents>,
239) -> (Vec<Session>, Vec<SessionRecord>) {
240    let mut sessions = Vec::new();
241    let mut recent = Vec::new();
242
243    for (path, fe) in files {
244        let mut events = fe.events.clone();
245        events.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
246        if events.is_empty() {
247            continue;
248        }
249        // Split into chunks: a new chunk starts when an event is more than
250        // SESSION_IDLE_GAP after the current chunk's FIRST turn.
251        let mut chunks: Vec<Vec<(f64, TurnMetrics)>> = Vec::new();
252        let mut cur: Vec<(f64, TurnMetrics)> = vec![events[0]];
253        let mut session_start = events[0].0;
254        for &nxt in &events[1..] {
255            if nxt.0 - session_start > SESSION_IDLE_GAP {
256                chunks.push(std::mem::take(&mut cur));
257                cur = vec![nxt];
258                session_start = nxt.0;
259            } else {
260                cur.push(nxt);
261            }
262        }
263        chunks.push(cur);
264
265        let base_id = session_base_id(path);
266        let multi = chunks.len() > 1;
267        for (i, chunk) in chunks.iter().enumerate() {
268            let first_ts = chunk[0].0;
269            let last_ts = chunk[chunk.len() - 1].0;
270            let mut agg = TurnMetrics::default();
271            for (_, m) in chunk {
272                agg.add(m);
273            }
274            sessions.push(Session {
275                tokens: agg.total,
276                cache_read: agg.cache_read,
277                input: agg.input,
278                output: agg.output,
279                cache_creation: agg.cache_creation,
280                cost: agg.cost,
281                last_ts,
282                first_ts,
283                model: fe.model.clone(),
284                cwd: fe.cwd.clone(),
285            });
286            recent.push(SessionRecord {
287                id: if multi {
288                    format!("{base_id}#{}", i + 1)
289                } else {
290                    base_id.clone()
291                },
292                started_at: iso_utc(first_ts),
293                ended_at: iso_utc(last_ts),
294                duration_minutes: round1((last_ts - first_ts) / 60.0),
295                tokens: agg.total,
296                cache_read: agg.cache_read,
297                input: agg.input,
298                output: agg.output,
299                cache_creation: agg.cache_creation,
300                cost: round6(agg.cost),
301                model: fe.model.clone().unwrap_or_else(|| "—".to_string()),
302                project: project_name_from_cwd(fe.cwd.as_deref()),
303            });
304        }
305    }
306    (sessions, recent)
307}
308
309#[allow(clippy::too_many_arguments)]
310fn time_bucket(date: String, b: &Bucket) -> TimeBucket {
311    TimeBucket {
312        date,
313        tokens: b.tokens,
314        sessions: b.sessions,
315        input: b.input,
316        output: b.output,
317        cache_creation: b.cache_creation,
318        cache_read: b.cache_read,
319        cost: round6(b.cost),
320    }
321}
322
323fn named_bucket(model: String, b: &Bucket) -> NamedBucket {
324    NamedBucket {
325        model,
326        tokens: b.tokens,
327        sessions: b.sessions,
328        input: b.input,
329        output: b.output,
330        cache_creation: b.cache_creation,
331        cache_read: b.cache_read,
332        cost: round6(b.cost),
333    }
334}
335
336/// Stable top-N by tokens descending (ties keep insertion order).
337fn take_by_tokens(buckets: &OrderedBuckets, n: usize) -> Vec<(String, Bucket)> {
338    let mut items: Vec<(String, Bucket)> = buckets
339        .order
340        .iter()
341        .enumerate()
342        .map(|(i, k)| (k.clone(), buckets.buckets[i].clone()))
343        .collect();
344    items.sort_by(|a, b| b.1.tokens.cmp(&a.1.tokens));
345    items.truncate(n);
346    items
347}
348
349/// Stable top-N by key string descending (for by_week / by_month).
350fn take_by_key(buckets: &OrderedBuckets, n: usize) -> Vec<(String, Bucket)> {
351    let mut items: Vec<(String, Bucket)> = buckets
352        .order
353        .iter()
354        .enumerate()
355        .map(|(i, k)| (k.clone(), buckets.buckets[i].clone()))
356        .collect();
357    items.sort_by(|a, b| b.0.cmp(&a.0));
358    items.truncate(n);
359    items
360}
361
362/// Roll sessions into day/week/month/model/project buckets + the day×project
363/// cross-tab + 30d totals. `now` is epoch seconds; `offset` is the fixed local
364/// UTC offset. Mirrors `bucket_aggregates` (days=365, weeks=52, months=24,
365/// instance_days=30, instance_rows=200).
366pub fn bucket_aggregates(sessions: &[Session], now: f64, offset: UtcOffset) -> Buckets {
367    let mut by_day = OrderedBuckets::default();
368    let mut by_week = OrderedBuckets::default();
369    let mut by_month = OrderedBuckets::default();
370    let mut by_model = OrderedBuckets::default();
371    let mut by_project = OrderedBuckets::default();
372    // (day, project) -> (bucket, ordered models). Keep insertion order.
373    let mut dp_order: Vec<(String, String)> = Vec::new();
374    let mut dp_idx: std::collections::HashMap<(String, String), usize> = Default::default();
375    let mut dp_buckets: Vec<Bucket> = Vec::new();
376    let mut dp_models: Vec<Vec<String>> = Vec::new();
377
378    let mut total30: u64 = 0;
379    let mut sessions30: u64 = 0;
380    let mut cost30: f64 = 0.0;
381    let mut input30: u64 = 0;
382    let mut output30: u64 = 0;
383    let cutoff30 = now - WIN_30D;
384    let today_key = day_key(local_dt(now, offset));
385
386    for s in sessions {
387        let ts = s.last_ts;
388        let dt = local_dt(ts, offset);
389        let day = day_key(dt);
390        let week = week_key(dt);
391        let month = month_key(dt);
392        let proj = project_name_from_cwd(s.cwd.as_deref());
393
394        by_day.entry(&day).accumulate(s);
395        by_week.entry(&week).accumulate(s);
396        by_month.entry(&month).accumulate(s);
397        if let Some(model) = &s.model {
398            if !model.is_empty() {
399                by_model.entry(model).accumulate(s);
400            }
401        }
402        by_project.entry(&proj).accumulate(s);
403
404        // Per (day × project) cross-tab, scoped to the recent 30-day window.
405        if now - ts <= 30.0 * 86400.0 {
406            let key = (day.clone(), proj.clone());
407            let i = match dp_idx.get(&key) {
408                Some(&i) => i,
409                None => {
410                    let i = dp_buckets.len();
411                    dp_idx.insert(key.clone(), i);
412                    dp_order.push(key.clone());
413                    dp_buckets.push(Bucket::default());
414                    dp_models.push(Vec::new());
415                    i
416                }
417            };
418            dp_buckets[i].accumulate(s);
419            if let Some(model) = &s.model {
420                if !model.is_empty() && !dp_models[i].contains(model) {
421                    dp_models[i].push(model.clone());
422                }
423            }
424        }
425
426        if ts >= cutoff30 {
427            total30 += s.tokens;
428            sessions30 += 1;
429            cost30 += s.cost;
430            input30 += s.input;
431            output30 += s.output;
432        }
433    }
434
435    // by_day: pad every calendar day in the 365-day window, newest first.
436    let today_local = local_dt(now, offset);
437    let today_date = today_local.date();
438    let mut padded_day = Vec::with_capacity(365);
439    for i in 0..365i64 {
440        let d: Date = today_date.saturating_sub(time::Duration::days(i));
441        let key = format!("{:04}-{:02}-{:02}", d.year(), u8::from(d.month()), d.day());
442        match by_day.idx.get(&key) {
443            Some(&j) => padded_day.push(time_bucket(key.clone(), &by_day.buckets[j])),
444            None => padded_day.push(time_bucket(key.clone(), &Bucket::default())),
445        }
446    }
447
448    let by_week_out = take_by_key(&by_week, 52)
449        .into_iter()
450        .map(|(k, b)| time_bucket(k, &b))
451        .collect();
452    let by_month_out = take_by_key(&by_month, 24)
453        .into_iter()
454        .map(|(k, b)| time_bucket(k, &b))
455        .collect();
456    let by_model_out = take_by_tokens(&by_model, 20)
457        .into_iter()
458        .map(|(k, b)| named_bucket(k, &b))
459        .collect();
460    let by_project_out = take_by_tokens(&by_project, 20)
461        .into_iter()
462        .map(|(k, b)| named_bucket(k, &b))
463        .collect();
464
465    // by_day_project: newest day first, within a day by cost desc; cap 200.
466    let mut instances: Vec<DailyInstance> = dp_order
467        .iter()
468        .enumerate()
469        .map(|(i, (day, proj))| {
470            let b = &dp_buckets[i];
471            let mut models = dp_models[i].clone();
472            models.sort();
473            DailyInstance {
474                date: day.clone(),
475                project: proj.clone(),
476                models,
477                tokens: b.tokens,
478                sessions: b.sessions,
479                input: b.input,
480                output: b.output,
481                cache_creation: b.cache_creation,
482                cache_read: b.cache_read,
483                cost: round6(b.cost),
484            }
485        })
486        .collect();
487    // Python: sort(key=(date, cost), reverse=True) — a single stable sort with a
488    // composite key, descending on both.
489    instances.sort_by(|a, b| {
490        b.date
491            .cmp(&a.date)
492            .then(b.cost.partial_cmp(&a.cost).unwrap_or(std::cmp::Ordering::Equal))
493    });
494    instances.truncate(200);
495
496    // Longest single session across all history (minutes).
497    let mut max_session_minutes = 0.0f64;
498    for s in sessions {
499        let dur = (s.last_ts - s.first_ts) / 60.0;
500        if dur > max_session_minutes {
501            max_session_minutes = dur;
502        }
503    }
504
505    let cost_today = by_day
506        .idx
507        .get(&today_key)
508        .map(|&j| by_day.buckets[j].cost)
509        .unwrap_or(0.0);
510
511    Buckets {
512        total_tokens_30d: total30,
513        total_sessions_30d: sessions30,
514        total_cost_30d: round6(cost30),
515        total_input_30d: input30,
516        total_output_30d: output30,
517        cost_today: round6(cost_today),
518        max_session_minutes: round1(max_session_minutes),
519        by_day: padded_day,
520        by_week: by_week_out,
521        by_month: by_month_out,
522        by_model: by_model_out,
523        by_project: by_project_out,
524        by_day_project: instances,
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531
532    #[test]
533    fn project_name_basename_rules() {
534        assert_eq!(project_name_from_cwd(None), "—");
535        assert_eq!(project_name_from_cwd(Some("")), "—");
536        assert_eq!(project_name_from_cwd(Some("/a/b/c")), "c");
537        assert_eq!(project_name_from_cwd(Some("/a/b/c/")), "c");
538        assert_eq!(project_name_from_cwd(Some("/")), "/");
539    }
540
541    #[test]
542    fn session_id_strips_extension() {
543        assert_eq!(session_base_id("/x/y/abc.jsonl"), "abc");
544        assert_eq!(session_base_id("/x/y/a.b.jsonl"), "a.b");
545        assert_eq!(session_base_id("noext"), "noext");
546    }
547
548    #[test]
549    fn idle_gap_splits_from_first_turn() {
550        let mut files = std::collections::BTreeMap::new();
551        let m = TurnMetrics { total: 10, input: 5, output: 5, ..Default::default() };
552        files.insert(
553            "/p/s.jsonl".to_string(),
554            FileEvents {
555                model: Some("claude-opus-4-8".into()),
556                cwd: Some("/home/proj".into()),
557                // t=0, t=1h (same session, <5h from start), t=6h (new session).
558                events: vec![(0.0, m), (3600.0, m), (6.0 * 3600.0, m)],
559            },
560        );
561        let (sessions, recent) = split_logical_sessions(&files);
562        assert_eq!(sessions.len(), 2);
563        assert_eq!(recent.len(), 2);
564        assert_eq!(recent[0].id, "s#1");
565        assert_eq!(sessions[0].tokens, 20); // first two turns
566        assert_eq!(sessions[1].tokens, 10);
567    }
568}