Skip to main content

roder_usage_analytics/
backfill.rs

1//! Idempotent analytics backfill from per-thread `events.jsonl` files
2//! (roadmap phase 73, Task 3).
3//!
4//! Raw thread-event JSONL stays the audit source of truth; this module
5//! replays it into the SQLite analytics store. Import offsets make repeated
6//! backfills incremental, `rebuild` clears analytics rows and replays from
7//! scratch, and `best_effort` reports corrupt lines with file/line evidence
8//! instead of failing the whole import.
9
10use std::path::{Path, PathBuf};
11
12use anyhow::Context;
13use roder_api::events::EventEnvelope;
14use roder_api::thread::ThreadMetadata;
15
16use crate::ingest::AnalyticsIngestor;
17use crate::model::SessionRecord;
18use crate::store::AnalyticsStore;
19
20#[derive(Debug, Clone, Copy, Default)]
21pub struct BackfillOptions {
22    /// Clear all analytics rows before replaying JSONL.
23    pub rebuild: bool,
24    /// Report corrupt/unknown lines and continue instead of failing.
25    pub best_effort: bool,
26}
27
28#[derive(Debug, Clone, Default, PartialEq, Eq)]
29pub struct BackfillReport {
30    pub files_scanned: u64,
31    pub lines_ingested: u64,
32    pub lines_skipped_by_offset: u64,
33    pub sessions_enriched: u64,
34    pub parse_errors: Vec<BackfillParseError>,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct BackfillParseError {
39    pub path: String,
40    pub line: u64,
41    pub message: String,
42}
43
44/// Replays every `events.jsonl` under `thread_root` (including archived
45/// threads) into `store`, then enriches session rows from each thread's
46/// `metadata.json` (workspace label, provider, model).
47pub fn backfill_analytics(
48    thread_root: &Path,
49    store: &AnalyticsStore,
50    options: BackfillOptions,
51) -> anyhow::Result<BackfillReport> {
52    if options.rebuild {
53        store.clear_all()?;
54    }
55    let mut report = BackfillReport::default();
56    let ingestor = AnalyticsIngestor::new(store);
57
58    for events_path in find_event_logs(thread_root)? {
59        report.files_scanned += 1;
60        let source_key = events_path.display().to_string();
61        let already = store.import_offset(&source_key)?.unwrap_or(0);
62        let contents = std::fs::read_to_string(&events_path)
63            .with_context(|| format!("read {}", events_path.display()))?;
64
65        let mut line_number = 0_u64;
66        for line in contents.lines() {
67            line_number += 1;
68            if line_number <= already {
69                report.lines_skipped_by_offset += 1;
70                continue;
71            }
72            let trimmed = line.trim();
73            if trimmed.is_empty() {
74                continue;
75            }
76            match serde_json::from_str::<EventEnvelope>(trimmed) {
77                Ok(envelope) => {
78                    ingestor.ingest_event(&envelope)?;
79                    report.lines_ingested += 1;
80                }
81                Err(error) => {
82                    let parse_error = BackfillParseError {
83                        path: source_key.clone(),
84                        line: line_number,
85                        message: error.to_string(),
86                    };
87                    if options.best_effort {
88                        report.parse_errors.push(parse_error);
89                    } else {
90                        anyhow::bail!(
91                            "corrupt event at {}:{}: {} (rerun with --best-effort to skip)",
92                            parse_error.path,
93                            parse_error.line,
94                            parse_error.message
95                        );
96                    }
97                }
98            }
99        }
100        let mtime_ms = std::fs::metadata(&events_path)
101            .and_then(|metadata| metadata.modified())
102            .ok()
103            .and_then(|modified| {
104                modified
105                    .duration_since(std::time::UNIX_EPOCH)
106                    .ok()
107                    .map(|duration| duration.as_millis() as i64)
108            });
109        store.record_import_offset(&source_key, line_number, mtime_ms)?;
110
111        // Enrich the session row from thread metadata when present.
112        if let Some(metadata) = read_thread_metadata(&events_path) {
113            let (workspace_key, workspace_label) =
114                store.workspace_label_mode.label(&metadata.workspace);
115            store.upsert_session(&SessionRecord {
116                thread_id: metadata.thread_id.clone(),
117                workspace_key: Some(workspace_key),
118                workspace_label: Some(workspace_label),
119                provider: metadata.provider.clone(),
120                model: metadata.model.clone(),
121                created_at_ms: (metadata.created_at.unix_timestamp_nanos() / 1_000_000) as i64,
122                updated_at_ms: (metadata.updated_at.unix_timestamp_nanos() / 1_000_000) as i64,
123            })?;
124            report.sessions_enriched += 1;
125        }
126    }
127    Ok(report)
128}
129
130fn read_thread_metadata(events_path: &Path) -> Option<ThreadMetadata> {
131    let metadata_path = events_path.parent()?.join("metadata.json");
132    let data = std::fs::read(metadata_path).ok()?;
133    serde_json::from_slice(&data).ok()
134}
135
136/// Finds every `events.jsonl` under `root` (bounded depth: thread dirs and
137/// the archived-threads subtree).
138fn find_event_logs(root: &Path) -> anyhow::Result<Vec<PathBuf>> {
139    let mut logs = Vec::new();
140    let mut stack = vec![(root.to_path_buf(), 0_u8)];
141    while let Some((dir, depth)) = stack.pop() {
142        let Ok(entries) = std::fs::read_dir(&dir) else {
143            continue;
144        };
145        for entry in entries.flatten() {
146            let path = entry.path();
147            if path.is_dir() {
148                if depth < 3 {
149                    stack.push((path, depth + 1));
150                }
151            } else if path.file_name().is_some_and(|name| name == "events.jsonl") {
152                logs.push(path);
153            }
154        }
155    }
156    logs.sort();
157    Ok(logs)
158}