use std::collections::HashSet;
use std::fs;
use std::io::{self, BufRead, BufReader, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use jiff::{SignedDuration, Timestamp};
use serde::Deserialize;
const BLOCK_DURATION_HOURS: i64 = 5;
const WINDOW_DAYS: i64 = 7;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct JsonlAggregate {
pub five_hour: Option<FiveHourBlock>,
pub seven_day: SevenDayWindow,
pub source_paths: Vec<PathBuf>,
}
#[derive(Debug, Clone)]
pub struct FiveHourBlock {
pub start: Timestamp,
pub actual_last_activity: Timestamp,
pub token_counts: TokenCounts,
pub models: Vec<String>,
pub usage_limit_reset: Option<Timestamp>,
}
impl FiveHourBlock {
#[must_use]
pub fn end(&self) -> Timestamp {
self.start + SignedDuration::from_hours(BLOCK_DURATION_HOURS)
}
}
#[derive(Debug, Clone)]
pub struct SevenDayWindow {
pub window_start: Timestamp,
pub token_counts: TokenCounts,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct TokenCounts {
pub(crate) input: u64,
pub(crate) output: u64,
pub(crate) cache_creation: u64,
pub(crate) cache_read: u64,
}
impl TokenCounts {
#[cfg(test)]
#[must_use]
pub(crate) fn from_parts(
input: u64,
output: u64,
cache_creation: u64,
cache_read: u64,
) -> Self {
Self {
input,
output,
cache_creation,
cache_read,
}
}
#[must_use]
pub fn input(&self) -> u64 {
self.input
}
#[must_use]
pub fn output(&self) -> u64 {
self.output
}
#[must_use]
pub fn cache_creation(&self) -> u64 {
self.cache_creation
}
#[must_use]
pub fn cache_read(&self) -> u64 {
self.cache_read
}
#[must_use]
pub fn total(&self) -> u64 {
self.input
.saturating_add(self.output)
.saturating_add(self.cache_creation)
.saturating_add(self.cache_read)
}
fn accumulate(&mut self, other: UsageCounts) {
self.input = self.input.saturating_add(other.input_tokens);
self.output = self.output.saturating_add(other.output_tokens);
self.cache_creation = self.cache_creation.saturating_add(other.cache_creation);
self.cache_read = self.cache_read.saturating_add(other.cache_read);
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum JsonlError {
DirectoryMissing,
NoEntries,
IoError { path: PathBuf, cause: io::Error },
ParseError {
path: PathBuf,
line: u64,
cause: serde_json::Error,
},
}
impl JsonlError {
#[must_use]
pub fn code(&self) -> &'static str {
match self {
Self::DirectoryMissing => "DirectoryMissing",
Self::NoEntries => "NoEntries",
Self::IoError { .. } => "IoError",
Self::ParseError { .. } => "ParseError",
}
}
}
impl std::fmt::Display for JsonlError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DirectoryMissing => f.write_str("no Claude Code project directory found"),
Self::NoEntries => f.write_str("Claude Code project directory has no JSONL entries"),
Self::IoError { path, cause } => write!(
f,
"failed to read JSONL path {}: {}",
path.display(),
cause.kind()
),
Self::ParseError { path, line, cause } => write!(
f,
"JSONL parse failed in {} at line {}: {}",
path.display(),
line,
cause
),
}
}
}
impl std::error::Error for JsonlError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::IoError { cause, .. } => Some(cause),
Self::ParseError { cause, .. } => Some(cause),
_ => None,
}
}
}
#[derive(Debug, Deserialize)]
pub(crate) struct UsageEntry {
timestamp: Timestamp,
message: MessageFields,
#[serde(default, rename = "usageLimitResetTime")]
usage_limit_reset_time: Option<Timestamp>,
}
#[derive(Debug, Deserialize, Default)]
struct MessageFields {
#[serde(default)]
usage: Option<UsageCounts>,
#[serde(default)]
model: Option<String>,
#[serde(default)]
id: Option<String>,
}
#[derive(Debug, Deserialize, Default, Clone, Copy)]
struct UsageCounts {
#[serde(default)]
input_tokens: u64,
#[serde(default)]
output_tokens: u64,
#[serde(default, rename = "cache_creation_input_tokens")]
cache_creation: u64,
#[serde(default, rename = "cache_read_input_tokens")]
cache_read: u64,
}
#[derive(Debug, Clone, Default)]
struct DiscoveryEnv {
claude_config_dir: Option<PathBuf>,
xdg_config_home: Option<PathBuf>,
home: Option<PathBuf>,
}
impl DiscoveryEnv {
fn from_process_env() -> Self {
fn non_empty(key: &str) -> Option<PathBuf> {
std::env::var_os(key)
.filter(|v| !v.is_empty())
.map(PathBuf::from)
}
Self {
claude_config_dir: non_empty("CLAUDE_CONFIG_DIR"),
xdg_config_home: non_empty("XDG_CONFIG_HOME"),
home: non_empty("HOME"),
}
}
}
fn project_roots(env: &DiscoveryEnv) -> Vec<PathBuf> {
let mut out = Vec::with_capacity(3);
if let Some(dir) = &env.claude_config_dir {
out.push(dir.join("projects"));
}
let xdg_root = env
.xdg_config_home
.clone()
.or_else(|| env.home.as_ref().map(|h| h.join(".config")));
if let Some(xdg_root) = xdg_root {
out.push(xdg_root.join("claude").join("projects"));
}
if let Some(home) = &env.home {
out.push(home.join(".claude").join("projects"));
}
out
}
pub(crate) struct JsonlTailer {
path: PathBuf,
last_offset: u64,
last_size: u64,
}
impl JsonlTailer {
#[must_use]
pub(crate) fn new(path: PathBuf) -> Self {
Self {
path,
last_offset: 0,
last_size: 0,
}
}
pub(crate) fn read_new(&mut self) -> Result<Vec<UsageEntry>, JsonlError> {
let metadata = match fs::metadata(&self.path) {
Ok(m) => m,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(cause) => {
return Err(JsonlError::IoError {
path: self.path.clone(),
cause,
})
}
};
let size = metadata.len();
if size < self.last_size {
self.last_offset = 0;
}
self.last_size = size;
if self.last_offset >= size {
return Ok(Vec::new());
}
let mut file = fs::File::open(&self.path).map_err(|cause| JsonlError::IoError {
path: self.path.clone(),
cause,
})?;
file.seek(SeekFrom::Start(self.last_offset))
.map_err(|cause| JsonlError::IoError {
path: self.path.clone(),
cause,
})?;
let mut reader = BufReader::new(file);
let mut entries = Vec::new();
let mut buf: Vec<u8> = Vec::new();
loop {
buf.clear();
let read = reader
.read_until(b'\n', &mut buf)
.map_err(|cause| JsonlError::IoError {
path: self.path.clone(),
cause,
})?;
if read == 0 {
break;
}
if buf.last() != Some(&b'\n') {
break;
}
self.last_offset += read as u64;
let line = match buf.strip_suffix(b"\n") {
Some(rest) => rest.strip_suffix(b"\r").unwrap_or(rest),
None => &buf[..],
};
let text = String::from_utf8_lossy(line);
if let Ok(entry) = serde_json::from_str::<UsageEntry>(&text) {
entries.push(entry);
}
}
Ok(entries)
}
}
pub fn aggregate_jsonl() -> Result<JsonlAggregate, JsonlError> {
aggregate_jsonl_with(&DiscoveryEnv::from_process_env())
}
fn aggregate_jsonl_with(env: &DiscoveryEnv) -> Result<JsonlAggregate, JsonlError> {
let candidate_roots = project_roots(env);
let existing_roots: Vec<PathBuf> = candidate_roots.into_iter().filter(|r| r.exists()).collect();
if existing_roots.is_empty() {
return Err(JsonlError::DirectoryMissing);
}
let mut all_entries: Vec<UsageEntry> = Vec::new();
let mut source_paths: Vec<PathBuf> = Vec::new();
let mut seen_ids: HashSet<String> = HashSet::new();
for root in &existing_roots {
collect_from_root(root, &mut all_entries, &mut source_paths, &mut seen_ids)?;
}
if all_entries.is_empty() {
return Err(JsonlError::NoEntries);
}
all_entries.sort_by_key(|e| e.timestamp);
Ok(build_aggregate(&all_entries, source_paths))
}
fn collect_from_root(
root: &Path,
entries: &mut Vec<UsageEntry>,
source_paths: &mut Vec<PathBuf>,
seen_ids: &mut HashSet<String>,
) -> Result<(), JsonlError> {
let top = match fs::read_dir(root) {
Ok(iter) => iter,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
Err(cause) => {
return Err(JsonlError::IoError {
path: root.to_path_buf(),
cause,
})
}
};
for project in top {
let project = match project {
Ok(entry) => entry,
Err(cause) => {
crate::lsm_warn!(
"jsonl: dirent iteration under {} failed: {} ({cause}); skipping",
root.display(),
cause.kind(),
);
continue;
}
};
let project_path = project.path();
if !project_path.is_dir() {
continue;
}
let session_iter = match fs::read_dir(&project_path) {
Ok(iter) => iter,
Err(e) if e.kind() == io::ErrorKind::NotFound => continue,
Err(cause) => {
crate::lsm_warn!(
"jsonl: read_dir {} failed: {} ({cause}); skipping workspace",
project_path.display(),
cause.kind(),
);
continue;
}
};
for session in session_iter {
let session = match session {
Ok(entry) => entry,
Err(cause) => {
crate::lsm_warn!(
"jsonl: dirent iteration under {} failed: {} ({cause}); skipping",
project_path.display(),
cause.kind(),
);
continue;
}
};
let session_path = session.path();
if session_path.extension().is_none_or(|ext| ext != "jsonl") {
continue;
}
let mut tailer = JsonlTailer::new(session_path.clone());
let file_entries = match tailer.read_new() {
Ok(entries) => entries,
Err(JsonlError::IoError { path, cause }) => {
crate::lsm_warn!(
"jsonl: tailer read {} failed: {} ({cause}); skipping file",
path.display(),
cause.kind(),
);
continue;
}
Err(other) => {
crate::lsm_warn!(
"jsonl: tailer read {} failed: {other}; skipping file",
session_path.display(),
);
continue;
}
};
source_paths.push(session_path);
for entry in file_entries {
if let Some(id) = &entry.message.id {
if !seen_ids.insert(id.clone()) {
continue;
}
}
entries.push(entry);
}
}
}
Ok(())
}
fn build_aggregate(entries: &[UsageEntry], source_paths: Vec<PathBuf>) -> JsonlAggregate {
let now = Timestamp::now();
let window_start = now - SignedDuration::from_hours(WINDOW_DAYS * 24);
let five_hour = compute_active_block(entries, now);
let mut seven_day_counts = TokenCounts::default();
for entry in entries {
if entry.timestamp >= window_start && entry.timestamp <= now {
if let Some(usage) = entry.message.usage {
seven_day_counts.accumulate(usage);
}
}
}
JsonlAggregate {
five_hour,
seven_day: SevenDayWindow {
window_start,
token_counts: seven_day_counts,
},
source_paths,
}
}
fn compute_active_block(entries: &[UsageEntry], now: Timestamp) -> Option<FiveHourBlock> {
let block_duration = SignedDuration::from_hours(BLOCK_DURATION_HOURS);
let mut current: Option<FiveHourBlock> = None;
for entry in entries {
match &mut current {
None => current = Some(start_block(entry)),
Some(block) => {
let gap = entry.timestamp.duration_since(block.actual_last_activity);
if gap > block_duration {
current = Some(start_block(entry));
} else {
extend_block(block, entry);
}
}
}
}
let block = current?;
if now.duration_since(block.actual_last_activity) > block_duration {
None
} else {
Some(block)
}
}
fn start_block(entry: &UsageEntry) -> FiveHourBlock {
let mut block = FiveHourBlock {
start: floor_to_grain(entry.timestamp, 3600),
actual_last_activity: entry.timestamp,
token_counts: TokenCounts::default(),
models: Vec::new(),
usage_limit_reset: None,
};
extend_block(&mut block, entry);
block
}
fn extend_block(block: &mut FiveHourBlock, entry: &UsageEntry) {
if let Some(usage) = entry.message.usage {
block.token_counts.accumulate(usage);
}
if let Some(model) = &entry.message.model {
if !block.models.iter().any(|m| m == model) {
block.models.push(model.clone());
}
}
if let Some(reset) = entry.usage_limit_reset_time {
block.usage_limit_reset = Some(reset);
}
block.actual_last_activity = entry.timestamp;
}
pub(super) fn floor_to_grain(ts: Timestamp, grain_secs: i64) -> Timestamp {
let secs = ts.as_second();
let floored = secs - secs.rem_euclid(grain_secs);
Timestamp::from_second(floored).unwrap_or(ts)
}
#[cfg(test)]
mod tests;