Skip to main content

ccstat_provider_claude/
data_loader.rs

1//! Data loader module for discovering and parsing JSONL files
2//!
3//! This module handles platform-specific discovery of Claude usage data files
4//! and provides streaming access to parse large JSONL files efficiently.
5//!
6//! # Platform Support
7//!
8//! The data loader automatically discovers Claude data directories on:
9//! - macOS: `~/Library/Application Support/Claude`
10//! - Linux: `~/.config/Claude` or `$XDG_CONFIG_HOME/Claude`
11//! - Windows: `%APPDATA%\Claude`
12//!
13//! You can override the search path using the `CLAUDE_DATA_PATH` environment variable.
14//!
15//! # Examples
16//!
17//! ```no_run
18//! use ccstat_provider_claude::data_loader::DataLoader;
19//! use futures::StreamExt;
20//!
21//! # async fn example() -> ccstat_core::Result<()> {
22//! let data_loader = DataLoader::new().await?;
23//!
24//! // Stream usage entries
25//! let entries = data_loader.load_usage_entries_parallel();
26//! tokio::pin!(entries);
27//! while let Some(result) = entries.next().await {
28//!     let entry = result?;
29//!     println!("Session: {}, Tokens: {}", entry.session_id, entry.tokens.total());
30//! }
31//! # Ok(())
32//! # }
33//! ```
34
35use ccstat_core::error::{CcstatError, Result};
36use ccstat_core::memory_pool::MemoryPool;
37use ccstat_core::string_pool::{InternedModel, InternedSession};
38use ccstat_core::types::{ModelName, RawJsonlEntry, SessionId, UsageEntry};
39use futures::StreamExt;
40use futures::stream::Stream;
41use indicatif::{ProgressBar, ProgressStyle};
42use rayon::prelude::*;
43use std::collections::HashSet;
44use std::path::PathBuf;
45use std::sync::{Arc, Mutex};
46use tokio::io::{AsyncBufReadExt, BufReader};
47use tokio::sync::mpsc;
48use tracing::{debug, info, trace};
49
50/// Data loader for discovering and streaming JSONL files
51///
52/// The DataLoader is responsible for finding Claude usage data files on the
53/// system and providing efficient streaming access to parse them.
54pub struct DataLoader {
55    /// Discovered Claude data paths
56    claude_paths: Vec<PathBuf>,
57    /// Whether to show progress bars
58    show_progress: bool,
59    /// Whether to use string interning for memory optimization
60    use_interning: bool,
61    /// Whether to use arena allocation for parsing
62    use_arena: bool,
63}
64
65impl DataLoader {
66    /// Create a new DataLoader by discovering Claude paths
67    ///
68    /// This method automatically searches for Claude data directories
69    /// in platform-specific locations.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if no Claude data directories are found
74    pub async fn new() -> Result<Self> {
75        let paths = Self::discover_claude_paths().await?;
76        if paths.is_empty() {
77            return Err(CcstatError::NoClaudeDirectory);
78        }
79
80        debug!("Discovered {} Claude data directories", paths.len());
81        Ok(Self {
82            claude_paths: paths,
83            show_progress: false,
84            use_interning: false,
85            use_arena: false,
86        })
87    }
88
89    /// Discover Claude data directories on the system
90    async fn discover_claude_paths() -> Result<Vec<PathBuf>> {
91        let mut paths = Vec::new();
92
93        // Check ~/.claude first (common location for Claude Code)
94        if let Some(home) = dirs::home_dir() {
95            let claude_path = home.join(".claude");
96            if claude_path.exists() {
97                paths.push(claude_path);
98            }
99        }
100
101        // Platform-specific path discovery
102        #[cfg(target_os = "macos")]
103        {
104            // macOS paths
105            if let Some(home) = dirs::home_dir() {
106                let claude_path = home.join("Library/Application Support/Claude");
107                if claude_path.exists() {
108                    paths.push(claude_path);
109                }
110            }
111        }
112
113        #[cfg(target_os = "linux")]
114        {
115            // Linux paths
116            if let Some(config_dir) = dirs::config_dir() {
117                let claude_path = config_dir.join("Claude");
118                if claude_path.exists() {
119                    paths.push(claude_path);
120                }
121            }
122
123            if let Some(home) = dirs::home_dir() {
124                let claude_path = home.join(".config/Claude");
125                if claude_path.exists() {
126                    paths.push(claude_path);
127                }
128            }
129        }
130
131        #[cfg(target_os = "windows")]
132        {
133            // Windows paths
134            if let Some(app_data) = dirs::data_dir() {
135                let claude_path = app_data.join("Claude");
136                if claude_path.exists() {
137                    paths.push(claude_path);
138                }
139            }
140        }
141
142        // Check environment variable override
143        if let Ok(custom_path) = std::env::var("CLAUDE_DATA_PATH") {
144            let path = PathBuf::from(custom_path);
145            if path.exists() {
146                paths.push(path);
147            }
148        }
149
150        Ok(paths)
151    }
152
153    /// Helper to find JSONL files with optional filtering
154    ///
155    /// This internal method handles the common logic for finding JSONL files,
156    /// with an optional filter function for additional criteria like modification time.
157    ///
158    /// # Arguments
159    ///
160    /// * `filter` - Optional filter closure to apply additional criteria to files
161    ///
162    /// # Returns
163    ///
164    /// A vector of paths to JSONL files that match the filter criteria
165    async fn find_jsonl_files_with_filter<F>(&self, filter: F) -> Result<Vec<PathBuf>>
166    where
167        F: Fn(&std::path::Path) -> bool + Send + Sync + 'static + Clone,
168    {
169        let mut jsonl_files = Vec::new();
170
171        for base_path in &self.claude_paths {
172            let path_clone = base_path.clone();
173            let filter_clone = filter.clone();
174            let files = tokio::task::spawn_blocking(move || {
175                use walkdir::WalkDir;
176                let mut files = Vec::new();
177
178                for entry in WalkDir::new(path_clone).into_iter().filter_map(|e| e.ok()) {
179                    let path = entry.path();
180                    if path.extension().and_then(|s| s.to_str()) == Some("jsonl")
181                        && filter_clone(path)
182                    {
183                        files.push(path.to_path_buf());
184                    }
185                }
186                files
187            })
188            .await
189            .map_err(|e| CcstatError::Io(std::io::Error::other(e.to_string())))?;
190
191            jsonl_files.extend(files);
192        }
193
194        Ok(jsonl_files)
195    }
196
197    /// Find all JSONL files in the discovered directories
198    ///
199    /// Recursively searches for `.jsonl` files in all discovered Claude directories.
200    ///
201    /// # Returns
202    ///
203    /// A vector of paths to JSONL files found
204    pub async fn find_jsonl_files(&self) -> Result<Vec<PathBuf>> {
205        let files = self.find_jsonl_files_with_filter(|_| true).await?;
206        info!("Found {} JSONL files to process", files.len());
207        Ok(files)
208    }
209
210    /// Find JSONL files modified since a given date
211    ///
212    /// This is useful for performance optimization when you only need recent data,
213    /// such as for statusline generation which only needs today's data.
214    ///
215    /// # Arguments
216    ///
217    /// * `since` - Only include files modified after this time
218    ///
219    /// # Returns
220    ///
221    /// A vector of paths to JSONL files modified since the given date
222    pub async fn find_recent_jsonl_files(
223        &self,
224        since: chrono::DateTime<chrono::Utc>,
225    ) -> Result<Vec<PathBuf>> {
226        let since_std = std::time::SystemTime::from(since);
227
228        let files = self
229            .find_jsonl_files_with_filter(move |path| {
230                // Check modification time
231                if let Ok(metadata) = path.metadata()
232                    && let Ok(modified) = metadata.modified()
233                {
234                    modified >= since_std
235                } else {
236                    false
237                }
238            })
239            .await?;
240
241        info!(
242            "Found {} recent JSONL files to process (since {})",
243            files.len(),
244            since
245        );
246        Ok(files)
247    }
248
249    /// Enable or disable progress bars
250    pub fn with_progress(mut self, show_progress: bool) -> Self {
251        self.show_progress = show_progress;
252        self
253    }
254
255    /// Enable string interning for memory optimization
256    pub fn with_interning(mut self, use_interning: bool) -> Self {
257        self.use_interning = use_interning;
258        self
259    }
260
261    /// Enable arena allocation for parsing
262    pub fn with_arena(mut self, use_arena: bool) -> Self {
263        self.use_arena = use_arena;
264        self
265    }
266
267    /// Load usage entries in parallel for better performance
268    ///
269    /// This method uses Rayon to process multiple JSONL files concurrently,
270    /// providing significant performance improvements for large datasets.
271    ///
272    /// # Returns
273    ///
274    /// An async stream of `Result<UsageEntry>` items processed in parallel
275    pub fn load_usage_entries_parallel(&self) -> impl Stream<Item = Result<UsageEntry>> + '_ {
276        async_stream::stream! {
277            let files = match self.find_jsonl_files().await {
278                Ok(files) => files,
279                Err(e) => {
280                    yield Err(e);
281                    return;
282                }
283            };
284
285            let num_files = files.len();
286            if num_files == 0 {
287                return;
288            }
289
290            // Create progress bar if enabled
291            let progress = if self.show_progress {
292                let pb = ProgressBar::new(num_files as u64);
293                pb.set_style(
294                    ProgressStyle::default_bar()
295                        .template("{msg} [{bar:40.cyan/blue}] {pos}/{len} files")
296                        .unwrap()
297                        .progress_chars("#>-"),
298                );
299                pb.set_message("Loading usage data (parallel)");
300                Some(Arc::new(pb))
301            } else {
302                None
303            };
304
305            // Use channel to collect results from parallel processing
306            let (tx, mut rx) = mpsc::channel::<Result<Vec<UsageEntry>>>(num_files);
307
308            // Shared deduplication set
309            let seen_entries = Arc::new(Mutex::new(HashSet::new()));
310
311            // Process files in parallel using Rayon
312            let files_clone = files.clone();
313            let progress_clone = progress.clone();
314            let seen_entries_clone = seen_entries.clone();
315            let use_interning = self.use_interning;
316            let use_arena = self.use_arena;
317
318            tokio::task::spawn_blocking(move || {
319                files_clone.par_iter().for_each(|file_path| {
320                    let tx = tx.clone();
321                    if let Some(ref pb) = progress_clone {
322                        pb.inc(1);
323                    }
324
325                    // Read file synchronously in the thread pool
326                    let result = std::fs::read_to_string(file_path)
327                        .map_err(CcstatError::Io)
328                        .map(|content| {
329                            let mut entries = Vec::new();
330                            let mut local_duplicates = 0;
331
332                            // Common logic for processing a line
333                            let mut process_line = |line: &str| {
334                                match serde_json::from_str::<RawJsonlEntry>(line) {
335                                    Ok(raw_entry) => {
336                                        // Check for deduplication key
337                                        if let Some(dedup_key) = UsageEntry::dedup_key(&raw_entry) {
338                                            let mut seen = seen_entries_clone.lock().unwrap();
339                                            if seen.contains(&dedup_key) {
340                                                local_duplicates += 1;
341                                                trace!("Skipping duplicate entry with key: {}", dedup_key);
342                                                return;
343                                            }
344                                            seen.insert(dedup_key);
345                                        }
346
347                                        if let Some(mut entry) = UsageEntry::from_raw(raw_entry) {
348                                            if use_interning {
349                                                // Apply string interning
350                                                let interned_model = InternedModel::new(entry.model.as_str());
351                                                entry.model = ModelName::new(interned_model.as_str());
352                                                let interned_session = InternedSession::new(entry.session_id.as_str());
353                                                entry.session_id = SessionId::new(interned_session.as_str());
354                                            }
355                                            entries.push(entry);
356                                        }
357                                    }
358                                    Err(e) => {
359                                        trace!("Skipping non-usage entry in {}: {}", file_path.display(), e);
360                                    }
361                                }
362                            };
363
364                            if use_arena {
365                                // TODO: Consider processing in chunks to reduce memory usage for large files
366                                // Currently collecting all lines into memory for arena allocation
367                                let lines: Vec<String> = content.lines()
368                                    .filter(|line| !line.trim().is_empty())
369                                    .map(|s| s.to_string())
370                                    .collect();
371
372                                let pool = MemoryPool::new();
373                                for line in &lines {
374                                    let arena_line = pool.alloc_string(line);
375                                    process_line(arena_line);
376                                }
377                            } else {
378                                // Normal processing without arena
379                                for line in content.lines() {
380                                    if line.trim().is_empty() {
381                                        continue;
382                                    }
383                                    process_line(line);
384                                }
385                            }
386
387                            if local_duplicates > 0 {
388                                debug!("Skipped {} duplicate entries in {}", local_duplicates, file_path.display());
389                            }
390
391                            entries
392                        });
393
394                    let _ = tx.blocking_send(result);
395                });
396            });
397
398            // Yield all results
399            while let Some(result) = rx.recv().await {
400                match result {
401                    Ok(entries) => {
402                        for entry in entries {
403                            yield Ok(entry);
404                        }
405                    }
406                    Err(e) => yield Err(e),
407                }
408            }
409
410            // Log deduplication stats
411            let final_seen_count = seen_entries.lock().unwrap().len();
412            if final_seen_count > 0 {
413                info!("Processed {} unique entries after deduplication", final_seen_count);
414            }
415
416            if let Some(pb) = progress {
417                pb.finish_with_message("Loading complete (parallel)");
418            }
419        }
420    }
421
422    /// Helper method to process JSONL files as a stream
423    ///
424    /// This internal method handles the common logic for loading and processing
425    /// JSONL files, used by both load_usage_entries and load_recent_usage_entries.
426    ///
427    /// # Arguments
428    ///
429    /// * `files` - Vector of paths to JSONL files to process
430    /// * `progress_message` - Message to display in the progress bar
431    ///
432    /// # Returns
433    ///
434    /// An async stream of `Result<UsageEntry>` items
435    fn process_jsonl_files(
436        &self,
437        files: Vec<PathBuf>,
438        progress_message: &str,
439    ) -> impl Stream<Item = Result<UsageEntry>> + '_ {
440        let progress_msg = progress_message.to_string();
441        async_stream::stream! {
442            // Create progress bar if enabled
443            let progress = if self.show_progress {
444                let pb = ProgressBar::new(files.len() as u64);
445                pb.set_style(
446                    ProgressStyle::default_bar()
447                        .template("{msg} [{bar:40.cyan/blue}] {pos}/{len} files")
448                        .unwrap()
449                        .progress_chars("#>-"),
450                );
451                pb.set_message(progress_msg.clone());
452                Some(pb)
453            } else {
454                None
455            };
456
457            // Deduplication set
458            let mut seen_entries = HashSet::new();
459            let mut total_duplicates = 0;
460
461            for (idx, file_path) in files.into_iter().enumerate() {
462                if let Some(ref pb) = progress {
463                    pb.set_position(idx as u64);
464                }
465
466                let entries = self.parse_jsonl_stream(file_path, progress.as_ref(), &mut seen_entries);
467                tokio::pin!(entries);
468                while let Some(result) = entries.next().await {
469                    match &result {
470                        Ok(_) => yield result,
471                        Err(e) => {
472                            if let CcstatError::DuplicateEntry = e {
473                                total_duplicates += 1;
474                            } else {
475                                yield result;
476                            }
477                        }
478                    }
479                }
480            }
481
482            if total_duplicates > 0 {
483                info!("Skipped {} duplicate entries", total_duplicates);
484            }
485
486            if let Some(pb) = progress {
487                pb.finish_with_message("Loading complete");
488            }
489        }
490    }
491
492    /// Load recent usage entries as an async stream
493    ///
494    /// This method provides a stream of usage entries parsed from JSONL files
495    /// modified since the given date. It's optimized for scenarios where you
496    /// only need recent data, such as statusline generation.
497    ///
498    /// # Arguments
499    ///
500    /// * `since` - Only load entries from files modified after this time
501    ///
502    /// # Returns
503    ///
504    /// An async stream of `Result<UsageEntry>` items from recent files
505    pub fn load_recent_usage_entries(
506        &self,
507        since: chrono::DateTime<chrono::Utc>,
508    ) -> impl Stream<Item = Result<UsageEntry>> + '_ {
509        async_stream::stream! {
510            let files = match self.find_recent_jsonl_files(since).await {
511                Ok(files) => files,
512                Err(e) => {
513                    yield Err(e);
514                    return;
515                }
516            };
517
518            if files.is_empty() {
519                debug!("No recent files found since {}", since);
520                return;
521            }
522
523            let entries = self.process_jsonl_files(files, "Loading recent data");
524            tokio::pin!(entries);
525            while let Some(result) = entries.next().await {
526                yield result;
527            }
528        }
529    }
530
531    /// Parse a single JSONL file as a stream
532    fn parse_jsonl_stream<'a>(
533        &'a self,
534        path: PathBuf,
535        _progress: Option<&'a ProgressBar>,
536        seen_entries: &'a mut HashSet<String>,
537    ) -> impl Stream<Item = Result<UsageEntry>> + 'a {
538        async_stream::stream! {
539            // Get file size for progress tracking
540            let _file_size = match tokio::fs::metadata(&path).await {
541                Ok(metadata) => metadata.len(),
542                Err(_) => 0,
543            };
544
545            let file = match tokio::fs::File::open(&path).await {
546                Ok(f) => f,
547                Err(e) => {
548                    yield Err(e.into());
549                    return;
550                }
551            };
552
553            let reader = BufReader::new(file);
554            let mut lines = reader.lines();
555            let mut line_number = 0;
556            let mut file_duplicates = 0;
557
558            while let Ok(Some(line)) = lines.next_line().await {
559                line_number += 1;
560
561                if line.trim().is_empty() {
562                    continue;
563                }
564
565                match serde_json::from_str::<RawJsonlEntry>(&line) {
566                    Ok(raw_entry) => {
567                        // Check for deduplication key
568                        if let Some(dedup_key) = UsageEntry::dedup_key(&raw_entry) {
569                            if seen_entries.contains(&dedup_key) {
570                                file_duplicates += 1;
571                                trace!("Skipping duplicate entry with key: {}", dedup_key);
572                                yield Err(CcstatError::DuplicateEntry);
573                                continue;
574                            }
575                            seen_entries.insert(dedup_key);
576                        }
577
578                        if let Some(entry) = self.convert_entry(raw_entry) {
579                            yield Ok(entry);
580                        }
581                        // Skip non-assistant entries silently
582                    },
583                    Err(e) => {
584                        trace!(
585                            "Skipping non-usage entry at line {} in {}: {}",
586                            line_number,
587                            path.display(),
588                            e
589                        );
590                        // Continue processing other lines
591                    }
592                }
593            }
594
595            if file_duplicates > 0 {
596                debug!("Skipped {} duplicate entries in {}", file_duplicates, path.display());
597            }
598        }
599    }
600
601    /// Get the discovered Claude paths
602    ///
603    /// Returns a slice of all discovered Claude data directories.
604    /// Useful for debugging or displaying where data is being loaded from.
605    pub fn paths(&self) -> &[PathBuf] {
606        &self.claude_paths
607    }
608
609    /// Convert raw entry to UsageEntry with optional string interning
610    fn convert_entry(&self, raw: RawJsonlEntry) -> Option<UsageEntry> {
611        UsageEntry::from_raw(raw).map(|mut entry| {
612            if self.use_interning {
613                // Intern the model name
614                let interned_model = InternedModel::new(entry.model.as_str());
615                entry.model = ModelName::new(interned_model.as_str());
616
617                // Intern the session ID
618                let interned_session = InternedSession::new(entry.session_id.as_str());
619                entry.session_id = SessionId::new(interned_session.as_str());
620            }
621            entry
622        })
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629    use crate::test_utils::{ENV_MUTEX, EnvVarGuard};
630    use tempfile::TempDir;
631    use tokio::io::AsyncWriteExt;
632
633    #[tokio::test]
634    async fn test_jsonl_parsing() {
635        let temp_dir = TempDir::new().unwrap();
636        let jsonl_path = temp_dir.path().join("test.jsonl");
637
638        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
639        file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50,"cache_creation_input_tokens":10,"cache_read_input_tokens":5}},"cwd":"/home/user/project-a"}"#).await.unwrap();
640        file.write_all(b"\n").await.unwrap();
641        file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T01:00:00Z","type":"assistant","message":{"model":"claude-3-sonnet","usage":{"input_tokens":200,"output_tokens":100,"cache_creation_input_tokens":20,"cache_read_input_tokens":10}}}"#).await.unwrap();
642
643        let loader = DataLoader {
644            claude_paths: vec![],
645            show_progress: false,
646            use_interning: false,
647            use_arena: false,
648        };
649        let mut seen = HashSet::new();
650        let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
651        tokio::pin!(stream);
652
653        let entry1 = stream.next().await.unwrap().unwrap();
654        assert_eq!(entry1.session_id.as_str(), "test1");
655        assert_eq!(entry1.tokens.input_tokens, 100);
656        assert_eq!(entry1.project, Some("project-a".to_string()));
657
658        let entry2 = stream.next().await.unwrap().unwrap();
659        assert_eq!(entry2.session_id.as_str(), "test2");
660        assert_eq!(entry2.tokens.input_tokens, 200);
661        assert_eq!(entry2.project, None);
662    }
663
664    #[tokio::test]
665    async fn test_parallel_loading() {
666        let temp_dir = TempDir::new().unwrap();
667
668        // Create multiple JSONL files
669        for i in 0..3 {
670            let content = format!(
671                r#"{{"sessionId":"test{i}","timestamp":"2024-01-01T0{i}:00:00Z","type":"assistant","message":{{"model":"claude-3-opus","usage":{{"input_tokens":100,"output_tokens":50,"cache_creation_input_tokens":10,"cache_read_input_tokens":5}}}},"cost_usd":0.1}}"#
672            );
673            let file_path = temp_dir.path().join(format!("test{i}.jsonl"));
674            let mut file = tokio::fs::File::create(&file_path).await.unwrap();
675            file.write_all(content.as_bytes()).await.unwrap();
676        }
677
678        let loader = DataLoader {
679            claude_paths: vec![temp_dir.path().to_path_buf()],
680            show_progress: false,
681            use_interning: false,
682            use_arena: false,
683        };
684
685        // Test parallel loading
686        let entries: Vec<_> = loader
687            .load_usage_entries_parallel()
688            .collect::<Vec<_>>()
689            .await
690            .into_iter()
691            .collect::<Result<Vec<_>>>()
692            .unwrap();
693
694        assert_eq!(entries.len(), 3);
695
696        // Verify all entries are loaded (order may vary due to parallel processing)
697        let session_ids: Vec<_> = entries.iter().map(|e| e.session_id.as_str()).collect();
698        assert!(session_ids.contains(&"test0"));
699        assert!(session_ids.contains(&"test1"));
700        assert!(session_ids.contains(&"test2"));
701    }
702
703    #[tokio::test]
704    async fn test_discover_claude_paths_with_env_override() {
705        let _lock = ENV_MUTEX.lock().await;
706
707        let temp_dir = TempDir::new().unwrap();
708        let custom_path = temp_dir.path().to_path_buf();
709
710        // Use RAII guard for safe environment variable manipulation
711        let mut env_guard = EnvVarGuard::new();
712        env_guard.set("CLAUDE_DATA_PATH", custom_path.to_str().unwrap());
713
714        let paths = DataLoader::discover_claude_paths().await.unwrap();
715        assert!(paths.contains(&custom_path));
716
717        // Environment variables will be automatically restored when env_guard drops
718    }
719
720    #[tokio::test]
721    async fn test_find_jsonl_files() {
722        let temp_dir = TempDir::new().unwrap();
723
724        // Create a subdirectory with JSONL files
725        let sub_dir = temp_dir.path().join("subdir");
726        tokio::fs::create_dir(&sub_dir).await.unwrap();
727
728        // Create JSONL files at different levels
729        tokio::fs::write(temp_dir.path().join("test1.jsonl"), "")
730            .await
731            .unwrap();
732        tokio::fs::write(sub_dir.join("test2.jsonl"), "")
733            .await
734            .unwrap();
735        tokio::fs::write(temp_dir.path().join("not_jsonl.txt"), "")
736            .await
737            .unwrap();
738
739        let loader = DataLoader {
740            claude_paths: vec![temp_dir.path().to_path_buf()],
741            show_progress: false,
742            use_interning: false,
743            use_arena: false,
744        };
745
746        let files = loader.find_jsonl_files().await.unwrap();
747        assert_eq!(files.len(), 2);
748
749        // Check that only JSONL files are found
750        for file in &files {
751            assert_eq!(file.extension().and_then(|s| s.to_str()), Some("jsonl"));
752        }
753    }
754
755    #[tokio::test]
756    async fn test_find_recent_jsonl_files() {
757        let temp_dir = TempDir::new().unwrap();
758
759        // Create old and new JSONL files
760        let old_file = temp_dir.path().join("old.jsonl");
761        let new_file = temp_dir.path().join("new.jsonl");
762
763        tokio::fs::write(&old_file, "").await.unwrap();
764        tokio::fs::write(&new_file, "").await.unwrap();
765
766        // Set the old file's modification time to 2 days ago
767        let two_days_ago = chrono::Utc::now() - chrono::Duration::days(2);
768        let old_time =
769            filetime::FileTime::from_system_time(std::time::SystemTime::from(two_days_ago));
770        filetime::set_file_mtime(&old_file, old_time).unwrap();
771
772        // For test purposes, we'll use the current time minus 1 hour as the filter
773        let one_hour_ago = chrono::Utc::now() - chrono::Duration::hours(1);
774
775        let loader = DataLoader {
776            claude_paths: vec![temp_dir.path().to_path_buf()],
777            show_progress: false,
778            use_interning: false,
779            use_arena: false,
780        };
781
782        // This should find the new file but not the old one
783        let files = loader.find_recent_jsonl_files(one_hour_ago).await.unwrap();
784        assert_eq!(files.len(), 1); // Only the new file should be found
785    }
786
787    #[tokio::test]
788    async fn test_deduplication() {
789        let temp_dir = TempDir::new().unwrap();
790        let jsonl_path = temp_dir.path().join("test.jsonl");
791
792        // Create a file with duplicate entries
793        let duplicate_content = r#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"id":"msg_123","model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}},"requestId":"req_456"}"#;
794
795        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
796        file.write_all(duplicate_content.as_bytes()).await.unwrap();
797        file.write_all(b"\n").await.unwrap();
798        file.write_all(duplicate_content.as_bytes()).await.unwrap(); // Same entry again
799        file.write_all(b"\n").await.unwrap();
800        file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T01:00:00Z","type":"assistant","message":{"id":"msg_789","model":"claude-3-opus","usage":{"input_tokens":200,"output_tokens":100}},"requestId":"req_012"}"#).await.unwrap();
801
802        // Run the test in two parts: first collect entries, then check deduplication separately
803        let loader = DataLoader {
804            claude_paths: vec![],
805            show_progress: false,
806            use_interning: false,
807            use_arena: false,
808        };
809        let mut seen = HashSet::new();
810        let stream = loader.parse_jsonl_stream(jsonl_path.clone(), None, &mut seen);
811        tokio::pin!(stream);
812
813        let mut entries = Vec::new();
814        let mut error_count = 0;
815        while let Some(result) = stream.next().await {
816            match result {
817                Ok(entry) => entries.push(entry),
818                Err(CcstatError::DuplicateEntry) => error_count += 1,
819                Err(e) => panic!("Unexpected error in stream: {:?}", e),
820            }
821        }
822
823        // Should have only 2 unique entries (test1 appears twice, so 1 duplicate error)
824        assert_eq!(entries.len(), 2);
825        assert_eq!(error_count, 1); // One duplicate was found
826
827        // Verify the deduplication worked by checking unique session IDs
828        let session_ids: HashSet<_> = entries.iter().map(|e| e.session_id.as_str()).collect();
829        assert_eq!(session_ids.len(), 2);
830    }
831
832    #[tokio::test]
833    async fn test_empty_file_handling() {
834        let temp_dir = TempDir::new().unwrap();
835        let jsonl_path = temp_dir.path().join("empty.jsonl");
836
837        // Create an empty file
838        tokio::fs::write(&jsonl_path, "").await.unwrap();
839
840        let loader = DataLoader {
841            claude_paths: vec![],
842            show_progress: false,
843            use_interning: false,
844            use_arena: false,
845        };
846        let mut seen = HashSet::new();
847        let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
848        tokio::pin!(stream);
849
850        let mut count = 0;
851        while stream.next().await.is_some() {
852            count += 1;
853        }
854
855        assert_eq!(count, 0);
856    }
857
858    #[tokio::test]
859    async fn test_malformed_json_handling() {
860        let temp_dir = TempDir::new().unwrap();
861        let jsonl_path = temp_dir.path().join("malformed.jsonl");
862
863        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
864        file.write_all(b"not valid json\n").await.unwrap();
865        file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
866        file.write_all(b"\n{broken json").await.unwrap();
867
868        let loader = DataLoader {
869            claude_paths: vec![],
870            show_progress: false,
871            use_interning: false,
872            use_arena: false,
873        };
874        let mut seen = HashSet::new();
875        let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
876        tokio::pin!(stream);
877
878        let mut valid_entries = Vec::new();
879        while let Some(result) = stream.next().await {
880            if let Ok(entry) = result {
881                valid_entries.push(entry);
882            }
883        }
884
885        // Should have parsed only the valid entry
886        assert_eq!(valid_entries.len(), 1);
887        assert_eq!(valid_entries[0].session_id.as_str(), "test1");
888    }
889
890    #[tokio::test]
891    async fn test_non_assistant_entries_filtered() {
892        let temp_dir = TempDir::new().unwrap();
893        let jsonl_path = temp_dir.path().join("mixed.jsonl");
894
895        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
896        // User type entry - should be filtered
897        file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"user","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
898        file.write_all(b"\n").await.unwrap();
899        // Assistant type entry - should be included
900        file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
901        file.write_all(b"\n").await.unwrap();
902        // System type entry - should be filtered
903        file.write_all(br#"{"sessionId":"test3","timestamp":"2024-01-01T00:00:00Z","type":"system","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
904
905        let loader = DataLoader {
906            claude_paths: vec![],
907            show_progress: false,
908            use_interning: false,
909            use_arena: false,
910        };
911        let mut seen = HashSet::new();
912        let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
913        tokio::pin!(stream);
914
915        let mut entries = Vec::new();
916        while let Some(result) = stream.next().await {
917            if let Ok(entry) = result {
918                entries.push(entry);
919            }
920        }
921
922        // Should only have the assistant entry
923        assert_eq!(entries.len(), 1);
924        assert_eq!(entries[0].session_id.as_str(), "test2");
925    }
926
927    #[tokio::test]
928    async fn test_with_progress_flag() {
929        let temp_dir = TempDir::new().unwrap();
930
931        let loader = DataLoader {
932            claude_paths: vec![temp_dir.path().to_path_buf()],
933            show_progress: false,
934            use_interning: false,
935            use_arena: false,
936        };
937
938        let loader_with_progress = loader.with_progress(true);
939        assert!(loader_with_progress.show_progress);
940
941        let loader_without_progress = loader_with_progress.with_progress(false);
942        assert!(!loader_without_progress.show_progress);
943    }
944
945    #[tokio::test]
946    async fn test_with_interning_flag() {
947        let temp_dir = TempDir::new().unwrap();
948
949        let loader = DataLoader {
950            claude_paths: vec![temp_dir.path().to_path_buf()],
951            show_progress: false,
952            use_interning: false,
953            use_arena: false,
954        };
955
956        let loader_with_interning = loader.with_interning(true);
957        assert!(loader_with_interning.use_interning);
958    }
959
960    #[tokio::test]
961    async fn test_with_arena_flag() {
962        let temp_dir = TempDir::new().unwrap();
963
964        let loader = DataLoader {
965            claude_paths: vec![temp_dir.path().to_path_buf()],
966            show_progress: false,
967            use_interning: false,
968            use_arena: false,
969        };
970
971        let loader_with_arena = loader.with_arena(true);
972        assert!(loader_with_arena.use_arena);
973    }
974
975    #[tokio::test]
976    async fn test_string_interning_functionality() {
977        let temp_dir = TempDir::new().unwrap();
978        let jsonl_path = temp_dir.path().join("test.jsonl");
979
980        // Create test data with repeated model names and session IDs
981        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
982        file.write_all(br#"{"sessionId":"test-session","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
983        file.write_all(b"\n").await.unwrap();
984        file.write_all(br#"{"sessionId":"test-session","timestamp":"2024-01-01T01:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":200,"output_tokens":100}}}"#).await.unwrap();
985
986        // Test with interning enabled
987        let loader_with_interning = DataLoader {
988            claude_paths: vec![temp_dir.path().to_path_buf()],
989            show_progress: false,
990            use_interning: true,
991            use_arena: false,
992        };
993
994        let entries: Vec<_> = loader_with_interning
995            .load_usage_entries_parallel()
996            .collect::<Vec<_>>()
997            .await
998            .into_iter()
999            .collect::<Result<Vec<_>>>()
1000            .unwrap();
1001
1002        assert_eq!(entries.len(), 2);
1003        // Both entries should have the same model and session ID
1004        assert_eq!(entries[0].model.as_str(), "claude-3-opus");
1005        assert_eq!(entries[1].model.as_str(), "claude-3-opus");
1006        assert_eq!(entries[0].session_id.as_str(), "test-session");
1007        assert_eq!(entries[1].session_id.as_str(), "test-session");
1008    }
1009
1010    #[tokio::test]
1011    async fn test_arena_allocation_functionality() {
1012        let temp_dir = TempDir::new().unwrap();
1013        let jsonl_path = temp_dir.path().join("test.jsonl");
1014
1015        // Create test data
1016        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1017        file.write_all(br#"{"sessionId":"arena-test","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-sonnet","usage":{"input_tokens":150,"output_tokens":75}}}"#).await.unwrap();
1018
1019        // Test with arena allocation enabled
1020        let loader_with_arena = DataLoader {
1021            claude_paths: vec![temp_dir.path().to_path_buf()],
1022            show_progress: false,
1023            use_interning: false,
1024            use_arena: true,
1025        };
1026
1027        let entries: Vec<_> = loader_with_arena
1028            .load_usage_entries_parallel()
1029            .collect::<Vec<_>>()
1030            .await
1031            .into_iter()
1032            .collect::<Result<Vec<_>>>()
1033            .unwrap();
1034
1035        assert_eq!(entries.len(), 1);
1036        assert_eq!(entries[0].session_id.as_str(), "arena-test");
1037        assert_eq!(entries[0].model.as_str(), "claude-3-sonnet");
1038        assert_eq!(entries[0].tokens.input_tokens, 150);
1039    }
1040
1041    #[tokio::test]
1042    async fn test_interning_and_arena_together() {
1043        let temp_dir = TempDir::new().unwrap();
1044        let jsonl_path = temp_dir.path().join("test.jsonl");
1045
1046        // Create test data with repeated values
1047        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1048        for i in 0..3 {
1049            let line = format!(
1050                r#"{{"sessionId":"combined-test","timestamp":"2024-01-01T0{}:00:00Z","type":"assistant","message":{{"model":"claude-3-opus","usage":{{"input_tokens":{},"output_tokens":50}}}}}}"#,
1051                i,
1052                (i + 1) * 100
1053            );
1054            file.write_all(line.as_bytes()).await.unwrap();
1055            file.write_all(b"\n").await.unwrap();
1056        }
1057
1058        // Test with both interning and arena enabled
1059        let loader = DataLoader {
1060            claude_paths: vec![temp_dir.path().to_path_buf()],
1061            show_progress: false,
1062            use_interning: true,
1063            use_arena: true,
1064        };
1065
1066        let entries: Vec<_> = loader
1067            .load_usage_entries_parallel()
1068            .collect::<Vec<_>>()
1069            .await
1070            .into_iter()
1071            .collect::<Result<Vec<_>>>()
1072            .unwrap();
1073
1074        assert_eq!(entries.len(), 3);
1075        // All entries should have the same interned values
1076        for entry in &entries {
1077            assert_eq!(entry.model.as_str(), "claude-3-opus");
1078            assert_eq!(entry.session_id.as_str(), "combined-test");
1079        }
1080    }
1081
1082    #[tokio::test]
1083    async fn test_get_paths() {
1084        let temp_dir = TempDir::new().unwrap();
1085        let path = temp_dir.path().to_path_buf();
1086
1087        let loader = DataLoader {
1088            claude_paths: vec![path.clone()],
1089            show_progress: false,
1090            use_interning: false,
1091            use_arena: false,
1092        };
1093
1094        let paths = loader.paths();
1095        assert_eq!(paths.len(), 1);
1096        assert_eq!(paths[0], path);
1097    }
1098
1099    #[tokio::test]
1100    async fn test_empty_lines_ignored() {
1101        let temp_dir = TempDir::new().unwrap();
1102        let jsonl_path = temp_dir.path().join("with_empty.jsonl");
1103
1104        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1105        file.write_all(b"\n\n").await.unwrap(); // Empty lines
1106        file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
1107        file.write_all(b"\n\n\n").await.unwrap(); // More empty lines
1108        file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":200,"output_tokens":100}}}"#).await.unwrap();
1109        file.write_all(b"\n").await.unwrap();
1110
1111        let loader = DataLoader {
1112            claude_paths: vec![],
1113            show_progress: false,
1114            use_interning: false,
1115            use_arena: false,
1116        };
1117        let mut seen = HashSet::new();
1118        let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
1119        tokio::pin!(stream);
1120
1121        let mut entries = Vec::new();
1122        while let Some(result) = stream.next().await {
1123            if let Ok(entry) = result {
1124                entries.push(entry);
1125            }
1126        }
1127
1128        assert_eq!(entries.len(), 2);
1129    }
1130
1131    #[tokio::test]
1132    async fn test_api_error_messages_filtered() {
1133        let temp_dir = TempDir::new().unwrap();
1134        let jsonl_path = temp_dir.path().join("with_errors.jsonl");
1135
1136        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1137        // Entry with API error flag - should be filtered
1138        file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}},"isApiErrorMessage":true}"#).await.unwrap();
1139        file.write_all(b"\n").await.unwrap();
1140        // Normal entry - should be included
1141        file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
1142
1143        let loader = DataLoader {
1144            claude_paths: vec![],
1145            show_progress: false,
1146            use_interning: false,
1147            use_arena: false,
1148        };
1149        let mut seen = HashSet::new();
1150        let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
1151        tokio::pin!(stream);
1152
1153        let mut entries = Vec::new();
1154        while let Some(result) = stream.next().await {
1155            if let Ok(entry) = result {
1156                entries.push(entry);
1157            }
1158        }
1159
1160        // Should only have the non-error entry
1161        assert_eq!(entries.len(), 1);
1162        assert_eq!(entries[0].session_id.as_str(), "test2");
1163    }
1164
1165    #[tokio::test]
1166    async fn test_load_recent_usage_entries() {
1167        let temp_dir = TempDir::new().unwrap();
1168        let jsonl_path = temp_dir.path().join("recent.jsonl");
1169
1170        let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1171        file.write_all(br#"{"sessionId":"recent1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
1172
1173        let loader = DataLoader {
1174            claude_paths: vec![temp_dir.path().to_path_buf()],
1175            show_progress: false,
1176            use_interning: false,
1177            use_arena: false,
1178        };
1179
1180        // Load entries from files modified in the last hour
1181        let one_hour_ago = chrono::Utc::now() - chrono::Duration::hours(1);
1182        let entries: Vec<_> = loader
1183            .load_recent_usage_entries(one_hour_ago)
1184            .collect::<Vec<_>>()
1185            .await
1186            .into_iter()
1187            .collect::<Result<Vec<_>>>()
1188            .unwrap();
1189
1190        assert_eq!(entries.len(), 1);
1191        assert_eq!(entries[0].session_id.as_str(), "recent1");
1192    }
1193
1194    #[tokio::test]
1195    async fn test_no_claude_directory_error() {
1196        let _lock = ENV_MUTEX.lock().await;
1197
1198        // Use RAII guard for safe environment variable manipulation
1199        let mut env_guard = EnvVarGuard::new();
1200        env_guard.set("HOME", "/nonexistent");
1201        env_guard.remove("CLAUDE_DATA_PATH");
1202
1203        let result = DataLoader::new().await;
1204        assert!(result.is_err());
1205
1206        // Environment variables will be automatically restored when env_guard drops
1207    }
1208}