1use anyhow::{Result};
2use aws_sdk_s3::Client as S3Client;
3use std::collections::HashMap;
4use std::path::Path;
5use super::file_splitter::types::FileType;
6use tracing::{info, error, warn};
7
8pub struct FileDiscovery {
10 s3_client: S3Client,
11 workspace_bucket: String,
12}
13
14impl FileDiscovery {
15 pub fn new(s3_client: S3Client, workspace_bucket: String) -> Self {
16 Self {
17 s3_client,
18 workspace_bucket,
19 }
20 }
21
22 pub async fn discover_files(&self, input_spec: &str) -> Result<Vec<FileInfo>> {
24 info!("=== FileDiscovery::discover_files DEBUG START ===");
25 info!("Input spec: '{}'", input_spec);
26 info!("Workspace bucket: '{}'", self.workspace_bucket);
27
28 let result = if input_spec.ends_with('/') {
29 info!("Detected folder input (ends with '/') - processing as folder");
30 self.process_folder(input_spec).await
32 } else {
33 info!("Detected single file input (no trailing '/') - processing as single file");
34 self.process_single_file(input_spec).await
36 };
37
38 match &result {
39 Ok(files) => {
40 info!("discover_files completed successfully with {} files", files.len());
41 },
42 Err(e) => {
43 error!("discover_files failed: {}", e);
44 error!("Error details: {:?}", e);
45 }
46 }
47
48 info!("=== FileDiscovery::discover_files DEBUG END ===");
49 result
50 }
51
52 async fn process_single_file(&self, file_key: &str) -> Result<Vec<FileInfo>> {
54 info!("=== process_single_file DEBUG START ===");
55 info!("Processing single file: '{}'", file_key);
56 info!("Full S3 path: s3://{}/{}", self.workspace_bucket, file_key);
57
58 info!("Attempting to get file metadata...");
59 let metadata_result = self.get_file_metadata(file_key).await;
60
61 let metadata = match metadata_result {
62 Ok(meta) => {
63 info!("Metadata retrieved successfully:");
64 info!(" - Size: {} bytes", meta.size_bytes);
65 info!(" - File type: {:?}", meta.file_type);
66 info!(" - Is processable: {}", meta.is_processable);
67
68 if !meta.is_processable {
69 error!("File '{}' is not processable! Reasons:", file_key);
70 error!(" - File type: {:?}", meta.file_type);
71 error!(" - Size: {} bytes", meta.size_bytes);
72 if meta.size_bytes == 0 {
73 error!(" - File is empty (0 bytes)");
74 }
75 if matches!(meta.file_type, super::file_splitter::types::FileType::Binary) {
76 error!(" - File is binary type");
77 if meta.size_bytes > 100 * 1024 * 1024 {
78 error!(" - File is too large (> 100MB)");
79 }
80 }
81 return Err(anyhow::anyhow!("File '{}' is not processable", file_key));
82 }
83
84 meta
85 },
86 Err(e) => {
87 error!("Failed to get metadata for file '{}': {}", file_key, e);
88 error!("This usually means:");
89 error!(" 1. File does not exist at s3://{}/{}", self.workspace_bucket, file_key);
90 error!(" 2. No permissions to access the file");
91 error!(" 3. S3 service is unavailable");
92 error!(" 4. Incorrect bucket name or file path");
93 return Err(e.context(format!("Failed to get metadata for single file: {}", file_key)));
94 }
95 };
96
97 let file_info = FileInfo {
98 s3_key: file_key.to_string(),
99 relative_path: file_key.to_string(),
100 size_bytes: metadata.size_bytes,
101 file_type: metadata.file_type,
102 };
103
104 info!("Single file processed successfully:");
105 info!(" - S3 key: '{}'", file_info.s3_key);
106 info!(" - Relative path: '{}'", file_info.relative_path);
107 info!(" - Size: {} bytes", file_info.size_bytes);
108 info!(" - Type: {:?}", file_info.file_type);
109 info!("=== process_single_file DEBUG END (SUCCESS) ===");
110
111 Ok(vec![file_info])
112 }
113
114
115 async fn process_folder(&self, folder_prefix: &str) -> Result<Vec<FileInfo>> {
117 info!("=== process_folder DEBUG START ===");
118 info!("Processing folder: '{}'", folder_prefix);
119 info!("Full S3 path: s3://{}/{}", self.workspace_bucket, folder_prefix);
120
121 let mut files = Vec::new();
122 let mut continuation_token: Option<String> = None;
123 let mut request_count = 0;
124
125 loop {
126 request_count += 1;
127 info!("Making S3 ListObjects request #{}", request_count);
128
129 let mut request = self.s3_client
130 .list_objects_v2()
131 .bucket(&self.workspace_bucket)
132 .prefix(folder_prefix)
133 .max_keys(1000);
134
135 if let Some(token) = &continuation_token {
136 info!("Using continuation token: {}", token);
137 request = request.continuation_token(token);
138 }
139
140 info!("Sending ListObjectsV2 request to S3...");
141 let response_result = request.send().await;
142
143 let response = match response_result {
144 Ok(resp) => {
145 info!("S3 ListObjectsV2 request #{} successful!", request_count);
146 resp
147 },
148 Err(e) => {
149 error!("S3 ListObjectsV2 request failed: {}", e);
150 error!("Error details: {:?}", e);
151 error!("Possible causes:");
152 error!(" 1. Folder/prefix '{}' does not exist in bucket '{}'", folder_prefix, self.workspace_bucket);
153 error!(" 2. No list permissions for this bucket/prefix");
154 error!(" 3. AWS credentials are invalid or expired");
155 error!(" 4. Bucket '{}' does not exist", self.workspace_bucket);
156 error!(" 5. Network connectivity issues");
157 return Err(anyhow::anyhow!("S3 ListObjectsV2 failed for folder '{}': {}", folder_prefix, e)
158 .context("Failed to list objects in S3"));
159 }
160 };
161
162 let objects = response.contents();
163 info!("Found {} objects in this batch", objects.len());
164
165 if objects.is_empty() {
166 warn!("No objects found in folder '{}' - this might be expected if folder is empty", folder_prefix);
167 }
168
169 for (i, object) in objects.iter().enumerate() {
170 let key = object.key().unwrap_or_default();
171 info!("Processing object {}: '{}'", i + 1, key);
172
173 if key.ends_with('/') {
175 info!(" Skipping directory: '{}'", key);
176 continue;
177 }
178
179 info!(" Creating metadata for file: '{}'", key);
180 let metadata = FileMetadata::from_s3_object(object, key);
181 info!(" File metadata: size={} bytes, type={:?}, processable={}",
182 metadata.size_bytes, metadata.file_type, metadata.is_processable);
183
184 if metadata.is_processable {
185 let relative_path = key.strip_prefix(folder_prefix).unwrap_or(key).to_string();
186 let file_info = FileInfo {
187 s3_key: key.to_string(),
188 relative_path: relative_path.clone(),
189 size_bytes: metadata.size_bytes,
190 file_type: metadata.file_type,
191 };
192
193 info!(" Adding processable file: '{}' (relative: '{}')", key, relative_path);
194 files.push(file_info);
195 } else {
196 info!(" Skipping non-processable file: '{}' (size={}, type={:?})",
197 key, metadata.size_bytes, metadata.file_type);
198 }
199 }
200
201 continuation_token = response.next_continuation_token().map(|s| s.to_string());
202 if let Some(ref token) = continuation_token {
203 info!("More objects available, continuing with token: {}", token);
204 } else {
205 info!("No more objects to fetch, finishing folder processing");
206 break;
207 }
208 }
209
210 info!("Folder processing completed: {} processable files found", files.len());
211 if files.is_empty() {
212 warn!("No processable files found in folder '{}'", folder_prefix);
213 warn!("This could mean:");
214 warn!(" 1. Folder is empty");
215 warn!(" 2. All files are binary/non-processable");
216 warn!(" 3. All files are empty (0 bytes)");
217 warn!(" 4. All files are too large (> 100MB for binary files)");
218 }
219
220 info!("=== process_folder DEBUG END ===");
221 Ok(files)
222 }
223
224 async fn get_file_metadata(&self, file_key: &str) -> Result<FileMetadata> {
226 info!("=== get_file_metadata DEBUG START ===");
227 info!("Getting metadata for file: '{}'", file_key);
228 info!("Bucket: '{}'", self.workspace_bucket);
229 info!("Full S3 path: s3://{}/{}", self.workspace_bucket, file_key);
230
231 info!("Sending HEAD request to S3...");
232 let response_result = self.s3_client
233 .head_object()
234 .bucket(&self.workspace_bucket)
235 .key(file_key)
236 .send()
237 .await;
238
239 let response = match response_result {
240 Ok(resp) => {
241 info!("S3 HEAD request successful!");
242 info!("Response metadata:");
243 if let Some(content_length) = resp.content_length() {
244 info!(" - Content length: {} bytes", content_length);
245 } else {
246 warn!(" - No content length in response");
247 }
248 if let Some(content_type) = resp.content_type() {
249 info!(" - Content type: {}", content_type);
250 } else {
251 info!(" - No content type specified");
252 }
253 if let Some(last_modified) = resp.last_modified() {
254 info!(" - Last modified: {:?}", last_modified);
255 }
256 resp
257 },
258 Err(e) => {
259 error!("S3 HEAD request failed for '{}': {}", file_key, e);
260 error!("Error details: {:?}", e);
261 error!("Possible causes:");
262 error!(" 1. File '{}' does not exist in bucket '{}'", file_key, self.workspace_bucket);
263 error!(" 2. No read permissions for this file/bucket");
264 error!(" 3. AWS credentials are invalid or expired");
265 error!(" 4. Network connectivity issues");
266 error!(" 5. S3 service is temporarily unavailable");
267 return Err(anyhow::anyhow!("S3 HEAD request failed for '{}': {}", file_key, e)
268 .context(format!("Failed to get metadata for {}", file_key)));
269 }
270 };
271
272 let size_bytes = response.content_length().unwrap_or(0) as usize;
273 let path = Path::new(file_key);
274 let file_type = super::file_splitter::types::FileType::from_extension(path);
275 let is_processable = Self::is_file_processable(&file_type, size_bytes);
276
277 info!("File metadata analysis:");
278 info!(" - Raw size from S3: {} bytes", size_bytes);
279 info!(" - File path for type detection: {:?}", path);
280 info!(" - Detected file type: {:?}", file_type);
281 info!(" - Is processable: {}", is_processable);
282
283 if !is_processable {
284 info!("File is not processable because:");
285 if size_bytes == 0 {
286 info!(" - File is empty (0 bytes)");
287 }
288 if matches!(file_type, super::file_splitter::types::FileType::Binary) {
289 info!(" - File type is Binary (not text-based)");
290 if size_bytes > 100 * 1024 * 1024 {
291 info!(" - Binary file is too large (> 100MB): {} bytes", size_bytes);
292 }
293 }
294 }
295
296 let metadata = FileMetadata {
297 size_bytes,
298 file_type,
299 is_processable,
300 };
301
302 info!("=== get_file_metadata DEBUG END (SUCCESS) ===");
303 Ok(metadata)
304 }
305
306 fn is_file_processable(file_type: &FileType, size_bytes: usize) -> bool {
308
309 if matches!(file_type, FileType::Binary) && size_bytes > 100 * 1024 * 1024 {
311 return false;
312 }
313
314 if size_bytes == 0 {
316 return false;
317 }
318
319 !matches!(file_type, FileType::Binary)
321 }
322
323 pub fn generate_processing_summary(&self, files: &[FileInfo]) -> ProcessingSummary {
325 let mut summary = ProcessingSummary {
326 total_files: files.len(),
327 total_size_bytes: files.iter().map(|f| f.size_bytes).sum(),
328 files_by_type: HashMap::new(),
329 files_by_category: HashMap::new(),
330 };
331
332 for file in files {
333 *summary.files_by_type.entry(file.file_type).or_insert(0) += 1;
335
336 let category = file.file_type.language_category();
338 *summary.files_by_category.entry(category.to_string()).or_insert(0) += 1;
339 }
340
341 summary
342 }
343}
344
345#[derive(Debug, Clone,)]
347pub struct FileInfo {
348 pub s3_key: String,
349 pub relative_path: String,
350 pub size_bytes: usize,
351 pub file_type: FileType,
352}
353
354#[derive(Debug, Clone)]
356struct FileMetadata {
357 size_bytes: usize,
358 pub file_type: FileType,
359 is_processable: bool,
360}
361
362impl FileMetadata {
363 fn from_s3_object(object: &aws_sdk_s3::types::Object, key: &str) -> Self {
364 let size_bytes = object.size().unwrap_or(0) as usize;
365 let path = Path::new(key);
366 let file_type = FileType::from_extension(path);
367 let is_processable = FileDiscovery::is_file_processable(&file_type, size_bytes);
368
369 Self {
370 size_bytes,
371 file_type,
372 is_processable,
373 }
374 }
375}
376
377#[derive(Debug, Clone)]
379pub struct ProcessingSummary {
380 pub total_files: usize,
381 pub total_size_bytes: usize,
382 pub files_by_type: HashMap<FileType, usize>,
383 pub files_by_category: HashMap<String, usize>,
384}
385
386impl ProcessingSummary {
387 pub fn format_summary(&self) -> String {
388 let mut summary = format!(
389 "Processing Summary:\n- Total files: {}\n- Total size: {:.2} MB\n",
390 self.total_files,
391 self.total_size_bytes as f64 / (1024.0 * 1024.0)
392 );
393
394 summary.push_str("\nFiles by category:\n");
395 for (category, count) in &self.files_by_category {
396 summary.push_str(&format!("- {}: {} files\n", category, count));
397 }
398
399 summary.push_str("\nFiles by type:\n");
400 for (file_type, count) in &self.files_by_type {
401 summary.push_str(&format!("- {:?}: {} files\n", file_type, count));
402 }
403
404 summary
405 }
406}
407
408
409#[cfg(test)]
410mod file_discovery_tests {
411 use crate::modules::file_discovery::{FileDiscovery, FileInfo};
412 use crate::modules::file_splitter::types::FileType;
413 use aws_config::BehaviorVersion;
414 use aws_sdk_s3::Client as S3Client;
415 use std::env;
416
417 async fn create_test_s3_client() -> S3Client {
419 let config = aws_config::defaults(BehaviorVersion::latest())
420 .region("eu-west-2")
421 .load()
422 .await;
423 S3Client::new(&config)
424 }
425
426 fn get_test_bucket() -> String {
428 env::var("TEST_WORKSPACE_BUCKET")
429 .unwrap_or_else(|_| "ai-workbench-6c9c43db-7fe6-42f1-8b11-8f82323f83f0-eu-west-2".to_string())
430 }
431
432 #[tokio::test]
433 async fn test_single_file_discovery() {
434 let s3_client = create_test_s3_client().await;
435 let bucket = get_test_bucket();
436 let discovery = FileDiscovery::new(s3_client, bucket);
437
438 let test_file_key = "test-data/sample.txt";
440
441 let result = discovery.discover_files(test_file_key).await;
442
443 match result {
444 Ok(files) => {
445 assert_eq!(files.len(), 1);
446 let file = &files[0];
447 assert_eq!(file.s3_key, test_file_key);
448 assert_eq!(file.relative_path, test_file_key);
449 assert!(file.size_bytes > 0);
450 println!("Discovered file: {} ({} bytes, type: {:?})",
451 file.s3_key, file.size_bytes, file.file_type);
452 }
453 Err(e) => {
454 eprintln!("Test failed - file not found or error: {}", e);
455 println!("Skipping test - no test file available");
457 }
458 }
459 }
460
461 #[tokio::test]
462 async fn test_folder_discovery() {
463 let s3_client = create_test_s3_client().await;
464 let bucket = get_test_bucket();
465 let discovery = FileDiscovery::new(s3_client, bucket);
466
467 let test_folder = "text_files/";
469
470 let result = discovery.discover_files(test_folder).await;
471
472 match result {
473 Ok(files) => {
474 println!("Discovered {} files in folder '{}'", files.len(), test_folder);
475
476 for file in &files {
477 assert!(file.s3_key.starts_with(test_folder));
478 assert!(!file.s3_key.ends_with('/')); assert!(file.size_bytes > 0);
480 assert!(!file.relative_path.starts_with(test_folder)); println!(" - {} ({} bytes, type: {:?})",
483 file.relative_path, file.size_bytes, file.file_type);
484 }
485
486 let summary = discovery.generate_processing_summary(&files);
488 println!("\n{}", summary.format_summary());
489 }
490 Err(e) => {
491 eprintln!("Test failed - folder not found or error: {}", e);
492 println!("Skipping test - no test folder available");
493 }
494 }
495 }
496
497 #[tokio::test]
498 async fn test_file_type_detection() {
499 let s3_client = create_test_s3_client().await;
500 let bucket = get_test_bucket();
501 let discovery = FileDiscovery::new(s3_client, bucket);
502
503 let test_files = vec![
505 "test-data/sample.txt",
506 "test-data/data.csv",
507 "test-data/config.json",
508 "test-data/document.md",
509 "test-data/script.py",
510 "test-data/code.rs",
511 "test-data/styles.css",
512 "test-data/markup.html",
513 ];
514
515 for file_key in test_files {
516 let result = discovery.discover_files(file_key).await;
517
518 match result {
519 Ok(files) if !files.is_empty() => {
520 let file = &files[0];
521 println!("File: {} -> Type: {:?}, Category: {}",
522 file_key, file.file_type, file.file_type.language_category());
523
524 match file_key.split('.').last() {
526 Some("txt") | Some("md") => assert_eq!(file.file_type, FileType::Text),
527 Some("csv") => assert_eq!(file.file_type, FileType::Csv),
528 Some("json") => assert_eq!(file.file_type, FileType::Json),
529 Some("py") => assert_eq!(file.file_type, FileType::Python),
530 Some("rs") => assert_eq!(file.file_type, FileType::Rust),
531 Some("css") => assert_eq!(file.file_type, FileType::Css),
532 Some("html") | Some("htm") => assert_eq!(file.file_type, FileType::Html),
533 _ => {} }
535 }
536 Ok(_) => println!("File {} not found, skipping", file_key),
537 Err(e) => println!("Error checking {}: {}", file_key, e),
538 }
539 }
540 }
541
542 #[tokio::test]
543 async fn test_large_folder_discovery() {
544 let s3_client = create_test_s3_client().await;
545 let bucket = get_test_bucket();
546 let discovery = FileDiscovery::new(s3_client, bucket);
547
548 let large_folder = "large-dataset/";
550
551 let result = discovery.discover_files(large_folder).await;
552
553 match result {
554 Ok(files) => {
555 println!("Discovered {} files in large folder", files.len());
556
557 if files.len() > 10 {
558 let total_size: usize = files.iter().map(|f| f.size_bytes).sum();
560 println!("Total size: {:.2} MB", total_size as f64 / (1024.0 * 1024.0));
561
562 let mut keys: Vec<_> = files.iter().map(|f| &f.s3_key).collect();
564 keys.sort();
565 keys.dedup();
566 assert_eq!(keys.len(), files.len(), "Found duplicate files in discovery");
567 }
568
569 let summary = discovery.generate_processing_summary(&files);
571 println!("\n{}", summary.format_summary());
572 }
573 Err(e) => {
574 println!("Large folder test skipped: {}", e);
575 }
576 }
577 }
578
579 #[tokio::test]
580 async fn test_file_filtering() {
581 let s3_client = create_test_s3_client().await;
582 let bucket = get_test_bucket();
583 let discovery = FileDiscovery::new(s3_client, bucket);
584
585 let mixed_folder = "mixed-files/";
587
588 let result = discovery.discover_files(mixed_folder).await;
589
590 match result {
591 Ok(files) => {
592 println!("Found {} processable files in mixed folder", files.len());
593
594 for file in &files {
596 assert!(file.size_bytes > 0, "File {} has zero size", file.s3_key);
597 assert!(!matches!(file.file_type, FileType::Binary),
598 "Binary file {} should have been filtered out", file.s3_key);
599
600 if file.size_bytes > 100 * 1024 * 1024 { assert!(!matches!(file.file_type, FileType::Binary),
603 "Large binary file {} should have been filtered", file.s3_key);
604 }
605 }
606 }
607 Err(e) => {
608 println!("Mixed files test skipped: {}", e);
609 }
610 }
611 }
612
613 #[tokio::test]
614 async fn test_processing_summary() {
615 let s3_client = create_test_s3_client().await;
616 let bucket = get_test_bucket();
617 let discovery = FileDiscovery::new(s3_client, bucket);
618
619 let test_folder = "test-data/";
620
621 let result = discovery.discover_files(test_folder).await;
622
623 match result {
624 Ok(files) if !files.is_empty() => {
625 let summary = discovery.generate_processing_summary(&files);
626
627 assert_eq!(summary.total_files, files.len());
629 let expected_total_size: usize = files.iter().map(|f| f.size_bytes).sum();
630 assert_eq!(summary.total_size_bytes, expected_total_size);
631
632 let mut manual_type_counts = std::collections::HashMap::new();
634 for file in &files {
635 *manual_type_counts.entry(file.file_type).or_insert(0) += 1;
636 }
637 assert_eq!(summary.files_by_type, manual_type_counts);
638
639 let mut manual_category_counts = std::collections::HashMap::new();
641 for file in &files {
642 let category = file.file_type.language_category();
643 *manual_category_counts.entry(category.to_string()).or_insert(0) += 1;
644 }
645 assert_eq!(summary.files_by_category, manual_category_counts);
646
647 let formatted = summary.format_summary();
649 assert!(formatted.contains("Processing Summary:"));
650 assert!(formatted.contains(&format!("Total files: {}", summary.total_files)));
651 assert!(formatted.contains("Files by category:"));
652 assert!(formatted.contains("Files by type:"));
653
654 println!("Processing Summary Test Results:\n{}", formatted);
655 }
656 Ok(_) => println!("No files found for summary test"),
657 Err(e) => println!("Summary test skipped: {}", e),
658 }
659 }
660
661 #[test]
662 fn test_file_info_structure() {
663 use crate::modules::file_splitter::types::FileType;
664
665 let file_info = FileInfo {
667 s3_key: "test/path/file.txt".to_string(),
668 relative_path: "path/file.txt".to_string(),
669 size_bytes: 1024,
670 file_type: FileType::Text,
671 };
672
673 assert_eq!(file_info.s3_key, "test/path/file.txt");
674 assert_eq!(file_info.relative_path, "path/file.txt");
675 assert_eq!(file_info.size_bytes, 1024);
676 assert_eq!(file_info.file_type, FileType::Text);
677
678 let cloned = file_info.clone();
680 assert_eq!(file_info.s3_key, cloned.s3_key);
681
682 let debug_str = format!("{:?}", file_info);
683 assert!(debug_str.contains("FileInfo"));
684 }
685}
686
687