batch_mode_batch_workspace/
locate_batch_files.rs1crate::ix!();
3
4impl BatchWorkspace {
5
6 pub async fn locate_batch_files(
9 self: &Arc<Self>,
10 index: &BatchIndex
11
12 ) -> Result<Option<BatchFileTriple>,BatchWorkspaceError> {
13
14 let file_pattern = index.file_pattern();
16
17 let mut input = None;
18 let mut output = None;
19 let mut error = None;
20 let mut associated_metadata = None;
21
22 let mut entries = fs::read_dir(self.workdir()).await?;
23
24 while let Some(entry) = entries.next_entry().await? {
25
26 let path = entry.path();
27
28 let filename = match path.file_name().and_then(|name| name.to_str()) {
30 Some(name) => name,
31 None => {
32 continue; }
34 };
35
36 let captures = match file_pattern.captures(filename) {
38 Some(captures) => captures,
39 None => {
40 continue; }
43 };
44
45 let file_type = captures.get(1).map(|m| m.as_str());
47
48 match file_type {
49 Some("input") => {
50 if input.is_some() {
51 return Err(io::Error::new(io::ErrorKind::InvalidData, "Multiple input files found").into());
52 }
53 debug!("Found input file: {:?}", path);
54 input = Some(path);
55 }
56 Some("output") => {
57 if output.is_some() {
58 return Err(io::Error::new(io::ErrorKind::InvalidData, "Multiple output files found").into());
59 }
60 debug!("Found output file: {:?}", path);
61 output = Some(path);
62 }
63 Some("error") => {
64 if error.is_some() {
65 return Err(io::Error::new(io::ErrorKind::InvalidData, "Multiple error files found").into());
66 }
67 debug!("Found error file: {:?}", path);
68 error = Some(path);
69 }
70 Some("metadata") => {
71 if associated_metadata.is_some() {
72 return Err(io::Error::new(io::ErrorKind::InvalidData, "Multiple associated_metadata files found").into());
73 }
74 debug!("Found associated_metadata file: {:?}", path);
75 associated_metadata = Some(path);
76 }
77
78 _ => {
79 continue;
80 }
81 }
82 }
83
84 if input.is_none() && output.is_none() && error.is_none() && associated_metadata.is_none() {
85 debug!("No batch files were found in directory: {:?}", &self.workdir());
86 Ok(None)
87 } else {
88 debug!(
89 "Batch files located - Input: {:?}, Output: {:?}, Error: {:?}, AssociatedMetadata: {:?}",
90 input, output, error, associated_metadata
91 );
92 Ok(Some(BatchFileTriple::new_direct(index, input, output, error, associated_metadata, self.clone())))
93 }
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100
101 #[traced_test]
102 async fn test_locate_batch_files_usize() -> Result<(),BatchWorkspaceError> {
103
104 let workspace = BatchWorkspace::new_temp().await?;
105 let workdir = workspace.workdir();
106
107 fs::write(workdir.join("batch_input_4.jsonl"), b"test").await?;
108 fs::write(workdir.join("batch_output_4.jsonl"), b"test").await?;
109 fs::write(workdir.join("batch_error_4.jsonl"), b"test").await?;
110
111 let batch_files = workspace.locate_batch_files(&BatchIndex::Usize(4)).await?.unwrap();
112 assert_eq!(*batch_files.input(), Some(workdir.join("batch_input_4.jsonl")));
113 assert_eq!(*batch_files.output(), Some(workdir.join("batch_output_4.jsonl")));
114 assert_eq!(*batch_files.error(), Some(workdir.join("batch_error_4.jsonl")));
115
116 Ok(())
117 }
118
119 #[traced_test]
120 async fn test_locate_batch_files_uuid() -> Result<(),BatchWorkspaceError> {
121 let workspace = BatchWorkspace::new_temp().await?;
122 let workdir = workspace.workdir();
123
124 let uuid = "550e8400-e29b-41d4-a716-446655440000";
125 fs::write(workdir.join(format!("batch_input_{}.jsonl", uuid)), b"test").await?;
126 fs::write(workdir.join(format!("batch_output_{}.jsonl", uuid)), b"test").await?;
127
128 let batch_files = workspace.locate_batch_files(&BatchIndex::from_uuid_str(uuid)?).await?.unwrap();
129 assert_eq!(*batch_files.input(), Some(workdir.join(format!("batch_input_{}.jsonl", uuid))));
130 assert_eq!(*batch_files.output(), Some(workdir.join(format!("batch_output_{}.jsonl", uuid))));
131 assert_eq!(*batch_files.error(), None);
132
133 Ok(())
134 }
135
136 #[traced_test]
137 async fn test_locate_batch_files_no_files() -> Result<(),BatchWorkspaceError> {
138 let workspace = BatchWorkspace::new_temp().await?;
139
140 let batch_files = workspace.locate_batch_files(&BatchIndex::Usize(4)).await?;
141 assert!(batch_files.is_none());
142
143 Ok(())
144 }
145
146 #[traced_test]
147 async fn test_locate_batch_files_ignores_invalid_files() -> Result<(),BatchWorkspaceError> {
148 let workspace = BatchWorkspace::new_temp().await?;
149 let workdir = workspace.workdir();
150
151 fs::write(workdir.join("batch_input_4.jsonl"), b"test").await?;
153 fs::write(workdir.join("batch_input_4_duplicate.jsonl"), b"test").await?;
155
156 let result = workspace.locate_batch_files(&BatchIndex::Usize(4)).await?;
157 assert!(result.is_some(), "Expected to find the valid batch input file");
158
159 let batch_files = result.unwrap();
160 assert_eq!(*batch_files.input(), Some(workdir.join("batch_input_4.jsonl")));
161 assert!(batch_files.output().is_none());
162 assert!(batch_files.error().is_none());
163
164 Ok(())
165 }
166}