batch_mode_batch_workspace/
gather_all_batch_files.rs1crate::ix!();
3
4impl BatchWorkspace {
5
6 pub async fn gather_all_batch_triples(self: &Arc<Self>)
7 -> Result<Vec<BatchFileTriple>,BatchWorkspaceError>
8 {
9 info!("gathering all batch triples in workspace {:#?}", self);
10
11 let indices = self.find_existing_batch_file_indices().await?;
13
14 info!("found batch indices: {:#?}", indices);
15
16 let mut batch_files = Vec::new();
17
18 for index in indices {
19 if let Some(batch) = self.locate_batch_files(&index).await? {
21 batch_files.push(batch);
22 }
23 }
24
25 batch_files.sort();
26
27 info!("found batch files: {:#?}", batch_files);
28
29 Ok(batch_files)
30 }
31}
32
33#[cfg(test)]
34mod tests {
35 use super::*;
36
37 #[traced_test]
38 async fn test_gather_all_batch_files_all_present() -> Result<(), BatchWorkspaceError> {
39 let workspace = BatchWorkspace::new_temp().await?;
40 let workdir = workspace.workdir();
41
42 println!("BatchWorkspace directory: {:?}", workdir);
43
44 let indices = vec![1, 2, 3];
46 for index in &indices {
47 let input_path = workdir.join(format!("batch_input_{}.jsonl", index));
48 let output_path = workdir.join(format!("batch_output_{}.jsonl", index));
49 let error_path = workdir.join(format!("batch_error_{}.jsonl", index));
50
51 fs::write(&input_path, "input data").await?;
52 fs::write(&output_path, "output data").await?;
53 fs::write(&error_path, "error data").await?;
54
55 fs::metadata(&input_path).await?;
57 fs::metadata(&output_path).await?;
58 fs::metadata(&error_path).await?;
59 }
60
61 let batch_files = workspace.gather_all_batch_triples().await?;
63
64 assert_eq!(batch_files.len(), indices.len());
66
67 for (i, batch) in batch_files.iter().enumerate() {
69 assert_eq!(*batch.index(), BatchIndex::Usize(indices[i]));
70 assert!(batch.input().is_some());
71 assert!(batch.output().is_some());
72 assert!(batch.error().is_some());
73 }
74
75 Ok(())
76 }
77
78 #[traced_test]
79 async fn test_gather_all_batch_files_partial_files() -> Result<(),BatchWorkspaceError> {
80 let workspace = BatchWorkspace::new_temp().await?;
81 let workdir = workspace.workdir();
82
83 let input_only_path = workdir.join("batch_input_1.jsonl");
85 fs::write(&input_only_path, "input data").await?;
86
87 let input_output_path_1 = workdir.join("batch_input_2.jsonl");
88 let input_output_path_2 = workdir.join("batch_output_2.jsonl");
89 fs::write(&input_output_path_1, "input data").await?;
90 fs::write(&input_output_path_2, "output data").await?;
91
92 let batch_files = workspace.gather_all_batch_triples().await?;
94
95 assert_eq!(batch_files.len(), 2);
97
98 for batch in batch_files {
99 match batch.index() {
100 BatchIndex::Usize(1) => {
101 assert!(batch.input().is_some());
102 assert!(batch.output().is_none());
103 assert!(batch.error().is_none());
104 },
105 BatchIndex::Usize(2) => {
106 assert!(batch.input().is_some());
107 assert!(batch.output().is_some());
108 assert!(batch.error().is_none());
109 },
110 _ => panic!("Unexpected batch index"),
111 }
112 }
113
114 Ok(())
115 }
116
117 #[traced_test]
118 async fn test_gather_all_batch_files_none_present() -> Result<(),BatchWorkspaceError> {
119 let workspace = BatchWorkspace::new_temp().await?;
120
121 let batch_files = workspace.gather_all_batch_triples().await?;
123
124 assert!(batch_files.is_empty());
126
127 Ok(())
128 }
129
130 #[traced_test]
131 async fn test_gather_all_batch_files_non_existent_directory()
132 -> Result<(), BatchWorkspaceError>
133 {
134 use tempfile::tempdir;
135 use std::fs::Permissions;
136 use std::os::unix::fs::PermissionsExt;
137 use tokio::fs;
138
139 let temp_dir = tempdir().map_err(BatchWorkspaceError::IoError)?;
141
142 let permissions = Permissions::from_mode(0o555); fs::set_permissions(temp_dir.path(), permissions).await.map_err(BatchWorkspaceError::IoError)?;
145
146 let path = temp_dir.path().join("subdir");
148 let result = BatchWorkspace::new_in(&path).await;
149
150 assert!(result.is_err());
152
153 if let Err(BatchWorkspaceError::IoError(ref e)) = result {
155 assert_eq!(e.kind(), std::io::ErrorKind::PermissionDenied);
156 } else {
157 panic!("Expected an IoError due to permission denied");
158 }
159
160 Ok(())
161 }
162
163 #[traced_test]
164 async fn test_gather_all_batch_files_malformed_files() -> Result<(),BatchWorkspaceError> {
165 let workspace = BatchWorkspace::new_temp().await?;
166 let workdir = workspace.workdir();
167
168 let malformed_file_1 = workdir.join("malformed_file.jsonl");
170 let malformed_file_2 = workdir.join("batch_x_input.jsonl");
171 fs::write(&malformed_file_1, "some data").await?;
172 fs::write(&malformed_file_2, "some data").await?;
173
174 let valid_input_path = workdir.join("batch_input_3.jsonl");
176 fs::write(&valid_input_path, "input data").await?;
177
178 let batch_files = workspace.gather_all_batch_triples().await?;
180
181 assert_eq!(batch_files.len(), 1);
183 assert_eq!(*batch_files[0].index(), BatchIndex::Usize(3));
184 assert!(batch_files[0].input().is_some());
185
186 Ok(())
187 }
188
189 #[traced_test]
190 async fn test_gather_all_batch_files_concurrency() -> Result<(),BatchWorkspaceError> {
191 let workspace = BatchWorkspace::new_temp().await?;
192 let workdir = workspace.workdir();
193
194 for index in 1..=10 {
196 let input_path = workdir.join(format!("batch_input_{}.jsonl", index));
197 fs::write(&input_path, "input data").await?;
198 }
199
200 let futures = vec![
202 workspace.gather_all_batch_triples(),
203 workspace.gather_all_batch_triples(),
204 workspace.gather_all_batch_triples(),
205 ];
206
207 let results = futures::future::join_all(futures).await;
208
209 for result in results {
211 assert!(result.is_ok());
212 let batch_files = result.unwrap();
213 assert_eq!(batch_files.len(), 10);
214 }
215
216 Ok(())
217 }
218
219 #[traced_test]
220 async fn test_gather_all_batch_files_duplicate_indices() -> Result<(),BatchWorkspaceError> {
221 let workspace = BatchWorkspace::new_temp().await?;
222 let workdir = workspace.workdir();
223
224 let input_path_1 = workdir.join("batch_input_4.jsonl");
226 let input_path_2 = workdir.join("batch_input_4_duplicate.jsonl");
227 fs::write(&input_path_1, "input data 1").await?;
228 fs::write(&input_path_2, "input data 2").await?;
229
230 let batch_files = workspace.gather_all_batch_triples().await?;
232
233 assert_eq!(batch_files.len(), 1);
235 assert_eq!(*batch_files[0].index(), BatchIndex::Usize(4));
236 assert!(batch_files[0].input().is_some());
237
238 Ok(())
239 }
240}