use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuildEvent {
pub ts: DateTime<Utc>,
pub crate_name: String,
#[serde(default)]
pub version: String,
pub result: EventResult,
pub elapsed_ms: u64,
#[serde(default)]
pub compile_time_ms: u64,
pub size: u64,
#[serde(default)]
pub cache_key: String,
#[serde(default)]
pub schema: u32,
#[serde(default)]
pub key_ms: u64,
#[serde(default)]
pub key_hash_hits: u64,
#[serde(default)]
pub key_hash_misses: u64,
#[serde(default)]
pub key_hash_bytes: u64,
#[serde(default)]
pub lookup_ms: u64,
#[serde(default)]
pub restore_ms: u64,
#[serde(default)]
pub store_ms: u64,
#[serde(default)]
pub store_output_blobs: u32,
#[serde(default)]
pub store_duplicate_blobs: u32,
#[serde(default)]
pub store_new_blobs: u32,
#[serde(default)]
pub compiler_runs: u32,
#[serde(default)]
pub preprocessor_runs: u32,
#[serde(default)]
pub probe_runs: u32,
#[serde(default)]
pub reflinked_bytes: u64,
#[serde(default)]
pub hardlinked_bytes: u64,
#[serde(default)]
pub copied_bytes: u64,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub passthrough_reason: String,
#[serde(default, skip_serializing_if = "is_false")]
pub fallback: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exit_code: Option<i32>,
}
fn is_false(value: &bool) -> bool {
!*value
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EventResult {
LocalHit,
PrefetchHit,
RemoteHit,
Dup,
Miss,
Error,
Passthrough,
Skipped,
}
impl std::fmt::Display for EventResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EventResult::LocalHit => write!(f, "local_hit"),
EventResult::PrefetchHit => write!(f, "prefetch_hit"),
EventResult::RemoteHit => write!(f, "remote_hit"),
EventResult::Dup => write!(f, "dup"),
EventResult::Miss => write!(f, "miss"),
EventResult::Error => write!(f, "error"),
EventResult::Passthrough => write!(f, "passthrough"),
EventResult::Skipped => write!(f, "skipped"),
}
}
}
#[allow(dead_code)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuildSummaryEvent {
pub ts: DateTime<Utc>,
pub schema: u32,
pub warming_wait_ms: u64,
pub prefetch_duration_ms: u64,
pub prefetch_requested: usize,
pub prefetch_downloaded: usize,
pub shards_matched: usize,
pub shards_total: usize,
}
pub fn log_event(event_log_path: &Path, event: &BuildEvent) -> Result<()> {
if let Some(parent) = event_log_path.parent() {
fs::create_dir_all(parent)?;
}
let lock = open_log_lock(event_log_path).context("opening event log lock")?;
lock.lock().context("locking event log")?;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(event_log_path)
.context("opening event log")?;
let line = serde_json::to_string(event).context("serializing event")?;
let mut bytes = line.into_bytes();
bytes.push(b'\n');
file.write_all(&bytes).context("writing event to log")?;
lock.unlock().context("unlocking event log")?;
Ok(())
}
pub fn read_events(event_log_path: &Path) -> Result<Vec<BuildEvent>> {
if !event_log_path.exists() {
return Ok(Vec::new());
}
let lock = open_log_lock(event_log_path).context("opening event log lock")?;
lock.lock_shared().context("locking event log for read")?;
let file = File::open(event_log_path).context("opening event log")?;
let reader = BufReader::new(&file);
let mut events = Vec::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<BuildEvent>(&line) {
Ok(event) => events.push(event),
Err(e) => {
tracing::debug!("skipping invalid event line: {}", e);
}
}
}
lock.unlock().context("unlocking event log")?;
Ok(events)
}
pub fn read_events_since(event_log_path: &Path, since: DateTime<Utc>) -> Result<Vec<BuildEvent>> {
let all = read_events(event_log_path)?;
Ok(all.into_iter().filter(|e| e.ts >= since).collect())
}
pub struct EventTailer {
path: PathBuf,
position: u64,
file: Option<File>,
}
impl EventTailer {
pub fn new(path: PathBuf) -> Self {
let file = File::open(&path).ok();
let position = file
.as_ref()
.and_then(|file| file.metadata().ok())
.map(|m| m.len())
.unwrap_or(0);
EventTailer {
path,
position,
file,
}
}
pub fn from_start(path: PathBuf) -> Self {
let file = File::open(&path).ok();
EventTailer {
path,
position: 0,
file,
}
}
pub fn poll(&mut self) -> Result<Vec<BuildEvent>> {
if self.file.is_none() {
match File::open(&self.path) {
Ok(file) => self.file = Some(file),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(e.into()),
}
}
let file = self.file.as_mut().unwrap();
let file_len = file.metadata()?.len();
if file_len < self.position {
self.position = 0;
}
if file_len <= self.position {
return Ok(Vec::new());
}
file.seek(SeekFrom::Start(self.position))?;
let reader = BufReader::new(file);
let mut events = Vec::new();
let mut bytes_read = 0u64;
for line in reader.lines() {
let line = line?;
bytes_read += line.len() as u64 + 1; if line.trim().is_empty() {
continue;
}
if let Ok(event) = serde_json::from_str::<BuildEvent>(&line) {
events.push(event);
}
}
self.position += bytes_read;
Ok(events)
}
}
pub fn rotate_if_needed(event_log_path: &Path, max_size: u64, keep_lines: usize) -> Result<()> {
if !event_log_path.exists() {
return Ok(());
}
let meta = fs::metadata(event_log_path)?;
if meta.len() <= max_size {
return Ok(());
}
let content = fs::read_to_string(event_log_path)?;
let lines: Vec<&str> = content.lines().collect();
let keep_from = lines.len().saturating_sub(keep_lines);
let kept: Vec<&str> = lines[keep_from..].to_vec();
fs::write(event_log_path, kept.join("\n") + "\n")?;
tracing::info!(
"rotated event log: kept {} of {} lines",
kept.len(),
lines.len()
);
Ok(())
}
use crate::daemon::TransferEvent;
pub fn log_transfer(transfer_log_path: &Path, event: &TransferEvent) -> Result<()> {
if let Some(parent) = transfer_log_path.parent() {
fs::create_dir_all(parent)?;
}
let lock = open_log_lock(transfer_log_path).context("opening transfer log lock")?;
lock.lock().context("locking transfer log")?;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(transfer_log_path)
.context("opening transfer log")?;
let line = serde_json::to_string(event).context("serializing transfer event")?;
let mut bytes = line.into_bytes();
bytes.push(b'\n');
file.write_all(&bytes)
.context("writing transfer event to log")?;
lock.unlock().context("unlocking transfer log")?;
Ok(())
}
pub fn read_transfers(transfer_log_path: &Path) -> Result<Vec<TransferEvent>> {
if !transfer_log_path.exists() {
return Ok(Vec::new());
}
let lock = open_log_lock(transfer_log_path).context("opening transfer log lock")?;
lock.lock_shared()
.context("locking transfer log for read")?;
let file = File::open(transfer_log_path).context("opening transfer log")?;
let reader = BufReader::new(&file);
let mut events = Vec::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<TransferEvent>(&line) {
Ok(event) => events.push(event),
Err(e) => {
tracing::debug!("skipping invalid transfer line: {}", e);
}
}
}
lock.unlock().context("unlocking transfer log")?;
Ok(events)
}
fn open_log_lock(log_path: &Path) -> Result<File> {
OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(sidecar_lock_path(log_path))
.context("opening log lock")
}
fn sidecar_lock_path(log_path: &Path) -> PathBuf {
let mut path = log_path.as_os_str().to_owned();
path.push(".lock");
PathBuf::from(path)
}
pub fn read_transfers_since(transfer_log_path: &Path, since_ts: u64) -> Result<Vec<TransferEvent>> {
let all = read_transfers(transfer_log_path)?;
Ok(all
.into_iter()
.filter(|e| e.timestamp >= since_ts)
.collect())
}
pub fn rotate_transfers_if_needed(
transfer_log_path: &Path,
max_size: u64,
keep_lines: usize,
) -> Result<()> {
if !transfer_log_path.exists() {
return Ok(());
}
let meta = fs::metadata(transfer_log_path)?;
if meta.len() <= max_size {
return Ok(());
}
let content = fs::read_to_string(transfer_log_path)?;
let lines: Vec<&str> = content.lines().collect();
let keep_from = lines.len().saturating_sub(keep_lines);
let kept: Vec<&str> = lines[keep_from..].to_vec();
fs::write(transfer_log_path, kept.join("\n") + "\n")?;
tracing::info!(
"rotated transfer log: kept {} of {} lines",
kept.len(),
lines.len()
);
Ok(())
}
#[allow(dead_code)]
pub fn clear_events(event_log_path: &Path) -> Result<()> {
if event_log_path.exists() {
fs::write(event_log_path, "")?;
}
Ok(())
}
pub struct EventStats {
#[allow(dead_code)]
pub total: usize,
pub local_hits: usize,
pub prefetch_hits: usize,
pub remote_hits: usize,
pub dups: usize,
pub misses: usize,
pub errors: usize,
pub total_size: u64,
pub total_elapsed_ms: u64,
pub hit_elapsed_ms: u64,
pub miss_elapsed_ms: u64,
pub hit_compile_time_ms: u64,
pub miss_compile_time_ms: u64,
pub total_key_ms: u64,
pub total_lookup_ms: u64,
pub total_restore_ms: u64,
pub total_store_ms: u64,
pub store_output_blobs: u32,
pub store_duplicate_blobs: u32,
pub store_new_blobs: u32,
pub reflinked_bytes: u64,
pub hardlinked_bytes: u64,
pub copied_bytes: u64,
}
pub fn compute_stats(events: &[BuildEvent]) -> EventStats {
let mut stats = EventStats {
total: events.len(),
local_hits: 0,
prefetch_hits: 0,
remote_hits: 0,
dups: 0,
misses: 0,
errors: 0,
total_size: 0,
total_elapsed_ms: 0,
hit_elapsed_ms: 0,
miss_elapsed_ms: 0,
hit_compile_time_ms: 0,
miss_compile_time_ms: 0,
total_key_ms: 0,
total_lookup_ms: 0,
total_restore_ms: 0,
total_store_ms: 0,
store_output_blobs: 0,
store_duplicate_blobs: 0,
store_new_blobs: 0,
reflinked_bytes: 0,
hardlinked_bytes: 0,
copied_bytes: 0,
};
for event in events {
match event.result {
EventResult::LocalHit => {
stats.local_hits += 1;
stats.hit_elapsed_ms += event.elapsed_ms;
stats.hit_compile_time_ms += event.compile_time_ms;
}
EventResult::PrefetchHit => {
stats.prefetch_hits += 1;
stats.hit_elapsed_ms += event.elapsed_ms;
stats.hit_compile_time_ms += event.compile_time_ms;
}
EventResult::RemoteHit => {
stats.remote_hits += 1;
stats.hit_elapsed_ms += event.elapsed_ms;
stats.hit_compile_time_ms += event.compile_time_ms;
}
EventResult::Dup => {
stats.dups += 1;
stats.miss_elapsed_ms += event.elapsed_ms;
stats.miss_compile_time_ms += if event.compile_time_ms > 0 {
event.compile_time_ms
} else {
event.elapsed_ms
};
}
EventResult::Miss => {
stats.misses += 1;
stats.miss_elapsed_ms += event.elapsed_ms;
stats.miss_compile_time_ms += if event.compile_time_ms > 0 {
event.compile_time_ms
} else {
event.elapsed_ms
};
}
EventResult::Error => stats.errors += 1,
EventResult::Passthrough | EventResult::Skipped => continue,
}
stats.total_size += event.size;
stats.total_elapsed_ms += event.elapsed_ms;
stats.total_key_ms += event.key_ms;
stats.total_lookup_ms += event.lookup_ms;
stats.total_restore_ms += event.restore_ms;
stats.total_store_ms += event.store_ms;
stats.store_output_blobs += event.store_output_blobs;
stats.store_duplicate_blobs += event.store_duplicate_blobs;
stats.store_new_blobs += event.store_new_blobs;
stats.reflinked_bytes += event.reflinked_bytes;
stats.hardlinked_bytes += event.hardlinked_bytes;
stats.copied_bytes += event.copied_bytes;
}
stats
}
#[cfg(test)]
mod tests {
use super::*;
fn test_event(
crate_name: &str,
result: EventResult,
elapsed_ms: u64,
compile_time_ms: u64,
size: u64,
cache_key: &str,
) -> BuildEvent {
BuildEvent {
ts: Utc::now(),
crate_name: crate_name.to_string(),
version: "0.0.0".to_string(),
result,
elapsed_ms,
compile_time_ms,
size,
cache_key: cache_key.to_string(),
schema: 8,
key_ms: 0,
key_hash_hits: 0,
key_hash_misses: 0,
key_hash_bytes: 0,
lookup_ms: 0,
restore_ms: 0,
store_ms: 0,
store_output_blobs: 0,
store_duplicate_blobs: 0,
store_new_blobs: 0,
compiler_runs: 0,
preprocessor_runs: 0,
probe_runs: 0,
reflinked_bytes: 0,
hardlinked_bytes: 0,
copied_bytes: 0,
passthrough_reason: String::new(),
fallback: false,
exit_code: None,
}
}
#[test]
fn test_log_and_read_events() {
let dir = tempfile::tempdir().unwrap();
let log_path = dir.path().join("events.jsonl");
let event = BuildEvent {
ts: Utc::now(),
crate_name: "serde".to_string(),
version: "1.0.210".to_string(),
result: EventResult::LocalHit,
elapsed_ms: 2,
compile_time_ms: 250,
size: 3145728,
cache_key: "abc123".to_string(),
schema: 8,
key_ms: 0,
key_hash_hits: 0,
key_hash_misses: 0,
key_hash_bytes: 0,
lookup_ms: 0,
restore_ms: 0,
store_ms: 0,
store_output_blobs: 0,
store_duplicate_blobs: 0,
store_new_blobs: 0,
compiler_runs: 0,
preprocessor_runs: 0,
probe_runs: 0,
reflinked_bytes: 0,
hardlinked_bytes: 0,
copied_bytes: 0,
passthrough_reason: String::new(),
fallback: false,
exit_code: None,
};
log_event(&log_path, &event).unwrap();
log_event(&log_path, &event).unwrap();
let events = read_events(&log_path).unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].crate_name, "serde");
assert_eq!(events[0].result, EventResult::LocalHit);
}
#[test]
fn test_event_tailer() {
let dir = tempfile::tempdir().unwrap();
let log_path = dir.path().join("events.jsonl");
let mut tailer = EventTailer::from_start(log_path.clone());
assert_eq!(tailer.poll().unwrap().len(), 0);
let event = test_event("tokio", EventResult::Miss, 5000, 4800, 8388608, "def456");
log_event(&log_path, &event).unwrap();
let new_events = tailer.poll().unwrap();
assert_eq!(new_events.len(), 1);
assert_eq!(tailer.poll().unwrap().len(), 0);
log_event(&log_path, &event).unwrap();
let new_events = tailer.poll().unwrap();
assert_eq!(new_events.len(), 1);
}
#[test]
fn test_event_rotation() {
let dir = tempfile::tempdir().unwrap();
let log_path = dir.path().join("events.jsonl");
for i in 0..100 {
let event = test_event(
&format!("crate_{i}"),
EventResult::LocalHit,
1,
25,
1024,
&format!("key_{i}"),
);
log_event(&log_path, &event).unwrap();
}
rotate_if_needed(&log_path, 100, 10).unwrap();
let events = read_events(&log_path).unwrap();
assert_eq!(events.len(), 10);
assert_eq!(events[0].crate_name, "crate_90");
}
#[test]
fn test_event_result_display() {
assert_eq!(EventResult::LocalHit.to_string(), "local_hit");
assert_eq!(EventResult::PrefetchHit.to_string(), "prefetch_hit");
assert_eq!(EventResult::RemoteHit.to_string(), "remote_hit");
assert_eq!(EventResult::Dup.to_string(), "dup");
assert_eq!(EventResult::Miss.to_string(), "miss");
assert_eq!(EventResult::Error.to_string(), "error");
assert_eq!(EventResult::Passthrough.to_string(), "passthrough");
assert_eq!(EventResult::Skipped.to_string(), "skipped");
}
#[test]
fn test_read_events_nonexistent_file() {
let events = read_events(Path::new("/nonexistent/events.jsonl")).unwrap();
assert!(events.is_empty());
}
#[test]
fn test_read_events_with_invalid_lines() {
let dir = tempfile::tempdir().unwrap();
let log_path = dir.path().join("events.jsonl");
let event = test_event("valid", EventResult::Miss, 100, 90, 1024, "key");
log_event(&log_path, &event).unwrap();
use std::io::Write;
let mut f = OpenOptions::new().append(true).open(&log_path).unwrap();
writeln!(f, "this is not json").unwrap();
writeln!(f, "{{}}").unwrap();
let events = read_events(&log_path).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].crate_name, "valid");
}
#[test]
fn test_read_events_since() {
let dir = tempfile::tempdir().unwrap();
let log_path = dir.path().join("events.jsonl");
let mut old_event = test_event("old", EventResult::Miss, 100, 80, 1024, "key1");
old_event.ts = Utc::now() - chrono::Duration::hours(2);
let new_event = test_event("new", EventResult::LocalHit, 10, 250, 512, "key2");
log_event(&log_path, &old_event).unwrap();
log_event(&log_path, &new_event).unwrap();
let since = Utc::now() - chrono::Duration::hours(1);
let events = read_events_since(&log_path, since).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].crate_name, "new");
}
#[test]
fn test_compute_stats() {
let events = vec![
test_event("a", EventResult::LocalHit, 10, 300, 100, "k1"),
test_event("b", EventResult::PrefetchHit, 5, 250, 150, "k1b"),
test_event("c", EventResult::RemoteHit, 50, 900, 200, "k2"),
test_event("dup", EventResult::Dup, 700, 650, 400, "kdup"),
test_event("d", EventResult::Miss, 1000, 950, 500, "k3"),
test_event("e", EventResult::Error, 5, 0, 0, "k4"),
test_event("f", EventResult::Skipped, 0, 0, 0, "k5"),
test_event("g", EventResult::Passthrough, 25, 0, 0, ""),
];
let stats = compute_stats(&events);
assert_eq!(stats.total, 8);
assert_eq!(stats.local_hits, 1);
assert_eq!(stats.prefetch_hits, 1);
assert_eq!(stats.remote_hits, 1);
assert_eq!(stats.dups, 1);
assert_eq!(stats.misses, 1);
assert_eq!(stats.errors, 1);
assert_eq!(stats.total_size, 1350);
assert_eq!(stats.total_elapsed_ms, 1770);
assert_eq!(stats.hit_elapsed_ms, 65);
assert_eq!(stats.miss_elapsed_ms, 1700);
assert_eq!(stats.hit_compile_time_ms, 1450);
assert_eq!(stats.miss_compile_time_ms, 1600);
}
#[test]
fn test_compute_stats_empty() {
let stats = compute_stats(&[]);
assert_eq!(stats.total, 0);
assert_eq!(stats.local_hits, 0);
}
#[test]
fn test_clear_events() {
let dir = tempfile::tempdir().unwrap();
let log_path = dir.path().join("events.jsonl");
let event = test_event("test", EventResult::Miss, 100, 80, 1024, "key");
log_event(&log_path, &event).unwrap();
assert!(!read_events(&log_path).unwrap().is_empty());
clear_events(&log_path).unwrap();
assert!(read_events(&log_path).unwrap().is_empty());
}
#[test]
fn test_clear_events_nonexistent() {
clear_events(Path::new("/nonexistent/events.jsonl")).unwrap();
}
#[test]
fn test_rotate_skips_small_file() {
let dir = tempfile::tempdir().unwrap();
let log_path = dir.path().join("events.jsonl");
let event = test_event("test", EventResult::Miss, 100, 80, 1024, "key");
log_event(&log_path, &event).unwrap();
let size_before = fs::metadata(&log_path).unwrap().len();
rotate_if_needed(&log_path, 1_000_000, 10).unwrap();
let size_after = fs::metadata(&log_path).unwrap().len();
assert_eq!(size_before, size_after);
}
#[test]
fn test_rotate_nonexistent() {
rotate_if_needed(Path::new("/nonexistent/events.jsonl"), 100, 10).unwrap();
}
#[test]
fn test_event_tailer_handles_truncation() {
let dir = tempfile::tempdir().unwrap();
let log_path = dir.path().join("events.jsonl");
let event = test_event("test", EventResult::Miss, 100, 80, 1024, "key");
for _ in 0..10 {
log_event(&log_path, &event).unwrap();
}
let mut tailer = EventTailer::from_start(log_path.clone());
assert_eq!(tailer.poll().unwrap().len(), 10);
fs::write(&log_path, "").unwrap();
log_event(&log_path, &event).unwrap();
let events = tailer.poll().unwrap();
assert_eq!(events.len(), 1);
}
}