Skip to main content

linesmith_core/data_context/
jsonl.rs

1//! JSONL transcript aggregator — terminal fallback for the rate-limit
2//! data pipeline.
3//!
4//! Canonical spec: `docs/specs/jsonl-aggregation.md`. Ports the
5//! billing-block math from [`ryoppippi/ccusage`](https://github.com/ryoppippi/ccusage)'s
6//! `_session-blocks.ts` (MIT). Produces raw token counts and block
7//! boundaries only; mapping to [`UsageBucket`](super::UsageBucket)
8//! without tier detection is the orchestrator's problem.
9//!
10//! v0.1 exposes only the currently-active 5h block. Historical
11//! blocks are deferred per spec §Open questions — extending
12//! `JsonlAggregate` with `completed_blocks` is a non-breaking change
13//! under `#[non_exhaustive]`.
14
15use std::collections::HashSet;
16use std::fs;
17use std::io::{self, BufRead, BufReader, Seek, SeekFrom};
18use std::path::{Path, PathBuf};
19
20use chrono::{DateTime, Duration, DurationRound, Utc};
21use serde::Deserialize;
22
23/// Billing-block duration, matching ccusage's
24/// `DEFAULT_SESSION_DURATION_HOURS` in `_session-blocks.ts`.
25const BLOCK_DURATION_HOURS: i64 = 5;
26/// Rolling-window width per spec §7-day window math.
27const WINDOW_DAYS: i64 = 7;
28
29// --- Public types -------------------------------------------------------
30
31/// Output of the aggregator. `five_hour` is `None` when no entry
32/// falls within the last [`BLOCK_DURATION_HOURS`] hours; `seven_day`
33/// is always present (zero-valued on an empty transcript).
34#[derive(Debug, Clone)]
35#[non_exhaustive]
36pub struct JsonlAggregate {
37    pub five_hour: Option<FiveHourBlock>,
38    pub seven_day: SevenDayWindow,
39    pub source_paths: Vec<PathBuf>,
40}
41
42/// Active 5-hour billing block. `start` is the UTC-floor-to-hour of
43/// the block's first entry; `actual_last_activity` lets the caller
44/// distinguish a block where the user stopped typing 10 seconds ago
45/// from one where they stopped 4 hours ago. The block's end time is
46/// a derivation from `start` — see [`Self::end`].
47#[derive(Debug, Clone)]
48pub struct FiveHourBlock {
49    pub start: DateTime<Utc>,
50    pub actual_last_activity: DateTime<Utc>,
51    pub token_counts: TokenCounts,
52    pub models: Vec<String>,
53    pub usage_limit_reset: Option<DateTime<Utc>>,
54}
55
56impl FiveHourBlock {
57    /// Nominal close of the block: `start + BLOCK_DURATION_HOURS`.
58    /// Derived rather than stored so the invariant can't drift from
59    /// `start` after construction.
60    #[must_use]
61    pub fn end(&self) -> DateTime<Utc> {
62        self.start + Duration::hours(BLOCK_DURATION_HOURS)
63    }
64}
65
66/// Rolling 7-day window. `window_start` is `now - 7d` at the time
67/// the aggregator ran.
68#[derive(Debug, Clone)]
69pub struct SevenDayWindow {
70    pub window_start: DateTime<Utc>,
71    pub token_counts: TokenCounts,
72}
73
74/// Per-category token counts aggregated from the transcript.
75///
76/// # Invariants
77///
78/// - All mutations go through [`Self::accumulate`] so additions
79///   saturate at `u64::MAX` rather than wrapping. Fields are
80///   `pub(crate)` so in-crate code can read them; writes are
81///   funnelled through the one private path that preserves the
82///   saturating discipline. External crates read via [`Self::total`]
83///   / [`Self::input`] / [`Self::output`] / [`Self::cache_creation`]
84///   / [`Self::cache_read`].
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
86pub struct TokenCounts {
87    pub(crate) input: u64,
88    pub(crate) output: u64,
89    pub(crate) cache_creation: u64,
90    pub(crate) cache_read: u64,
91}
92
93impl TokenCounts {
94    /// Test / fixture constructor. Not exposed to runtime callers —
95    /// production `TokenCounts` values come from the aggregator's
96    /// `accumulate` loop, which preserves the saturating invariant.
97    #[cfg(test)]
98    #[must_use]
99    pub(crate) fn from_parts(
100        input: u64,
101        output: u64,
102        cache_creation: u64,
103        cache_read: u64,
104    ) -> Self {
105        Self {
106            input,
107            output,
108            cache_creation,
109            cache_read,
110        }
111    }
112
113    #[must_use]
114    pub fn input(&self) -> u64 {
115        self.input
116    }
117
118    #[must_use]
119    pub fn output(&self) -> u64 {
120        self.output
121    }
122
123    #[must_use]
124    pub fn cache_creation(&self) -> u64 {
125        self.cache_creation
126    }
127
128    #[must_use]
129    pub fn cache_read(&self) -> u64 {
130        self.cache_read
131    }
132
133    /// Saturating sum across all four categories. Saturating to
134    /// match the spec's open-question note: `u64` overflow is
135    /// practically unreachable, but wrap-on-overflow is surprising.
136    #[must_use]
137    pub fn total(&self) -> u64 {
138        self.input
139            .saturating_add(self.output)
140            .saturating_add(self.cache_creation)
141            .saturating_add(self.cache_read)
142    }
143
144    fn accumulate(&mut self, other: UsageCounts) {
145        self.input = self.input.saturating_add(other.input_tokens);
146        self.output = self.output.saturating_add(other.output_tokens);
147        self.cache_creation = self.cache_creation.saturating_add(other.cache_creation);
148        self.cache_read = self.cache_read.saturating_add(other.cache_read);
149    }
150}
151
152// --- JsonlError ---------------------------------------------------------
153
154#[derive(Debug)]
155#[non_exhaustive]
156pub enum JsonlError {
157    /// No project-root directory exists in any cascade path.
158    DirectoryMissing,
159    /// Project roots exist but yielded zero parseable records.
160    NoEntries,
161    /// Filesystem error opening or traversing a path.
162    IoError { path: PathBuf, cause: io::Error },
163    /// Reserved for fail-fast callers. Production aggregation logs
164    /// per-line parse failures and continues rather than surfacing
165    /// this variant.
166    ParseError {
167        path: PathBuf,
168        line: u64,
169        cause: serde_json::Error,
170    },
171}
172
173impl JsonlError {
174    /// Short plugin-facing tag per `docs/specs/plugin-api.md` §ctx
175    /// shape. `ctx.jsonl` is reserved (not plugin-accessible) in
176    /// v0.1 but the tag stays useful for `UsageError::Jsonl`
177    /// delegation.
178    #[must_use]
179    pub fn code(&self) -> &'static str {
180        match self {
181            Self::DirectoryMissing => "DirectoryMissing",
182            Self::NoEntries => "NoEntries",
183            Self::IoError { .. } => "IoError",
184            Self::ParseError { .. } => "ParseError",
185        }
186    }
187}
188
189impl std::fmt::Display for JsonlError {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        match self {
192            Self::DirectoryMissing => f.write_str("no Claude Code project directory found"),
193            Self::NoEntries => f.write_str("Claude Code project directory has no JSONL entries"),
194            Self::IoError { path, cause } => write!(
195                f,
196                "failed to read JSONL path {}: {}",
197                path.display(),
198                cause.kind()
199            ),
200            Self::ParseError { path, line, cause } => write!(
201                f,
202                "JSONL parse failed in {} at line {}: {}",
203                path.display(),
204                line,
205                cause
206            ),
207        }
208    }
209}
210
211impl std::error::Error for JsonlError {
212    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
213        match self {
214            Self::IoError { cause, .. } => Some(cause),
215            Self::ParseError { cause, .. } => Some(cause),
216            _ => None,
217        }
218    }
219}
220
221// --- Per-line record schema --------------------------------------------
222
223/// Serde view over a single JSONL line. Only the fields the
224/// aggregator consumes are named; unknown keys (including
225/// `costUSD` and `version` until we need them) are dropped per
226/// ADR-0009.
227#[derive(Debug, Deserialize)]
228pub(crate) struct UsageEntry {
229    timestamp: DateTime<Utc>,
230    message: MessageFields,
231    #[serde(default, rename = "usageLimitResetTime")]
232    usage_limit_reset_time: Option<DateTime<Utc>>,
233}
234
235#[derive(Debug, Deserialize, Default)]
236struct MessageFields {
237    #[serde(default)]
238    usage: Option<UsageCounts>,
239    #[serde(default)]
240    model: Option<String>,
241    #[serde(default)]
242    id: Option<String>,
243}
244
245#[derive(Debug, Deserialize, Default, Clone, Copy)]
246struct UsageCounts {
247    #[serde(default)]
248    input_tokens: u64,
249    #[serde(default)]
250    output_tokens: u64,
251    #[serde(default, rename = "cache_creation_input_tokens")]
252    cache_creation: u64,
253    #[serde(default, rename = "cache_read_input_tokens")]
254    cache_read: u64,
255}
256
257// --- Project-root discovery --------------------------------------------
258
259/// Environmental inputs for `project_roots`. Injected by tests so
260/// they don't have to mutate the (thread-unsafe) process env — same
261/// pattern as `credentials::FileCascadeEnv`.
262#[derive(Debug, Clone, Default)]
263struct DiscoveryEnv {
264    claude_config_dir: Option<PathBuf>,
265    xdg_config_home: Option<PathBuf>,
266    home: Option<PathBuf>,
267}
268
269impl DiscoveryEnv {
270    fn from_process_env() -> Self {
271        fn non_empty(key: &str) -> Option<PathBuf> {
272            std::env::var_os(key)
273                .filter(|v| !v.is_empty())
274                .map(PathBuf::from)
275        }
276        Self {
277            claude_config_dir: non_empty("CLAUDE_CONFIG_DIR"),
278            xdg_config_home: non_empty("XDG_CONFIG_HOME"),
279            home: non_empty("HOME"),
280        }
281    }
282}
283
284fn project_roots(env: &DiscoveryEnv) -> Vec<PathBuf> {
285    let mut out = Vec::with_capacity(3);
286    if let Some(dir) = &env.claude_config_dir {
287        out.push(dir.join("projects"));
288    }
289    // XDG candidate is emitted whenever an XDG root is derivable —
290    // either `$XDG_CONFIG_HOME` directly or `$HOME/.config`. A
291    // HOME-less CI/service environment with only `$XDG_CONFIG_HOME`
292    // set still gets its XDG path probed. Same pattern as
293    // `credentials::file_cascade_candidates`.
294    let xdg_root = env
295        .xdg_config_home
296        .clone()
297        .or_else(|| env.home.as_ref().map(|h| h.join(".config")));
298    if let Some(xdg_root) = xdg_root {
299        out.push(xdg_root.join("claude").join("projects"));
300    }
301    // Legacy `~/.claude/projects/` requires `$HOME`.
302    if let Some(home) = &env.home {
303        out.push(home.join(".claude").join("projects"));
304    }
305    out
306}
307
308// --- JsonlTailer --------------------------------------------------------
309
310/// Byte-offset incremental reader for a single JSONL file. Opens +
311/// reads + closes per call; does NOT hold a file handle across
312/// invocations. Detects truncation via `size < last_size` and
313/// resets the offset when that happens.
314pub(crate) struct JsonlTailer {
315    path: PathBuf,
316    last_offset: u64,
317    last_size: u64,
318}
319
320impl JsonlTailer {
321    #[must_use]
322    pub(crate) fn new(path: PathBuf) -> Self {
323        Self {
324            path,
325            last_offset: 0,
326            last_size: 0,
327        }
328    }
329
330    /// Read any new complete lines since the last call. Malformed
331    /// lines are silently skipped (the offset advances past them so
332    /// repeat invocations don't re-encounter). Returns `Ok(vec![])`
333    /// when the file doesn't exist yet — a fresh install scenario.
334    pub(crate) fn read_new(&mut self) -> Result<Vec<UsageEntry>, JsonlError> {
335        let metadata = match fs::metadata(&self.path) {
336            Ok(m) => m,
337            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
338            Err(cause) => {
339                return Err(JsonlError::IoError {
340                    path: self.path.clone(),
341                    cause,
342                })
343            }
344        };
345
346        let size = metadata.len();
347        if size < self.last_size {
348            self.last_offset = 0;
349        }
350        self.last_size = size;
351
352        if self.last_offset >= size {
353            return Ok(Vec::new());
354        }
355
356        let mut file = fs::File::open(&self.path).map_err(|cause| JsonlError::IoError {
357            path: self.path.clone(),
358            cause,
359        })?;
360        file.seek(SeekFrom::Start(self.last_offset))
361            .map_err(|cause| JsonlError::IoError {
362                path: self.path.clone(),
363                cause,
364            })?;
365
366        let mut reader = BufReader::new(file);
367        let mut entries = Vec::new();
368        let mut buf: Vec<u8> = Vec::new();
369        loop {
370            buf.clear();
371            // Byte-level read: a non-UTF-8 line becomes a per-line
372            // skip (lossy convert + serde reject), not a whole-file
373            // abort the way `read_line(&mut String)` would be.
374            let read = reader
375                .read_until(b'\n', &mut buf)
376                .map_err(|cause| JsonlError::IoError {
377                    path: self.path.clone(),
378                    cause,
379                })?;
380            if read == 0 {
381                break;
382            }
383            if buf.last() != Some(&b'\n') {
384                // Partial trailing line: don't advance past it.
385                break;
386            }
387            self.last_offset += read as u64;
388            let line = match buf.strip_suffix(b"\n") {
389                Some(rest) => rest.strip_suffix(b"\r").unwrap_or(rest),
390                None => &buf[..],
391            };
392            let text = String::from_utf8_lossy(line);
393            if let Ok(entry) = serde_json::from_str::<UsageEntry>(&text) {
394                entries.push(entry);
395            }
396        }
397
398        Ok(entries)
399    }
400}
401
402// --- Aggregation entry point -------------------------------------------
403
404/// Discover project roots, scan every `*.jsonl` under them, dedupe,
405/// aggregate. Memoization is the caller's responsibility; each call
406/// re-scans from offset zero.
407pub fn aggregate_jsonl() -> Result<JsonlAggregate, JsonlError> {
408    aggregate_jsonl_with(&DiscoveryEnv::from_process_env())
409}
410
411fn aggregate_jsonl_with(env: &DiscoveryEnv) -> Result<JsonlAggregate, JsonlError> {
412    let candidate_roots = project_roots(env);
413    let existing_roots: Vec<PathBuf> = candidate_roots.into_iter().filter(|r| r.exists()).collect();
414    if existing_roots.is_empty() {
415        return Err(JsonlError::DirectoryMissing);
416    }
417
418    let mut all_entries: Vec<UsageEntry> = Vec::new();
419    let mut source_paths: Vec<PathBuf> = Vec::new();
420    let mut seen_ids: HashSet<String> = HashSet::new();
421
422    for root in &existing_roots {
423        collect_from_root(root, &mut all_entries, &mut source_paths, &mut seen_ids)?;
424    }
425
426    if all_entries.is_empty() {
427        return Err(JsonlError::NoEntries);
428    }
429
430    all_entries.sort_by_key(|e| e.timestamp);
431    Ok(build_aggregate(&all_entries, source_paths))
432}
433
434/// Recurse one level into each `projects/{workspace}/` subdir and
435/// pick up `*.jsonl` files. Dedup on `message.id`; missing-id
436/// entries are always kept.
437fn collect_from_root(
438    root: &Path,
439    entries: &mut Vec<UsageEntry>,
440    source_paths: &mut Vec<PathBuf>,
441    seen_ids: &mut HashSet<String>,
442) -> Result<(), JsonlError> {
443    let top = match fs::read_dir(root) {
444        Ok(iter) => iter,
445        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
446        Err(cause) => {
447            return Err(JsonlError::IoError {
448                path: root.to_path_buf(),
449                cause,
450            })
451        }
452    };
453    for project in top {
454        let project = match project {
455            Ok(entry) => entry,
456            Err(cause) => {
457                crate::lsm_warn!(
458                    "jsonl: dirent iteration under {} failed: {} ({cause}); skipping",
459                    root.display(),
460                    cause.kind(),
461                );
462                continue;
463            }
464        };
465        let project_path = project.path();
466        if !project_path.is_dir() {
467            continue;
468        }
469        let session_iter = match fs::read_dir(&project_path) {
470            Ok(iter) => iter,
471            Err(e) if e.kind() == io::ErrorKind::NotFound => continue,
472            Err(cause) => {
473                // EACCES / EIO on a specific workspace dir — the
474                // top-level project-root fix in resolve_usage_default
475                // only catches root-level failures. Without this warn,
476                // a stale/unreadable workspace silently poisons the
477                // JSONL fallback and users see the endpoint-path error
478                // with no diagnostic trail.
479                crate::lsm_warn!(
480                    "jsonl: read_dir {} failed: {} ({cause}); skipping workspace",
481                    project_path.display(),
482                    cause.kind(),
483                );
484                continue;
485            }
486        };
487        for session in session_iter {
488            let session = match session {
489                Ok(entry) => entry,
490                Err(cause) => {
491                    crate::lsm_warn!(
492                        "jsonl: dirent iteration under {} failed: {} ({cause}); skipping",
493                        project_path.display(),
494                        cause.kind(),
495                    );
496                    continue;
497                }
498            };
499            let session_path = session.path();
500            if session_path.extension().is_none_or(|ext| ext != "jsonl") {
501                continue;
502            }
503            let mut tailer = JsonlTailer::new(session_path.clone());
504            let file_entries = match tailer.read_new() {
505                Ok(entries) => entries,
506                Err(JsonlError::IoError { path, cause }) => {
507                    crate::lsm_warn!(
508                        "jsonl: tailer read {} failed: {} ({cause}); skipping file",
509                        path.display(),
510                        cause.kind(),
511                    );
512                    continue;
513                }
514                Err(other) => {
515                    crate::lsm_warn!(
516                        "jsonl: tailer read {} failed: {other}; skipping file",
517                        session_path.display(),
518                    );
519                    continue;
520                }
521            };
522            source_paths.push(session_path);
523            for entry in file_entries {
524                if let Some(id) = &entry.message.id {
525                    if !seen_ids.insert(id.clone()) {
526                        continue;
527                    }
528                }
529                entries.push(entry);
530            }
531        }
532    }
533    Ok(())
534}
535
536fn build_aggregate(entries: &[UsageEntry], source_paths: Vec<PathBuf>) -> JsonlAggregate {
537    let now = Utc::now();
538    let window_start = now - Duration::days(WINDOW_DAYS);
539
540    let five_hour = compute_active_block(entries, now);
541
542    let mut seven_day_counts = TokenCounts::default();
543    for entry in entries {
544        // Spec §7-day window math: `[now - 7d, now]`. Clock skew
545        // can produce future-dated entries — exclude them so a
546        // misconfigured machine can't inflate the 7d totals until
547        // wall-clock catches up.
548        if entry.timestamp >= window_start && entry.timestamp <= now {
549            if let Some(usage) = entry.message.usage {
550                seven_day_counts.accumulate(usage);
551            }
552        }
553    }
554
555    JsonlAggregate {
556        five_hour,
557        seven_day: SevenDayWindow {
558            window_start,
559            token_counts: seven_day_counts,
560        },
561        source_paths,
562    }
563}
564
565/// Walk entries chronologically, rolling into blocks whenever the
566/// gap from the previous entry exceeds `BLOCK_DURATION_HOURS`.
567/// Returns the latest block only if it's still active (last activity
568/// within `BLOCK_DURATION_HOURS` of `now`).
569///
570/// Future-dated entries (clock skew) are deliberately NOT filtered
571/// here — their tokens still count so a user with a slightly-fast
572/// clock doesn't lose their current session under JSONL fallback.
573/// The cascade's [`build_jsonl_usage`](super::cascade::build_jsonl_usage)
574/// clamps `block.start` to `now`'s hour-floor before surfacing the
575/// window to segments, which neutralizes skewed `ends_at` without
576/// corrupting the token totals.
577fn compute_active_block(entries: &[UsageEntry], now: DateTime<Utc>) -> Option<FiveHourBlock> {
578    let block_duration = Duration::hours(BLOCK_DURATION_HOURS);
579    let mut current: Option<FiveHourBlock> = None;
580    for entry in entries {
581        match &mut current {
582            None => current = Some(start_block(entry)),
583            Some(block) => {
584                let gap = entry.timestamp - block.actual_last_activity;
585                if gap > block_duration {
586                    current = Some(start_block(entry));
587                } else {
588                    extend_block(block, entry);
589                }
590            }
591        }
592    }
593    let block = current?;
594    if now - block.actual_last_activity > block_duration {
595        None
596    } else {
597        Some(block)
598    }
599}
600
601fn start_block(entry: &UsageEntry) -> FiveHourBlock {
602    let mut block = FiveHourBlock {
603        start: floor_to_hour(entry.timestamp),
604        actual_last_activity: entry.timestamp,
605        token_counts: TokenCounts::default(),
606        models: Vec::new(),
607        usage_limit_reset: None,
608    };
609    extend_block(&mut block, entry);
610    block
611}
612
613fn extend_block(block: &mut FiveHourBlock, entry: &UsageEntry) {
614    if let Some(usage) = entry.message.usage {
615        block.token_counts.accumulate(usage);
616    }
617    if let Some(model) = &entry.message.model {
618        if !block.models.iter().any(|m| m == model) {
619            block.models.push(model.clone());
620        }
621    }
622    if let Some(reset) = entry.usage_limit_reset_time {
623        block.usage_limit_reset = Some(reset);
624    }
625    block.actual_last_activity = entry.timestamp;
626}
627
628pub(super) fn floor_to_hour(ts: DateTime<Utc>) -> DateTime<Utc> {
629    // `duration_trunc(hours(1))` only errors on zero / overflow
630    // durations, which can't arise from a 1-hour grain.
631    ts.duration_trunc(Duration::hours(1))
632        .expect("1-hour grain never overflows DateTime<Utc>")
633}
634
635// --- Tests --------------------------------------------------------------
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use chrono::TimeZone;
641    use tempfile::TempDir;
642
643    fn env_from(claude: Option<&Path>, xdg: Option<&Path>, home: Option<&Path>) -> DiscoveryEnv {
644        DiscoveryEnv {
645            claude_config_dir: claude.map(Path::to_path_buf),
646            xdg_config_home: xdg.map(Path::to_path_buf),
647            home: home.map(Path::to_path_buf),
648        }
649    }
650
651    fn write_jsonl(dir: &Path, workspace: &str, session: &str, lines: &[&str]) -> PathBuf {
652        let target = dir.join(workspace);
653        fs::create_dir_all(&target).unwrap();
654        let path = target.join(session);
655        fs::write(&path, lines.join("\n") + "\n").unwrap();
656        path
657    }
658
659    fn record(ts: &str, input: u64, output: u64, id: Option<&str>) -> String {
660        let id_part = id.map_or(String::new(), |i| format!(r#","id":"{i}""#));
661        format!(
662            r#"{{"timestamp":"{ts}","message":{{"usage":{{"input_tokens":{input},"output_tokens":{output}}},"model":"claude-opus-4-7"{id_part}}}}}"#
663        )
664    }
665
666    // --- project_roots cascade ----------------------------------------
667
668    #[test]
669    fn project_roots_includes_env_dir_when_set() {
670        let tmp = TempDir::new().unwrap();
671        let env = env_from(Some(tmp.path()), None, Some(tmp.path()));
672        let roots = project_roots(&env);
673        assert!(roots[0].ends_with("projects"));
674        assert!(roots[0].starts_with(tmp.path()));
675    }
676
677    #[test]
678    fn project_roots_omits_env_dir_when_unset() {
679        let tmp = TempDir::new().unwrap();
680        let env = env_from(None, None, Some(tmp.path()));
681        let roots = project_roots(&env);
682        for r in &roots {
683            assert!(!r
684                .parent()
685                .unwrap()
686                .ends_with(tmp.path().file_name().unwrap()));
687        }
688    }
689
690    #[test]
691    fn project_roots_falls_back_to_home_when_xdg_unset() {
692        let tmp = TempDir::new().unwrap();
693        let env = env_from(None, None, Some(tmp.path()));
694        let roots = project_roots(&env);
695        assert!(roots
696            .iter()
697            .any(|r| r.starts_with(tmp.path().join(".config"))));
698        assert!(roots
699            .iter()
700            .any(|r| r.starts_with(tmp.path().join(".claude"))));
701    }
702
703    // --- aggregate_jsonl_with: dir-level outcomes ---------------------
704
705    #[test]
706    fn aggregate_returns_directory_missing_when_no_roots_exist() {
707        let tmp = TempDir::new().unwrap();
708        let env = env_from(None, None, Some(&tmp.path().join("nonexistent")));
709        let err = aggregate_jsonl_with(&env).unwrap_err();
710        assert!(matches!(err, JsonlError::DirectoryMissing));
711    }
712
713    #[test]
714    fn aggregate_returns_no_entries_when_roots_empty() {
715        let tmp = TempDir::new().unwrap();
716        fs::create_dir_all(tmp.path().join(".claude").join("projects")).unwrap();
717        let env = env_from(None, None, Some(tmp.path()));
718        let err = aggregate_jsonl_with(&env).unwrap_err();
719        assert!(matches!(err, JsonlError::NoEntries));
720    }
721
722    // --- 5h block math ------------------------------------------------
723
724    #[test]
725    fn active_block_computed_from_recent_entries() {
726        // Two entries within 5h of now.
727        let now = Utc::now();
728        let e1 = UsageEntry {
729            timestamp: now - Duration::hours(1),
730            message: MessageFields {
731                usage: Some(UsageCounts {
732                    input_tokens: 100,
733                    output_tokens: 50,
734                    cache_creation: 0,
735                    cache_read: 0,
736                }),
737                model: Some("claude-opus-4-7".into()),
738                id: Some("msg_1".into()),
739            },
740            usage_limit_reset_time: None,
741        };
742        let block = compute_active_block(&[e1], now).expect("active");
743        assert_eq!(block.token_counts.input, 100);
744        assert_eq!(block.models, vec!["claude-opus-4-7"]);
745    }
746
747    #[test]
748    fn no_active_block_when_last_entry_is_older_than_window() {
749        let now = Utc::now();
750        let e1 = UsageEntry {
751            timestamp: now - Duration::hours(10),
752            message: MessageFields::default(),
753            usage_limit_reset_time: None,
754        };
755        assert!(compute_active_block(&[e1], now).is_none());
756    }
757
758    #[test]
759    fn new_block_starts_on_gap_exceeding_window() {
760        // e1 at T-8h, e2 at T-1h. Gap is 7h > 5h, so a new block
761        // starts at e2. Only the newer block is kept.
762        let now = Utc::now();
763        let e1 = UsageEntry {
764            timestamp: now - Duration::hours(8),
765            message: MessageFields {
766                usage: Some(UsageCounts {
767                    input_tokens: 999,
768                    ..UsageCounts::default()
769                }),
770                ..MessageFields::default()
771            },
772            usage_limit_reset_time: None,
773        };
774        let e2 = UsageEntry {
775            timestamp: now - Duration::hours(1),
776            message: MessageFields {
777                usage: Some(UsageCounts {
778                    input_tokens: 10,
779                    ..UsageCounts::default()
780                }),
781                ..MessageFields::default()
782            },
783            usage_limit_reset_time: None,
784        };
785        let block = compute_active_block(&[e1, e2], now).expect("active");
786        // Only e2's tokens count — e1 was rolled into a closed block.
787        assert_eq!(block.token_counts.input, 10);
788    }
789
790    #[test]
791    fn usage_limit_reset_picks_most_recent() {
792        let now = Utc::now();
793        let earlier_reset = now + Duration::hours(1);
794        let later_reset = now + Duration::hours(2);
795        let e1 = UsageEntry {
796            timestamp: now - Duration::minutes(90),
797            message: MessageFields::default(),
798            usage_limit_reset_time: Some(earlier_reset),
799        };
800        let e2 = UsageEntry {
801            timestamp: now - Duration::minutes(30),
802            message: MessageFields::default(),
803            usage_limit_reset_time: Some(later_reset),
804        };
805        let block = compute_active_block(&[e1, e2], now).expect("active");
806        assert_eq!(block.usage_limit_reset, Some(later_reset));
807    }
808
809    // --- Per-line record parsing --------------------------------------
810
811    #[test]
812    fn parses_full_record_shape() {
813        let line = r#"{"timestamp":"2026-04-20T14:23:47Z","message":{"id":"msg_1","model":"claude-opus-4-7","usage":{"input_tokens":1842,"output_tokens":631,"cache_creation_input_tokens":0,"cache_read_input_tokens":48122}},"costUSD":0.0421,"version":"1.0.85","usageLimitResetTime":"2026-04-20T19:00:00Z"}"#;
814        let entry: UsageEntry = serde_json::from_str(line).expect("parse");
815        let u = entry.message.usage.unwrap();
816        assert_eq!(u.input_tokens, 1842);
817        assert_eq!(u.cache_read, 48122);
818        assert_eq!(entry.message.id.as_deref(), Some("msg_1"));
819        assert!(entry.usage_limit_reset_time.is_some());
820    }
821
822    #[test]
823    fn parses_sparse_record_shape() {
824        // Missing model, id, cost, reset — just timestamp + usage.
825        let line = r#"{"timestamp":"2026-04-20T14:23:47Z","message":{"usage":{"input_tokens":100,"output_tokens":50}}}"#;
826        let entry: UsageEntry = serde_json::from_str(line).expect("parse");
827        assert_eq!(entry.message.usage.unwrap().input_tokens, 100);
828        assert!(entry.message.id.is_none());
829    }
830
831    #[test]
832    fn unknown_fields_are_dropped() {
833        // Future schema extension shouldn't break parsing.
834        let line = r#"{"timestamp":"2026-04-20T14:23:47Z","message":{},"futureField":"ignored","anotherThing":{"nested":true}}"#;
835        serde_json::from_str::<UsageEntry>(line).expect("parse");
836    }
837
838    // --- JsonlTailer -------------------------------------------------
839
840    #[test]
841    fn tailer_reads_all_lines_on_first_call() {
842        let tmp = TempDir::new().unwrap();
843        let path = tmp.path().join("t.jsonl");
844        let lines = [
845            record("2026-04-20T00:00:00Z", 1, 1, Some("a")),
846            record("2026-04-20T00:01:00Z", 2, 2, Some("b")),
847            record("2026-04-20T00:02:00Z", 3, 3, Some("c")),
848        ];
849        fs::write(&path, lines.join("\n") + "\n").unwrap();
850        let mut tailer = JsonlTailer::new(path);
851        let entries = tailer.read_new().expect("ok");
852        assert_eq!(entries.len(), 3);
853    }
854
855    #[test]
856    fn tailer_only_reads_new_lines_on_second_call() {
857        let tmp = TempDir::new().unwrap();
858        let path = tmp.path().join("t.jsonl");
859        fs::write(
860            &path,
861            record("2026-04-20T00:00:00Z", 1, 1, Some("a")) + "\n",
862        )
863        .unwrap();
864        let mut tailer = JsonlTailer::new(path.clone());
865        let first = tailer.read_new().expect("ok");
866        assert_eq!(first.len(), 1);
867
868        let existing = fs::read_to_string(&path).unwrap();
869        let new_line = record("2026-04-20T00:01:00Z", 2, 2, Some("b"));
870        fs::write(&path, format!("{existing}{new_line}\n")).unwrap();
871        let second = tailer.read_new().expect("ok");
872        assert_eq!(second.len(), 1);
873        assert_eq!(second[0].message.id.as_deref(), Some("b"));
874    }
875
876    #[test]
877    fn tailer_returns_empty_for_missing_file() {
878        let tmp = TempDir::new().unwrap();
879        let mut tailer = JsonlTailer::new(tmp.path().join("nonexistent.jsonl"));
880        let entries = tailer.read_new().expect("ok");
881        assert!(entries.is_empty());
882    }
883
884    #[test]
885    fn tailer_resets_on_truncation() {
886        let tmp = TempDir::new().unwrap();
887        let path = tmp.path().join("t.jsonl");
888        let initial = [
889            record("2026-04-20T00:00:00Z", 1, 1, Some("a")),
890            record("2026-04-20T00:01:00Z", 2, 2, Some("b")),
891        ];
892        fs::write(&path, initial.join("\n") + "\n").unwrap();
893        let mut tailer = JsonlTailer::new(path.clone());
894        tailer.read_new().expect("first");
895
896        let new_line = record("2026-04-20T00:02:00Z", 3, 3, Some("c"));
897        fs::write(&path, new_line + "\n").unwrap();
898        let after = tailer.read_new().expect("after truncate");
899        assert_eq!(after.len(), 1);
900        assert_eq!(after[0].message.id.as_deref(), Some("c"));
901    }
902
903    #[test]
904    fn tailer_skips_partial_trailing_line() {
905        let tmp = TempDir::new().unwrap();
906        let path = tmp.path().join("t.jsonl");
907        let complete = record("2026-04-20T00:00:00Z", 1, 1, Some("a"));
908        fs::write(
909            &path,
910            format!("{complete}\n{}", r#"{"timestamp":"2026-04-20T00:01:00Z""#),
911        )
912        .unwrap();
913        let mut tailer = JsonlTailer::new(path);
914        let entries = tailer.read_new().expect("ok");
915        // Partial line must not advance the offset — it would be
916        // re-read on the next poll once the tail completes.
917        assert_eq!(entries.len(), 1);
918    }
919
920    #[test]
921    fn tailer_skips_non_utf8_line_and_keeps_later_valid_lines() {
922        // Per spec §Edge cases, non-UTF-8 is a malformed-line class
923        // — the tailer must skip just the bad line, NOT abandon the
924        // rest of the file. Pre-fix, `read_line(&mut String)` raised
925        // `InvalidData` and the file was dropped entirely.
926        let tmp = TempDir::new().unwrap();
927        let path = tmp.path().join("t.jsonl");
928        let good_before = record("2026-04-20T00:00:00Z", 1, 1, Some("before"));
929        let good_after = record("2026-04-20T00:02:00Z", 3, 3, Some("after"));
930        let mut bytes = Vec::new();
931        bytes.extend_from_slice(good_before.as_bytes());
932        bytes.push(b'\n');
933        bytes.extend_from_slice(&[0xFF, 0xFE, 0xFD, b'\n']);
934        bytes.extend_from_slice(good_after.as_bytes());
935        bytes.push(b'\n');
936        fs::write(&path, &bytes).unwrap();
937        let mut tailer = JsonlTailer::new(path);
938        let entries = tailer.read_new().expect("ok");
939        assert_eq!(entries.len(), 2);
940    }
941
942    #[test]
943    fn tailer_skips_malformed_lines_and_advances_past_them() {
944        let tmp = TempDir::new().unwrap();
945        let path = tmp.path().join("t.jsonl");
946        let good = record("2026-04-20T00:00:00Z", 1, 1, Some("a"));
947        let bad = "{ this is not json }";
948        let good2 = record("2026-04-20T00:01:00Z", 2, 2, Some("b"));
949        fs::write(&path, format!("{good}\n{bad}\n{good2}\n")).unwrap();
950        let mut tailer = JsonlTailer::new(path);
951        let entries = tailer.read_new().expect("ok");
952        assert_eq!(entries.len(), 2);
953    }
954
955    // --- Dedup semantics ----------------------------------------------
956
957    #[test]
958    fn aggregate_dedupes_on_message_id() {
959        let tmp = TempDir::new().unwrap();
960        let home = tmp.path();
961        let projects = home.join(".claude").join("projects");
962        let now = Utc::now();
963        let ts = now
964            .duration_trunc(Duration::minutes(1))
965            .unwrap()
966            .to_rfc3339();
967        let line = record(&ts, 100, 50, Some("dup-1"));
968        write_jsonl(&projects, "-proj-a", "sess1.jsonl", &[&line]);
969        write_jsonl(&projects, "-proj-a", "sess2.jsonl", &[&line]);
970
971        let env = env_from(None, None, Some(home));
972        let agg = aggregate_jsonl_with(&env).expect("aggregate");
973        // Dedupe merges the shared id across sessions → 100, not 200.
974        assert_eq!(agg.seven_day.token_counts.input, 100);
975    }
976
977    #[test]
978    fn aggregate_keeps_missing_id_entries_individually() {
979        let tmp = TempDir::new().unwrap();
980        let home = tmp.path();
981        let projects = home.join(".claude").join("projects");
982        let now = Utc::now();
983        let ts = now
984            .duration_trunc(Duration::minutes(1))
985            .unwrap()
986            .to_rfc3339();
987        let line = record(&ts, 100, 50, None);
988        write_jsonl(&projects, "-proj-a", "sess1.jsonl", &[&line, &line]);
989
990        let env = env_from(None, None, Some(home));
991        let agg = aggregate_jsonl_with(&env).expect("aggregate");
992        assert_eq!(agg.seven_day.token_counts.input, 200);
993    }
994
995    // --- Full aggregate integration -----------------------------------
996
997    #[test]
998    fn aggregate_happy_path_produces_active_block_and_7d_window() {
999        let tmp = TempDir::new().unwrap();
1000        let home = tmp.path();
1001        let projects = home.join(".claude").join("projects");
1002        let now = Utc::now();
1003        let recent_ts = now
1004            .duration_trunc(Duration::minutes(1))
1005            .unwrap()
1006            .to_rfc3339();
1007        let old_ts = (now - Duration::days(3))
1008            .duration_trunc(Duration::minutes(1))
1009            .unwrap()
1010            .to_rfc3339();
1011        let old_line = record(&old_ts, 500, 100, Some("old-1"));
1012        let recent_line = record(&recent_ts, 250, 50, Some("new-1"));
1013        write_jsonl(
1014            &projects,
1015            "-Users-alice-code-myrepo",
1016            "session.jsonl",
1017            &[&old_line, &recent_line],
1018        );
1019
1020        let env = env_from(None, None, Some(home));
1021        let agg = aggregate_jsonl_with(&env).expect("aggregate");
1022
1023        // 7d window picks up both entries.
1024        assert_eq!(agg.seven_day.token_counts.input, 750);
1025        // Active block is anchored to the recent entry.
1026        let block = agg.five_hour.expect("active block");
1027        assert_eq!(block.token_counts.input, 250);
1028    }
1029
1030    #[test]
1031    fn aggregate_old_only_transcript_has_no_active_block() {
1032        let tmp = TempDir::new().unwrap();
1033        let home = tmp.path();
1034        let projects = home.join(".claude").join("projects");
1035        let old_ts = (Utc::now() - Duration::days(10))
1036            .duration_trunc(Duration::minutes(1))
1037            .unwrap()
1038            .to_rfc3339();
1039        let line = record(&old_ts, 100, 50, Some("old-1"));
1040        write_jsonl(&projects, "-proj-a", "session.jsonl", &[&line]);
1041
1042        let env = env_from(None, None, Some(home));
1043        let agg = aggregate_jsonl_with(&env).expect("aggregate");
1044        assert!(agg.five_hour.is_none());
1045        // 7d window also excludes the >7d entry.
1046        assert_eq!(agg.seven_day.token_counts.input, 0);
1047    }
1048
1049    // --- TokenCounts saturating arithmetic ---------------------------
1050
1051    #[test]
1052    fn token_counts_total_saturates_on_overflow() {
1053        let counts = TokenCounts::from_parts(u64::MAX - 5, 10, 0, 0);
1054        assert_eq!(counts.total(), u64::MAX);
1055    }
1056
1057    #[test]
1058    fn token_counts_from_parts_pins_positional_argument_order() {
1059        // Regression guard: every in-crate test calls this constructor
1060        // with specific values in specific positions (e.g. the cascade
1061        // `jsonl_ok` fixture uses 1_000_000 input + 200_000 output).
1062        // If the argument order is ever reshuffled, the `total()`
1063        // assertions downstream still pass (sums are order-invariant)
1064        // so a typed-field check here is the only structural guard.
1065        let t = TokenCounts::from_parts(1, 2, 3, 4);
1066        assert_eq!(t.input(), 1);
1067        assert_eq!(t.output(), 2);
1068        assert_eq!(t.cache_creation(), 3);
1069        assert_eq!(t.cache_read(), 4);
1070        assert_eq!(t.total(), 10);
1071    }
1072
1073    // --- Error taxonomy -----------------------------------------------
1074
1075    #[test]
1076    fn jsonl_error_code_taxonomy_is_unique() {
1077        let all: [(JsonlError, &str); 4] = [
1078            (JsonlError::DirectoryMissing, "DirectoryMissing"),
1079            (JsonlError::NoEntries, "NoEntries"),
1080            (
1081                JsonlError::IoError {
1082                    path: PathBuf::from("/x"),
1083                    cause: io::Error::other("x"),
1084                },
1085                "IoError",
1086            ),
1087            (
1088                JsonlError::ParseError {
1089                    path: PathBuf::from("/x"),
1090                    line: 1,
1091                    cause: serde_json::from_str::<i32>("x").unwrap_err(),
1092                },
1093                "ParseError",
1094            ),
1095        ];
1096        let codes: std::collections::HashSet<&'static str> =
1097            all.iter().map(|(e, _)| e.code()).collect();
1098        assert_eq!(codes.len(), 4);
1099        for (err, expected) in &all {
1100            assert_eq!(err.code(), *expected);
1101        }
1102    }
1103
1104    // --- floor_to_hour edge case --------------------------------------
1105
1106    #[test]
1107    fn floor_to_hour_truncates_subhour_components() {
1108        let ts = Utc.with_ymd_and_hms(2026, 4, 20, 14, 37, 52).unwrap();
1109        let floored = floor_to_hour(ts);
1110        assert_eq!(
1111            floored,
1112            Utc.with_ymd_and_hms(2026, 4, 20, 14, 0, 0).unwrap()
1113        );
1114    }
1115
1116    // --- FiveHourBlock::end derivation --------------------------------
1117
1118    #[test]
1119    fn five_hour_block_end_derives_from_start() {
1120        let now = Utc::now();
1121        let e = UsageEntry {
1122            timestamp: now - Duration::minutes(30),
1123            message: MessageFields::default(),
1124            usage_limit_reset_time: None,
1125        };
1126        let block = compute_active_block(&[e], now).expect("active");
1127        assert_eq!(
1128            block.end(),
1129            block.start + Duration::hours(BLOCK_DURATION_HOURS)
1130        );
1131    }
1132
1133    // --- Block-boundary math (off-by-one guard) -----------------------
1134
1135    #[test]
1136    fn entries_exactly_5h_apart_stay_in_same_block() {
1137        // Spec: gap > 5h opens a new block. A gap of exactly 5h
1138        // (not strictly greater) must keep both entries in the
1139        // single rolling block. Guards against a `>=` refactor.
1140        let now = Utc::now();
1141        let e1 = UsageEntry {
1142            timestamp: now - Duration::hours(5),
1143            message: MessageFields {
1144                usage: Some(UsageCounts {
1145                    input_tokens: 100,
1146                    ..UsageCounts::default()
1147                }),
1148                ..MessageFields::default()
1149            },
1150            usage_limit_reset_time: None,
1151        };
1152        let e2 = UsageEntry {
1153            timestamp: now,
1154            message: MessageFields {
1155                usage: Some(UsageCounts {
1156                    input_tokens: 50,
1157                    ..UsageCounts::default()
1158                }),
1159                ..MessageFields::default()
1160            },
1161            usage_limit_reset_time: None,
1162        };
1163        let block = compute_active_block(&[e1, e2], now).expect("active");
1164        // Both entries rolled into one block → 150 total input.
1165        assert_eq!(block.token_counts.input, 150);
1166    }
1167
1168    #[test]
1169    fn gap_of_5h_plus_one_ns_opens_new_block() {
1170        // Strict `>` — one nanosecond past the boundary flips.
1171        let now = Utc::now();
1172        let e1 = UsageEntry {
1173            timestamp: now - Duration::hours(5) - Duration::nanoseconds(1),
1174            message: MessageFields {
1175                usage: Some(UsageCounts {
1176                    input_tokens: 999,
1177                    ..UsageCounts::default()
1178                }),
1179                ..MessageFields::default()
1180            },
1181            usage_limit_reset_time: None,
1182        };
1183        let e2 = UsageEntry {
1184            timestamp: now,
1185            message: MessageFields {
1186                usage: Some(UsageCounts {
1187                    input_tokens: 7,
1188                    ..UsageCounts::default()
1189                }),
1190                ..MessageFields::default()
1191            },
1192            usage_limit_reset_time: None,
1193        };
1194        let block = compute_active_block(&[e1, e2], now).expect("active");
1195        // Only the newer block survives; 999 was rolled into a
1196        // closed block and discarded in v0.1.
1197        assert_eq!(block.token_counts.input, 7);
1198    }
1199
1200    // --- 7-day window boundary ---------------------------------------
1201
1202    #[test]
1203    fn entry_at_exactly_7d_boundary_is_included() {
1204        let tmp = TempDir::new().unwrap();
1205        let home = tmp.path();
1206        let projects = home.join(".claude").join("projects");
1207        // Write timestamp that `build_aggregate`'s `window_start`
1208        // check (`e.timestamp >= window_start`) will include.
1209        // Use `now - 7d + 10s` so the spec's `>=` boundary is
1210        // pinned without depending on instant-perfect math.
1211        let near_boundary = (Utc::now() - Duration::days(7) + Duration::seconds(10))
1212            .duration_trunc(Duration::seconds(1))
1213            .unwrap()
1214            .to_rfc3339();
1215        let line = record(&near_boundary, 42, 0, Some("boundary"));
1216        write_jsonl(&projects, "-proj", "sess.jsonl", &[&line]);
1217        let env = env_from(None, None, Some(home));
1218        let agg = aggregate_jsonl_with(&env).expect("aggregate");
1219        assert_eq!(agg.seven_day.token_counts.input, 42);
1220    }
1221
1222    #[test]
1223    fn entry_older_than_7d_excluded_from_window() {
1224        let tmp = TempDir::new().unwrap();
1225        let home = tmp.path();
1226        let projects = home.join(".claude").join("projects");
1227        // `now - 8d` is definitively outside the window.
1228        let old = (Utc::now() - Duration::days(8))
1229            .duration_trunc(Duration::seconds(1))
1230            .unwrap()
1231            .to_rfc3339();
1232        let line = record(&old, 1000, 0, Some("way-old"));
1233        write_jsonl(&projects, "-proj", "sess.jsonl", &[&line]);
1234        let env = env_from(None, None, Some(home));
1235        let agg = aggregate_jsonl_with(&env).expect("aggregate");
1236        assert_eq!(agg.seven_day.token_counts.input, 0);
1237    }
1238
1239    // --- Cross-root dedup --------------------------------------------
1240
1241    #[test]
1242    fn aggregate_dedupes_across_cascade_roots() {
1243        // Spec: the same message.id written to two different roots
1244        // (one after a user migrates ~/.claude → CLAUDE_CONFIG_DIR,
1245        // say) collapses to a single entry. Regression guard for
1246        // a per-root HashMap without a global merge step.
1247        let tmp = TempDir::new().unwrap();
1248        let env_dir = tmp.path().join("env-dir");
1249        let home = tmp.path().join("home");
1250        let env_projects = env_dir.join("projects");
1251        let legacy_projects = home.join(".claude").join("projects");
1252        let ts = Utc::now()
1253            .duration_trunc(Duration::minutes(1))
1254            .unwrap()
1255            .to_rfc3339();
1256        let line = record(&ts, 100, 50, Some("shared-msg"));
1257        write_jsonl(&env_projects, "-proj", "sess-env.jsonl", &[&line]);
1258        write_jsonl(&legacy_projects, "-proj", "sess-legacy.jsonl", &[&line]);
1259        let env = env_from(Some(&env_dir), None, Some(&home));
1260        let agg = aggregate_jsonl_with(&env).expect("aggregate");
1261        // If dedup is scoped per-root, this would show 200 tokens.
1262        assert_eq!(agg.seven_day.token_counts.input, 100);
1263    }
1264
1265    // --- Offset monotonicity invariant -------------------------------
1266
1267    #[test]
1268    fn tailer_offset_monotonically_advances_on_repeat_reads() {
1269        // Invariant from spec §Testing strategy: new_offset >=
1270        // old_offset and new_offset <= file_size. Exercise several
1271        // append cycles.
1272        let tmp = TempDir::new().unwrap();
1273        let path = tmp.path().join("t.jsonl");
1274        fs::write(
1275            &path,
1276            record("2026-04-20T00:00:00Z", 1, 1, Some("a")) + "\n",
1277        )
1278        .unwrap();
1279        let mut tailer = JsonlTailer::new(path.clone());
1280        tailer.read_new().expect("first");
1281        let after_first = tailer.last_offset;
1282        assert_eq!(after_first, tailer.last_size);
1283
1284        let existing = fs::read_to_string(&path).unwrap();
1285        let new_line = record("2026-04-20T00:01:00Z", 2, 2, Some("b"));
1286        fs::write(&path, format!("{existing}{new_line}\n")).unwrap();
1287        tailer.read_new().expect("second");
1288        let after_second = tailer.last_offset;
1289        assert!(after_second > after_first, "offset must advance");
1290        assert_eq!(after_second, tailer.last_size);
1291
1292        tailer.read_new().expect("third");
1293        assert_eq!(tailer.last_offset, after_second);
1294    }
1295
1296    // --- Models dedup within a block ---------------------------------
1297
1298    #[test]
1299    fn block_models_dedupes_within_block() {
1300        let now = Utc::now();
1301        fn mk(ts: DateTime<Utc>, model: &str) -> UsageEntry {
1302            UsageEntry {
1303                timestamp: ts,
1304                message: MessageFields {
1305                    model: Some(model.to_string()),
1306                    ..MessageFields::default()
1307                },
1308                usage_limit_reset_time: None,
1309            }
1310        }
1311        let entries = [
1312            mk(now - Duration::minutes(30), "claude-opus-4-7"),
1313            mk(now - Duration::minutes(20), "claude-sonnet-4-6"),
1314            mk(now - Duration::minutes(10), "claude-opus-4-7"),
1315        ];
1316        let block = compute_active_block(&entries, now).expect("active");
1317        assert_eq!(block.models.len(), 2);
1318    }
1319
1320    // --- usage_limit_reset merge -------------------------------------
1321
1322    // --- XDG-without-HOME cascade ------------------------------------
1323
1324    #[test]
1325    fn project_roots_includes_xdg_when_home_unset() {
1326        // Service/CI environments can set $XDG_CONFIG_HOME without
1327        // $HOME; the XDG projects root must not be gated on home.
1328        // Matches the credentials::file_cascade_candidates pattern.
1329        let tmp = TempDir::new().unwrap();
1330        let xdg = tmp.path().join("xdg");
1331        let env = env_from(None, Some(&xdg), None);
1332        let roots = project_roots(&env);
1333        assert!(
1334            roots
1335                .iter()
1336                .any(|r| r == &xdg.join("claude").join("projects")),
1337            "XDG candidate must be present with HOME unset + XDG set",
1338        );
1339        assert!(
1340            !roots.iter().any(|r| r.ends_with(".claude/projects")),
1341            "Legacy ~/.claude requires HOME",
1342        );
1343    }
1344
1345    #[test]
1346    fn aggregate_reads_xdg_projects_when_home_unset() {
1347        let tmp = TempDir::new().unwrap();
1348        let xdg = tmp.path().join("xdg");
1349        let ts = Utc::now()
1350            .duration_trunc(Duration::minutes(1))
1351            .unwrap()
1352            .to_rfc3339();
1353        let line = record(&ts, 77, 33, Some("xdg-only"));
1354        write_jsonl(
1355            &xdg.join("claude").join("projects"),
1356            "-proj",
1357            "sess.jsonl",
1358            &[&line],
1359        );
1360        let env = env_from(None, Some(&xdg), None);
1361        let agg = aggregate_jsonl_with(&env).expect("aggregate");
1362        assert_eq!(agg.seven_day.token_counts.input, 77);
1363    }
1364
1365    // --- Future-dated entry excluded from 7d window ------------------
1366
1367    #[test]
1368    fn seven_day_window_excludes_future_timestamps() {
1369        // Clock skew (backup restore, NTP glitch, VM resume) can
1370        // stamp entries in the future. They must NOT inflate the
1371        // 7d totals — spec says window is `[now - 7d, now]`.
1372        let tmp = TempDir::new().unwrap();
1373        let home = tmp.path();
1374        let projects = home.join(".claude").join("projects");
1375        let future = (Utc::now() + Duration::hours(2))
1376            .duration_trunc(Duration::seconds(1))
1377            .unwrap()
1378            .to_rfc3339();
1379        let future_line = record(&future, 500, 0, Some("future-1"));
1380        let past = Utc::now()
1381            .duration_trunc(Duration::seconds(1))
1382            .unwrap()
1383            .to_rfc3339();
1384        let past_line = record(&past, 10, 0, Some("past-1"));
1385        write_jsonl(
1386            &projects,
1387            "-proj",
1388            "sess.jsonl",
1389            &[&future_line, &past_line],
1390        );
1391        let env = env_from(None, None, Some(home));
1392        let agg = aggregate_jsonl_with(&env).expect("aggregate");
1393        // Only the present entry contributes; future one excluded.
1394        // Assert across categories so a half-applied fix that only
1395        // guards `input_tokens` (leaving `output_tokens` to bleed
1396        // through) trips this test.
1397        assert_eq!(agg.seven_day.token_counts.input, 10);
1398        assert_eq!(agg.seven_day.token_counts.output, 0);
1399        assert_eq!(agg.seven_day.token_counts.cache_creation, 0);
1400        assert_eq!(agg.seven_day.token_counts.cache_read, 0);
1401    }
1402
1403    #[test]
1404    fn claude_config_dir_only_no_home_no_xdg() {
1405        // Pure env-dir cascade: CLAUDE_CONFIG_DIR set, both HOME and
1406        // XDG_CONFIG_HOME unset. Must produce exactly one candidate.
1407        let tmp = TempDir::new().unwrap();
1408        let env = env_from(Some(tmp.path()), None, None);
1409        let roots = project_roots(&env);
1410        assert_eq!(roots.len(), 1);
1411        assert_eq!(roots[0], tmp.path().join("projects"));
1412    }
1413
1414    #[test]
1415    fn future_timestamp_inside_5h_block_is_counted_as_mild_skew() {
1416        // Pins the intended behavior for mild clock skew: an entry
1417        // stamped in the near future still anchors the active block
1418        // (the `now - actual_last_activity > 5h` liveness check is
1419        // false for negative durations). Users with slightly-fast
1420        // clocks shouldn't have their current session's tokens
1421        // disappear. Pathological far-future skew is a separate
1422        // hardening question, out of scope here.
1423        let now = Utc::now();
1424        let future_entry = UsageEntry {
1425            timestamp: now + Duration::minutes(10),
1426            message: MessageFields {
1427                usage: Some(UsageCounts {
1428                    input_tokens: 42,
1429                    ..UsageCounts::default()
1430                }),
1431                ..MessageFields::default()
1432            },
1433            usage_limit_reset_time: None,
1434        };
1435        let block = compute_active_block(&[future_entry], now).expect("active");
1436        assert_eq!(block.token_counts.input, 42);
1437    }
1438
1439    #[test]
1440    fn usage_limit_reset_keeps_some_over_later_none() {
1441        // `extend_block` only overwrites when the incoming entry
1442        // carries `Some(_)`. A later entry with `None` must not
1443        // clear the earlier reset hint.
1444        let now = Utc::now();
1445        let reset = now + Duration::hours(1);
1446        let e1 = UsageEntry {
1447            timestamp: now - Duration::minutes(30),
1448            message: MessageFields::default(),
1449            usage_limit_reset_time: Some(reset),
1450        };
1451        let e2 = UsageEntry {
1452            timestamp: now - Duration::minutes(10),
1453            message: MessageFields::default(),
1454            usage_limit_reset_time: None,
1455        };
1456        let block = compute_active_block(&[e1, e2], now).expect("active");
1457        assert_eq!(block.usage_limit_reset, Some(reset));
1458    }
1459}