Skip to main content

linesmith_core/data_context/jsonl/
mod.rs

1//! JSONL transcript aggregator — terminal fallback for the rate-limit
2//! data pipeline.
3//!
4//! Canonical spec: `docs/specs/jsonl-aggregation.md`. Ports the
5//! billing-block math from [`ryoppippi/ccusage`](https://github.com/ryoppippi/ccusage)'s
6//! `_session-blocks.ts` (MIT). Produces raw token counts and block
7//! boundaries only; mapping to [`UsageBucket`](super::UsageBucket)
8//! without tier detection is the orchestrator's problem.
9//!
10//! v0.1 exposes only the currently-active 5h block. Historical
11//! blocks are deferred per spec §Open questions — extending
12//! `JsonlAggregate` with `completed_blocks` is a non-breaking change
13//! under `#[non_exhaustive]`.
14
15use std::collections::HashSet;
16use std::fs;
17use std::io::{self, BufRead, BufReader, Seek, SeekFrom};
18use std::path::{Path, PathBuf};
19
20use jiff::{SignedDuration, Timestamp};
21use serde::Deserialize;
22
23/// Billing-block duration, matching ccusage's
24/// `DEFAULT_SESSION_DURATION_HOURS` in `_session-blocks.ts`.
25const BLOCK_DURATION_HOURS: i64 = 5;
26/// Rolling-window width per spec §7-day window math.
27const WINDOW_DAYS: i64 = 7;
28
29// --- Public types -------------------------------------------------------
30
31/// Output of the aggregator. `five_hour` is `None` when no entry
32/// falls within the last [`BLOCK_DURATION_HOURS`] hours; `seven_day`
33/// is always present (zero-valued on an empty transcript).
34#[derive(Debug, Clone)]
35#[non_exhaustive]
36pub struct JsonlAggregate {
37    pub five_hour: Option<FiveHourBlock>,
38    pub seven_day: SevenDayWindow,
39    pub source_paths: Vec<PathBuf>,
40}
41
42/// Active 5-hour billing block. `start` is the UTC-floor-to-hour of
43/// the block's first entry; `actual_last_activity` lets the caller
44/// distinguish a block where the user stopped typing 10 seconds ago
45/// from one where they stopped 4 hours ago. The block's end time is
46/// a derivation from `start` — see [`Self::end`].
47#[derive(Debug, Clone)]
48pub struct FiveHourBlock {
49    pub start: Timestamp,
50    pub actual_last_activity: Timestamp,
51    pub token_counts: TokenCounts,
52    pub models: Vec<String>,
53    /// `usageLimitResetTime` from the most recent entry that carried
54    /// one. Verified absent across the surveyed Claude Code corpus
55    /// (lsm-ghpj, 2026-05-16); the field is deserialized defensively
56    /// but segments do not consume it — `rate_limit_5h_reset` uses
57    /// `block.end()` per ADR-0013.
58    pub usage_limit_reset: Option<Timestamp>,
59}
60
61impl FiveHourBlock {
62    /// Nominal close of the block: `start + BLOCK_DURATION_HOURS`.
63    /// Derived rather than stored so the invariant can't drift from
64    /// `start` after construction.
65    #[must_use]
66    pub fn end(&self) -> Timestamp {
67        self.start + SignedDuration::from_hours(BLOCK_DURATION_HOURS)
68    }
69}
70
71/// Rolling 7-day window. `window_start` is `now - 7d` at the time
72/// the aggregator ran.
73#[derive(Debug, Clone)]
74pub struct SevenDayWindow {
75    pub window_start: Timestamp,
76    pub token_counts: TokenCounts,
77}
78
79/// Per-category token counts aggregated from the transcript.
80///
81/// # Invariants
82///
83/// - All mutations go through [`Self::accumulate`] so additions
84///   saturate at `u64::MAX` rather than wrapping. Fields are
85///   `pub(crate)` so in-crate code can read them; writes are
86///   funnelled through the one private path that preserves the
87///   saturating discipline. External crates read via [`Self::total`]
88///   / [`Self::input`] / [`Self::output`] / [`Self::cache_creation`]
89///   / [`Self::cache_read`].
90#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
91pub struct TokenCounts {
92    pub(crate) input: u64,
93    pub(crate) output: u64,
94    pub(crate) cache_creation: u64,
95    pub(crate) cache_read: u64,
96}
97
98impl TokenCounts {
99    /// Test / fixture constructor. Not exposed to runtime callers —
100    /// production `TokenCounts` values come from the aggregator's
101    /// `accumulate` loop, which preserves the saturating invariant.
102    #[cfg(test)]
103    #[must_use]
104    pub(crate) fn from_parts(
105        input: u64,
106        output: u64,
107        cache_creation: u64,
108        cache_read: u64,
109    ) -> Self {
110        Self {
111            input,
112            output,
113            cache_creation,
114            cache_read,
115        }
116    }
117
118    #[must_use]
119    pub fn input(&self) -> u64 {
120        self.input
121    }
122
123    #[must_use]
124    pub fn output(&self) -> u64 {
125        self.output
126    }
127
128    #[must_use]
129    pub fn cache_creation(&self) -> u64 {
130        self.cache_creation
131    }
132
133    #[must_use]
134    pub fn cache_read(&self) -> u64 {
135        self.cache_read
136    }
137
138    /// Saturating sum across all four categories. Saturating to
139    /// match the spec's open-question note: `u64` overflow is
140    /// practically unreachable, but wrap-on-overflow is surprising.
141    #[must_use]
142    pub fn total(&self) -> u64 {
143        self.input
144            .saturating_add(self.output)
145            .saturating_add(self.cache_creation)
146            .saturating_add(self.cache_read)
147    }
148
149    fn accumulate(&mut self, other: UsageCounts) {
150        self.input = self.input.saturating_add(other.input_tokens);
151        self.output = self.output.saturating_add(other.output_tokens);
152        self.cache_creation = self.cache_creation.saturating_add(other.cache_creation);
153        self.cache_read = self.cache_read.saturating_add(other.cache_read);
154    }
155}
156
157// --- JsonlError ---------------------------------------------------------
158
159#[derive(Debug)]
160#[non_exhaustive]
161pub enum JsonlError {
162    /// No project-root directory exists in any cascade path.
163    DirectoryMissing,
164    /// Project roots exist but yielded zero parseable records.
165    NoEntries,
166    /// Filesystem error opening or traversing a path.
167    IoError { path: PathBuf, cause: io::Error },
168    /// Reserved for fail-fast callers. Production aggregation logs
169    /// per-line parse failures and continues rather than surfacing
170    /// this variant.
171    ParseError {
172        path: PathBuf,
173        line: u64,
174        cause: serde_json::Error,
175    },
176}
177
178impl JsonlError {
179    /// Short plugin-facing tag per `docs/specs/plugin-api.md` §ctx
180    /// shape. `ctx.jsonl` is reserved (not plugin-accessible) in
181    /// v0.1 but the tag stays useful for `UsageError::Jsonl`
182    /// delegation.
183    #[must_use]
184    pub fn code(&self) -> &'static str {
185        match self {
186            Self::DirectoryMissing => "DirectoryMissing",
187            Self::NoEntries => "NoEntries",
188            Self::IoError { .. } => "IoError",
189            Self::ParseError { .. } => "ParseError",
190        }
191    }
192}
193
194impl std::fmt::Display for JsonlError {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        match self {
197            Self::DirectoryMissing => f.write_str("no Claude Code project directory found"),
198            Self::NoEntries => f.write_str("Claude Code project directory has no JSONL entries"),
199            Self::IoError { path, cause } => write!(
200                f,
201                "failed to read JSONL path {}: {}",
202                path.display(),
203                cause.kind()
204            ),
205            Self::ParseError { path, line, cause } => write!(
206                f,
207                "JSONL parse failed in {} at line {}: {}",
208                path.display(),
209                line,
210                cause
211            ),
212        }
213    }
214}
215
216impl std::error::Error for JsonlError {
217    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
218        match self {
219            Self::IoError { cause, .. } => Some(cause),
220            Self::ParseError { cause, .. } => Some(cause),
221            _ => None,
222        }
223    }
224}
225
226// --- Per-line record schema --------------------------------------------
227
228/// Serde view over a single JSONL line. Only the fields the
229/// aggregator consumes are named; unknown keys (including
230/// `costUSD` and `version` until we need them) are dropped per
231/// ADR-0009.
232#[derive(Debug, Deserialize)]
233pub(crate) struct UsageEntry {
234    timestamp: Timestamp,
235    message: MessageFields,
236    #[serde(default, rename = "usageLimitResetTime")]
237    usage_limit_reset_time: Option<Timestamp>,
238}
239
240#[derive(Debug, Deserialize, Default)]
241struct MessageFields {
242    #[serde(default)]
243    usage: Option<UsageCounts>,
244    #[serde(default)]
245    model: Option<String>,
246    #[serde(default)]
247    id: Option<String>,
248}
249
250#[derive(Debug, Deserialize, Default, Clone, Copy)]
251struct UsageCounts {
252    #[serde(default)]
253    input_tokens: u64,
254    #[serde(default)]
255    output_tokens: u64,
256    #[serde(default, rename = "cache_creation_input_tokens")]
257    cache_creation: u64,
258    #[serde(default, rename = "cache_read_input_tokens")]
259    cache_read: u64,
260}
261
262// --- Project-root discovery --------------------------------------------
263
264/// Environmental inputs for `project_roots`. Injected by tests so
265/// they don't have to mutate the (thread-unsafe) process env — same
266/// pattern as `credentials::FileCascadeEnv`.
267#[derive(Debug, Clone, Default)]
268struct DiscoveryEnv {
269    claude_config_dir: Option<PathBuf>,
270    xdg_config_home: Option<PathBuf>,
271    home: Option<PathBuf>,
272}
273
274impl DiscoveryEnv {
275    fn from_process_env() -> Self {
276        fn non_empty(key: &str) -> Option<PathBuf> {
277            std::env::var_os(key)
278                .filter(|v| !v.is_empty())
279                .map(PathBuf::from)
280        }
281        Self {
282            claude_config_dir: non_empty("CLAUDE_CONFIG_DIR"),
283            xdg_config_home: non_empty("XDG_CONFIG_HOME"),
284            home: non_empty("HOME"),
285        }
286    }
287}
288
289fn project_roots(env: &DiscoveryEnv) -> Vec<PathBuf> {
290    let mut out = Vec::with_capacity(3);
291    if let Some(dir) = &env.claude_config_dir {
292        out.push(dir.join("projects"));
293    }
294    // XDG candidate is emitted whenever an XDG root is derivable —
295    // either `$XDG_CONFIG_HOME` directly or `$HOME/.config`. A
296    // HOME-less CI/service environment with only `$XDG_CONFIG_HOME`
297    // set still gets its XDG path probed. Same pattern as
298    // `credentials::file_cascade_candidates`.
299    let xdg_root = env
300        .xdg_config_home
301        .clone()
302        .or_else(|| env.home.as_ref().map(|h| h.join(".config")));
303    if let Some(xdg_root) = xdg_root {
304        out.push(xdg_root.join("claude").join("projects"));
305    }
306    // Legacy `~/.claude/projects/` requires `$HOME`.
307    if let Some(home) = &env.home {
308        out.push(home.join(".claude").join("projects"));
309    }
310    out
311}
312
313// --- JsonlTailer --------------------------------------------------------
314
315/// Byte-offset incremental reader for a single JSONL file. Opens +
316/// reads + closes per call; does NOT hold a file handle across
317/// invocations. Detects truncation via `size < last_size` and
318/// resets the offset when that happens.
319pub(crate) struct JsonlTailer {
320    path: PathBuf,
321    last_offset: u64,
322    last_size: u64,
323}
324
325impl JsonlTailer {
326    #[must_use]
327    pub(crate) fn new(path: PathBuf) -> Self {
328        Self {
329            path,
330            last_offset: 0,
331            last_size: 0,
332        }
333    }
334
335    /// Read any new complete lines since the last call. Malformed
336    /// lines are silently skipped (the offset advances past them so
337    /// repeat invocations don't re-encounter). Returns `Ok(vec![])`
338    /// when the file doesn't exist yet — a fresh install scenario.
339    pub(crate) fn read_new(&mut self) -> Result<Vec<UsageEntry>, JsonlError> {
340        let metadata = match fs::metadata(&self.path) {
341            Ok(m) => m,
342            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
343            Err(cause) => {
344                return Err(JsonlError::IoError {
345                    path: self.path.clone(),
346                    cause,
347                })
348            }
349        };
350
351        let size = metadata.len();
352        if size < self.last_size {
353            self.last_offset = 0;
354        }
355        self.last_size = size;
356
357        if self.last_offset >= size {
358            return Ok(Vec::new());
359        }
360
361        let mut file = fs::File::open(&self.path).map_err(|cause| JsonlError::IoError {
362            path: self.path.clone(),
363            cause,
364        })?;
365        file.seek(SeekFrom::Start(self.last_offset))
366            .map_err(|cause| JsonlError::IoError {
367                path: self.path.clone(),
368                cause,
369            })?;
370
371        let mut reader = BufReader::new(file);
372        let mut entries = Vec::new();
373        let mut buf: Vec<u8> = Vec::new();
374        loop {
375            buf.clear();
376            // Byte-level read: a non-UTF-8 line becomes a per-line
377            // skip (lossy convert + serde reject), not a whole-file
378            // abort the way `read_line(&mut String)` would be.
379            let read = reader
380                .read_until(b'\n', &mut buf)
381                .map_err(|cause| JsonlError::IoError {
382                    path: self.path.clone(),
383                    cause,
384                })?;
385            if read == 0 {
386                break;
387            }
388            if buf.last() != Some(&b'\n') {
389                // Partial trailing line: don't advance past it.
390                break;
391            }
392            self.last_offset += read as u64;
393            let line = match buf.strip_suffix(b"\n") {
394                Some(rest) => rest.strip_suffix(b"\r").unwrap_or(rest),
395                None => &buf[..],
396            };
397            let text = String::from_utf8_lossy(line);
398            if let Ok(entry) = serde_json::from_str::<UsageEntry>(&text) {
399                entries.push(entry);
400            }
401        }
402
403        Ok(entries)
404    }
405}
406
407// --- Aggregation entry point -------------------------------------------
408
409/// Discover project roots, scan every `*.jsonl` under them, dedupe,
410/// aggregate. Memoization is the caller's responsibility; each call
411/// re-scans from offset zero.
412pub fn aggregate_jsonl() -> Result<JsonlAggregate, JsonlError> {
413    aggregate_jsonl_with(&DiscoveryEnv::from_process_env())
414}
415
416fn aggregate_jsonl_with(env: &DiscoveryEnv) -> Result<JsonlAggregate, JsonlError> {
417    let candidate_roots = project_roots(env);
418    let existing_roots: Vec<PathBuf> = candidate_roots.into_iter().filter(|r| r.exists()).collect();
419    if existing_roots.is_empty() {
420        return Err(JsonlError::DirectoryMissing);
421    }
422
423    let mut all_entries: Vec<UsageEntry> = Vec::new();
424    let mut source_paths: Vec<PathBuf> = Vec::new();
425    let mut seen_ids: HashSet<String> = HashSet::new();
426
427    for root in &existing_roots {
428        collect_from_root(root, &mut all_entries, &mut source_paths, &mut seen_ids)?;
429    }
430
431    if all_entries.is_empty() {
432        return Err(JsonlError::NoEntries);
433    }
434
435    all_entries.sort_by_key(|e| e.timestamp);
436    Ok(build_aggregate(&all_entries, source_paths))
437}
438
439/// Recurse one level into each `projects/{workspace}/` subdir and
440/// pick up `*.jsonl` files. Dedup on `message.id`; missing-id
441/// entries are always kept.
442fn collect_from_root(
443    root: &Path,
444    entries: &mut Vec<UsageEntry>,
445    source_paths: &mut Vec<PathBuf>,
446    seen_ids: &mut HashSet<String>,
447) -> Result<(), JsonlError> {
448    let top = match fs::read_dir(root) {
449        Ok(iter) => iter,
450        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
451        Err(cause) => {
452            return Err(JsonlError::IoError {
453                path: root.to_path_buf(),
454                cause,
455            })
456        }
457    };
458    for project in top {
459        let project = match project {
460            Ok(entry) => entry,
461            Err(cause) => {
462                crate::lsm_warn!(
463                    "jsonl: dirent iteration under {} failed: {} ({cause}); skipping",
464                    root.display(),
465                    cause.kind(),
466                );
467                continue;
468            }
469        };
470        let project_path = project.path();
471        if !project_path.is_dir() {
472            continue;
473        }
474        let session_iter = match fs::read_dir(&project_path) {
475            Ok(iter) => iter,
476            Err(e) if e.kind() == io::ErrorKind::NotFound => continue,
477            Err(cause) => {
478                // EACCES / EIO on a specific workspace dir — the
479                // top-level project-root fix in resolve_usage_default
480                // only catches root-level failures. Without this warn,
481                // a stale/unreadable workspace silently poisons the
482                // JSONL fallback and users see the endpoint-path error
483                // with no diagnostic trail.
484                crate::lsm_warn!(
485                    "jsonl: read_dir {} failed: {} ({cause}); skipping workspace",
486                    project_path.display(),
487                    cause.kind(),
488                );
489                continue;
490            }
491        };
492        for session in session_iter {
493            let session = match session {
494                Ok(entry) => entry,
495                Err(cause) => {
496                    crate::lsm_warn!(
497                        "jsonl: dirent iteration under {} failed: {} ({cause}); skipping",
498                        project_path.display(),
499                        cause.kind(),
500                    );
501                    continue;
502                }
503            };
504            let session_path = session.path();
505            if session_path.extension().is_none_or(|ext| ext != "jsonl") {
506                continue;
507            }
508            let mut tailer = JsonlTailer::new(session_path.clone());
509            let file_entries = match tailer.read_new() {
510                Ok(entries) => entries,
511                Err(JsonlError::IoError { path, cause }) => {
512                    crate::lsm_warn!(
513                        "jsonl: tailer read {} failed: {} ({cause}); skipping file",
514                        path.display(),
515                        cause.kind(),
516                    );
517                    continue;
518                }
519                Err(other) => {
520                    crate::lsm_warn!(
521                        "jsonl: tailer read {} failed: {other}; skipping file",
522                        session_path.display(),
523                    );
524                    continue;
525                }
526            };
527            source_paths.push(session_path);
528            for entry in file_entries {
529                if let Some(id) = &entry.message.id {
530                    if !seen_ids.insert(id.clone()) {
531                        continue;
532                    }
533                }
534                entries.push(entry);
535            }
536        }
537    }
538    Ok(())
539}
540
541fn build_aggregate(entries: &[UsageEntry], source_paths: Vec<PathBuf>) -> JsonlAggregate {
542    let now = Timestamp::now();
543    let window_start = now - SignedDuration::from_hours(WINDOW_DAYS * 24);
544
545    let five_hour = compute_active_block(entries, now);
546
547    let mut seven_day_counts = TokenCounts::default();
548    for entry in entries {
549        // Spec §7-day window math: `[now - 7d, now]`. Clock skew
550        // can produce future-dated entries — exclude them so a
551        // misconfigured machine can't inflate the 7d totals until
552        // wall-clock catches up.
553        if entry.timestamp >= window_start && entry.timestamp <= now {
554            if let Some(usage) = entry.message.usage {
555                seven_day_counts.accumulate(usage);
556            }
557        }
558    }
559
560    JsonlAggregate {
561        five_hour,
562        seven_day: SevenDayWindow {
563            window_start,
564            token_counts: seven_day_counts,
565        },
566        source_paths,
567    }
568}
569
570/// Walk entries chronologically, rolling into blocks whenever the
571/// gap from the previous entry exceeds `BLOCK_DURATION_HOURS`.
572/// Returns the latest block only if it's still active (last activity
573/// within `BLOCK_DURATION_HOURS` of `now`).
574///
575/// Future-dated entries (clock skew) are deliberately NOT filtered
576/// here — their tokens still count so a user with a slightly-fast
577/// clock doesn't lose their current session under JSONL fallback.
578/// The cascade's [`build_jsonl_usage`](super::cascade::build_jsonl_usage)
579/// clamps `block.start` to `now`'s hour-floor before surfacing the
580/// window to segments, which neutralizes skewed `ends_at` without
581/// corrupting the token totals.
582fn compute_active_block(entries: &[UsageEntry], now: Timestamp) -> Option<FiveHourBlock> {
583    let block_duration = SignedDuration::from_hours(BLOCK_DURATION_HOURS);
584    let mut current: Option<FiveHourBlock> = None;
585    for entry in entries {
586        match &mut current {
587            None => current = Some(start_block(entry)),
588            Some(block) => {
589                let gap = entry.timestamp.duration_since(block.actual_last_activity);
590                if gap > block_duration {
591                    current = Some(start_block(entry));
592                } else {
593                    extend_block(block, entry);
594                }
595            }
596        }
597    }
598    let block = current?;
599    if now.duration_since(block.actual_last_activity) > block_duration {
600        None
601    } else {
602        Some(block)
603    }
604}
605
606fn start_block(entry: &UsageEntry) -> FiveHourBlock {
607    let mut block = FiveHourBlock {
608        start: floor_to_grain(entry.timestamp, 3600),
609        actual_last_activity: entry.timestamp,
610        token_counts: TokenCounts::default(),
611        models: Vec::new(),
612        usage_limit_reset: None,
613    };
614    extend_block(&mut block, entry);
615    block
616}
617
618fn extend_block(block: &mut FiveHourBlock, entry: &UsageEntry) {
619    if let Some(usage) = entry.message.usage {
620        block.token_counts.accumulate(usage);
621    }
622    if let Some(model) = &entry.message.model {
623        if !block.models.iter().any(|m| m == model) {
624            block.models.push(model.clone());
625        }
626    }
627    if let Some(reset) = entry.usage_limit_reset_time {
628        block.usage_limit_reset = Some(reset);
629    }
630    block.actual_last_activity = entry.timestamp;
631}
632
633/// Floor a timestamp to a whole multiple of `grain_secs` seconds (UTC).
634/// Falls back to the input on overflow: `rem_euclid` always returns a
635/// non-negative remainder, so subtracting it pushes a near-`MIN`
636/// timestamp out of jiff's range. A crafted JSONL line with a
637/// `-009999-01-02T01:59:59Z` timestamp (= `Timestamp::MIN`) round-trips
638/// through serde, so an unconditional `expect` would panic on the
639/// aggregator hot path.
640pub(super) fn floor_to_grain(ts: Timestamp, grain_secs: i64) -> Timestamp {
641    let secs = ts.as_second();
642    let floored = secs - secs.rem_euclid(grain_secs);
643    Timestamp::from_second(floored).unwrap_or(ts)
644}
645
646#[cfg(test)]
647mod tests;