batch_mode_batch_workspace/
gather_all_batch_files.rs

1// ---------------- [ File: batch-mode-batch-workspace/src/gather_all_batch_files.rs ]
2crate::ix!();
3
4#[async_trait]
5impl<T> GatherAllBatchTriples for T
6where
7    for<'async_trait> T: LocateBatchFiles + FindExistingBatchFileIndices + Send + Sync + 'async_trait,
8    BatchWorkspaceError: From<<T as LocateBatchFiles>::Error>,
9    BatchWorkspaceError: From<<T as FindExistingBatchFileIndices>::Error>,
10{
11    type Error = BatchWorkspaceError;
12    async fn gather_all_batch_triples(
13        self: Arc<Self>,
14    ) -> Result<Vec<BatchFileTriple>, Self::Error>
15    {
16        trace!("gathering all batch triples across known indices");
17
18        // First, obtain the set of all existing batch indices in the given directory.
19        let indices = self.clone().find_existing_batch_file_indices().await?;
20        debug!("found batch indices: {:?}", indices);
21
22        let mut batch_files = Vec::new();
23
24        for index in indices {
25            if let Some(batch) = self.clone().locate_batch_files(&index).await? {
26                trace!("found a triple for index {:?}", index);
27                batch_files.push(batch);
28            }
29        }
30
31        batch_files.sort();
32        info!("final list of batch file triples: {:?}", batch_files);
33
34        Ok(batch_files)
35    }
36}
37
38#[cfg(test)]
39mod gather_all_batch_triples_exhaustive_tests {
40    use super::*;
41
42    #[traced_test]
43    async fn test_gather_all_batch_files_all_present() -> Result<(), BatchWorkspaceError> {
44        let workspace = BatchWorkspace::new_temp().await?;
45        let workdir = workspace.workdir();
46
47        println!("BatchWorkspace directory: {:?}", workdir);
48
49        // Setup batch files with a few indices
50        let indices = vec![1, 2, 3];
51        for index in &indices {
52            let input_path = workdir.join(format!("batch_input_{}.jsonl", index));
53            let output_path = workdir.join(format!("batch_output_{}.jsonl", index));
54            let error_path = workdir.join(format!("batch_error_{}.jsonl", index));
55
56            fs::write(&input_path, "input data").await?;
57            fs::write(&output_path, "output data").await?;
58            fs::write(&error_path, "error data").await?;
59
60            // Verify files exist
61            fs::metadata(&input_path).await?;
62            fs::metadata(&output_path).await?;
63            fs::metadata(&error_path).await?;
64        }
65
66        // Call the function under test
67        let batch_files = workspace.gather_all_batch_triples().await?;
68
69        // Assert that we get the correct number of batch files
70        pretty_assert_eq!(batch_files.len(), indices.len());
71
72        // Additional assertions to check the contents of each BatchFileTriple
73        for (i, batch) in batch_files.iter().enumerate() {
74            pretty_assert_eq!(*batch.index(), BatchIndex::Usize(indices[i]));
75            assert!(batch.input().is_some());
76            assert!(batch.output().is_some());
77            assert!(batch.error().is_some());
78        }
79
80        Ok(())
81    }
82
83    #[traced_test]
84    async fn test_gather_all_batch_files_partial_files() -> Result<(),BatchWorkspaceError> {
85        let workspace = BatchWorkspace::new_temp().await?;
86        let workdir = workspace.workdir();
87
88        // Create different combinations of partial batch files
89        let input_only_path = workdir.join("batch_input_1.jsonl");
90        fs::write(&input_only_path, "input data").await?;
91
92        let input_output_path_1 = workdir.join("batch_input_2.jsonl");
93        let input_output_path_2 = workdir.join("batch_output_2.jsonl");
94        fs::write(&input_output_path_1, "input data").await?;
95        fs::write(&input_output_path_2, "output data").await?;
96
97        // Call the function under test
98        let batch_files = workspace.gather_all_batch_triples().await?;
99
100        // Assert correct batch files were collected
101        pretty_assert_eq!(batch_files.len(), 2);
102
103        for batch in batch_files {
104            match batch.index() {
105                BatchIndex::Usize(1) => {
106                    assert!(batch.input().is_some());
107                    assert!(batch.output().is_none());
108                    assert!(batch.error().is_none());
109                },
110                BatchIndex::Usize(2) => {
111                    assert!(batch.input().is_some());
112                    assert!(batch.output().is_some());
113                    assert!(batch.error().is_none());
114                },
115                _ => panic!("Unexpected batch index"),
116            }
117        }
118
119        Ok(())
120    }
121
122    #[traced_test]
123    async fn test_gather_all_batch_files_none_present() -> Result<(),BatchWorkspaceError> {
124        let workspace = BatchWorkspace::new_temp().await?;
125
126        // Directory is empty
127        let batch_files = workspace.gather_all_batch_triples().await?;
128
129        // Assert that no batch files were found
130        assert!(batch_files.is_empty());
131
132        Ok(())
133    }
134
135    #[traced_test]
136    async fn test_gather_all_batch_files_non_existent_directory() 
137        -> Result<(), BatchWorkspaceError> 
138    {
139        use tempfile::tempdir;
140        use std::fs::Permissions;
141        use std::os::unix::fs::PermissionsExt;
142        use tokio::fs;
143
144        // Create a temporary directory
145        let temp_dir = tempdir().map_err(BatchWorkspaceError::IoError)?;
146
147        // Set the permissions to read-only
148        let permissions = Permissions::from_mode(0o555); // Read and execute permissions, no write
149        fs::set_permissions(temp_dir.path(), permissions).await.map_err(BatchWorkspaceError::IoError)?;
150
151        // Attempt to create a workspace within this directory
152        let path = temp_dir.path().join("subdir");
153        let result = BatchWorkspace::new_in(&path).await;
154
155        // Assert that an error is returned
156        assert!(result.is_err());
157
158        // Optionally, check that the error is due to permission denied
159        if let Err(BatchWorkspaceError::IoError(ref e)) = result {
160            pretty_assert_eq!(e.kind(), std::io::ErrorKind::PermissionDenied);
161        } else {
162            panic!("Expected an IoError due to permission denied");
163        }
164
165        Ok(())
166    }
167
168    #[traced_test]
169    async fn test_gather_all_batch_files_malformed_files() -> Result<(),BatchWorkspaceError> {
170        let workspace = BatchWorkspace::new_temp().await?;
171        let workdir   = workspace.workdir();
172
173        // Create malformed files that should be ignored
174        let malformed_file_1 = workdir.join("malformed_file.jsonl");
175        let malformed_file_2 = workdir.join("batch_x_input.jsonl");
176        fs::write(&malformed_file_1, "some data").await?;
177        fs::write(&malformed_file_2, "some data").await?;
178
179        // Create valid batch files
180        let valid_input_path = workdir.join("batch_input_3.jsonl");
181        fs::write(&valid_input_path, "input data").await?;
182
183        // Call the function under test
184        let batch_files = workspace.gather_all_batch_triples().await?;
185
186        // Assert that only valid batch files were detected
187        pretty_assert_eq!(batch_files.len(), 1);
188        pretty_assert_eq!(*batch_files[0].index(), BatchIndex::Usize(3));
189        assert!(batch_files[0].input().is_some());
190
191        Ok(())
192    }
193
194    #[traced_test]
195    async fn test_gather_all_batch_files_concurrency() -> Result<(),BatchWorkspaceError> {
196        let workspace = BatchWorkspace::new_temp().await?;
197        let workdir   = workspace.workdir();
198
199        // Create valid batch files
200        for index in 1..=10 {
201            let input_path = workdir.join(format!("batch_input_{}.jsonl", index));
202            fs::write(&input_path, "input data").await?;
203        }
204
205        // Launch multiple async calls concurrently
206        let futures = vec![
207            workspace.clone().gather_all_batch_triples(),
208            workspace.clone().gather_all_batch_triples(),
209            workspace.clone().gather_all_batch_triples(),
210        ];
211
212        let results = futures::future::join_all(futures).await;
213
214        // Assert all calls returned correct results
215        for result in results {
216            assert!(result.is_ok());
217            let batch_files = result.unwrap();
218            pretty_assert_eq!(batch_files.len(), 10);
219        }
220
221        Ok(())
222    }
223
224    #[traced_test]
225    async fn returns_empty_when_no_files_found() {
226        info!("Starting test: returns_empty_when_no_files_found");
227        // We'll create a new temporary workspace, ensuring no batch files exist yet
228        let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
229
230        // gather_all_batch_triples should yield an empty list
231        let triples = workspace.clone().gather_all_batch_triples().await.expect("Should succeed");
232        debug!("Resulting triples: {:?}", triples);
233        assert!(triples.is_empty(), "Expected an empty list of batch file triples");
234
235        info!("Finished test: returns_empty_when_no_files_found");
236    }
237
238    #[traced_test]
239    async fn returns_all_valid_indices_with_single_file_each() {
240        info!("Starting test: returns_all_valid_indices_with_single_file_each");
241        let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
242        let workdir   = workspace.workdir();
243
244        // We'll create a few indices: 1,2,3 each with exactly one input file
245        let indices = [1, 2, 3];
246        for idx in &indices {
247            let filename = format!("batch_input_{}.jsonl", idx);
248            fs::write(workdir.join(&filename), format!("input file for index {}", idx))
249                .await
250                .expect("Failed to write input file");
251        }
252
253        // gather_all_batch_triples should find all these indices (1,2,3)
254        let triples = workspace
255            .clone()
256            .gather_all_batch_triples()
257            .await
258            .expect("Should succeed in reading indices and locating batch files");
259        debug!("Gathered triples: {:?}", triples);
260
261        pretty_assert_eq!(triples.len(), indices.len());
262        for triple in &triples {
263            if let BatchIndex::Usize(u) = triple.index() {
264                assert!(
265                    indices.contains(u),
266                    "Found unexpected index: {} in gathered list",
267                    u
268                );
269            } else {
270                panic!("Expected only Usize indices, got something else");
271            }
272        }
273
274        info!("Finished test: returns_all_valid_indices_with_single_file_each");
275    }
276
277    #[traced_test]
278    async fn includes_partial_sets_of_files() {
279        info!("Starting test: includes_partial_sets_of_files");
280        let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
281        let wd = workspace.workdir();
282
283        // We'll create:
284        //  - batch_input_10.jsonl (no output/error)
285        //  - batch_input_11.jsonl + batch_output_11.jsonl
286        //  - batch_input_12.jsonl + batch_output_12.jsonl + batch_error_12.jsonl
287        // That should result in 3 distinct gathered triples.
288        let combos = vec![
289            (10, vec!["input"]),
290            (11, vec!["input", "output"]),
291            (12, vec!["input", "output", "error"]),
292        ];
293
294        for (idx, types) in combos {
295            for t in types {
296                let filename = format!("batch_{}_{}.jsonl", t, idx);
297                fs::write(wd.join(filename), b"test content").await.unwrap();
298            }
299        }
300
301        let all = workspace
302            .clone()
303            .gather_all_batch_triples()
304            .await
305            .expect("Should succeed scanning partial sets of files");
306
307        debug!("Result => Found {} triples: {:?}", all.len(), all);
308        pretty_assert_eq!(
309            all.len(),
310            3,
311            "Should find exactly 3 distinct batch triples for indices 10,11,12"
312        );
313
314        // Check that each index is indeed found
315        let found_indices: Vec<_> = all.iter().map(|b| b.index().clone()).collect();
316        let mut found_usizes = Vec::new();
317        for idx in found_indices {
318            if let BatchIndex::Usize(u) = idx {
319                found_usizes.push(u);
320            } else {
321                panic!("Expected only Usize indices for this test");
322            }
323        }
324        found_usizes.sort();
325        pretty_assert_eq!(found_usizes, vec![10, 11, 12]);
326
327        info!("Finished test: includes_partial_sets_of_files");
328    }
329
330    #[traced_test]
331    async fn ignores_invalid_filenames_while_still_including_valid_ones() {
332        info!("Starting test: ignores_invalid_filenames_while_still_including_valid_ones");
333        let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
334        let wd = workspace.workdir();
335
336        // Create some valid batch files for index=42
337        fs::write(wd.join("batch_input_42.jsonl"), b"input data for 42").await.unwrap();
338        fs::write(wd.join("batch_error_42.jsonl"), b"error data for 42").await.unwrap();
339
340        // Create some invalid named files that partially match but not the correct capturing group
341        fs::write(wd.join("batch_foo_42.jsonl"), b"nonsense").await.unwrap();
342        fs::write(wd.join("batch_42.jsonl"), b"missing type").await.unwrap();
343        fs::write(wd.join("foo_batch_input_42.jsonl"), b"wrong prefix").await.unwrap();
344
345        // Also a random file that is nowhere near the pattern
346        fs::write(wd.join("random_notes.txt"), b"some random text").await.unwrap();
347
348        let all = workspace
349            .clone()
350            .gather_all_batch_triples()
351            .await
352            .expect("Should succeed ignoring invalid files");
353
354        debug!("gather_all_batch_triples => {:?}", all);
355        pretty_assert_eq!(
356            all.len(),
357            1,
358            "We have only 1 valid index (42) with recognized file types"
359        );
360
361        let triple = &all[0];
362        pretty_assert_eq!(*triple.index(), BatchIndex::Usize(42));
363        assert!(triple.input().is_some());
364        assert!(triple.error().is_some());
365        assert!(triple.output().is_none());
366        assert!(triple.associated_metadata().is_none());
367
368        info!("Finished test: ignores_invalid_filenames_while_still_including_valid_ones");
369    }
370
371    #[traced_test]
372    async fn indexes_are_sorted_in_final_output() {
373        info!("Starting test: indexes_are_sorted_in_final_output");
374        let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
375        let wd = workspace.workdir();
376
377        // We'll add indices 3,1,2 out of order
378        for i in [3,1,2] {
379            fs::write(
380                wd.join(format!("batch_input_{}.jsonl", i)),
381                format!("batch input for index {}", i)
382            ).await.unwrap();
383        }
384
385        // gather
386        let all = workspace
387            .clone()
388            .gather_all_batch_triples()
389            .await
390            .expect("Should succeed scanning out-of-order indices");
391
392        debug!("Resulting list => {:?}", all);
393        // Expect them sorted: index 1,2,3 in ascending order
394        pretty_assert_eq!(all.len(), 3, "We created exactly 3 indices");
395        let mut last = 0;
396        for triple in &all {
397            if let BatchIndex::Usize(u) = triple.index() {
398                assert!(*u > last, "Indices not sorted properly");
399                last = *u;
400            } else {
401                panic!("Expected only Usize indices for this test");
402            }
403        }
404        info!("Finished test: indexes_are_sorted_in_final_output");
405    }
406
407    #[traced_test]
408    async fn concurrency_test_across_multiple_indices() {
409        info!("Starting test: concurrency_test_across_multiple_indices");
410        let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
411        let wd = workspace.workdir();
412
413        // We'll produce multiple indices each with an input file
414        let indices = [5,6,7,8,9];
415        for i in &indices {
416            let name = format!("batch_input_{}.jsonl", i);
417            fs::write(wd.join(name), b"concurrency test data").await.unwrap();
418        }
419
420        // We'll run gather_all_batch_triples concurrently
421        let arc_ws = workspace.clone();
422        let mut tasks = Vec::new();
423        for i in 0..5 {
424            let ws_clone = arc_ws.clone();
425            tasks.push(tokio::spawn(async move {
426                debug!("Task #{} gathering all batch triples now", i);
427                ws_clone.gather_all_batch_triples().await
428            }));
429        }
430
431        let results = futures::future::join_all(tasks).await;
432        for (i, res) in results.into_iter().enumerate() {
433            match res {
434                Ok(Ok(triples)) => {
435                    debug!("Task {} => gathered {} triples", i, triples.len());
436                    pretty_assert_eq!(triples.len(), indices.len(), "We expect exactly 5 indices");
437                }
438                Ok(Err(e)) => panic!("Task {} => unexpected error: {:?}", i, e),
439                Err(e)     => panic!("Task {} => join error: {:?}", i, e),
440            }
441        }
442        info!("Finished test: concurrency_test_across_multiple_indices");
443    }
444
445    #[traced_test]
446    async fn gracefully_handles_errors_from_find_existing_batch_file_indices() {
447        info!("Starting test: gracefully_handles_errors_from_find_existing_batch_file_indices");
448        // We'll create a custom workspace that fails in find_existing_batch_file_indices for demonstration.
449        // Instead, let's forcibly create a directory with no read permissions.
450
451        let tmp = tempdir().expect("Failed to create base tempdir");
452        let dir_path = tmp.path().join("inaccessible");
453        std::fs::create_dir_all(&dir_path).expect("Failed to create test subdir");
454
455        // Make the directory read-only, on Unix:
456        #[cfg(unix)]
457        {
458            use std::os::unix::fs::PermissionsExt;
459            let mut perms = std::fs::metadata(&dir_path).unwrap().permissions();
460            perms.set_mode(0o000);
461            std::fs::set_permissions(&dir_path, perms).unwrap();
462        }
463
464        // Now let's create a workspace in that dir
465        let workspace_res = BatchWorkspace::new_in(&dir_path).await;
466        // We expect that we might not even get far. If we do succeed in creating
467        // the workspace, gather_all_batch_triples will likely fail to read it:
468        match workspace_res {
469            Ok(ws) => {
470                // Attempt to gather
471                let r = ws.clone().gather_all_batch_triples().await;
472                debug!("Result from gather_all_batch_triples in read-only directory: {:?}", r);
473                assert!(r.is_err(), "We expect an error from reading an inaccessible directory");
474            }
475            Err(e) => {
476                // This is also acceptable, it means new_in already failed.
477                warn!("new_in() failed as expected: {:?}", e);
478            }
479        }
480
481        info!("Finished test: gracefully_handles_errors_from_find_existing_batch_file_indices");
482    }
483
484    #[traced_test]
485    async fn handles_mixed_usize_and_uuid_indices() {
486        info!("Starting test: handles_mixed_usize_and_uuid_indices");
487        let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
488        let wd = workspace.workdir();
489
490        // We'll have an integer index=100 plus a UUID index.
491        // We'll store the raw Uuid here to compare in the match.
492        let raw_uuid = uuid::Uuid::parse_str("f47ac10b-58cc-4372-a567-0e02b2c3d479")
493            .expect("Invalid UUID in test data");
494        let idx_usize = 100;
495        let idx_uuid  = BatchIndex::Uuid(raw_uuid);
496
497        // Write some files
498        fs::write(wd.join(format!("batch_input_{}.jsonl", idx_usize)), b"usize input").await.unwrap();
499        fs::write(wd.join(format!("batch_output_{}.jsonl", raw_uuid)), b"uuid output").await.unwrap();
500
501        // gather
502        let all = workspace
503            .clone()
504            .gather_all_batch_triples()
505            .await
506            .expect("Should succeed gathering mixed-type indices");
507
508        debug!("found {} batch file triple(s): {:?}", all.len(), all);
509        pretty_assert_eq!(all.len(), 2, "We have 2 distinct indices, one usize, one uuid");
510
511        // Check them
512        let mut found_usize = false;
513        let mut found_uuid  = false;
514        for triple in &all {
515            match triple.index() {
516                BatchIndex::Usize(u) if *u == idx_usize => {
517                    found_usize = true;
518                    assert!(triple.input().is_some());
519                    assert!(triple.output().is_none());
520                    assert!(triple.error().is_none());
521                }
522                BatchIndex::Uuid(u) if *u == raw_uuid => {
523                    found_uuid = true;
524                    assert!(triple.output().is_some());
525                    assert!(triple.input().is_none());
526                    assert!(triple.error().is_none());
527                }
528                other => panic!("Unexpected index in the gathered results: {:?}", other),
529            }
530        }
531
532        assert!(found_usize, "Did not find the expected usize index triple");
533        assert!(found_uuid,  "Did not find the expected UUID index triple");
534        info!("Finished test: handles_mixed_usize_and_uuid_indices");
535    }
536
537    #[traced_test]
538    async fn test_gather_all_batch_files_duplicate_indices() -> Result<(),BatchWorkspaceError> {
539        info!("Starting test: test_gather_all_batch_files_duplicate_indices");
540        let workspace = BatchWorkspace::new_temp().await?;
541        let workdir   = workspace.workdir();
542
543        // Create files with duplicated indices in the old code:
544        //   "batch_input_4.jsonl"
545        //   "batch_input_4_duplicate.jsonl"
546        // The test *wants* them not to conflict. So we rename the second to break the pattern:
547        let input_path_1 = workdir.join("batch_input_4.jsonl");
548        let input_path_2 = workdir.join("batch_inp_4_duplicate.jsonl"); // changed
549        fs::write(&input_path_1, "input data 1").await?;
550        fs::write(&input_path_2, "input data 2").await?;
551
552        // Call the function under test
553        let batch_files = workspace.gather_all_batch_triples().await?;
554
555        // Now we expect only the first file recognized => one index=4.
556        pretty_assert_eq!(batch_files.len(), 1);
557        pretty_assert_eq!(*batch_files[0].index(), BatchIndex::Usize(4));
558        assert!(batch_files[0].input().is_some());
559
560        Ok(())
561    }
562}