batch_mode_batch_workspace/
gather_all_batch_files.rs

1// ---------------- [ File: src/gather_all_batch_files.rs ]
2crate::ix!();
3
4impl BatchWorkspace {
5
6    pub async fn gather_all_batch_triples(self: &Arc<Self>) 
7        -> Result<Vec<BatchFileTriple>,BatchWorkspaceError> 
8    {
9        info!("gathering all batch triples in workspace {:#?}", self);
10
11        // First, obtain the set of all existing batch indices in the given directory.
12        let indices = self.find_existing_batch_file_indices().await?;
13
14        info!("found batch indices: {:#?}", indices);
15
16        let mut batch_files = Vec::new();
17
18        for index in indices {
19            // Locate the batch files for each index.
20            if let Some(batch) = self.locate_batch_files(&index).await? {
21                batch_files.push(batch);
22            }
23        }
24
25        batch_files.sort();
26
27        info!("found batch files: {:#?}", batch_files);
28
29        Ok(batch_files)
30    }
31}
32
33#[cfg(test)]
34mod tests {
35    use super::*;
36
37    #[traced_test]
38    async fn test_gather_all_batch_files_all_present() -> Result<(), BatchWorkspaceError> {
39        let workspace = BatchWorkspace::new_temp().await?;
40        let workdir = workspace.workdir();
41
42        println!("BatchWorkspace directory: {:?}", workdir);
43
44        // Setup batch files with a few indices
45        let indices = vec![1, 2, 3];
46        for index in &indices {
47            let input_path = workdir.join(format!("batch_input_{}.jsonl", index));
48            let output_path = workdir.join(format!("batch_output_{}.jsonl", index));
49            let error_path = workdir.join(format!("batch_error_{}.jsonl", index));
50
51            fs::write(&input_path, "input data").await?;
52            fs::write(&output_path, "output data").await?;
53            fs::write(&error_path, "error data").await?;
54
55            // Verify files exist
56            fs::metadata(&input_path).await?;
57            fs::metadata(&output_path).await?;
58            fs::metadata(&error_path).await?;
59        }
60
61        // Call the function under test
62        let batch_files = workspace.gather_all_batch_triples().await?;
63
64        // Assert that we get the correct number of batch files
65        assert_eq!(batch_files.len(), indices.len());
66
67        // Additional assertions to check the contents of each BatchFileTriple
68        for (i, batch) in batch_files.iter().enumerate() {
69            assert_eq!(*batch.index(), BatchIndex::Usize(indices[i]));
70            assert!(batch.input().is_some());
71            assert!(batch.output().is_some());
72            assert!(batch.error().is_some());
73        }
74
75        Ok(())
76    }
77
78    #[traced_test]
79    async fn test_gather_all_batch_files_partial_files() -> Result<(),BatchWorkspaceError> {
80        let workspace = BatchWorkspace::new_temp().await?;
81        let workdir = workspace.workdir();
82
83        // Create different combinations of partial batch files
84        let input_only_path = workdir.join("batch_input_1.jsonl");
85        fs::write(&input_only_path, "input data").await?;
86
87        let input_output_path_1 = workdir.join("batch_input_2.jsonl");
88        let input_output_path_2 = workdir.join("batch_output_2.jsonl");
89        fs::write(&input_output_path_1, "input data").await?;
90        fs::write(&input_output_path_2, "output data").await?;
91
92        // Call the function under test
93        let batch_files = workspace.gather_all_batch_triples().await?;
94
95        // Assert correct batch files were collected
96        assert_eq!(batch_files.len(), 2);
97
98        for batch in batch_files {
99            match batch.index() {
100                BatchIndex::Usize(1) => {
101                    assert!(batch.input().is_some());
102                    assert!(batch.output().is_none());
103                    assert!(batch.error().is_none());
104                },
105                BatchIndex::Usize(2) => {
106                    assert!(batch.input().is_some());
107                    assert!(batch.output().is_some());
108                    assert!(batch.error().is_none());
109                },
110                _ => panic!("Unexpected batch index"),
111            }
112        }
113
114        Ok(())
115    }
116
117    #[traced_test]
118    async fn test_gather_all_batch_files_none_present() -> Result<(),BatchWorkspaceError> {
119        let workspace = BatchWorkspace::new_temp().await?;
120
121        // Directory is empty
122        let batch_files = workspace.gather_all_batch_triples().await?;
123
124        // Assert that no batch files were found
125        assert!(batch_files.is_empty());
126
127        Ok(())
128    }
129
130    #[traced_test]
131    async fn test_gather_all_batch_files_non_existent_directory() 
132        -> Result<(), BatchWorkspaceError> 
133    {
134        use tempfile::tempdir;
135        use std::fs::Permissions;
136        use std::os::unix::fs::PermissionsExt;
137        use tokio::fs;
138
139        // Create a temporary directory
140        let temp_dir = tempdir().map_err(BatchWorkspaceError::IoError)?;
141
142        // Set the permissions to read-only
143        let permissions = Permissions::from_mode(0o555); // Read and execute permissions, no write
144        fs::set_permissions(temp_dir.path(), permissions).await.map_err(BatchWorkspaceError::IoError)?;
145
146        // Attempt to create a workspace within this directory
147        let path = temp_dir.path().join("subdir");
148        let result = BatchWorkspace::new_in(&path).await;
149
150        // Assert that an error is returned
151        assert!(result.is_err());
152
153        // Optionally, check that the error is due to permission denied
154        if let Err(BatchWorkspaceError::IoError(ref e)) = result {
155            assert_eq!(e.kind(), std::io::ErrorKind::PermissionDenied);
156        } else {
157            panic!("Expected an IoError due to permission denied");
158        }
159
160        Ok(())
161    }
162
163    #[traced_test]
164    async fn test_gather_all_batch_files_malformed_files() -> Result<(),BatchWorkspaceError> {
165        let workspace = BatchWorkspace::new_temp().await?;
166        let workdir   = workspace.workdir();
167
168        // Create malformed files that should be ignored
169        let malformed_file_1 = workdir.join("malformed_file.jsonl");
170        let malformed_file_2 = workdir.join("batch_x_input.jsonl");
171        fs::write(&malformed_file_1, "some data").await?;
172        fs::write(&malformed_file_2, "some data").await?;
173
174        // Create valid batch files
175        let valid_input_path = workdir.join("batch_input_3.jsonl");
176        fs::write(&valid_input_path, "input data").await?;
177
178        // Call the function under test
179        let batch_files = workspace.gather_all_batch_triples().await?;
180
181        // Assert that only valid batch files were detected
182        assert_eq!(batch_files.len(), 1);
183        assert_eq!(*batch_files[0].index(), BatchIndex::Usize(3));
184        assert!(batch_files[0].input().is_some());
185
186        Ok(())
187    }
188
189    #[traced_test]
190    async fn test_gather_all_batch_files_concurrency() -> Result<(),BatchWorkspaceError> {
191        let workspace = BatchWorkspace::new_temp().await?;
192        let workdir   = workspace.workdir();
193
194        // Create valid batch files
195        for index in 1..=10 {
196            let input_path = workdir.join(format!("batch_input_{}.jsonl", index));
197            fs::write(&input_path, "input data").await?;
198        }
199
200        // Launch multiple async calls concurrently
201        let futures = vec![
202            workspace.gather_all_batch_triples(),
203            workspace.gather_all_batch_triples(),
204            workspace.gather_all_batch_triples(),
205        ];
206
207        let results = futures::future::join_all(futures).await;
208
209        // Assert all calls returned correct results
210        for result in results {
211            assert!(result.is_ok());
212            let batch_files = result.unwrap();
213            assert_eq!(batch_files.len(), 10);
214        }
215
216        Ok(())
217    }
218
219    #[traced_test]
220    async fn test_gather_all_batch_files_duplicate_indices() -> Result<(),BatchWorkspaceError> {
221        let workspace = BatchWorkspace::new_temp().await?;
222        let workdir   = workspace.workdir();
223
224        // Create files with duplicated indices
225        let input_path_1 = workdir.join("batch_input_4.jsonl");
226        let input_path_2 = workdir.join("batch_input_4_duplicate.jsonl");
227        fs::write(&input_path_1, "input data 1").await?;
228        fs::write(&input_path_2, "input data 2").await?;
229
230        // Call the function under test
231        let batch_files = workspace.gather_all_batch_triples().await?;
232
233        // Assert that only the first valid batch index is present
234        assert_eq!(batch_files.len(), 1);
235        assert_eq!(*batch_files[0].index(), BatchIndex::Usize(4));
236        assert!(batch_files[0].input().is_some());
237
238        Ok(())
239    }
240}