batch_mode_batch_workspace/
gather_all_batch_files.rs1crate::ix!();
3
4#[async_trait]
5impl<T> GatherAllBatchTriples for T
6where
7 for<'async_trait> T: LocateBatchFiles + FindExistingBatchFileIndices + Send + Sync + 'async_trait,
8 BatchWorkspaceError: From<<T as LocateBatchFiles>::Error>,
9 BatchWorkspaceError: From<<T as FindExistingBatchFileIndices>::Error>,
10{
11 type Error = BatchWorkspaceError;
12 async fn gather_all_batch_triples(
13 self: Arc<Self>,
14 ) -> Result<Vec<BatchFileTriple>, Self::Error>
15 {
16 trace!("gathering all batch triples across known indices");
17
18 let indices = self.clone().find_existing_batch_file_indices().await?;
20 debug!("found batch indices: {:?}", indices);
21
22 let mut batch_files = Vec::new();
23
24 for index in indices {
25 if let Some(batch) = self.clone().locate_batch_files(&index).await? {
26 trace!("found a triple for index {:?}", index);
27 batch_files.push(batch);
28 }
29 }
30
31 batch_files.sort();
32 info!("final list of batch file triples: {:?}", batch_files);
33
34 Ok(batch_files)
35 }
36}
37
38#[cfg(test)]
39mod gather_all_batch_triples_exhaustive_tests {
40 use super::*;
41
42 #[traced_test]
43 async fn test_gather_all_batch_files_all_present() -> Result<(), BatchWorkspaceError> {
44 let workspace = BatchWorkspace::new_temp().await?;
45 let workdir = workspace.workdir();
46
47 println!("BatchWorkspace directory: {:?}", workdir);
48
49 let indices = vec![1, 2, 3];
51 for index in &indices {
52 let input_path = workdir.join(format!("batch_input_{}.jsonl", index));
53 let output_path = workdir.join(format!("batch_output_{}.jsonl", index));
54 let error_path = workdir.join(format!("batch_error_{}.jsonl", index));
55
56 fs::write(&input_path, "input data").await?;
57 fs::write(&output_path, "output data").await?;
58 fs::write(&error_path, "error data").await?;
59
60 fs::metadata(&input_path).await?;
62 fs::metadata(&output_path).await?;
63 fs::metadata(&error_path).await?;
64 }
65
66 let batch_files = workspace.gather_all_batch_triples().await?;
68
69 pretty_assert_eq!(batch_files.len(), indices.len());
71
72 for (i, batch) in batch_files.iter().enumerate() {
74 pretty_assert_eq!(*batch.index(), BatchIndex::Usize(indices[i]));
75 assert!(batch.input().is_some());
76 assert!(batch.output().is_some());
77 assert!(batch.error().is_some());
78 }
79
80 Ok(())
81 }
82
83 #[traced_test]
84 async fn test_gather_all_batch_files_partial_files() -> Result<(),BatchWorkspaceError> {
85 let workspace = BatchWorkspace::new_temp().await?;
86 let workdir = workspace.workdir();
87
88 let input_only_path = workdir.join("batch_input_1.jsonl");
90 fs::write(&input_only_path, "input data").await?;
91
92 let input_output_path_1 = workdir.join("batch_input_2.jsonl");
93 let input_output_path_2 = workdir.join("batch_output_2.jsonl");
94 fs::write(&input_output_path_1, "input data").await?;
95 fs::write(&input_output_path_2, "output data").await?;
96
97 let batch_files = workspace.gather_all_batch_triples().await?;
99
100 pretty_assert_eq!(batch_files.len(), 2);
102
103 for batch in batch_files {
104 match batch.index() {
105 BatchIndex::Usize(1) => {
106 assert!(batch.input().is_some());
107 assert!(batch.output().is_none());
108 assert!(batch.error().is_none());
109 },
110 BatchIndex::Usize(2) => {
111 assert!(batch.input().is_some());
112 assert!(batch.output().is_some());
113 assert!(batch.error().is_none());
114 },
115 _ => panic!("Unexpected batch index"),
116 }
117 }
118
119 Ok(())
120 }
121
122 #[traced_test]
123 async fn test_gather_all_batch_files_none_present() -> Result<(),BatchWorkspaceError> {
124 let workspace = BatchWorkspace::new_temp().await?;
125
126 let batch_files = workspace.gather_all_batch_triples().await?;
128
129 assert!(batch_files.is_empty());
131
132 Ok(())
133 }
134
135 #[traced_test]
136 async fn test_gather_all_batch_files_non_existent_directory()
137 -> Result<(), BatchWorkspaceError>
138 {
139 use tempfile::tempdir;
140 use std::fs::Permissions;
141 use std::os::unix::fs::PermissionsExt;
142 use tokio::fs;
143
144 let temp_dir = tempdir().map_err(BatchWorkspaceError::IoError)?;
146
147 let permissions = Permissions::from_mode(0o555); fs::set_permissions(temp_dir.path(), permissions).await.map_err(BatchWorkspaceError::IoError)?;
150
151 let path = temp_dir.path().join("subdir");
153 let result = BatchWorkspace::new_in(&path).await;
154
155 assert!(result.is_err());
157
158 if let Err(BatchWorkspaceError::IoError(ref e)) = result {
160 pretty_assert_eq!(e.kind(), std::io::ErrorKind::PermissionDenied);
161 } else {
162 panic!("Expected an IoError due to permission denied");
163 }
164
165 Ok(())
166 }
167
168 #[traced_test]
169 async fn test_gather_all_batch_files_malformed_files() -> Result<(),BatchWorkspaceError> {
170 let workspace = BatchWorkspace::new_temp().await?;
171 let workdir = workspace.workdir();
172
173 let malformed_file_1 = workdir.join("malformed_file.jsonl");
175 let malformed_file_2 = workdir.join("batch_x_input.jsonl");
176 fs::write(&malformed_file_1, "some data").await?;
177 fs::write(&malformed_file_2, "some data").await?;
178
179 let valid_input_path = workdir.join("batch_input_3.jsonl");
181 fs::write(&valid_input_path, "input data").await?;
182
183 let batch_files = workspace.gather_all_batch_triples().await?;
185
186 pretty_assert_eq!(batch_files.len(), 1);
188 pretty_assert_eq!(*batch_files[0].index(), BatchIndex::Usize(3));
189 assert!(batch_files[0].input().is_some());
190
191 Ok(())
192 }
193
194 #[traced_test]
195 async fn test_gather_all_batch_files_concurrency() -> Result<(),BatchWorkspaceError> {
196 let workspace = BatchWorkspace::new_temp().await?;
197 let workdir = workspace.workdir();
198
199 for index in 1..=10 {
201 let input_path = workdir.join(format!("batch_input_{}.jsonl", index));
202 fs::write(&input_path, "input data").await?;
203 }
204
205 let futures = vec![
207 workspace.clone().gather_all_batch_triples(),
208 workspace.clone().gather_all_batch_triples(),
209 workspace.clone().gather_all_batch_triples(),
210 ];
211
212 let results = futures::future::join_all(futures).await;
213
214 for result in results {
216 assert!(result.is_ok());
217 let batch_files = result.unwrap();
218 pretty_assert_eq!(batch_files.len(), 10);
219 }
220
221 Ok(())
222 }
223
224 #[traced_test]
225 async fn returns_empty_when_no_files_found() {
226 info!("Starting test: returns_empty_when_no_files_found");
227 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
229
230 let triples = workspace.clone().gather_all_batch_triples().await.expect("Should succeed");
232 debug!("Resulting triples: {:?}", triples);
233 assert!(triples.is_empty(), "Expected an empty list of batch file triples");
234
235 info!("Finished test: returns_empty_when_no_files_found");
236 }
237
238 #[traced_test]
239 async fn returns_all_valid_indices_with_single_file_each() {
240 info!("Starting test: returns_all_valid_indices_with_single_file_each");
241 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
242 let workdir = workspace.workdir();
243
244 let indices = [1, 2, 3];
246 for idx in &indices {
247 let filename = format!("batch_input_{}.jsonl", idx);
248 fs::write(workdir.join(&filename), format!("input file for index {}", idx))
249 .await
250 .expect("Failed to write input file");
251 }
252
253 let triples = workspace
255 .clone()
256 .gather_all_batch_triples()
257 .await
258 .expect("Should succeed in reading indices and locating batch files");
259 debug!("Gathered triples: {:?}", triples);
260
261 pretty_assert_eq!(triples.len(), indices.len());
262 for triple in &triples {
263 if let BatchIndex::Usize(u) = triple.index() {
264 assert!(
265 indices.contains(u),
266 "Found unexpected index: {} in gathered list",
267 u
268 );
269 } else {
270 panic!("Expected only Usize indices, got something else");
271 }
272 }
273
274 info!("Finished test: returns_all_valid_indices_with_single_file_each");
275 }
276
277 #[traced_test]
278 async fn includes_partial_sets_of_files() {
279 info!("Starting test: includes_partial_sets_of_files");
280 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
281 let wd = workspace.workdir();
282
283 let combos = vec![
289 (10, vec!["input"]),
290 (11, vec!["input", "output"]),
291 (12, vec!["input", "output", "error"]),
292 ];
293
294 for (idx, types) in combos {
295 for t in types {
296 let filename = format!("batch_{}_{}.jsonl", t, idx);
297 fs::write(wd.join(filename), b"test content").await.unwrap();
298 }
299 }
300
301 let all = workspace
302 .clone()
303 .gather_all_batch_triples()
304 .await
305 .expect("Should succeed scanning partial sets of files");
306
307 debug!("Result => Found {} triples: {:?}", all.len(), all);
308 pretty_assert_eq!(
309 all.len(),
310 3,
311 "Should find exactly 3 distinct batch triples for indices 10,11,12"
312 );
313
314 let found_indices: Vec<_> = all.iter().map(|b| b.index().clone()).collect();
316 let mut found_usizes = Vec::new();
317 for idx in found_indices {
318 if let BatchIndex::Usize(u) = idx {
319 found_usizes.push(u);
320 } else {
321 panic!("Expected only Usize indices for this test");
322 }
323 }
324 found_usizes.sort();
325 pretty_assert_eq!(found_usizes, vec![10, 11, 12]);
326
327 info!("Finished test: includes_partial_sets_of_files");
328 }
329
330 #[traced_test]
331 async fn ignores_invalid_filenames_while_still_including_valid_ones() {
332 info!("Starting test: ignores_invalid_filenames_while_still_including_valid_ones");
333 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
334 let wd = workspace.workdir();
335
336 fs::write(wd.join("batch_input_42.jsonl"), b"input data for 42").await.unwrap();
338 fs::write(wd.join("batch_error_42.jsonl"), b"error data for 42").await.unwrap();
339
340 fs::write(wd.join("batch_foo_42.jsonl"), b"nonsense").await.unwrap();
342 fs::write(wd.join("batch_42.jsonl"), b"missing type").await.unwrap();
343 fs::write(wd.join("foo_batch_input_42.jsonl"), b"wrong prefix").await.unwrap();
344
345 fs::write(wd.join("random_notes.txt"), b"some random text").await.unwrap();
347
348 let all = workspace
349 .clone()
350 .gather_all_batch_triples()
351 .await
352 .expect("Should succeed ignoring invalid files");
353
354 debug!("gather_all_batch_triples => {:?}", all);
355 pretty_assert_eq!(
356 all.len(),
357 1,
358 "We have only 1 valid index (42) with recognized file types"
359 );
360
361 let triple = &all[0];
362 pretty_assert_eq!(*triple.index(), BatchIndex::Usize(42));
363 assert!(triple.input().is_some());
364 assert!(triple.error().is_some());
365 assert!(triple.output().is_none());
366 assert!(triple.associated_metadata().is_none());
367
368 info!("Finished test: ignores_invalid_filenames_while_still_including_valid_ones");
369 }
370
371 #[traced_test]
372 async fn indexes_are_sorted_in_final_output() {
373 info!("Starting test: indexes_are_sorted_in_final_output");
374 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
375 let wd = workspace.workdir();
376
377 for i in [3,1,2] {
379 fs::write(
380 wd.join(format!("batch_input_{}.jsonl", i)),
381 format!("batch input for index {}", i)
382 ).await.unwrap();
383 }
384
385 let all = workspace
387 .clone()
388 .gather_all_batch_triples()
389 .await
390 .expect("Should succeed scanning out-of-order indices");
391
392 debug!("Resulting list => {:?}", all);
393 pretty_assert_eq!(all.len(), 3, "We created exactly 3 indices");
395 let mut last = 0;
396 for triple in &all {
397 if let BatchIndex::Usize(u) = triple.index() {
398 assert!(*u > last, "Indices not sorted properly");
399 last = *u;
400 } else {
401 panic!("Expected only Usize indices for this test");
402 }
403 }
404 info!("Finished test: indexes_are_sorted_in_final_output");
405 }
406
407 #[traced_test]
408 async fn concurrency_test_across_multiple_indices() {
409 info!("Starting test: concurrency_test_across_multiple_indices");
410 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
411 let wd = workspace.workdir();
412
413 let indices = [5,6,7,8,9];
415 for i in &indices {
416 let name = format!("batch_input_{}.jsonl", i);
417 fs::write(wd.join(name), b"concurrency test data").await.unwrap();
418 }
419
420 let arc_ws = workspace.clone();
422 let mut tasks = Vec::new();
423 for i in 0..5 {
424 let ws_clone = arc_ws.clone();
425 tasks.push(tokio::spawn(async move {
426 debug!("Task #{} gathering all batch triples now", i);
427 ws_clone.gather_all_batch_triples().await
428 }));
429 }
430
431 let results = futures::future::join_all(tasks).await;
432 for (i, res) in results.into_iter().enumerate() {
433 match res {
434 Ok(Ok(triples)) => {
435 debug!("Task {} => gathered {} triples", i, triples.len());
436 pretty_assert_eq!(triples.len(), indices.len(), "We expect exactly 5 indices");
437 }
438 Ok(Err(e)) => panic!("Task {} => unexpected error: {:?}", i, e),
439 Err(e) => panic!("Task {} => join error: {:?}", i, e),
440 }
441 }
442 info!("Finished test: concurrency_test_across_multiple_indices");
443 }
444
445 #[traced_test]
446 async fn gracefully_handles_errors_from_find_existing_batch_file_indices() {
447 info!("Starting test: gracefully_handles_errors_from_find_existing_batch_file_indices");
448 let tmp = tempdir().expect("Failed to create base tempdir");
452 let dir_path = tmp.path().join("inaccessible");
453 std::fs::create_dir_all(&dir_path).expect("Failed to create test subdir");
454
455 #[cfg(unix)]
457 {
458 use std::os::unix::fs::PermissionsExt;
459 let mut perms = std::fs::metadata(&dir_path).unwrap().permissions();
460 perms.set_mode(0o000);
461 std::fs::set_permissions(&dir_path, perms).unwrap();
462 }
463
464 let workspace_res = BatchWorkspace::new_in(&dir_path).await;
466 match workspace_res {
469 Ok(ws) => {
470 let r = ws.clone().gather_all_batch_triples().await;
472 debug!("Result from gather_all_batch_triples in read-only directory: {:?}", r);
473 assert!(r.is_err(), "We expect an error from reading an inaccessible directory");
474 }
475 Err(e) => {
476 warn!("new_in() failed as expected: {:?}", e);
478 }
479 }
480
481 info!("Finished test: gracefully_handles_errors_from_find_existing_batch_file_indices");
482 }
483
484 #[traced_test]
485 async fn handles_mixed_usize_and_uuid_indices() {
486 info!("Starting test: handles_mixed_usize_and_uuid_indices");
487 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
488 let wd = workspace.workdir();
489
490 let raw_uuid = uuid::Uuid::parse_str("f47ac10b-58cc-4372-a567-0e02b2c3d479")
493 .expect("Invalid UUID in test data");
494 let idx_usize = 100;
495 let idx_uuid = BatchIndex::Uuid(raw_uuid);
496
497 fs::write(wd.join(format!("batch_input_{}.jsonl", idx_usize)), b"usize input").await.unwrap();
499 fs::write(wd.join(format!("batch_output_{}.jsonl", raw_uuid)), b"uuid output").await.unwrap();
500
501 let all = workspace
503 .clone()
504 .gather_all_batch_triples()
505 .await
506 .expect("Should succeed gathering mixed-type indices");
507
508 debug!("found {} batch file triple(s): {:?}", all.len(), all);
509 pretty_assert_eq!(all.len(), 2, "We have 2 distinct indices, one usize, one uuid");
510
511 let mut found_usize = false;
513 let mut found_uuid = false;
514 for triple in &all {
515 match triple.index() {
516 BatchIndex::Usize(u) if *u == idx_usize => {
517 found_usize = true;
518 assert!(triple.input().is_some());
519 assert!(triple.output().is_none());
520 assert!(triple.error().is_none());
521 }
522 BatchIndex::Uuid(u) if *u == raw_uuid => {
523 found_uuid = true;
524 assert!(triple.output().is_some());
525 assert!(triple.input().is_none());
526 assert!(triple.error().is_none());
527 }
528 other => panic!("Unexpected index in the gathered results: {:?}", other),
529 }
530 }
531
532 assert!(found_usize, "Did not find the expected usize index triple");
533 assert!(found_uuid, "Did not find the expected UUID index triple");
534 info!("Finished test: handles_mixed_usize_and_uuid_indices");
535 }
536
537 #[traced_test]
538 async fn test_gather_all_batch_files_duplicate_indices() -> Result<(),BatchWorkspaceError> {
539 info!("Starting test: test_gather_all_batch_files_duplicate_indices");
540 let workspace = BatchWorkspace::new_temp().await?;
541 let workdir = workspace.workdir();
542
543 let input_path_1 = workdir.join("batch_input_4.jsonl");
548 let input_path_2 = workdir.join("batch_inp_4_duplicate.jsonl"); fs::write(&input_path_1, "input data 1").await?;
550 fs::write(&input_path_2, "input data 2").await?;
551
552 let batch_files = workspace.gather_all_batch_triples().await?;
554
555 pretty_assert_eq!(batch_files.len(), 1);
557 pretty_assert_eq!(*batch_files[0].index(), BatchIndex::Usize(4));
558 assert!(batch_files[0].input().is_some());
559
560 Ok(())
561 }
562}