batch_mode_batch_workspace/
find_existing_batch_indices.rs1crate::ix!();
3
4static BATCH_FILE_RE: Lazy<Regex> = Lazy::new(|| {
6 Regex::new(
12 r"^batch_(?P<kind>input|output|error|metadata)_(?P<idx>\d+|[0-9A-Za-z\-]{36})\.jsonl$",
13 )
14 .unwrap()
15});
16
17#[async_trait]
18impl<T> FindExistingBatchFileIndices for T
19where
20 T: BatchWorkspaceInterface + Send + Sync + 'static,
21{
22 type Error = BatchWorkspaceError;
23
24 async fn find_existing_batch_file_indices(self: Arc<Self>) -> Result<HashSet<BatchIndex>, Self::Error> {
25 let mut indices = HashSet::new();
26 let mut dir_entries = fs::read_dir(self.workdir()).await?;
27 while let Some(entry) = dir_entries.next_entry().await? {
28 let path = entry.path();
29 if let Some(filename) = path.file_name().and_then(|s| s.to_str()) {
30 if let Some(caps) = BATCH_FILE_RE.captures(filename) {
31 let idx_str = &caps["idx"];
32 let index = match idx_str.parse::<usize>() {
33 Ok(n) => BatchIndex::Usize(n),
34 Err(_) => BatchIndex::from_uuid_str(idx_str)?,
35 };
36 indices.insert(index);
37 }
38 }
39 }
40 Ok(indices)
41 }
42}
43
44#[cfg(test)]
45mod find_existing_batch_file_indices_exhaustive_tests {
46 use super::*;
47
48 #[traced_test]
49 async fn test_find_indices() -> Result<(),BatchWorkspaceError> {
50 debug!("creating a mock workspace for test_find_indices");
51 let workspace = BatchWorkspace::new_mock().await?;
52 let indices = workspace.clone().find_existing_batch_file_indices().await?;
53 debug!("found indices in test: {:?}", indices);
54
55 let mut expected_indices = HashSet::new();
56 expected_indices.insert(BatchIndex::Usize(0));
57 expected_indices.insert(BatchIndex::Usize(1));
58 expected_indices.insert(BatchIndex::Usize(12345));
59 expected_indices.insert(BatchIndex::from_uuid_str("550e8400-e29b-41d4-a716-446655440000").unwrap());
60 expected_indices.insert(BatchIndex::from_uuid_str("f47ac10b-58cc-4372-a567-0e02b2c3d479").unwrap());
61
62 pretty_assert_eq!(indices, expected_indices);
63
64 workspace.cleanup_if_temporary().await
65 }
66
67 #[traced_test]
68 async fn returns_empty_set_when_no_files_present() {
69 info!("Starting test: returns_empty_set_when_no_files_present");
70
71 let workspace = BatchWorkspace::new_temp()
72 .await
73 .expect("Failed to create temporary workspace");
74
75 let indices = workspace
76 .clone()
77 .find_existing_batch_file_indices()
78 .await
79 .expect("Should succeed even if directory is empty");
80
81 debug!("Collected indices: {:?}", indices);
82 assert!(indices.is_empty(), "Expected empty set of indices");
83
84 info!("Finished test: returns_empty_set_when_no_files_present");
85 }
86
87 #[traced_test]
88 async fn finds_single_usize_index_with_one_file() {
89 info!("Starting test: finds_single_usize_index_with_one_file");
90
91 let workspace = BatchWorkspace::new_temp()
92 .await
93 .expect("Failed to create temporary workspace");
94 let idx = 42;
95 let fname = format!("batch_input_{}.jsonl", idx);
96
97 let path = workspace.workdir().join(&fname);
98 fs::write(path, b"dummy content")
99 .await
100 .expect("Failed to write file");
101
102 let indices = workspace
103 .clone()
104 .find_existing_batch_file_indices()
105 .await
106 .expect("Should succeed reading directory");
107
108 debug!("Collected indices: {:?}", indices);
109 pretty_assert_eq!(indices.len(), 1, "Expected exactly one index");
110 pretty_assert_eq!(
111 indices.iter().next().unwrap(),
112 &BatchIndex::Usize(idx),
113 "The found index should match the one we created"
114 );
115
116 info!("Finished test: finds_single_usize_index_with_one_file");
117 }
118
119 #[traced_test]
120 async fn finds_single_uuid_index_with_one_file() {
121 info!("Starting test: finds_single_uuid_index_with_one_file");
122
123 let workspace = BatchWorkspace::new_temp()
124 .await
125 .expect("Failed to create temporary workspace");
126 let uuid_str = "f47ac10b-58cc-4372-a567-0e02b2c3d479";
127 let fname = format!("batch_output_{}.jsonl", uuid_str);
128 let path = workspace.workdir().join(&fname);
129
130 fs::write(path, b"dummy content")
131 .await
132 .expect("Failed to write file");
133
134 let indices = workspace
135 .clone()
136 .find_existing_batch_file_indices()
137 .await
138 .expect("Should succeed reading directory");
139 debug!("Collected indices: {:?}", indices);
140
141 pretty_assert_eq!(indices.len(), 1, "Expected exactly one UUID index");
142 pretty_assert_eq!(
143 indices.iter().next().unwrap(),
144 &BatchIndex::from_uuid_str(uuid_str).unwrap(),
145 "The found index should match the UUID we created"
146 );
147
148 info!("Finished test: finds_single_uuid_index_with_one_file");
149 }
150
151 #[traced_test]
152 async fn finds_multiple_indices_among_multiple_files() {
153 info!("Starting test: finds_multiple_indices_among_multiple_files");
154
155 let workspace = BatchWorkspace::new_temp()
156 .await
157 .expect("Failed to create temporary workspace");
158 let wd = workspace.workdir();
159
160 let filenames = vec![
161 "batch_input_1.jsonl",
162 "batch_output_2.jsonl",
163 "batch_error_3.jsonl",
164 "batch_input_10.jsonl",
165 "batch_error_1.jsonl", ];
167 for fname in &filenames {
168 fs::write(wd.join(fname), b"test").await.unwrap();
169 }
170
171 let indices = workspace
172 .clone()
173 .find_existing_batch_file_indices()
174 .await
175 .expect("Should succeed reading directory");
176 debug!("Collected indices: {:?}", indices);
177
178 pretty_assert_eq!(indices.len(), 4, "Expected 4 distinct indices");
180 for i in &[1,2,3,10] {
181 assert!(indices.contains(&BatchIndex::Usize(*i)), "Missing index {}", i);
182 }
183
184 info!("Finished test: finds_multiple_indices_among_multiple_files");
185 }
186
187 #[traced_test]
188 async fn ignores_files_that_dont_match_pattern() {
189 info!("Starting test: ignores_files_that_dont_match_pattern");
190
191 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create workspace");
192 let wd = workspace.workdir();
193
194 let valid_names = [
196 "batch_input_123.jsonl",
197 "batch_error_999.jsonl"
198 ];
199 for name in &valid_names {
200 fs::write(wd.join(name), b"valid pattern").await.unwrap();
201 }
202
203 let invalid_names = [
205 "batchinput_123.jsonl", "batch_input_123.txt", "batch_inp_999.jsonl", "random_file.jsonl", "batch_input_notanumber.jsonl", "batch_something_else_123.jsonl", ];
212 for name in &invalid_names {
213 fs::write(wd.join(name), b"invalid pattern").await.unwrap();
214 }
215
216 let indices = workspace
217 .clone()
218 .find_existing_batch_file_indices()
219 .await
220 .expect("Should succeed ignoring invalid files");
221 debug!("Collected indices: {:?}", indices);
222
223 pretty_assert_eq!(indices.len(), 2, "We wrote exactly 2 valid pattern files");
224 assert!(indices.contains(&BatchIndex::Usize(123)));
225 assert!(indices.contains(&BatchIndex::Usize(999)));
226
227 info!("Finished test: ignores_files_that_dont_match_pattern");
228 }
229
230 #[traced_test]
231 async fn concurrency_test_for_finding_indices() {
232 info!("Starting test: concurrency_test_for_finding_indices");
233 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
234 let wd = workspace.workdir();
235
236 let files = [
238 "batch_input_1.jsonl",
239 "batch_output_2.jsonl",
240 "batch_error_3.jsonl",
241 "batch_input_4.jsonl",
242 ];
243 for f in files {
244 fs::write(wd.join(f), b"concurrent test").await.unwrap();
245 }
246
247 let arc_ws = workspace.clone();
249 let mut tasks = Vec::new();
250 for i in 0..5 {
251 let w = arc_ws.clone();
252 tasks.push(tokio::spawn(async move {
253 debug!("Task #{} calling find_existing_batch_file_indices...", i);
254 w.find_existing_batch_file_indices().await
255 }));
256 }
257
258 let results = futures::future::join_all(tasks).await;
259 for (i, r) in results.into_iter().enumerate() {
260 match r {
261 Ok(Ok(set)) => {
262 debug!("Task #{} => found indices: {:?}", i, set);
263 pretty_assert_eq!(set.len(), 4, "Expected 4 distinct indices");
265 for j in 1..=4 {
266 assert!(set.contains(&BatchIndex::Usize(j)), "Missing index {}", j);
267 }
268 }
269 Ok(Err(e)) => panic!("Task #{} => unexpected error: {:?}", i, e),
270 Err(e) => panic!("Task #{} => join error: {:?}", i, e),
271 }
272 }
273
274 info!("Finished test: concurrency_test_for_finding_indices");
275 }
276
277 #[traced_test]
278 async fn returns_error_on_unreadable_workdir() {
279 info!("Starting test: returns_error_on_unreadable_workdir");
280 let tmp = tempdir().expect("Failed to create base tempdir");
281 let read_only_dir = tmp.path().join("read_only");
282 std::fs::create_dir_all(&read_only_dir).expect("Failed to create read_only directory");
283
284 #[cfg(unix)]
285 {
286 use std::os::unix::fs::PermissionsExt;
287 let mut perms = std::fs::metadata(&read_only_dir).unwrap().permissions();
288 perms.set_mode(0o000);
290 std::fs::set_permissions(&read_only_dir, perms).unwrap();
291 }
292
293 let workspace_res = BatchWorkspace::new_in(&read_only_dir).await;
295 match workspace_res {
296 Ok(ws) => {
297 let res = ws.clone().find_existing_batch_file_indices().await;
298 debug!("Result from find_existing_batch_file_indices: {:?}", res);
299 assert!(
300 res.is_err(),
301 "We expect an error reading from an unreadable directory"
302 );
303 }
304 Err(e) => {
305 warn!("new_in() failed as expected: {:?}", e);
307 }
308 }
309
310 info!("Finished test: returns_error_on_unreadable_workdir");
311 }
312
313 #[cfg(all(unix, not(target_os = "macos")))]
315 #[traced_test]
316 async fn handles_non_utf8_filenames_gracefully() {
317 info!("Starting test: handles_non_utf8_filenames_gracefully");
318
319 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
320 let wd = workspace.workdir();
321
322 fs::write(wd.join("batch_output_10.jsonl"), b"okay data").await.unwrap();
324
325 use std::os::unix::ffi::OsStrExt;
327 let invalid_name = std::ffi::OsStr::from_bytes(b"batch_input_11\xFF.jsonl");
328 let invalid_path = wd.join(invalid_name);
329 let _ = std::fs::File::create(&invalid_path)
330 .expect("Failed to create a file with invalid UTF-8 name on non-macOS Unix");
331
332 let indices = workspace
333 .clone()
334 .find_existing_batch_file_indices()
335 .await
336 .expect("Should succeed skipping non-UTF8 names");
337 debug!("Collected indices: {:?}", indices);
338
339 pretty_assert_eq!(indices.len(), 1);
341 assert!(indices.contains(&BatchIndex::Usize(10)));
342
343 info!("Finished test: handles_non_utf8_filenames_gracefully");
344 }
345
346 #[traced_test]
347 async fn returns_error_if_uuid_parse_fails() {
348 info!("Starting test: returns_error_if_uuid_parse_fails");
349 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
350 let wd = workspace.workdir();
351
352 let bad_uuid = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaZaaaa"; let fname = format!("batch_input_{bad_uuid}.jsonl");
356 let path = wd.join(&fname);
357
358 fs::write(&path, b"corrupt uuid").await.expect("Failed to write file");
359
360 fs::write(wd.join("batch_input_99.jsonl"), b"valid numeric index")
362 .await
363 .expect("Failed to write numeric file");
364
365 let res = workspace.clone().find_existing_batch_file_indices().await;
366 debug!("Result of find_existing_batch_file_indices: {:?}", res);
367
368 match res {
370 Err(BatchWorkspaceError::UuidParseError(e)) => {
371 info!("Got expected UuidParseError for invalid UUID: {:?}", e);
372 }
373 Err(other) => panic!("Expected a UuidParseError but got {:?}", other),
374 Ok(val) => panic!(
375 "Expected an error due to invalid UUID, but got Ok({:?})",
376 val
377 ),
378 }
379
380 info!("Finished test: returns_error_if_uuid_parse_fails");
381 }
382}