use std::{
fs::{self, OpenOptions},
io::{self, Write},
path::{Path, PathBuf},
thread,
time::{Duration, Instant},
};
use serde::{Deserialize, Serialize};
pub const MAX_EVENTS: usize = 1000;
const LOCK_RETRY_DELAY: Duration = Duration::from_millis(5);
const LOCK_WAIT_TIMEOUT: Duration = Duration::from_millis(500);
const STALE_LOCK_AGE: Duration = Duration::from_secs(30);
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ActivityEvent {
pub ts_ms: i64,
#[serde(flatten)]
pub payload: ActivityPayload,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ActivityPayload {
RuleRecalled {
rule_id: String,
rule_title: String,
score: f32,
took_ms: u64,
},
RuleInjected {
rule_count: u32,
prompt_chars: u32,
intent_summary: String,
},
RuleReinforced {
rule_id: String,
rule_title: String,
prev_strength: f32,
new_strength: f32,
reason: String,
},
RetrievalEmbedding { hits: u32, took_ms: u64 },
EmbedCapReached { cap: u32, used: u32 },
EmbeddingFallback { reason: String },
}
fn now_ms() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
}
fn log_path() -> Option<PathBuf> {
crate::infra::paths::data_home()
.ok()
.map(|dir| dir.join("activity.jsonl"))
}
pub fn record(payload: ActivityPayload) {
let Some(path) = log_path() else {
return;
};
let event = ActivityEvent {
ts_ms: now_ms(),
payload,
};
let Ok(line) = serde_json::to_string(&event) else {
return;
};
let _ = append_with_cap(&path, &line);
}
fn append_with_cap(path: &Path, line: &str) -> io::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let _lock = acquire_log_lock(path)?;
let existing = fs::read_to_string(path).unwrap_or_default();
if existing.lines().count() >= MAX_EVENTS {
let mut kept: Vec<&str> = existing.lines().collect();
let drop = kept.len().saturating_sub(MAX_EVENTS - 1);
kept.drain(..drop);
let mut out = kept.join("\n");
if !out.is_empty() {
out.push('\n');
}
out.push_str(line);
out.push('\n');
fs::write(path, out)?;
return Ok(());
}
let mut f = OpenOptions::new().create(true).append(true).open(path)?;
writeln!(f, "{line}")?;
Ok(())
}
struct ActivityLogLock {
path: PathBuf,
}
impl Drop for ActivityLogLock {
fn drop(&mut self) {
let _ = fs::remove_file(&self.path);
}
}
fn lock_path(path: &Path) -> PathBuf {
let mut lock = path.as_os_str().to_owned();
lock.push(".lock");
PathBuf::from(lock)
}
fn acquire_log_lock(path: &Path) -> io::Result<ActivityLogLock> {
let path = lock_path(path);
let started = Instant::now();
loop {
match OpenOptions::new().write(true).create_new(true).open(&path) {
Ok(mut file) => {
let _ = writeln!(file, "pid={}", std::process::id());
return Ok(ActivityLogLock { path });
}
Err(e)
if matches!(
e.kind(),
io::ErrorKind::AlreadyExists | io::ErrorKind::PermissionDenied
) =>
{
if e.kind() == io::ErrorKind::AlreadyExists && lock_is_stale(&path) {
let _ = fs::remove_file(&path);
continue;
}
if started.elapsed() >= LOCK_WAIT_TIMEOUT {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"timed out waiting for activity log lock",
));
}
thread::sleep(LOCK_RETRY_DELAY);
}
Err(e) => return Err(e),
}
}
}
fn lock_is_stale(path: &Path) -> bool {
fs::metadata(path)
.and_then(|meta| meta.modified())
.and_then(|modified| modified.elapsed().map_err(io::Error::other))
.is_ok_and(|age| age > STALE_LOCK_AGE)
}
fn parse_events(raw: &str) -> Vec<ActivityEvent> {
serde_json::Deserializer::from_str(raw)
.into_iter::<ActivityEvent>()
.filter_map(Result::ok)
.collect()
}
pub fn tail(n: usize) -> Vec<ActivityEvent> {
let Some(path) = log_path() else {
return Vec::new();
};
let Ok(raw) = fs::read_to_string(&path) else {
return Vec::new();
};
parse_events(&raw).into_iter().rev().take(n).collect()
}
pub fn record_to(path: &Path, payload: ActivityPayload) -> io::Result<()> {
let event = ActivityEvent {
ts_ms: now_ms(),
payload,
};
let line =
serde_json::to_string(&event).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
append_with_cap(path, &line)
}
pub fn tail_from(path: &Path, n: usize) -> Vec<ActivityEvent> {
let Ok(raw) = fs::read_to_string(path) else {
return Vec::new();
};
parse_events(&raw).into_iter().rev().take(n).collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn writer_caps_at_max_events() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("activity.jsonl");
for i in 0..=MAX_EVENTS {
record_to(
&path,
ActivityPayload::RuleRecalled {
rule_id: format!("r{i}"),
rule_title: "t".into(),
score: 0.1,
took_ms: 1,
},
)
.unwrap();
}
let events = tail_from(&path, MAX_EVENTS + 50);
assert_eq!(
events.len(),
MAX_EVENTS,
"file should be capped at {MAX_EVENTS} entries"
);
if let ActivityPayload::RuleRecalled { rule_id, .. } = &events[0].payload {
assert_eq!(rule_id, &format!("r{MAX_EVENTS}"));
} else {
panic!("unexpected payload kind on top");
}
}
#[test]
fn tail_returns_newest_first() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("activity.jsonl");
record_to(
&path,
ActivityPayload::RuleInjected {
rule_count: 1,
prompt_chars: 10,
intent_summary: "first".into(),
},
)
.unwrap();
record_to(
&path,
ActivityPayload::RuleInjected {
rule_count: 2,
prompt_chars: 20,
intent_summary: "second".into(),
},
)
.unwrap();
let events = tail_from(&path, 10);
assert_eq!(events.len(), 2);
if let ActivityPayload::RuleInjected { intent_summary, .. } = &events[0].payload {
assert_eq!(intent_summary, "second");
} else {
panic!("expected RuleInjected on top");
}
}
#[test]
fn embedding_fallback_round_trips_sanitized_reason() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("activity.jsonl");
record_to(
&path,
ActivityPayload::EmbeddingFallback {
reason: "network".into(),
},
)
.unwrap();
let events = tail_from(&path, 10);
assert_eq!(events.len(), 1);
if let ActivityPayload::EmbeddingFallback { reason } = &events[0].payload {
assert_eq!(reason, "network");
} else {
panic!("expected EmbeddingFallback on top");
}
}
#[test]
fn tail_recovers_concatenated_json_objects() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("activity.jsonl");
let first = ActivityEvent {
ts_ms: 1,
payload: ActivityPayload::RuleInjected {
rule_count: 1,
prompt_chars: 10,
intent_summary: "first".into(),
},
};
let second = ActivityEvent {
ts_ms: 2,
payload: ActivityPayload::RuleInjected {
rule_count: 2,
prompt_chars: 20,
intent_summary: "second".into(),
},
};
fs::write(
&path,
format!(
"{}{}",
serde_json::to_string(&first).unwrap(),
serde_json::to_string(&second).unwrap()
),
)
.unwrap();
let events = tail_from(&path, 10);
assert_eq!(events, vec![second, first]);
}
#[test]
fn concurrent_writes_remain_valid_jsonl() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("activity.jsonl");
let mut handles = Vec::new();
for worker in 0..8 {
let path = path.clone();
handles.push(thread::spawn(move || {
for i in 0..25 {
record_to(
&path,
ActivityPayload::RuleRecalled {
rule_id: format!("r-{worker}-{i}"),
rule_title: "title".into(),
score: 0.1,
took_ms: 1,
},
)
.unwrap();
}
}));
}
for handle in handles {
handle.join().unwrap();
}
let raw = fs::read_to_string(&path).unwrap();
assert_eq!(raw.lines().count(), 200);
for line in raw.lines() {
serde_json::from_str::<ActivityEvent>(line).unwrap();
}
}
}