1use std::collections::HashSet;
16use std::fs;
17use std::io::{self, BufRead, BufReader, Seek, SeekFrom};
18use std::path::{Path, PathBuf};
19
20use jiff::{SignedDuration, Timestamp};
21use serde::Deserialize;
22
23const BLOCK_DURATION_HOURS: i64 = 5;
26const WINDOW_DAYS: i64 = 7;
28
29#[derive(Debug, Clone)]
35#[non_exhaustive]
36pub struct JsonlAggregate {
37 pub five_hour: Option<FiveHourBlock>,
38 pub seven_day: SevenDayWindow,
39 pub source_paths: Vec<PathBuf>,
40}
41
42#[derive(Debug, Clone)]
48pub struct FiveHourBlock {
49 pub start: Timestamp,
50 pub actual_last_activity: Timestamp,
51 pub token_counts: TokenCounts,
52 pub models: Vec<String>,
53 pub usage_limit_reset: Option<Timestamp>,
59}
60
61impl FiveHourBlock {
62 #[must_use]
66 pub fn end(&self) -> Timestamp {
67 self.start + SignedDuration::from_hours(BLOCK_DURATION_HOURS)
68 }
69}
70
71#[derive(Debug, Clone)]
74pub struct SevenDayWindow {
75 pub window_start: Timestamp,
76 pub token_counts: TokenCounts,
77}
78
79#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
91pub struct TokenCounts {
92 pub(crate) input: u64,
93 pub(crate) output: u64,
94 pub(crate) cache_creation: u64,
95 pub(crate) cache_read: u64,
96}
97
98impl TokenCounts {
99 #[cfg(test)]
103 #[must_use]
104 pub(crate) fn from_parts(
105 input: u64,
106 output: u64,
107 cache_creation: u64,
108 cache_read: u64,
109 ) -> Self {
110 Self {
111 input,
112 output,
113 cache_creation,
114 cache_read,
115 }
116 }
117
118 #[must_use]
119 pub fn input(&self) -> u64 {
120 self.input
121 }
122
123 #[must_use]
124 pub fn output(&self) -> u64 {
125 self.output
126 }
127
128 #[must_use]
129 pub fn cache_creation(&self) -> u64 {
130 self.cache_creation
131 }
132
133 #[must_use]
134 pub fn cache_read(&self) -> u64 {
135 self.cache_read
136 }
137
138 #[must_use]
142 pub fn total(&self) -> u64 {
143 self.input
144 .saturating_add(self.output)
145 .saturating_add(self.cache_creation)
146 .saturating_add(self.cache_read)
147 }
148
149 fn accumulate(&mut self, other: UsageCounts) {
150 self.input = self.input.saturating_add(other.input_tokens);
151 self.output = self.output.saturating_add(other.output_tokens);
152 self.cache_creation = self.cache_creation.saturating_add(other.cache_creation);
153 self.cache_read = self.cache_read.saturating_add(other.cache_read);
154 }
155}
156
157#[derive(Debug)]
160#[non_exhaustive]
161pub enum JsonlError {
162 DirectoryMissing,
164 NoEntries,
166 IoError { path: PathBuf, cause: io::Error },
168 ParseError {
172 path: PathBuf,
173 line: u64,
174 cause: serde_json::Error,
175 },
176}
177
178impl JsonlError {
179 #[must_use]
184 pub fn code(&self) -> &'static str {
185 match self {
186 Self::DirectoryMissing => "DirectoryMissing",
187 Self::NoEntries => "NoEntries",
188 Self::IoError { .. } => "IoError",
189 Self::ParseError { .. } => "ParseError",
190 }
191 }
192}
193
194impl std::fmt::Display for JsonlError {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 match self {
197 Self::DirectoryMissing => f.write_str("no Claude Code project directory found"),
198 Self::NoEntries => f.write_str("Claude Code project directory has no JSONL entries"),
199 Self::IoError { path, cause } => write!(
200 f,
201 "failed to read JSONL path {}: {}",
202 path.display(),
203 cause.kind()
204 ),
205 Self::ParseError { path, line, cause } => write!(
206 f,
207 "JSONL parse failed in {} at line {}: {}",
208 path.display(),
209 line,
210 cause
211 ),
212 }
213 }
214}
215
216impl std::error::Error for JsonlError {
217 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
218 match self {
219 Self::IoError { cause, .. } => Some(cause),
220 Self::ParseError { cause, .. } => Some(cause),
221 _ => None,
222 }
223 }
224}
225
226#[derive(Debug, Deserialize)]
233pub(crate) struct UsageEntry {
234 timestamp: Timestamp,
235 message: MessageFields,
236 #[serde(default, rename = "usageLimitResetTime")]
237 usage_limit_reset_time: Option<Timestamp>,
238}
239
240#[derive(Debug, Deserialize, Default)]
241struct MessageFields {
242 #[serde(default)]
243 usage: Option<UsageCounts>,
244 #[serde(default)]
245 model: Option<String>,
246 #[serde(default)]
247 id: Option<String>,
248}
249
250#[derive(Debug, Deserialize, Default, Clone, Copy)]
251struct UsageCounts {
252 #[serde(default)]
253 input_tokens: u64,
254 #[serde(default)]
255 output_tokens: u64,
256 #[serde(default, rename = "cache_creation_input_tokens")]
257 cache_creation: u64,
258 #[serde(default, rename = "cache_read_input_tokens")]
259 cache_read: u64,
260}
261
262#[derive(Debug, Clone, Default)]
268struct DiscoveryEnv {
269 claude_config_dir: Option<PathBuf>,
270 xdg_config_home: Option<PathBuf>,
271 home: Option<PathBuf>,
272}
273
274impl DiscoveryEnv {
275 fn from_process_env() -> Self {
276 fn non_empty(key: &str) -> Option<PathBuf> {
277 std::env::var_os(key)
278 .filter(|v| !v.is_empty())
279 .map(PathBuf::from)
280 }
281 Self {
282 claude_config_dir: non_empty("CLAUDE_CONFIG_DIR"),
283 xdg_config_home: non_empty("XDG_CONFIG_HOME"),
284 home: non_empty("HOME"),
285 }
286 }
287}
288
289fn project_roots(env: &DiscoveryEnv) -> Vec<PathBuf> {
290 let mut out = Vec::with_capacity(3);
291 if let Some(dir) = &env.claude_config_dir {
292 out.push(dir.join("projects"));
293 }
294 let xdg_root = env
300 .xdg_config_home
301 .clone()
302 .or_else(|| env.home.as_ref().map(|h| h.join(".config")));
303 if let Some(xdg_root) = xdg_root {
304 out.push(xdg_root.join("claude").join("projects"));
305 }
306 if let Some(home) = &env.home {
308 out.push(home.join(".claude").join("projects"));
309 }
310 out
311}
312
313pub(crate) struct JsonlTailer {
320 path: PathBuf,
321 last_offset: u64,
322 last_size: u64,
323}
324
325impl JsonlTailer {
326 #[must_use]
327 pub(crate) fn new(path: PathBuf) -> Self {
328 Self {
329 path,
330 last_offset: 0,
331 last_size: 0,
332 }
333 }
334
335 pub(crate) fn read_new(&mut self) -> Result<Vec<UsageEntry>, JsonlError> {
340 let metadata = match fs::metadata(&self.path) {
341 Ok(m) => m,
342 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
343 Err(cause) => {
344 return Err(JsonlError::IoError {
345 path: self.path.clone(),
346 cause,
347 })
348 }
349 };
350
351 let size = metadata.len();
352 if size < self.last_size {
353 self.last_offset = 0;
354 }
355 self.last_size = size;
356
357 if self.last_offset >= size {
358 return Ok(Vec::new());
359 }
360
361 let mut file = fs::File::open(&self.path).map_err(|cause| JsonlError::IoError {
362 path: self.path.clone(),
363 cause,
364 })?;
365 file.seek(SeekFrom::Start(self.last_offset))
366 .map_err(|cause| JsonlError::IoError {
367 path: self.path.clone(),
368 cause,
369 })?;
370
371 let mut reader = BufReader::new(file);
372 let mut entries = Vec::new();
373 let mut buf: Vec<u8> = Vec::new();
374 loop {
375 buf.clear();
376 let read = reader
380 .read_until(b'\n', &mut buf)
381 .map_err(|cause| JsonlError::IoError {
382 path: self.path.clone(),
383 cause,
384 })?;
385 if read == 0 {
386 break;
387 }
388 if buf.last() != Some(&b'\n') {
389 break;
391 }
392 self.last_offset += read as u64;
393 let line = match buf.strip_suffix(b"\n") {
394 Some(rest) => rest.strip_suffix(b"\r").unwrap_or(rest),
395 None => &buf[..],
396 };
397 let text = String::from_utf8_lossy(line);
398 if let Ok(entry) = serde_json::from_str::<UsageEntry>(&text) {
399 entries.push(entry);
400 }
401 }
402
403 Ok(entries)
404 }
405}
406
407pub fn aggregate_jsonl() -> Result<JsonlAggregate, JsonlError> {
413 aggregate_jsonl_with(&DiscoveryEnv::from_process_env())
414}
415
416fn aggregate_jsonl_with(env: &DiscoveryEnv) -> Result<JsonlAggregate, JsonlError> {
417 let candidate_roots = project_roots(env);
418 let existing_roots: Vec<PathBuf> = candidate_roots.into_iter().filter(|r| r.exists()).collect();
419 if existing_roots.is_empty() {
420 return Err(JsonlError::DirectoryMissing);
421 }
422
423 let mut all_entries: Vec<UsageEntry> = Vec::new();
424 let mut source_paths: Vec<PathBuf> = Vec::new();
425 let mut seen_ids: HashSet<String> = HashSet::new();
426
427 for root in &existing_roots {
428 collect_from_root(root, &mut all_entries, &mut source_paths, &mut seen_ids)?;
429 }
430
431 if all_entries.is_empty() {
432 return Err(JsonlError::NoEntries);
433 }
434
435 all_entries.sort_by_key(|e| e.timestamp);
436 Ok(build_aggregate(&all_entries, source_paths))
437}
438
439fn collect_from_root(
443 root: &Path,
444 entries: &mut Vec<UsageEntry>,
445 source_paths: &mut Vec<PathBuf>,
446 seen_ids: &mut HashSet<String>,
447) -> Result<(), JsonlError> {
448 let top = match fs::read_dir(root) {
449 Ok(iter) => iter,
450 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
451 Err(cause) => {
452 return Err(JsonlError::IoError {
453 path: root.to_path_buf(),
454 cause,
455 })
456 }
457 };
458 for project in top {
459 let project = match project {
460 Ok(entry) => entry,
461 Err(cause) => {
462 crate::lsm_warn!(
463 "jsonl: dirent iteration under {} failed: {} ({cause}); skipping",
464 root.display(),
465 cause.kind(),
466 );
467 continue;
468 }
469 };
470 let project_path = project.path();
471 if !project_path.is_dir() {
472 continue;
473 }
474 let session_iter = match fs::read_dir(&project_path) {
475 Ok(iter) => iter,
476 Err(e) if e.kind() == io::ErrorKind::NotFound => continue,
477 Err(cause) => {
478 crate::lsm_warn!(
485 "jsonl: read_dir {} failed: {} ({cause}); skipping workspace",
486 project_path.display(),
487 cause.kind(),
488 );
489 continue;
490 }
491 };
492 for session in session_iter {
493 let session = match session {
494 Ok(entry) => entry,
495 Err(cause) => {
496 crate::lsm_warn!(
497 "jsonl: dirent iteration under {} failed: {} ({cause}); skipping",
498 project_path.display(),
499 cause.kind(),
500 );
501 continue;
502 }
503 };
504 let session_path = session.path();
505 if session_path.extension().is_none_or(|ext| ext != "jsonl") {
506 continue;
507 }
508 let mut tailer = JsonlTailer::new(session_path.clone());
509 let file_entries = match tailer.read_new() {
510 Ok(entries) => entries,
511 Err(JsonlError::IoError { path, cause }) => {
512 crate::lsm_warn!(
513 "jsonl: tailer read {} failed: {} ({cause}); skipping file",
514 path.display(),
515 cause.kind(),
516 );
517 continue;
518 }
519 Err(other) => {
520 crate::lsm_warn!(
521 "jsonl: tailer read {} failed: {other}; skipping file",
522 session_path.display(),
523 );
524 continue;
525 }
526 };
527 source_paths.push(session_path);
528 for entry in file_entries {
529 if let Some(id) = &entry.message.id {
530 if !seen_ids.insert(id.clone()) {
531 continue;
532 }
533 }
534 entries.push(entry);
535 }
536 }
537 }
538 Ok(())
539}
540
541fn build_aggregate(entries: &[UsageEntry], source_paths: Vec<PathBuf>) -> JsonlAggregate {
542 let now = Timestamp::now();
543 let window_start = now - SignedDuration::from_hours(WINDOW_DAYS * 24);
544
545 let five_hour = compute_active_block(entries, now);
546
547 let mut seven_day_counts = TokenCounts::default();
548 for entry in entries {
549 if entry.timestamp >= window_start && entry.timestamp <= now {
554 if let Some(usage) = entry.message.usage {
555 seven_day_counts.accumulate(usage);
556 }
557 }
558 }
559
560 JsonlAggregate {
561 five_hour,
562 seven_day: SevenDayWindow {
563 window_start,
564 token_counts: seven_day_counts,
565 },
566 source_paths,
567 }
568}
569
570fn compute_active_block(entries: &[UsageEntry], now: Timestamp) -> Option<FiveHourBlock> {
583 let block_duration = SignedDuration::from_hours(BLOCK_DURATION_HOURS);
584 let mut current: Option<FiveHourBlock> = None;
585 for entry in entries {
586 match &mut current {
587 None => current = Some(start_block(entry)),
588 Some(block) => {
589 let gap = entry.timestamp.duration_since(block.actual_last_activity);
590 if gap > block_duration {
591 current = Some(start_block(entry));
592 } else {
593 extend_block(block, entry);
594 }
595 }
596 }
597 }
598 let block = current?;
599 if now.duration_since(block.actual_last_activity) > block_duration {
600 None
601 } else {
602 Some(block)
603 }
604}
605
606fn start_block(entry: &UsageEntry) -> FiveHourBlock {
607 let mut block = FiveHourBlock {
608 start: floor_to_grain(entry.timestamp, 3600),
609 actual_last_activity: entry.timestamp,
610 token_counts: TokenCounts::default(),
611 models: Vec::new(),
612 usage_limit_reset: None,
613 };
614 extend_block(&mut block, entry);
615 block
616}
617
618fn extend_block(block: &mut FiveHourBlock, entry: &UsageEntry) {
619 if let Some(usage) = entry.message.usage {
620 block.token_counts.accumulate(usage);
621 }
622 if let Some(model) = &entry.message.model {
623 if !block.models.iter().any(|m| m == model) {
624 block.models.push(model.clone());
625 }
626 }
627 if let Some(reset) = entry.usage_limit_reset_time {
628 block.usage_limit_reset = Some(reset);
629 }
630 block.actual_last_activity = entry.timestamp;
631}
632
633pub(super) fn floor_to_grain(ts: Timestamp, grain_secs: i64) -> Timestamp {
641 let secs = ts.as_second();
642 let floored = secs - secs.rem_euclid(grain_secs);
643 Timestamp::from_second(floored).unwrap_or(ts)
644}
645
646#[cfg(test)]
647mod tests;