roder_usage_analytics/
backfill.rs1use std::path::{Path, PathBuf};
11
12use anyhow::Context;
13use roder_api::events::EventEnvelope;
14use roder_api::thread::ThreadMetadata;
15
16use crate::ingest::AnalyticsIngestor;
17use crate::model::SessionRecord;
18use crate::store::AnalyticsStore;
19
20#[derive(Debug, Clone, Copy, Default)]
21pub struct BackfillOptions {
22 pub rebuild: bool,
24 pub best_effort: bool,
26}
27
28#[derive(Debug, Clone, Default, PartialEq, Eq)]
29pub struct BackfillReport {
30 pub files_scanned: u64,
31 pub lines_ingested: u64,
32 pub lines_skipped_by_offset: u64,
33 pub sessions_enriched: u64,
34 pub parse_errors: Vec<BackfillParseError>,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct BackfillParseError {
39 pub path: String,
40 pub line: u64,
41 pub message: String,
42}
43
44pub fn backfill_analytics(
48 thread_root: &Path,
49 store: &AnalyticsStore,
50 options: BackfillOptions,
51) -> anyhow::Result<BackfillReport> {
52 if options.rebuild {
53 store.clear_all()?;
54 }
55 let mut report = BackfillReport::default();
56 let ingestor = AnalyticsIngestor::new(store);
57
58 for events_path in find_event_logs(thread_root)? {
59 report.files_scanned += 1;
60 let source_key = events_path.display().to_string();
61 let already = store.import_offset(&source_key)?.unwrap_or(0);
62 let contents = std::fs::read_to_string(&events_path)
63 .with_context(|| format!("read {}", events_path.display()))?;
64
65 let mut line_number = 0_u64;
66 for line in contents.lines() {
67 line_number += 1;
68 if line_number <= already {
69 report.lines_skipped_by_offset += 1;
70 continue;
71 }
72 let trimmed = line.trim();
73 if trimmed.is_empty() {
74 continue;
75 }
76 match serde_json::from_str::<EventEnvelope>(trimmed) {
77 Ok(envelope) => {
78 ingestor.ingest_event(&envelope)?;
79 report.lines_ingested += 1;
80 }
81 Err(error) => {
82 let parse_error = BackfillParseError {
83 path: source_key.clone(),
84 line: line_number,
85 message: error.to_string(),
86 };
87 if options.best_effort {
88 report.parse_errors.push(parse_error);
89 } else {
90 anyhow::bail!(
91 "corrupt event at {}:{}: {} (rerun with --best-effort to skip)",
92 parse_error.path,
93 parse_error.line,
94 parse_error.message
95 );
96 }
97 }
98 }
99 }
100 let mtime_ms = std::fs::metadata(&events_path)
101 .and_then(|metadata| metadata.modified())
102 .ok()
103 .and_then(|modified| {
104 modified
105 .duration_since(std::time::UNIX_EPOCH)
106 .ok()
107 .map(|duration| duration.as_millis() as i64)
108 });
109 store.record_import_offset(&source_key, line_number, mtime_ms)?;
110
111 if let Some(metadata) = read_thread_metadata(&events_path) {
113 let (workspace_key, workspace_label) =
114 store.workspace_label_mode.label(&metadata.workspace);
115 store.upsert_session(&SessionRecord {
116 thread_id: metadata.thread_id.clone(),
117 workspace_key: Some(workspace_key),
118 workspace_label: Some(workspace_label),
119 provider: metadata.provider.clone(),
120 model: metadata.model.clone(),
121 created_at_ms: (metadata.created_at.unix_timestamp_nanos() / 1_000_000) as i64,
122 updated_at_ms: (metadata.updated_at.unix_timestamp_nanos() / 1_000_000) as i64,
123 })?;
124 report.sessions_enriched += 1;
125 }
126 }
127 Ok(report)
128}
129
130fn read_thread_metadata(events_path: &Path) -> Option<ThreadMetadata> {
131 let metadata_path = events_path.parent()?.join("metadata.json");
132 let data = std::fs::read(metadata_path).ok()?;
133 serde_json::from_slice(&data).ok()
134}
135
136fn find_event_logs(root: &Path) -> anyhow::Result<Vec<PathBuf>> {
139 let mut logs = Vec::new();
140 let mut stack = vec![(root.to_path_buf(), 0_u8)];
141 while let Some((dir, depth)) = stack.pop() {
142 let Ok(entries) = std::fs::read_dir(&dir) else {
143 continue;
144 };
145 for entry in entries.flatten() {
146 let path = entry.path();
147 if path.is_dir() {
148 if depth < 3 {
149 stack.push((path, depth + 1));
150 }
151 } else if path.file_name().is_some_and(|name| name == "events.jsonl") {
152 logs.push(path);
153 }
154 }
155 }
156 logs.sort();
157 Ok(logs)
158}