1crate::ix!();
3
4#[async_trait]
5impl<T> LocateBatchFiles for T
6where
7 T: BatchWorkspaceInterface + Send + Sync + 'static,
8{
9 type Error = BatchWorkspaceError;
10
11 async fn locate_batch_files(
12 self: Arc<Self>,
13 index: &BatchIndex
14 ) -> Result<Option<BatchFileTriple>, Self::Error> {
15 let core_str = match index {
19 BatchIndex::Usize(_) => r"\d+",
20 BatchIndex::Uuid(_) => r"[0-9A-Fa-f\-]{36}",
21 };
22
23 let pattern_str = format!(
24 "^batch_(?P<kind>input|output|error|metadata)_(?P<core>{core_str})(?P<suffix>.*)\\.jsonl$"
25 );
26
27 let pattern = Regex::new(&pattern_str)
28 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
29 trace!("locate_batch_files => using pattern: {}", pattern_str);
30
31 let mut input = None;
32 let mut output = None;
33 let mut error = None;
34 let mut metadata = None;
35
36 let mut entries = fs::read_dir(self.workdir()).await?;
37 while let Some(entry) = entries.next_entry().await? {
38 let path = entry.path();
39 if let Some(filename) = path.file_name().and_then(|s| s.to_str()) {
40 if let Some(caps) = pattern.captures(filename) {
41 debug!("locate_batch_files => matched filename: {}", filename);
42
43 let core_capture = &caps["core"];
45 let this_index = if let Ok(n) = core_capture.parse::<usize>() {
46 BatchIndex::Usize(n)
47 } else {
48 match BatchIndex::from_uuid_str(core_capture) {
49 Ok(u) => u,
50 Err(_) => {
51 trace!(
53 "Skipping filename='{}' because core='{}' is neither integer nor valid UUID",
54 filename,
55 core_capture
56 );
57 continue;
58 }
59 }
60 };
61
62 if this_index != *index {
65 trace!(
66 "Skipping filename='{}': the parsed index={:?} != requested={:?}",
67 filename,
68 this_index,
69 index
70 );
71 continue;
72 }
73
74 match &caps["kind"] {
76 "input" => {
77 if input.is_some() {
78 error!(
79 "Multiple input files found for index {:?} => old: {:?}, new: {:?}",
80 index,
81 input.as_ref().unwrap(),
82 path
83 );
84 return Err(io::Error::new(
85 io::ErrorKind::InvalidData,
86 "Multiple input files found"
87 ).into());
88 }
89 input = Some(path);
90 }
91 "output" => {
92 if output.is_some() {
93 error!(
94 "Multiple output files found for index {:?} => old: {:?}, new: {:?}",
95 index,
96 output.as_ref().unwrap(),
97 path
98 );
99 return Err(io::Error::new(
100 io::ErrorKind::InvalidData,
101 "Multiple output files found"
102 ).into());
103 }
104 output = Some(path);
105 }
106 "error" => {
107 if error.is_some() {
108 error!(
109 "Multiple error files found for index {:?} => old: {:?}, new: {:?}",
110 index,
111 error.as_ref().unwrap(),
112 path
113 );
114 return Err(io::Error::new(
115 io::ErrorKind::InvalidData,
116 "Multiple error files found"
117 ).into());
118 }
119 error = Some(path);
120 }
121 "metadata" => {
122 if metadata.is_some() {
123 error!(
124 "Multiple metadata files found for index {:?} => old: {:?}, new: {:?}",
125 index,
126 metadata.as_ref().unwrap(),
127 path
128 );
129 return Err(io::Error::new(
130 io::ErrorKind::InvalidData,
131 "Multiple metadata files found"
132 ).into());
133 }
134 metadata = Some(path);
135 }
136 unk => {
137 warn!("Ignoring unrecognized 'kind' capture='{}' in filename='{}'", unk, filename);
138 }
139 }
140 } else {
141 trace!("Filename '{}' did not match pattern => skipped", filename);
142 }
143 } else {
144 trace!("Skipping unreadable or non-UTF8 filename at path: {:?}", path);
145 }
146 }
147
148 if input.is_none() && output.is_none() && error.is_none() && metadata.is_none() {
150 debug!(
151 "No matching files found for index={:?} => returning None",
152 index
153 );
154 Ok(None)
155 } else {
156 debug!(
157 "Constructing BatchFileTriple => index={:?}, input={:?}, output={:?}, error={:?}, metadata={:?}",
158 index, input, output, error, metadata
159 );
160 Ok(Some(BatchFileTriple::new_direct(
161 index,
162 input,
163 output,
164 error,
165 metadata,
166 self.clone()
167 )))
168 }
169 }
170}
171
172#[cfg(test)]
173mod locate_batch_files_exhaustive_tests {
174 use super::*;
175
176 #[traced_test]
177 async fn test_locate_batch_files_usize() -> Result<(),BatchWorkspaceError> {
178
179 let workspace = BatchWorkspace::new_temp().await?;
180 let workdir = workspace.workdir();
181
182 fs::write(workdir.join("batch_input_4.jsonl"), b"test").await?;
183 fs::write(workdir.join("batch_output_4.jsonl"), b"test").await?;
184 fs::write(workdir.join("batch_error_4.jsonl"), b"test").await?;
185
186 let batch_files = workspace.clone().locate_batch_files(&BatchIndex::Usize(4)).await?.unwrap();
187 pretty_assert_eq!(*batch_files.input(), Some(workdir.join("batch_input_4.jsonl")));
188 pretty_assert_eq!(*batch_files.output(), Some(workdir.join("batch_output_4.jsonl")));
189 pretty_assert_eq!(*batch_files.error(), Some(workdir.join("batch_error_4.jsonl")));
190
191 Ok(())
192 }
193
194 #[traced_test]
195 async fn test_locate_batch_files_uuid() -> Result<(),BatchWorkspaceError> {
196 let workspace = BatchWorkspace::new_temp().await?;
197 let workdir = workspace.workdir();
198
199 let uuid = "550e8400-e29b-41d4-a716-446655440000";
200 fs::write(workdir.join(format!("batch_input_{}.jsonl", uuid)), b"test").await?;
201 fs::write(workdir.join(format!("batch_output_{}.jsonl", uuid)), b"test").await?;
202
203 let batch_files = workspace.clone().locate_batch_files(&BatchIndex::from_uuid_str(uuid)?).await?.unwrap();
204 pretty_assert_eq!(*batch_files.input(), Some(workdir.join(format!("batch_input_{}.jsonl", uuid))));
205 pretty_assert_eq!(*batch_files.output(), Some(workdir.join(format!("batch_output_{}.jsonl", uuid))));
206 pretty_assert_eq!(*batch_files.error(), None);
207
208 Ok(())
209 }
210
211 #[traced_test]
212 async fn test_locate_batch_files_no_files() -> Result<(),BatchWorkspaceError> {
213 let workspace = BatchWorkspace::new_temp().await?;
214
215 let batch_files = workspace.locate_batch_files(&BatchIndex::Usize(4)).await?;
216 assert!(batch_files.is_none());
217
218 Ok(())
219 }
220
221 #[traced_test]
223 async fn returns_none_when_no_files_present_for_index() {
224 info!("Starting test: returns_none_when_no_files_present_for_index");
225 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
226 let index = BatchIndex::Usize(9999);
227
228 debug!("Invoking locate_batch_files with empty workspace and index=9999");
229 let result = workspace.clone().locate_batch_files(&index).await;
230 debug!("Result: {:?}", result);
231
232 assert!(result.is_ok(), "Should not error out if no files found");
233 let triple_option = result.unwrap();
234 assert!(triple_option.is_none(), "No files => we expect None");
235 info!("Finished test: returns_none_when_no_files_present_for_index");
236 }
237
238 #[traced_test]
240 async fn locates_single_input_file() {
241 info!("Starting test: locates_single_input_file");
242 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
243 let index = BatchIndex::Usize(10);
244 let filename = format!("batch_input_{}.jsonl", 10);
245
246 let path = workspace.workdir().join(&filename);
247 let content = b"some content for input";
248 fs::write(&path, content).await.expect("Failed to write input file");
249
250 let result = workspace.clone().locate_batch_files(&index).await;
251 debug!("Result: {:?}", result);
252 assert!(result.is_ok(), "Locating single input file should succeed");
253
254 let triple_option = result.unwrap();
255 assert!(triple_option.is_some(), "Expected to find a triple with the input file");
256 let triple = triple_option.unwrap();
257 pretty_assert_eq!(*triple.index(), index, "Index should match");
258 pretty_assert_eq!(*triple.input(), Some(path.clone()));
259 assert!(triple.output().is_none(), "No output file");
260 assert!(triple.error().is_none(), "No error file");
261 assert!(triple.associated_metadata().is_none(), "No metadata file");
262
263 info!("Finished test: locates_single_input_file");
264 }
265
266 #[traced_test]
268 async fn locates_single_output_file() {
269 info!("Starting test: locates_single_output_file");
270 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
271 let index = BatchIndex::Usize(11);
272 let filename = format!("batch_output_{}.jsonl", 11);
273
274 let path = workspace.workdir().join(&filename);
275 let content = b"some output data";
276 fs::write(&path, content).await.expect("Failed to write output file");
277
278 let result = workspace.clone().locate_batch_files(&index).await;
279 debug!("Result: {:?}", result);
280 assert!(result.is_ok());
281
282 let triple_option = result.unwrap();
283 assert!(triple_option.is_some(), "Should find a triple with the output file only");
284 let triple = triple_option.unwrap();
285 pretty_assert_eq!(*triple.index(), index);
286 assert!(triple.input().is_none());
287 pretty_assert_eq!(*triple.output(), Some(path.clone()));
288 assert!(triple.error().is_none());
289 assert!(triple.associated_metadata().is_none());
290
291 info!("Finished test: locates_single_output_file");
292 }
293
294 #[traced_test]
296 async fn locates_single_error_file() {
297 info!("Starting test: locates_single_error_file");
298 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
299 let index = BatchIndex::Usize(12);
300 let filename = format!("batch_error_{}.jsonl", 12);
301
302 let path = workspace.workdir().join(&filename);
303 fs::write(&path, b"some error data").await.expect("Failed to write error file");
304
305 let result = workspace.clone().locate_batch_files(&index).await;
306 debug!("Result: {:?}", result);
307
308 assert!(result.is_ok());
309 let triple_option = result.unwrap();
310 assert!(triple_option.is_some());
311 let triple = triple_option.unwrap();
312 pretty_assert_eq!(*triple.index(), index);
313 assert!(triple.input().is_none());
314 assert!(triple.output().is_none());
315 pretty_assert_eq!(*triple.error(), Some(path.clone()));
316 assert!(triple.associated_metadata().is_none());
317
318 info!("Finished test: locates_single_error_file");
319 }
320
321 #[traced_test]
323 async fn locates_single_metadata_file() {
324 info!("Starting test: locates_single_metadata_file");
325 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
326 let index = BatchIndex::Usize(13);
327 let filename = format!("batch_metadata_{}.jsonl", 13);
328
329 let path = workspace.workdir().join(&filename);
330 fs::write(&path, b"some metadata info").await.expect("Failed to write metadata file");
331
332 let result = workspace.clone().locate_batch_files(&index).await;
333 debug!("Result: {:?}", result);
334
335 assert!(result.is_ok());
336 let triple_option = result.unwrap();
337 assert!(triple_option.is_some());
338 let triple = triple_option.unwrap();
339 pretty_assert_eq!(*triple.index(), index);
340 assert!(triple.input().is_none());
341 assert!(triple.output().is_none());
342 assert!(triple.error().is_none());
343 pretty_assert_eq!(*triple.associated_metadata(), Some(path.clone()));
344
345 info!("Finished test: locates_single_metadata_file");
346 }
347
348 #[traced_test]
350 async fn finds_partial_set_of_files() {
351 info!("Starting test: finds_partial_set_of_files");
352 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
353 let index = BatchIndex::Usize(30);
354
355 let input_path = workspace.workdir().join(format!("batch_input_{}.jsonl", 30));
356 let output_path = workspace.workdir().join(format!("batch_output_{}.jsonl", 30));
357 fs::write(&input_path, b"input data").await.expect("Failed to write input file");
358 fs::write(&output_path, b"output data").await.expect("Failed to write output file");
359
360 let result = workspace.clone().locate_batch_files(&index).await;
361 assert!(result.is_ok(), "Should succeed with partial set of files");
362 let triple_option = result.unwrap();
363 assert!(triple_option.is_some(), "Expect Some(...)");
364 let triple = triple_option.unwrap();
365 pretty_assert_eq!(*triple.index(), index);
366 pretty_assert_eq!(*triple.input(), Some(input_path));
367 pretty_assert_eq!(*triple.output(), Some(output_path));
368 assert!(triple.error().is_none());
369 assert!(triple.associated_metadata().is_none());
370
371 info!("Finished test: finds_partial_set_of_files");
372 }
373
374 #[traced_test]
376 async fn ignores_unrecognized_filenames() {
377 info!("Starting test: ignores_unrecognized_filenames");
378 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
379 let index = BatchIndex::Usize(44);
380
381 let path = workspace.workdir().join("batch_foo_44.jsonl");
384 fs::write(&path, b"unknown type").await.expect("Failed to write unknown file");
385
386 let valid_input = workspace.workdir().join("batch_input_44.jsonl");
388 fs::write(&valid_input, b"some input").await.expect("Failed to write input file");
389
390 let result = workspace.clone().locate_batch_files(&index).await;
391 debug!("Result: {:?}", result);
392
393 assert!(result.is_ok());
395 let triple_option = result.unwrap();
396 assert!(triple_option.is_some());
397 let triple = triple_option.unwrap();
398 pretty_assert_eq!(*triple.index(), index);
399 pretty_assert_eq!(*triple.input(), Some(valid_input));
400 assert!(triple.output().is_none());
401 assert!(triple.error().is_none());
402 assert!(triple.associated_metadata().is_none());
403 info!("Finished test: ignores_unrecognized_filenames");
404 }
405
406 #[traced_test]
408 async fn locates_uuid_based_index_files() {
409 info!("Starting test: locates_uuid_based_index_files");
410 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
411 let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
412 let index = BatchIndex::from_uuid_str(uuid_str).expect("Failed to create batch index from uuid");
413
414 let file_name = format!("batch_output_{}.jsonl", uuid_str);
415 let path = workspace.workdir().join(&file_name);
416 fs::write(&path, b"uuid output data").await.expect("Failed to write uuid-based file");
417
418 let result = workspace.clone().locate_batch_files(&index).await;
419 debug!("Result: {:?}", result);
420
421 assert!(result.is_ok());
422 let triple_option = result.unwrap();
423 assert!(triple_option.is_some());
424 let triple = triple_option.unwrap();
425 pretty_assert_eq!(*triple.index(), index);
426 assert!(triple.input().is_none());
427 pretty_assert_eq!(*triple.output(), Some(path.clone()));
428 assert!(triple.error().is_none());
429 assert!(triple.associated_metadata().is_none());
430
431 info!("Finished test: locates_uuid_based_index_files");
432 }
433
434 #[traced_test]
436 async fn concurrent_locate_batch_files() {
437 info!("Starting test: concurrent_locate_batch_files");
438 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
439 let index = BatchIndex::Usize(88);
440
441 let input_name = format!("batch_input_{}.jsonl", 88);
443 let input_path = workspace.workdir().join(&input_name);
444 fs::write(&input_path, b"concurrent test input data").await.expect("Failed to write input file");
445
446 let arc_ws = workspace.clone();
448 let mut tasks = Vec::new();
449 for i in 0..5 {
450 let ws_clone = arc_ws.clone();
451 let index_clone = index.clone();
452 tasks.push(tokio::spawn(async move {
453 trace!("Task {} locating files for index=88", i);
454 ws_clone.locate_batch_files(&index_clone).await
455 }));
456 }
457
458 let results = futures::future::join_all(tasks).await;
459 for (i, res) in results.into_iter().enumerate() {
460 match res {
461 Ok(Ok(Some(triple))) => {
462 debug!("Task {} => triple found with input: {:?}", i, triple.input());
463 pretty_assert_eq!(*triple.index(), index, "Index must match");
464 }
465 Ok(Ok(None)) => panic!("Task {} => unexpected None, we have an input file!", i),
466 Ok(Err(e)) => panic!("Task {} => unexpected error: {:?}", i, e),
467 Err(e) => panic!("Task {} => join error: {:?}", i, e),
468 }
469 }
470
471 info!("Finished test: concurrent_locate_batch_files");
472 }
473
474 #[cfg(all(unix, not(target_os = "macos")))]
475 #[traced_test]
476 async fn gracefully_skips_non_utf8_filenames() {
477 info!("Starting test: gracefully_skips_non_utf8_filenames");
478 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
479 let wd = workspace.workdir();
480
481 use std::os::unix::ffi::OsStrExt;
484 let invalid_name = std::ffi::OsStr::from_bytes(b"batch_input_31\xFF.jsonl");
485 let path = wd.join(invalid_name);
486 let _ = std::fs::File::create(&path)
487 .expect("Failed to create non-UTF8 file on non-macOS Unix");
488
489 let valid_file = wd.join("batch_input_31.jsonl");
491 fs::write(&valid_file, b"input data").await.expect("Failed to write valid input file");
492
493 let result = workspace.clone().locate_batch_files(&BatchIndex::Usize(31)).await;
494 debug!("Result: {:?}", result);
495
496 assert!(result.is_ok(), "Should succeed, ignoring the non-UTF8 named file if any");
498 let triple_option = result.unwrap();
499 assert!(triple_option.is_some());
500 let triple = triple_option.unwrap();
501 pretty_assert_eq!(*triple.index(), BatchIndex::Usize(31));
502 pretty_assert_eq!(*triple.input(), Some(valid_file));
503
504 info!("Finished test: gracefully_skips_non_utf8_filenames");
505 }
506
507 #[traced_test]
508 async fn test_locate_batch_files_ignores_invalid_files() -> Result<(),BatchWorkspaceError> {
509 let workspace = BatchWorkspace::new_temp().await?;
510 let workdir = workspace.workdir();
511
512 fs::write(workdir.join("batch_input_4.jsonl"), b"test").await?;
514 fs::write(workdir.join("batch_inp_4_duplicate.jsonl"), b"test").await?;
516
517 let result = workspace.clone().locate_batch_files(&BatchIndex::Usize(4)).await?;
518 assert!(result.is_some(), "Expected to find the valid batch input file");
519
520 let batch_files = result.unwrap();
521 pretty_assert_eq!(*batch_files.input(), Some(workdir.join("batch_input_4.jsonl")));
522 assert!(batch_files.output().is_none());
523 assert!(batch_files.error().is_none());
524
525 Ok(())
526 }
527
528 #[traced_test]
530 async fn fails_if_multiple_input_files_found() {
531 info!("Starting revised test: fails_if_multiple_input_files_found");
532
533 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
534 let index = BatchIndex::Usize(20);
535
536 let valid_path = workspace.workdir().join("batch_input_20.jsonl");
538 fs::write(&valid_path, b"first input").await.expect("Failed to write first input file");
539
540 let extra_path = workspace.workdir().join("batch_inp_20_extra.jsonl");
542 fs::write(&extra_path, b"second input").await.expect("Failed to write second input file");
543
544 debug!("Invoking locate_batch_files for index=20");
545 let result = workspace.clone().locate_batch_files(&index).await;
546 debug!("Result: {:?}", result);
547
548 assert!(result.is_ok(), "Should succeed (the 'extra' file is ignored).");
550 let triple_opt = result.unwrap();
551 assert!(triple_opt.is_some());
552 let triple = triple_opt.unwrap();
553 pretty_assert_eq!(*triple.index(), index);
554 pretty_assert_eq!(*triple.input(), Some(valid_path.clone()));
555 assert!(triple.output().is_none());
556 assert!(triple.error().is_none());
557
558 info!("Finished revised test: fails_if_multiple_input_files_found => no error for extra file");
559 }
560
561 #[traced_test]
563 async fn fails_if_multiple_output_files_found() {
564 info!("Starting revised test: fails_if_multiple_output_files_found");
565
566 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
567 let index = BatchIndex::Usize(21);
568
569 let file1 = workspace.workdir().join("batch_output_21.jsonl");
571 fs::write(&file1, b"output file #1").await.expect("Failed to write output file #1");
572
573 let file2 = workspace.workdir().join("batch_out_21_extra.jsonl");
575 fs::write(&file2, b"output file #2").await.expect("Failed to write output file #2");
576
577 debug!("Invoking locate_batch_files for index=21");
578 let result = workspace.clone().locate_batch_files(&index).await;
579 debug!("Result: {:?}", result);
580
581 assert!(result.is_ok());
583 let triple_opt = result.unwrap();
584 assert!(triple_opt.is_some());
585 let triple = triple_opt.unwrap();
586 pretty_assert_eq!(*triple.index(), index);
587 pretty_assert_eq!(*triple.output(), Some(file1.clone()));
588 assert!(triple.input().is_none());
589 assert!(triple.error().is_none());
590
591 info!("Finished revised test: fails_if_multiple_output_files_found => no error for extra file");
592 }
593
594 #[traced_test]
596 async fn fails_if_multiple_error_files_found() {
597 info!("Starting revised test: fails_if_multiple_error_files_found");
598
599 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
600 let index = BatchIndex::Usize(22);
601
602 let err1 = workspace.workdir().join("batch_error_22.jsonl");
603 fs::write(&err1, b"error file #1").await.expect("Failed to write error file #1");
604
605 let err2 = workspace.workdir().join("batch_err_22_extra.jsonl");
607 fs::write(&err2, b"error file #2").await.expect("Failed to write error file #2");
608
609 debug!("Invoking locate_batch_files for index=22");
610 let result = workspace.clone().locate_batch_files(&index).await;
611 debug!("Result: {:?}", result);
612
613 assert!(result.is_ok());
615 let triple_opt = result.unwrap();
616 assert!(triple_opt.is_some());
617 let triple = triple_opt.unwrap();
618 pretty_assert_eq!(*triple.index(), index);
619 pretty_assert_eq!(*triple.error(), Some(err1.clone()));
620 assert!(triple.input().is_none());
621 assert!(triple.output().is_none());
622
623 info!("Finished revised test: fails_if_multiple_error_files_found => no error for extra file");
624 }
625
626 #[traced_test]
628 async fn fails_if_multiple_metadata_files_found() {
629 info!("Starting revised test: fails_if_multiple_metadata_files_found");
630
631 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
632 let index = BatchIndex::Usize(23);
633
634 let path_valid = workspace.workdir().join("batch_metadata_23.jsonl");
636 fs::write(&path_valid, b"metadata #1").await.expect("Failed to write metadata file #1");
637
638 let path_extra = workspace.workdir().join("batch_meta_23_extra.jsonl");
640 fs::write(&path_extra, b"metadata #2").await.expect("Failed to write metadata file #2");
641
642 debug!("Invoking locate_batch_files for index=23");
643 let result = workspace.clone().locate_batch_files(&index).await;
644 debug!("Result: {:?}", result);
645
646 assert!(result.is_ok(), "Should succeed (the 'extra' file is ignored).");
648 let triple_opt = result.unwrap();
649 assert!(triple_opt.is_some(), "We expect at least the valid file to be recognized.");
650 let triple = triple_opt.unwrap();
651 pretty_assert_eq!(*triple.index(), index);
652 pretty_assert_eq!(*triple.associated_metadata(), Some(path_valid.clone()));
653
654 info!("Finished revised test: fails_if_multiple_metadata_files_found => no error for extra file");
655 }
656}