1crate::ix!();
3
4#[derive(Getters,Setters,Builder,Debug)]
5#[getset(get="pub",set="pub")]
6#[builder(setter(into))]
7pub struct BatchWorkspace {
8 workdir: PathBuf,
9 logdir: PathBuf,
10 done_dir: PathBuf,
11 failed_items_dir: PathBuf,
12 target_dir: PathBuf,
13 failed_json_repairs_dir: PathBuf,
14 #[builder(setter(skip))]
15 #[builder(default = "None")]
16 temp_dir: Option<TempDir>, temporary: bool,
18}
19
20impl PartialEq for BatchWorkspace {
21 fn eq(&self, other: &Self) -> bool {
22 self.workdir == other.workdir &&
23 self.logdir == other.logdir &&
24 self.done_dir == other.done_dir &&
25 self.target_dir == other.target_dir &&
26 self.failed_json_repairs_dir == other.failed_json_repairs_dir &&
27 self.failed_items_dir == other.failed_items_dir &&
28 self.temporary == other.temporary
29 }
31}
32
33impl Eq for BatchWorkspace {}
34
35unsafe impl Send for BatchWorkspace {}
36unsafe impl Sync for BatchWorkspace {}
37
38impl GetTargetDir for BatchWorkspace {
39 fn get_target_dir(&self) -> PathBuf {
40 self.target_dir().to_path_buf()
41 }
42}
43
44impl BatchWorkspace {
45
46 pub async fn find_existing_triple_with_given_index(
47 self: Arc<BatchWorkspace>,
48 index: &BatchIndex
49 ) -> Result<BatchFileTriple,BatchWorkspaceError>
50 {
51 trace!("attempting to find existing triple with index: {:?}", index);
52 let maybe_triple = self.locate_batch_files(index).await?;
53 match maybe_triple {
54 Some(triple) => {
55 debug!("found existing triple for index {:?}", index);
56 Ok(triple)
57 },
58 None => {
59 warn!("no existing triple found for index {:?}", index);
60 Err(BatchWorkspaceError::NoBatchFileTripleAtIndex {
61 index: index.clone()
62 })
63 },
64 }
65 }
66
67 pub async fn new_in(product_root: impl AsRef<Path>) -> Result<Arc<Self>, BatchWorkspaceError> {
68
69 info!("creating new workspace in {:?}", product_root.as_ref());
70
71 let product_root = product_root.as_ref();
72 tokio::fs::create_dir_all(product_root).await?;
73
74 let workspace = Self {
75 workdir: product_root.join("workdir"),
76 logdir: product_root.join("logs"),
77 done_dir: product_root.join("done"),
78 target_dir: product_root.join("target"),
79 failed_json_repairs_dir: product_root.join("failed-json-repairs"),
80 failed_items_dir: product_root.join("failed-items"),
81 temp_dir: None, temporary: false,
83 };
84
85 workspace.create_directories_if_dne().await?;
86
87 Ok(Arc::new(workspace))
88 }
89
90 pub async fn new_temp() -> Result<Arc<Self>, BatchWorkspaceError> {
91
92 let temp_dir = tempdir()?;
93 let temp_dir_path = temp_dir.path().to_path_buf();
94
95 info!("creating new temporary workspace in {:?}", temp_dir_path);
96
97 let workspace = Self {
98 workdir: temp_dir_path.join("workdir"),
99 logdir: temp_dir_path.join("logs"),
100 done_dir: temp_dir_path.join("done"),
101 target_dir: temp_dir_path.join("target"),
102 failed_json_repairs_dir: temp_dir_path.join("failed-json-repairs"),
103 failed_items_dir: temp_dir_path.join("failed-items"),
104 temp_dir: Some(temp_dir), temporary: true,
106 };
107
108 workspace.create_directories_if_dne().await?;
109
110 Ok(Arc::new(workspace))
111 }
112
113 pub async fn new_mock() -> Result<Arc<Self>,BatchWorkspaceError> {
114
115 let workspace = Self::new_temp().await?;
116 let workdir = workspace.workdir();
117
118 let filenames = [
119 "batch_input_0.jsonl",
120 "batch_output_1.jsonl",
121 "batch_error_12345.jsonl",
122 "batch_input_550e8400-e29b-41d4-a716-446655440000.jsonl",
123 "batch_output_f47ac10b-58cc-4372-a567-0e02b2c3d479.jsonl",
124 "batch_error_invalid.jsonl", "random_file.txt", ];
127
128 info!("writing mock files {:#?} in our mock workspace", filenames);
129
130 for filename in filenames {
131 fs::write(workdir.join(filename), "").await?;
132 }
133
134 Ok(workspace)
135 }
136
137 #[cfg(test)]
138 pub async fn cleanup_if_temporary(&self) -> Result<(),BatchWorkspaceError> {
139 if self.temporary {
140 fs::remove_dir_all(&self.workdir).await?;
141 fs::remove_dir_all(&self.logdir).await?;
142 fs::remove_dir_all(&self.done_dir).await?;
143 fs::remove_dir_all(&self.target_dir).await?;
144 fs::remove_dir_all(&self.failed_json_repairs_dir).await?;
145 fs::remove_dir_all(&self.failed_items_dir).await?;
146 }
147 Ok(())
148 }
149
150 pub(crate) async fn create_directories_if_dne(&self) -> Result<(),BatchWorkspaceError> {
151 tokio::fs::create_dir_all(&self.workdir).await?;
153 tokio::fs::create_dir_all(&self.logdir).await?;
154 tokio::fs::create_dir_all(&self.done_dir).await?;
155 tokio::fs::create_dir_all(&self.target_dir).await?;
156 tokio::fs::create_dir_all(&self.failed_json_repairs_dir).await?;
157 tokio::fs::create_dir_all(&self.failed_items_dir).await?;
158 Ok(())
159 }
160
161 pub fn batch_expansion_error_log_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
162 self.logdir.join(format!("batch_expansion_error_log_{}.jsonl", batch_idx))
163 }
164}
165
166#[cfg(test)]
167mod batch_workspace_exhaustive_tests {
168 use super::*;
169 use std::fs::File;
170 use std::io::Write;
171 use std::time::Duration;
172 use tokio::runtime::Runtime;
173 use tokio::fs;
174 use tokio::time::sleep;
175 use tracing::*;
176
177 #[traced_test]
178 async fn ensures_equality_ignores_temp_dir() {
179 info!("Starting test: ensures_equality_ignores_temp_dir");
180 let ws1 = BatchWorkspace {
181 workdir: PathBuf::from("/some/path/workdir"),
182 logdir: PathBuf::from("/some/path/logs"),
183 done_dir: PathBuf::from("/some/path/done"),
184 failed_items_dir: PathBuf::from("/some/path/failed-items"),
185 target_dir: PathBuf::from("/some/path/target"),
186 failed_json_repairs_dir: PathBuf::from("/some/path/failed-json-repairs"),
187 temp_dir: None,
188 temporary: false,
189 };
190 let ws2 = BatchWorkspace {
191 workdir: PathBuf::from("/some/path/workdir"),
192 logdir: PathBuf::from("/some/path/logs"),
193 done_dir: PathBuf::from("/some/path/done"),
194 failed_items_dir: PathBuf::from("/some/path/failed-items"),
195 target_dir: PathBuf::from("/some/path/target"),
196 failed_json_repairs_dir: PathBuf::from("/some/path/failed-json-repairs"),
197 temp_dir: None, temporary: false,
199 };
200
201 debug!("Comparing ws1 and ws2:\n ws1: {:?}\n ws2: {:?}", ws1, ws2);
202 pretty_assert_eq!(ws1, ws2, "Workspaces should be considered equal ignoring temp_dir");
203 info!("Finished test: ensures_equality_ignores_temp_dir");
204 }
205
206 #[traced_test]
207 async fn ensures_inequality_if_any_path_differs() {
208 info!("Starting test: ensures_inequality_if_any_path_differs");
209 let mut ws1 = BatchWorkspace {
210 workdir: PathBuf::from("/some/path/workdir"),
211 logdir: PathBuf::from("/some/path/logs"),
212 done_dir: PathBuf::from("/some/path/done"),
213 failed_items_dir: PathBuf::from("/some/path/failed-items"),
214 target_dir: PathBuf::from("/some/path/target"),
215 failed_json_repairs_dir: PathBuf::from("/some/path/failed-json-repairs"),
216 temp_dir: None,
217 temporary: false,
218 };
219 let mut ws2 = ws1.deep_clone().expect("expected the clone to succeed");
220 debug!("Initially, ws1 == ws2 => {:?}", ws1 == ws2);
221
222 ws2.done_dir = PathBuf::from("/different/path/done");
223 assert_ne!(
224 ws1, ws2,
225 "Changing done_dir should lead to inequality"
226 );
227
228 ws2.done_dir = ws1.done_dir.clone();
230 ws2.target_dir = PathBuf::from("/changed/target/dir");
231 assert_ne!(
232 ws1, ws2,
233 "Changing target_dir should lead to inequality"
234 );
235
236 debug!("Now verifying partial eq with changed target_dir: ws1={:?} vs ws2={:?}", ws1, ws2);
237 info!("Finished test: ensures_inequality_if_any_path_differs");
238 }
239
240 #[traced_test]
241 async fn test_find_similar_target_path_no_similar_files() {
242 info!("Starting test: test_find_similar_target_path_no_similar_files");
243 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
244 let check_path = workspace.target_dir().join("brand_new_file.json");
245 let result = workspace.find_similar_target_path(&check_path);
246 debug!("No existing files in target_dir => result should be None");
247 assert!(result.is_none(), "Expected no similar file to be found in an empty directory");
248 info!("Finished test: test_find_similar_target_path_no_similar_files");
249 }
250
251 #[traced_test]
252 async fn test_find_similar_target_path_finds_close_match() {
253 info!("Starting test: test_find_similar_target_path_finds_close_match");
254 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
255 let target_dir = workspace.target_dir();
256 fs::create_dir_all(&target_dir).await.expect("Failed to create target_dir");
257
258 let existing_filename = target_dir.join("my_token_data.json");
260 {
261 let mut file = File::create(&existing_filename).expect("Failed to create existing file");
262 writeln!(file, "dummy content").expect("Failed to write dummy content");
263 }
264
265 let check_path = target_dir.join("my_token_dada.json"); debug!("We expect Levenshtein distance <= 2 for 'data' vs 'dada'");
268 let found = workspace.find_similar_target_path(&check_path);
269 assert!(
270 found.is_some(),
271 "Should find a close match to my_token_data.json"
272 );
273 let found_path = found.unwrap();
274 debug!("Found similar path: {:?}", found_path);
275 pretty_assert_eq!(found_path, existing_filename);
276 info!("Finished test: test_find_similar_target_path_finds_close_match");
277 }
278
279 #[traced_test]
280 async fn test_find_existing_triple_with_given_index_found() {
281 info!("Starting test: test_find_existing_triple_with_given_index_found");
282 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
283 let workdir = workspace.workdir();
284
285 let index = BatchIndex::Usize(99);
287 let input_name = format!("batch_input_{}.jsonl", index);
288 let input_path = workdir.join(&input_name);
289 fs::write(&input_path, "test data").await.expect("Failed to write input file");
290
291 let triple = workspace
293 .clone()
294 .find_existing_triple_with_given_index(&index)
295 .await;
296 debug!("Resulting triple: {:?}", triple);
297 assert!(triple.is_ok(), "We have an input file => triple is found");
298 let triple = triple.unwrap();
299 pretty_assert_eq!(*triple.index(), index);
300 pretty_assert_eq!(*triple.input(), Some(input_path));
301 assert!(triple.output().is_none());
302 assert!(triple.error().is_none());
303
304 info!("Finished test: test_find_existing_triple_with_given_index_found");
305 }
306
307 #[traced_test]
308 async fn test_find_existing_triple_with_given_index_not_found() {
309 info!("Starting test: test_find_existing_triple_with_given_index_not_found");
310 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
311 let index = BatchIndex::Usize(9999);
312
313 let triple_result = workspace
315 .clone()
316 .find_existing_triple_with_given_index(&index)
317 .await;
318 debug!("Result: {:?}", triple_result);
319 assert!(
320 matches!(triple_result, Err(BatchWorkspaceError::NoBatchFileTripleAtIndex { .. })),
321 "Expected NoBatchFileTripleAtIndex error"
322 );
323 info!("Finished test: test_find_existing_triple_with_given_index_not_found");
324 }
325
326 #[traced_test]
327 async fn test_new_mock_has_expected_mock_files() {
328 info!("Starting test: test_new_mock_has_expected_mock_files");
329 let workspace = BatchWorkspace::new_mock().await.expect("Failed to create mock workspace");
330 let workdir = workspace.workdir();
331 debug!("Mock workspace created at {:?}", workdir);
332
333 let filenames = [
335 "batch_input_0.jsonl",
336 "batch_output_1.jsonl",
337 "batch_error_12345.jsonl",
338 "batch_input_550e8400-e29b-41d4-a716-446655440000.jsonl",
339 "batch_output_f47ac10b-58cc-4372-a567-0e02b2c3d479.jsonl",
340 "batch_error_invalid.jsonl",
341 "random_file.txt",
342 ];
343 for fname in &filenames {
344 let path = workdir.join(fname);
345 trace!("Verifying existence of {:?}", path);
346 assert!(path.exists(), "Expected mock file to exist");
347 }
348
349 info!("Finished test: test_new_mock_has_expected_mock_files");
350 }
351
352 #[traced_test]
353 async fn test_cleanup_if_temporary_cleans_up_temp_dirs() {
354 info!("Starting test: test_cleanup_if_temporary_cleans_up_temp_dirs");
355 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
356 let root_dir = workspace.workdir().parent().unwrap().to_path_buf();
357
358 debug!("Temporary workspace's root dir: {:?}", root_dir);
359
360 assert!(workspace.workdir().exists(), "workdir should exist");
362 assert!(workspace.logdir().exists(), "logdir should exist");
363 assert!(workspace.done_dir().exists(), "done_dir should exist");
364 assert!(workspace.target_dir().exists(), "target_dir should exist");
365 assert!(workspace.failed_json_repairs_dir().exists(), "failed_json_repairs_dir should exist");
366 assert!(workspace.failed_items_dir().exists(), "failed_items_dir should exist");
367
368 workspace.cleanup_if_temporary().await.expect("Cleanup should not fail");
370
371 sleep(Duration::from_millis(200)).await;
373
374 debug!("Post-cleanup: Checking if subdirs are gone or remain. OS-specific behavior may vary.");
377
378 info!("Finished test: test_cleanup_if_temporary_cleans_up_temp_dirs");
379 }
380
381 #[traced_test]
382 async fn test_create_directories_if_dne_with_inaccessible_path() {
383 info!("Starting test: test_create_directories_if_dne_with_inaccessible_path");
384 let temp = tempdir().expect("Failed to create base tempdir");
386 let read_only_dir = temp.path().join("read_only");
387 std::fs::create_dir_all(&read_only_dir).expect("Failed to create read_only dir");
388
389 #[cfg(unix)]
391 {
392 use std::os::unix::fs::PermissionsExt;
393 let mut perms = std::fs::metadata(&read_only_dir).unwrap().permissions();
394 perms.set_mode(0o400); std::fs::set_permissions(&read_only_dir, perms).unwrap();
396 }
397
398 let workspace = BatchWorkspace {
400 workdir: read_only_dir.join("workdir"),
401 logdir: read_only_dir.join("logs"),
402 done_dir: read_only_dir.join("done"),
403 target_dir: read_only_dir.join("target"),
404 failed_json_repairs_dir: read_only_dir.join("failed-json-repairs"),
405 failed_items_dir: read_only_dir.join("failed-items"),
406 temp_dir: None,
407 temporary: false,
408 };
409
410 let res = workspace.create_directories_if_dne().await;
411 debug!("create_directories_if_dne result: {:?}", res);
412
413 match res {
416 Err(BatchWorkspaceError::IoError(e)) => {
417 warn!("Got expected IoError: {:?}", e);
418 }
419 Err(other) => panic!("Expected IoError, got {:?}", other),
420 Ok(_) => {
421 #[cfg(unix)]
424 panic!("Expected an error but got Ok() on a read-only directory (Unix).");
425 #[cfg(not(unix))]
426 warn!("On this OS, read-only directories might not cause an error. Ok() accepted.");
427 }
428 }
429
430 info!("Finished test: test_create_directories_if_dne_with_inaccessible_path");
431 }
432
433 #[traced_test]
434 async fn test_get_target_directory_files_lists_existing_target_files() {
435 info!("Starting test: test_get_target_directory_files_lists_existing_target_files");
436 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
437 let target_dir = workspace.target_dir();
438 fs::create_dir_all(&target_dir).await.expect("Failed to create target_dir");
439
440 let sample_file_1 = target_dir.join("file1.json");
441 let sample_file_2 = target_dir.join("file2.txt");
442 fs::write(&sample_file_1, "data1").await.expect("Failed to write file1.json");
443 fs::write(&sample_file_2, "data2").await.expect("Failed to write file2.txt");
444
445 let files = workspace.get_target_directory_files();
446 debug!("Discovered files in target directory: {:?}", files);
447
448 pretty_assert_eq!(files.len(), 2, "We wrote exactly 2 files in the target dir");
449 assert!(files.contains(&sample_file_1));
450 assert!(files.contains(&sample_file_2));
451
452 info!("Finished test: test_get_target_directory_files_lists_existing_target_files");
453 }
454
455 #[traced_test]
456 async fn test_batch_expansion_error_log_filename() {
457 info!("Starting test: test_batch_expansion_error_log_filename");
458 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
459
460 let idx_usize = BatchIndex::Usize(1234);
461 let logname_usize = workspace.batch_expansion_error_log_filename(&idx_usize);
462 debug!("logname_usize => {:?}", logname_usize);
463 assert!(logname_usize.to_string_lossy().contains("batch_expansion_error_log_1234.jsonl"));
464
465 let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
466 let idx_uuid = BatchIndex::from_uuid_str(uuid_str).expect("Failed to parse test UUID");
467 let logname_uuid = workspace.batch_expansion_error_log_filename(&idx_uuid);
468 debug!("logname_uuid => {:?}", logname_uuid);
469 assert!(logname_uuid.to_string_lossy().contains("batch_expansion_error_log_550e8400-e29b-41d4-a716-446655440000.jsonl"));
470
471 info!("Finished test: test_batch_expansion_error_log_filename");
472 }
473
474 #[traced_test]
475 async fn test_concurrent_new_temp_workspaces() {
476 info!("Starting test: test_concurrent_new_temp_workspaces");
477 let num_concurrent = 5;
479 let mut tasks = Vec::new();
480
481 for _ in 0..num_concurrent {
482 tasks.push(tokio::spawn(async {
483 BatchWorkspace::new_temp().await
484 }));
485 }
486
487 let results = futures::future::join_all(tasks).await;
488 let mut success_count = 0;
489 for r in results {
490 match r {
491 Ok(Ok(ws)) => {
492 debug!("Successfully created a new_temp workspace: {:?}", ws);
493 assert!(ws.workdir().exists());
494 success_count += 1;
495 },
496 Ok(Err(e)) => {
497 error!("Failed to create new_temp workspace: {:?}", e);
498 },
499 Err(e) => {
500 error!("Join error for new_temp creation task: {:?}", e);
501 }
502 }
503 }
504 debug!("Total successful new_temp creations: {}", success_count);
505 pretty_assert_eq!(success_count, num_concurrent, "All tasks should succeed");
506 info!("Finished test: test_concurrent_new_temp_workspaces");
507 }
508
509 #[traced_test]
510 async fn test_concurrent_find_existing_triple_with_given_index() {
511 info!("Starting test: test_concurrent_find_existing_triple_with_given_index");
512 let workspace = BatchWorkspace::new_temp().await.expect("Failed to create temp workspace");
513 let workdir = workspace.workdir();
514
515 let index = BatchIndex::Usize(77);
517 fs::write(workdir.join(format!("batch_input_{}.jsonl", 77)), "dummy").await.expect("Failed to write input file");
518
519 let arc_ws = workspace.clone();
520 let mut tasks = Vec::new();
521
522 for i in 0..5 {
524 let w = arc_ws.clone();
525 let index_clone = index.clone();
526 tasks.push(tokio::spawn(async move {
527 debug!("Task #{} searching for index {:?}", i, index_clone);
528 w.find_existing_triple_with_given_index(&index_clone).await
529 }));
530 }
531
532 let results = futures::future::join_all(tasks).await;
533 for (i, r) in results.into_iter().enumerate() {
534 match r {
535 Ok(Ok(triple)) => {
536 pretty_assert_eq!(*triple.index(), index, "Task #{} found the correct triple", i);
537 },
538 other => panic!("Task #{} unexpected result: {:?}", i, other),
539 }
540 }
541
542 info!("Finished test: test_concurrent_find_existing_triple_with_given_index");
543 }
544
545 #[traced_test]
546 async fn test_new_in_creates_proper_directories() {
547 info!("Starting test: test_new_in_creates_proper_directories");
548 let temp = tempdir().expect("Failed to create tempdir for test");
549 let dir_path = temp.path().join("product_root");
550 std::fs::create_dir_all(&dir_path).expect("Failed to create product root on disk");
551
552 let workspace = BatchWorkspace::new_in(&dir_path).await
554 .expect("Failed to create new_in workspace");
555
556 debug!("Created workspace in: {:?}", dir_path);
557
558 assert!(workspace.workdir().is_dir(), "workdir should exist");
560 assert!(workspace.logdir().is_dir(), "logdir should exist");
561 assert!(workspace.done_dir().is_dir(), "done_dir should exist");
562 assert!(workspace.target_dir().is_dir(), "target_dir should exist");
563 assert!(workspace.failed_json_repairs_dir().is_dir(), "failed_json_repairs_dir should exist");
564 assert!(workspace.failed_items_dir().is_dir(), "failed_items_dir should exist");
565 assert!(!workspace.temporary, "should not be marked temporary");
566
567 info!("Finished test: test_new_in_creates_proper_directories");
568 }
569
570 #[traced_test]
571 async fn test_new_temp_creates_proper_directories() {
572 info!("Starting test: test_new_temp_creates_proper_directories");
573
574 let workspace = BatchWorkspace::new_temp().await
575 .expect("Failed to create new_temp workspace");
576 debug!("Created new temp workspace: {:?}", workspace);
577
578 assert!(workspace.workdir().is_dir(), "workdir should exist");
580 assert!(workspace.logdir().is_dir(), "logdir should exist");
581 assert!(workspace.done_dir().is_dir(), "done_dir should exist");
582 assert!(workspace.target_dir().is_dir(), "target_dir should exist");
583 assert!(workspace.failed_json_repairs_dir().is_dir(), "failed_json_repairs_dir should exist");
584 assert!(workspace.failed_items_dir().is_dir(), "failed_items_dir should exist");
585 assert!(workspace.temporary, "should be marked temporary");
586
587 info!("Finished test: test_new_temp_creates_proper_directories");
588 }
589}