1use ccstat_core::error::{CcstatError, Result};
36use ccstat_core::memory_pool::MemoryPool;
37use ccstat_core::string_pool::{InternedModel, InternedSession};
38use ccstat_core::types::{ModelName, RawJsonlEntry, SessionId, UsageEntry};
39use futures::StreamExt;
40use futures::stream::Stream;
41use indicatif::{ProgressBar, ProgressStyle};
42use rayon::prelude::*;
43use std::collections::HashSet;
44use std::path::PathBuf;
45use std::sync::{Arc, Mutex};
46use tokio::io::{AsyncBufReadExt, BufReader};
47use tokio::sync::mpsc;
48use tracing::{debug, info, trace};
49
50pub struct DataLoader {
55 claude_paths: Vec<PathBuf>,
57 show_progress: bool,
59 use_interning: bool,
61 use_arena: bool,
63}
64
65impl DataLoader {
66 pub async fn new() -> Result<Self> {
75 let paths = Self::discover_claude_paths().await?;
76 if paths.is_empty() {
77 return Err(CcstatError::NoClaudeDirectory);
78 }
79
80 debug!("Discovered {} Claude data directories", paths.len());
81 Ok(Self {
82 claude_paths: paths,
83 show_progress: false,
84 use_interning: false,
85 use_arena: false,
86 })
87 }
88
89 async fn discover_claude_paths() -> Result<Vec<PathBuf>> {
91 let mut paths = Vec::new();
92
93 if let Some(home) = dirs::home_dir() {
95 let claude_path = home.join(".claude");
96 if claude_path.exists() {
97 paths.push(claude_path);
98 }
99 }
100
101 #[cfg(target_os = "macos")]
103 {
104 if let Some(home) = dirs::home_dir() {
106 let claude_path = home.join("Library/Application Support/Claude");
107 if claude_path.exists() {
108 paths.push(claude_path);
109 }
110 }
111 }
112
113 #[cfg(target_os = "linux")]
114 {
115 if let Some(config_dir) = dirs::config_dir() {
117 let claude_path = config_dir.join("Claude");
118 if claude_path.exists() {
119 paths.push(claude_path);
120 }
121 }
122
123 if let Some(home) = dirs::home_dir() {
124 let claude_path = home.join(".config/Claude");
125 if claude_path.exists() {
126 paths.push(claude_path);
127 }
128 }
129 }
130
131 #[cfg(target_os = "windows")]
132 {
133 if let Some(app_data) = dirs::data_dir() {
135 let claude_path = app_data.join("Claude");
136 if claude_path.exists() {
137 paths.push(claude_path);
138 }
139 }
140 }
141
142 if let Ok(custom_path) = std::env::var("CLAUDE_DATA_PATH") {
144 let path = PathBuf::from(custom_path);
145 if path.exists() {
146 paths.push(path);
147 }
148 }
149
150 Ok(paths)
151 }
152
153 async fn find_jsonl_files_with_filter<F>(&self, filter: F) -> Result<Vec<PathBuf>>
166 where
167 F: Fn(&std::path::Path) -> bool + Send + Sync + 'static + Clone,
168 {
169 let mut jsonl_files = Vec::new();
170
171 for base_path in &self.claude_paths {
172 let path_clone = base_path.clone();
173 let filter_clone = filter.clone();
174 let files = tokio::task::spawn_blocking(move || {
175 use walkdir::WalkDir;
176 let mut files = Vec::new();
177
178 for entry in WalkDir::new(path_clone).into_iter().filter_map(|e| e.ok()) {
179 let path = entry.path();
180 if path.extension().and_then(|s| s.to_str()) == Some("jsonl")
181 && filter_clone(path)
182 {
183 files.push(path.to_path_buf());
184 }
185 }
186 files
187 })
188 .await
189 .map_err(|e| CcstatError::Io(std::io::Error::other(e.to_string())))?;
190
191 jsonl_files.extend(files);
192 }
193
194 Ok(jsonl_files)
195 }
196
197 pub async fn find_jsonl_files(&self) -> Result<Vec<PathBuf>> {
205 let files = self.find_jsonl_files_with_filter(|_| true).await?;
206 info!("Found {} JSONL files to process", files.len());
207 Ok(files)
208 }
209
210 pub async fn find_recent_jsonl_files(
223 &self,
224 since: chrono::DateTime<chrono::Utc>,
225 ) -> Result<Vec<PathBuf>> {
226 let since_std = std::time::SystemTime::from(since);
227
228 let files = self
229 .find_jsonl_files_with_filter(move |path| {
230 if let Ok(metadata) = path.metadata()
232 && let Ok(modified) = metadata.modified()
233 {
234 modified >= since_std
235 } else {
236 false
237 }
238 })
239 .await?;
240
241 info!(
242 "Found {} recent JSONL files to process (since {})",
243 files.len(),
244 since
245 );
246 Ok(files)
247 }
248
249 pub fn with_progress(mut self, show_progress: bool) -> Self {
251 self.show_progress = show_progress;
252 self
253 }
254
255 pub fn with_interning(mut self, use_interning: bool) -> Self {
257 self.use_interning = use_interning;
258 self
259 }
260
261 pub fn with_arena(mut self, use_arena: bool) -> Self {
263 self.use_arena = use_arena;
264 self
265 }
266
267 pub fn load_usage_entries_parallel(&self) -> impl Stream<Item = Result<UsageEntry>> + '_ {
276 async_stream::stream! {
277 let files = match self.find_jsonl_files().await {
278 Ok(files) => files,
279 Err(e) => {
280 yield Err(e);
281 return;
282 }
283 };
284
285 let num_files = files.len();
286 if num_files == 0 {
287 return;
288 }
289
290 let progress = if self.show_progress {
292 let pb = ProgressBar::new(num_files as u64);
293 pb.set_style(
294 ProgressStyle::default_bar()
295 .template("{msg} [{bar:40.cyan/blue}] {pos}/{len} files")
296 .unwrap()
297 .progress_chars("#>-"),
298 );
299 pb.set_message("Loading usage data (parallel)");
300 Some(Arc::new(pb))
301 } else {
302 None
303 };
304
305 let (tx, mut rx) = mpsc::channel::<Result<Vec<UsageEntry>>>(num_files);
307
308 let seen_entries = Arc::new(Mutex::new(HashSet::new()));
310
311 let files_clone = files.clone();
313 let progress_clone = progress.clone();
314 let seen_entries_clone = seen_entries.clone();
315 let use_interning = self.use_interning;
316 let use_arena = self.use_arena;
317
318 tokio::task::spawn_blocking(move || {
319 files_clone.par_iter().for_each(|file_path| {
320 let tx = tx.clone();
321 if let Some(ref pb) = progress_clone {
322 pb.inc(1);
323 }
324
325 let result = std::fs::read_to_string(file_path)
327 .map_err(CcstatError::Io)
328 .map(|content| {
329 let mut entries = Vec::new();
330 let mut local_duplicates = 0;
331
332 let mut process_line = |line: &str| {
334 match serde_json::from_str::<RawJsonlEntry>(line) {
335 Ok(raw_entry) => {
336 if let Some(dedup_key) = UsageEntry::dedup_key(&raw_entry) {
338 let mut seen = seen_entries_clone.lock().unwrap();
339 if seen.contains(&dedup_key) {
340 local_duplicates += 1;
341 trace!("Skipping duplicate entry with key: {}", dedup_key);
342 return;
343 }
344 seen.insert(dedup_key);
345 }
346
347 if let Some(mut entry) = UsageEntry::from_raw(raw_entry) {
348 if use_interning {
349 let interned_model = InternedModel::new(entry.model.as_str());
351 entry.model = ModelName::new(interned_model.as_str());
352 let interned_session = InternedSession::new(entry.session_id.as_str());
353 entry.session_id = SessionId::new(interned_session.as_str());
354 }
355 entries.push(entry);
356 }
357 }
358 Err(e) => {
359 trace!("Skipping non-usage entry in {}: {}", file_path.display(), e);
360 }
361 }
362 };
363
364 if use_arena {
365 let lines: Vec<String> = content.lines()
368 .filter(|line| !line.trim().is_empty())
369 .map(|s| s.to_string())
370 .collect();
371
372 let pool = MemoryPool::new();
373 for line in &lines {
374 let arena_line = pool.alloc_string(line);
375 process_line(arena_line);
376 }
377 } else {
378 for line in content.lines() {
380 if line.trim().is_empty() {
381 continue;
382 }
383 process_line(line);
384 }
385 }
386
387 if local_duplicates > 0 {
388 debug!("Skipped {} duplicate entries in {}", local_duplicates, file_path.display());
389 }
390
391 entries
392 });
393
394 let _ = tx.blocking_send(result);
395 });
396 });
397
398 while let Some(result) = rx.recv().await {
400 match result {
401 Ok(entries) => {
402 for entry in entries {
403 yield Ok(entry);
404 }
405 }
406 Err(e) => yield Err(e),
407 }
408 }
409
410 let final_seen_count = seen_entries.lock().unwrap().len();
412 if final_seen_count > 0 {
413 info!("Processed {} unique entries after deduplication", final_seen_count);
414 }
415
416 if let Some(pb) = progress {
417 pb.finish_with_message("Loading complete (parallel)");
418 }
419 }
420 }
421
422 fn process_jsonl_files(
436 &self,
437 files: Vec<PathBuf>,
438 progress_message: &str,
439 ) -> impl Stream<Item = Result<UsageEntry>> + '_ {
440 let progress_msg = progress_message.to_string();
441 async_stream::stream! {
442 let progress = if self.show_progress {
444 let pb = ProgressBar::new(files.len() as u64);
445 pb.set_style(
446 ProgressStyle::default_bar()
447 .template("{msg} [{bar:40.cyan/blue}] {pos}/{len} files")
448 .unwrap()
449 .progress_chars("#>-"),
450 );
451 pb.set_message(progress_msg.clone());
452 Some(pb)
453 } else {
454 None
455 };
456
457 let mut seen_entries = HashSet::new();
459 let mut total_duplicates = 0;
460
461 for (idx, file_path) in files.into_iter().enumerate() {
462 if let Some(ref pb) = progress {
463 pb.set_position(idx as u64);
464 }
465
466 let entries = self.parse_jsonl_stream(file_path, progress.as_ref(), &mut seen_entries);
467 tokio::pin!(entries);
468 while let Some(result) = entries.next().await {
469 match &result {
470 Ok(_) => yield result,
471 Err(e) => {
472 if let CcstatError::DuplicateEntry = e {
473 total_duplicates += 1;
474 } else {
475 yield result;
476 }
477 }
478 }
479 }
480 }
481
482 if total_duplicates > 0 {
483 info!("Skipped {} duplicate entries", total_duplicates);
484 }
485
486 if let Some(pb) = progress {
487 pb.finish_with_message("Loading complete");
488 }
489 }
490 }
491
492 pub fn load_recent_usage_entries(
506 &self,
507 since: chrono::DateTime<chrono::Utc>,
508 ) -> impl Stream<Item = Result<UsageEntry>> + '_ {
509 async_stream::stream! {
510 let files = match self.find_recent_jsonl_files(since).await {
511 Ok(files) => files,
512 Err(e) => {
513 yield Err(e);
514 return;
515 }
516 };
517
518 if files.is_empty() {
519 debug!("No recent files found since {}", since);
520 return;
521 }
522
523 let entries = self.process_jsonl_files(files, "Loading recent data");
524 tokio::pin!(entries);
525 while let Some(result) = entries.next().await {
526 yield result;
527 }
528 }
529 }
530
531 fn parse_jsonl_stream<'a>(
533 &'a self,
534 path: PathBuf,
535 _progress: Option<&'a ProgressBar>,
536 seen_entries: &'a mut HashSet<String>,
537 ) -> impl Stream<Item = Result<UsageEntry>> + 'a {
538 async_stream::stream! {
539 let _file_size = match tokio::fs::metadata(&path).await {
541 Ok(metadata) => metadata.len(),
542 Err(_) => 0,
543 };
544
545 let file = match tokio::fs::File::open(&path).await {
546 Ok(f) => f,
547 Err(e) => {
548 yield Err(e.into());
549 return;
550 }
551 };
552
553 let reader = BufReader::new(file);
554 let mut lines = reader.lines();
555 let mut line_number = 0;
556 let mut file_duplicates = 0;
557
558 while let Ok(Some(line)) = lines.next_line().await {
559 line_number += 1;
560
561 if line.trim().is_empty() {
562 continue;
563 }
564
565 match serde_json::from_str::<RawJsonlEntry>(&line) {
566 Ok(raw_entry) => {
567 if let Some(dedup_key) = UsageEntry::dedup_key(&raw_entry) {
569 if seen_entries.contains(&dedup_key) {
570 file_duplicates += 1;
571 trace!("Skipping duplicate entry with key: {}", dedup_key);
572 yield Err(CcstatError::DuplicateEntry);
573 continue;
574 }
575 seen_entries.insert(dedup_key);
576 }
577
578 if let Some(entry) = self.convert_entry(raw_entry) {
579 yield Ok(entry);
580 }
581 },
583 Err(e) => {
584 trace!(
585 "Skipping non-usage entry at line {} in {}: {}",
586 line_number,
587 path.display(),
588 e
589 );
590 }
592 }
593 }
594
595 if file_duplicates > 0 {
596 debug!("Skipped {} duplicate entries in {}", file_duplicates, path.display());
597 }
598 }
599 }
600
601 pub fn paths(&self) -> &[PathBuf] {
606 &self.claude_paths
607 }
608
609 fn convert_entry(&self, raw: RawJsonlEntry) -> Option<UsageEntry> {
611 UsageEntry::from_raw(raw).map(|mut entry| {
612 if self.use_interning {
613 let interned_model = InternedModel::new(entry.model.as_str());
615 entry.model = ModelName::new(interned_model.as_str());
616
617 let interned_session = InternedSession::new(entry.session_id.as_str());
619 entry.session_id = SessionId::new(interned_session.as_str());
620 }
621 entry
622 })
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629 use crate::test_utils::{ENV_MUTEX, EnvVarGuard};
630 use tempfile::TempDir;
631 use tokio::io::AsyncWriteExt;
632
633 #[tokio::test]
634 async fn test_jsonl_parsing() {
635 let temp_dir = TempDir::new().unwrap();
636 let jsonl_path = temp_dir.path().join("test.jsonl");
637
638 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
639 file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50,"cache_creation_input_tokens":10,"cache_read_input_tokens":5}},"cwd":"/home/user/project-a"}"#).await.unwrap();
640 file.write_all(b"\n").await.unwrap();
641 file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T01:00:00Z","type":"assistant","message":{"model":"claude-3-sonnet","usage":{"input_tokens":200,"output_tokens":100,"cache_creation_input_tokens":20,"cache_read_input_tokens":10}}}"#).await.unwrap();
642
643 let loader = DataLoader {
644 claude_paths: vec![],
645 show_progress: false,
646 use_interning: false,
647 use_arena: false,
648 };
649 let mut seen = HashSet::new();
650 let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
651 tokio::pin!(stream);
652
653 let entry1 = stream.next().await.unwrap().unwrap();
654 assert_eq!(entry1.session_id.as_str(), "test1");
655 assert_eq!(entry1.tokens.input_tokens, 100);
656 assert_eq!(entry1.project, Some("project-a".to_string()));
657
658 let entry2 = stream.next().await.unwrap().unwrap();
659 assert_eq!(entry2.session_id.as_str(), "test2");
660 assert_eq!(entry2.tokens.input_tokens, 200);
661 assert_eq!(entry2.project, None);
662 }
663
664 #[tokio::test]
665 async fn test_parallel_loading() {
666 let temp_dir = TempDir::new().unwrap();
667
668 for i in 0..3 {
670 let content = format!(
671 r#"{{"sessionId":"test{i}","timestamp":"2024-01-01T0{i}:00:00Z","type":"assistant","message":{{"model":"claude-3-opus","usage":{{"input_tokens":100,"output_tokens":50,"cache_creation_input_tokens":10,"cache_read_input_tokens":5}}}},"cost_usd":0.1}}"#
672 );
673 let file_path = temp_dir.path().join(format!("test{i}.jsonl"));
674 let mut file = tokio::fs::File::create(&file_path).await.unwrap();
675 file.write_all(content.as_bytes()).await.unwrap();
676 }
677
678 let loader = DataLoader {
679 claude_paths: vec![temp_dir.path().to_path_buf()],
680 show_progress: false,
681 use_interning: false,
682 use_arena: false,
683 };
684
685 let entries: Vec<_> = loader
687 .load_usage_entries_parallel()
688 .collect::<Vec<_>>()
689 .await
690 .into_iter()
691 .collect::<Result<Vec<_>>>()
692 .unwrap();
693
694 assert_eq!(entries.len(), 3);
695
696 let session_ids: Vec<_> = entries.iter().map(|e| e.session_id.as_str()).collect();
698 assert!(session_ids.contains(&"test0"));
699 assert!(session_ids.contains(&"test1"));
700 assert!(session_ids.contains(&"test2"));
701 }
702
703 #[tokio::test]
704 async fn test_discover_claude_paths_with_env_override() {
705 let _lock = ENV_MUTEX.lock().await;
706
707 let temp_dir = TempDir::new().unwrap();
708 let custom_path = temp_dir.path().to_path_buf();
709
710 let mut env_guard = EnvVarGuard::new();
712 env_guard.set("CLAUDE_DATA_PATH", custom_path.to_str().unwrap());
713
714 let paths = DataLoader::discover_claude_paths().await.unwrap();
715 assert!(paths.contains(&custom_path));
716
717 }
719
720 #[tokio::test]
721 async fn test_find_jsonl_files() {
722 let temp_dir = TempDir::new().unwrap();
723
724 let sub_dir = temp_dir.path().join("subdir");
726 tokio::fs::create_dir(&sub_dir).await.unwrap();
727
728 tokio::fs::write(temp_dir.path().join("test1.jsonl"), "")
730 .await
731 .unwrap();
732 tokio::fs::write(sub_dir.join("test2.jsonl"), "")
733 .await
734 .unwrap();
735 tokio::fs::write(temp_dir.path().join("not_jsonl.txt"), "")
736 .await
737 .unwrap();
738
739 let loader = DataLoader {
740 claude_paths: vec![temp_dir.path().to_path_buf()],
741 show_progress: false,
742 use_interning: false,
743 use_arena: false,
744 };
745
746 let files = loader.find_jsonl_files().await.unwrap();
747 assert_eq!(files.len(), 2);
748
749 for file in &files {
751 assert_eq!(file.extension().and_then(|s| s.to_str()), Some("jsonl"));
752 }
753 }
754
755 #[tokio::test]
756 async fn test_find_recent_jsonl_files() {
757 let temp_dir = TempDir::new().unwrap();
758
759 let old_file = temp_dir.path().join("old.jsonl");
761 let new_file = temp_dir.path().join("new.jsonl");
762
763 tokio::fs::write(&old_file, "").await.unwrap();
764 tokio::fs::write(&new_file, "").await.unwrap();
765
766 let two_days_ago = chrono::Utc::now() - chrono::Duration::days(2);
768 let old_time =
769 filetime::FileTime::from_system_time(std::time::SystemTime::from(two_days_ago));
770 filetime::set_file_mtime(&old_file, old_time).unwrap();
771
772 let one_hour_ago = chrono::Utc::now() - chrono::Duration::hours(1);
774
775 let loader = DataLoader {
776 claude_paths: vec![temp_dir.path().to_path_buf()],
777 show_progress: false,
778 use_interning: false,
779 use_arena: false,
780 };
781
782 let files = loader.find_recent_jsonl_files(one_hour_ago).await.unwrap();
784 assert_eq!(files.len(), 1); }
786
787 #[tokio::test]
788 async fn test_deduplication() {
789 let temp_dir = TempDir::new().unwrap();
790 let jsonl_path = temp_dir.path().join("test.jsonl");
791
792 let duplicate_content = r#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"id":"msg_123","model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}},"requestId":"req_456"}"#;
794
795 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
796 file.write_all(duplicate_content.as_bytes()).await.unwrap();
797 file.write_all(b"\n").await.unwrap();
798 file.write_all(duplicate_content.as_bytes()).await.unwrap(); file.write_all(b"\n").await.unwrap();
800 file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T01:00:00Z","type":"assistant","message":{"id":"msg_789","model":"claude-3-opus","usage":{"input_tokens":200,"output_tokens":100}},"requestId":"req_012"}"#).await.unwrap();
801
802 let loader = DataLoader {
804 claude_paths: vec![],
805 show_progress: false,
806 use_interning: false,
807 use_arena: false,
808 };
809 let mut seen = HashSet::new();
810 let stream = loader.parse_jsonl_stream(jsonl_path.clone(), None, &mut seen);
811 tokio::pin!(stream);
812
813 let mut entries = Vec::new();
814 let mut error_count = 0;
815 while let Some(result) = stream.next().await {
816 match result {
817 Ok(entry) => entries.push(entry),
818 Err(CcstatError::DuplicateEntry) => error_count += 1,
819 Err(e) => panic!("Unexpected error in stream: {:?}", e),
820 }
821 }
822
823 assert_eq!(entries.len(), 2);
825 assert_eq!(error_count, 1); let session_ids: HashSet<_> = entries.iter().map(|e| e.session_id.as_str()).collect();
829 assert_eq!(session_ids.len(), 2);
830 }
831
832 #[tokio::test]
833 async fn test_empty_file_handling() {
834 let temp_dir = TempDir::new().unwrap();
835 let jsonl_path = temp_dir.path().join("empty.jsonl");
836
837 tokio::fs::write(&jsonl_path, "").await.unwrap();
839
840 let loader = DataLoader {
841 claude_paths: vec![],
842 show_progress: false,
843 use_interning: false,
844 use_arena: false,
845 };
846 let mut seen = HashSet::new();
847 let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
848 tokio::pin!(stream);
849
850 let mut count = 0;
851 while stream.next().await.is_some() {
852 count += 1;
853 }
854
855 assert_eq!(count, 0);
856 }
857
858 #[tokio::test]
859 async fn test_malformed_json_handling() {
860 let temp_dir = TempDir::new().unwrap();
861 let jsonl_path = temp_dir.path().join("malformed.jsonl");
862
863 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
864 file.write_all(b"not valid json\n").await.unwrap();
865 file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
866 file.write_all(b"\n{broken json").await.unwrap();
867
868 let loader = DataLoader {
869 claude_paths: vec![],
870 show_progress: false,
871 use_interning: false,
872 use_arena: false,
873 };
874 let mut seen = HashSet::new();
875 let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
876 tokio::pin!(stream);
877
878 let mut valid_entries = Vec::new();
879 while let Some(result) = stream.next().await {
880 if let Ok(entry) = result {
881 valid_entries.push(entry);
882 }
883 }
884
885 assert_eq!(valid_entries.len(), 1);
887 assert_eq!(valid_entries[0].session_id.as_str(), "test1");
888 }
889
890 #[tokio::test]
891 async fn test_non_assistant_entries_filtered() {
892 let temp_dir = TempDir::new().unwrap();
893 let jsonl_path = temp_dir.path().join("mixed.jsonl");
894
895 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
896 file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"user","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
898 file.write_all(b"\n").await.unwrap();
899 file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
901 file.write_all(b"\n").await.unwrap();
902 file.write_all(br#"{"sessionId":"test3","timestamp":"2024-01-01T00:00:00Z","type":"system","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
904
905 let loader = DataLoader {
906 claude_paths: vec![],
907 show_progress: false,
908 use_interning: false,
909 use_arena: false,
910 };
911 let mut seen = HashSet::new();
912 let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
913 tokio::pin!(stream);
914
915 let mut entries = Vec::new();
916 while let Some(result) = stream.next().await {
917 if let Ok(entry) = result {
918 entries.push(entry);
919 }
920 }
921
922 assert_eq!(entries.len(), 1);
924 assert_eq!(entries[0].session_id.as_str(), "test2");
925 }
926
927 #[tokio::test]
928 async fn test_with_progress_flag() {
929 let temp_dir = TempDir::new().unwrap();
930
931 let loader = DataLoader {
932 claude_paths: vec![temp_dir.path().to_path_buf()],
933 show_progress: false,
934 use_interning: false,
935 use_arena: false,
936 };
937
938 let loader_with_progress = loader.with_progress(true);
939 assert!(loader_with_progress.show_progress);
940
941 let loader_without_progress = loader_with_progress.with_progress(false);
942 assert!(!loader_without_progress.show_progress);
943 }
944
945 #[tokio::test]
946 async fn test_with_interning_flag() {
947 let temp_dir = TempDir::new().unwrap();
948
949 let loader = DataLoader {
950 claude_paths: vec![temp_dir.path().to_path_buf()],
951 show_progress: false,
952 use_interning: false,
953 use_arena: false,
954 };
955
956 let loader_with_interning = loader.with_interning(true);
957 assert!(loader_with_interning.use_interning);
958 }
959
960 #[tokio::test]
961 async fn test_with_arena_flag() {
962 let temp_dir = TempDir::new().unwrap();
963
964 let loader = DataLoader {
965 claude_paths: vec![temp_dir.path().to_path_buf()],
966 show_progress: false,
967 use_interning: false,
968 use_arena: false,
969 };
970
971 let loader_with_arena = loader.with_arena(true);
972 assert!(loader_with_arena.use_arena);
973 }
974
975 #[tokio::test]
976 async fn test_string_interning_functionality() {
977 let temp_dir = TempDir::new().unwrap();
978 let jsonl_path = temp_dir.path().join("test.jsonl");
979
980 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
982 file.write_all(br#"{"sessionId":"test-session","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
983 file.write_all(b"\n").await.unwrap();
984 file.write_all(br#"{"sessionId":"test-session","timestamp":"2024-01-01T01:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":200,"output_tokens":100}}}"#).await.unwrap();
985
986 let loader_with_interning = DataLoader {
988 claude_paths: vec![temp_dir.path().to_path_buf()],
989 show_progress: false,
990 use_interning: true,
991 use_arena: false,
992 };
993
994 let entries: Vec<_> = loader_with_interning
995 .load_usage_entries_parallel()
996 .collect::<Vec<_>>()
997 .await
998 .into_iter()
999 .collect::<Result<Vec<_>>>()
1000 .unwrap();
1001
1002 assert_eq!(entries.len(), 2);
1003 assert_eq!(entries[0].model.as_str(), "claude-3-opus");
1005 assert_eq!(entries[1].model.as_str(), "claude-3-opus");
1006 assert_eq!(entries[0].session_id.as_str(), "test-session");
1007 assert_eq!(entries[1].session_id.as_str(), "test-session");
1008 }
1009
1010 #[tokio::test]
1011 async fn test_arena_allocation_functionality() {
1012 let temp_dir = TempDir::new().unwrap();
1013 let jsonl_path = temp_dir.path().join("test.jsonl");
1014
1015 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1017 file.write_all(br#"{"sessionId":"arena-test","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-sonnet","usage":{"input_tokens":150,"output_tokens":75}}}"#).await.unwrap();
1018
1019 let loader_with_arena = DataLoader {
1021 claude_paths: vec![temp_dir.path().to_path_buf()],
1022 show_progress: false,
1023 use_interning: false,
1024 use_arena: true,
1025 };
1026
1027 let entries: Vec<_> = loader_with_arena
1028 .load_usage_entries_parallel()
1029 .collect::<Vec<_>>()
1030 .await
1031 .into_iter()
1032 .collect::<Result<Vec<_>>>()
1033 .unwrap();
1034
1035 assert_eq!(entries.len(), 1);
1036 assert_eq!(entries[0].session_id.as_str(), "arena-test");
1037 assert_eq!(entries[0].model.as_str(), "claude-3-sonnet");
1038 assert_eq!(entries[0].tokens.input_tokens, 150);
1039 }
1040
1041 #[tokio::test]
1042 async fn test_interning_and_arena_together() {
1043 let temp_dir = TempDir::new().unwrap();
1044 let jsonl_path = temp_dir.path().join("test.jsonl");
1045
1046 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1048 for i in 0..3 {
1049 let line = format!(
1050 r#"{{"sessionId":"combined-test","timestamp":"2024-01-01T0{}:00:00Z","type":"assistant","message":{{"model":"claude-3-opus","usage":{{"input_tokens":{},"output_tokens":50}}}}}}"#,
1051 i,
1052 (i + 1) * 100
1053 );
1054 file.write_all(line.as_bytes()).await.unwrap();
1055 file.write_all(b"\n").await.unwrap();
1056 }
1057
1058 let loader = DataLoader {
1060 claude_paths: vec![temp_dir.path().to_path_buf()],
1061 show_progress: false,
1062 use_interning: true,
1063 use_arena: true,
1064 };
1065
1066 let entries: Vec<_> = loader
1067 .load_usage_entries_parallel()
1068 .collect::<Vec<_>>()
1069 .await
1070 .into_iter()
1071 .collect::<Result<Vec<_>>>()
1072 .unwrap();
1073
1074 assert_eq!(entries.len(), 3);
1075 for entry in &entries {
1077 assert_eq!(entry.model.as_str(), "claude-3-opus");
1078 assert_eq!(entry.session_id.as_str(), "combined-test");
1079 }
1080 }
1081
1082 #[tokio::test]
1083 async fn test_get_paths() {
1084 let temp_dir = TempDir::new().unwrap();
1085 let path = temp_dir.path().to_path_buf();
1086
1087 let loader = DataLoader {
1088 claude_paths: vec![path.clone()],
1089 show_progress: false,
1090 use_interning: false,
1091 use_arena: false,
1092 };
1093
1094 let paths = loader.paths();
1095 assert_eq!(paths.len(), 1);
1096 assert_eq!(paths[0], path);
1097 }
1098
1099 #[tokio::test]
1100 async fn test_empty_lines_ignored() {
1101 let temp_dir = TempDir::new().unwrap();
1102 let jsonl_path = temp_dir.path().join("with_empty.jsonl");
1103
1104 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1105 file.write_all(b"\n\n").await.unwrap(); file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
1107 file.write_all(b"\n\n\n").await.unwrap(); file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":200,"output_tokens":100}}}"#).await.unwrap();
1109 file.write_all(b"\n").await.unwrap();
1110
1111 let loader = DataLoader {
1112 claude_paths: vec![],
1113 show_progress: false,
1114 use_interning: false,
1115 use_arena: false,
1116 };
1117 let mut seen = HashSet::new();
1118 let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
1119 tokio::pin!(stream);
1120
1121 let mut entries = Vec::new();
1122 while let Some(result) = stream.next().await {
1123 if let Ok(entry) = result {
1124 entries.push(entry);
1125 }
1126 }
1127
1128 assert_eq!(entries.len(), 2);
1129 }
1130
1131 #[tokio::test]
1132 async fn test_api_error_messages_filtered() {
1133 let temp_dir = TempDir::new().unwrap();
1134 let jsonl_path = temp_dir.path().join("with_errors.jsonl");
1135
1136 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1137 file.write_all(br#"{"sessionId":"test1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}},"isApiErrorMessage":true}"#).await.unwrap();
1139 file.write_all(b"\n").await.unwrap();
1140 file.write_all(br#"{"sessionId":"test2","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
1142
1143 let loader = DataLoader {
1144 claude_paths: vec![],
1145 show_progress: false,
1146 use_interning: false,
1147 use_arena: false,
1148 };
1149 let mut seen = HashSet::new();
1150 let stream = loader.parse_jsonl_stream(jsonl_path, None, &mut seen);
1151 tokio::pin!(stream);
1152
1153 let mut entries = Vec::new();
1154 while let Some(result) = stream.next().await {
1155 if let Ok(entry) = result {
1156 entries.push(entry);
1157 }
1158 }
1159
1160 assert_eq!(entries.len(), 1);
1162 assert_eq!(entries[0].session_id.as_str(), "test2");
1163 }
1164
1165 #[tokio::test]
1166 async fn test_load_recent_usage_entries() {
1167 let temp_dir = TempDir::new().unwrap();
1168 let jsonl_path = temp_dir.path().join("recent.jsonl");
1169
1170 let mut file = tokio::fs::File::create(&jsonl_path).await.unwrap();
1171 file.write_all(br#"{"sessionId":"recent1","timestamp":"2024-01-01T00:00:00Z","type":"assistant","message":{"model":"claude-3-opus","usage":{"input_tokens":100,"output_tokens":50}}}"#).await.unwrap();
1172
1173 let loader = DataLoader {
1174 claude_paths: vec![temp_dir.path().to_path_buf()],
1175 show_progress: false,
1176 use_interning: false,
1177 use_arena: false,
1178 };
1179
1180 let one_hour_ago = chrono::Utc::now() - chrono::Duration::hours(1);
1182 let entries: Vec<_> = loader
1183 .load_recent_usage_entries(one_hour_ago)
1184 .collect::<Vec<_>>()
1185 .await
1186 .into_iter()
1187 .collect::<Result<Vec<_>>>()
1188 .unwrap();
1189
1190 assert_eq!(entries.len(), 1);
1191 assert_eq!(entries[0].session_id.as_str(), "recent1");
1192 }
1193
1194 #[tokio::test]
1195 async fn test_no_claude_directory_error() {
1196 let _lock = ENV_MUTEX.lock().await;
1197
1198 let mut env_guard = EnvVarGuard::new();
1200 env_guard.set("HOME", "/nonexistent");
1201 env_guard.remove("CLAUDE_DATA_PATH");
1202
1203 let result = DataLoader::new().await;
1204 assert!(result.is_err());
1205
1206 }
1208}