batch_mode_batch_workspace/
interface.rs1crate::ix!();
3
4impl BatchWorkspaceInterface for BatchWorkspace {}
5
6impl GetDoneDirectory for BatchWorkspace {
7
8 fn get_done_directory(&self) -> &PathBuf {
9 self.done_dir()
10 }
11}
12
13impl GetInputFilenameAtIndex for BatchWorkspace {
14
15 fn input_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
16 self.workdir().join(format!("batch_input_{}.jsonl", batch_idx))
17 }
18}
19
20impl GetOutputFilenameAtIndex for BatchWorkspace {
21
22 fn output_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
23 self.workdir().join(format!("batch_output_{}.jsonl", batch_idx))
24 }
25}
26
27impl GetErrorFilenameAtIndex for BatchWorkspace {
28
29 fn error_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
30 self.workdir().join(format!("batch_error_{}.jsonl", batch_idx))
31 }
32}
33
34impl GetMetadataFilenameAtIndex for BatchWorkspace {
35
36 fn metadata_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
37 self.workdir().join(format!("batch_metadata_{}.jsonl", batch_idx))
38 }
39}
40
41impl GetTargetPath for BatchWorkspace {
42
43 type Item = Arc<dyn GetTargetPathForAIExpansion + Send + Sync + 'static>;
44
45 fn target_path(
46 &self,
47 item: &Self::Item,
48 expected_content_type: &ExpectedContentType
49 ) -> PathBuf {
50
51 let mut path = item.target_path_for_ai_json_expansion(
53 &self.target_dir(),
54 expected_content_type
55 );
56
57 match expected_content_type {
60 ExpectedContentType::Json => {
61 if path.extension().map(|ext| ext != "json").unwrap_or(true) {
62 path.set_extension("json");
63 }
64 }
65 ExpectedContentType::JsonLines => {
66 if path.extension().map(|ext| ext != "jsonl").unwrap_or(true) {
67 path.set_extension("jsonl");
68 }
69 }
70 ExpectedContentType::PlainText => {
71 if path.extension().map(|ext| ext != "txt").unwrap_or(true) {
72 path.set_extension("txt");
73 }
74 }
75 }
76 debug!("final target_path => {:?}", path);
77 path
78 }
79}
80
81impl GetFailedJsonRepairsDir for BatchWorkspace {
82
83 fn failed_json_repairs_dir(&self) -> PathBuf {
84 self.failed_json_repairs_dir().to_path_buf()
85 }
86}
87
88impl GetFailedItemsDir for BatchWorkspace {
89
90 fn failed_items_dir(&self) -> PathBuf {
91 self.failed_items_dir().to_path_buf()
92 }
93}
94
95impl GetTextStoragePath for BatchWorkspace {
96 fn text_storage_path(&self, batch_idx: &BatchIndex) -> PathBuf {
97 trace!("computing text_storage_path for index: {:?}", batch_idx);
98 let suffix = match batch_idx {
101 BatchIndex::Usize(u) => format!("{}", u),
102 BatchIndex::Uuid(u) => format!("{}", u),
103 };
104 let path = self.workdir().join(format!("batch_text_{}.txt", suffix));
105 debug!("calculated text_storage_path => {:?}", path);
106 path
107 }
108}
109
110impl GetWorkdir for BatchWorkspace {
111
112 fn workdir(&self) -> PathBuf {
113 self.workdir().clone()
114 }
115}
116
117#[cfg(test)]
118mod batch_workspace_interface_exhaustive_tests {
119 use super::*;
120 use std::sync::Arc;
121 use std::path::PathBuf;
122 use tracing::*;
123 use tokio::runtime::Runtime;
124
125 #[derive(NamedItem, Debug)]
127 struct MockItemWithTargetPath {
128 name: String,
129 }
130
131 #[traced_test]
132 fn test_get_done_directory() {
133 info!("Starting test: test_get_done_directory");
134
135 let ws = BatchWorkspaceBuilder::default()
137 .workdir("/some/root/workdir")
138 .logdir("/some/root/logs")
139 .done_dir("/some/root/done")
140 .failed_items_dir("/some/root/failed-items")
141 .target_dir("/some/root/target")
142 .failed_json_repairs_dir("/some/root/failed-json-repairs")
143 .temporary(false)
144 .build()
145 .expect("Failed building workspace");
146
147 let done_dir = ws.get_done_directory();
148 debug!("Returned done_dir: {:?}", done_dir);
149 pretty_assert_eq!(
150 *done_dir,
151 PathBuf::from("/some/root/done"),
152 "Should match the expected done directory"
153 );
154 info!("Finished test: test_get_done_directory");
155 }
156
157 #[traced_test]
158 fn test_get_input_filename_at_index_usize() {
159 info!("Starting test: test_get_input_filename_at_index_usize");
160
161 let ws = BatchWorkspaceBuilder::default()
162 .workdir("/my/workdir")
163 .logdir("/my/logs")
164 .done_dir("/my/done")
165 .failed_items_dir("/my/failed-items")
166 .target_dir("/my/target")
167 .failed_json_repairs_dir("/my/failed-json-repairs")
168 .temporary(false)
169 .build()
170 .expect("Failed building workspace");
171
172 let idx = BatchIndex::Usize(42);
173 let path = ws.input_filename(&idx);
174 debug!("input_filename => {:?}", path);
175 pretty_assert_eq!(path, PathBuf::from("/my/workdir/batch_input_42.jsonl"));
176 info!("Finished test: test_get_input_filename_at_index_usize");
177 }
178
179 #[traced_test]
180 fn test_get_input_filename_at_index_uuid() {
181 info!("Starting test: test_get_input_filename_at_index_uuid");
182
183 let ws = BatchWorkspaceBuilder::default()
184 .workdir("/my/workdir")
185 .logdir("/my/logs")
186 .done_dir("/my/done")
187 .failed_items_dir("/my/failed-items")
188 .target_dir("/my/target")
189 .failed_json_repairs_dir("/my/failed-json-repairs")
190 .temporary(false)
191 .build()
192 .expect("Failed building workspace");
193
194 let idx_uuid = BatchIndex::from_uuid_str("550e8400-e29b-41d4-a716-446655440000")
195 .unwrap();
196 let path = ws.input_filename(&idx_uuid);
197 debug!("input_filename => {:?}", path);
198 pretty_assert_eq!(
199 path,
200 PathBuf::from("/my/workdir/batch_input_550e8400-e29b-41d4-a716-446655440000.jsonl")
201 );
202 info!("Finished test: test_get_input_filename_at_index_uuid");
203 }
204
205 #[traced_test]
206 fn test_get_output_filename_at_index() {
207 info!("Starting test: test_get_output_filename_at_index");
208
209 let ws = BatchWorkspaceBuilder::default()
210 .workdir("/data/workdir")
211 .logdir("/data/logs")
212 .done_dir("/data/done")
213 .failed_items_dir("/data/failed")
214 .target_dir("/data/target")
215 .failed_json_repairs_dir("/data/repair")
216 .temporary(false)
217 .build()
218 .expect("Failed building workspace");
219
220 let idx = BatchIndex::Usize(99);
221 let path = ws.output_filename(&idx);
222 debug!("output_filename => {:?}", path);
223 pretty_assert_eq!(path, PathBuf::from("/data/workdir/batch_output_99.jsonl"));
224 info!("Finished test: test_get_output_filename_at_index");
225 }
226
227 #[traced_test]
228 fn test_get_error_filename_at_index() {
229 info!("Starting test: test_get_error_filename_at_index");
230
231 let ws = BatchWorkspaceBuilder::default()
232 .workdir("/data/workdir")
233 .logdir("/data/logs")
234 .done_dir("/data/done")
235 .failed_items_dir("/data/failed")
236 .target_dir("/data/target")
237 .failed_json_repairs_dir("/data/repair")
238 .temporary(false)
239 .build()
240 .expect("Failed building workspace");
241
242 let idx = BatchIndex::from_uuid_str("f47ac10b-58cc-4372-a567-0e02b2c3d479").unwrap();
243 let path = ws.error_filename(&idx);
244 debug!("error_filename => {:?}", path);
245 pretty_assert_eq!(
246 path,
247 PathBuf::from("/data/workdir/batch_error_f47ac10b-58cc-4372-a567-0e02b2c3d479.jsonl")
248 );
249 info!("Finished test: test_get_error_filename_at_index");
250 }
251
252 #[traced_test]
253 fn test_get_metadata_filename_at_index() {
254 info!("Starting test: test_get_metadata_filename_at_index");
255
256 let ws = BatchWorkspaceBuilder::default()
257 .workdir("/data/workdir")
258 .logdir("/data/logs")
259 .done_dir("/data/done")
260 .failed_items_dir("/data/failed")
261 .target_dir("/data/target")
262 .failed_json_repairs_dir("/data/repair")
263 .temporary(false)
264 .build()
265 .expect("Failed building workspace");
266
267 let idx = BatchIndex::Usize(0);
268 let path = ws.metadata_filename(&idx);
269 debug!("metadata_filename => {:?}", path);
270 pretty_assert_eq!(
271 path,
272 PathBuf::from("/data/workdir/batch_metadata_0.jsonl")
273 );
274 info!("Finished test: test_get_metadata_filename_at_index");
275 }
276
277 #[traced_test]
278 fn test_get_target_path_for_item() {
279 info!("Starting test: test_get_target_path_for_item");
280
281 let ws = BatchWorkspaceBuilder::default()
282 .workdir("/root/workdir")
283 .logdir("/root/logs")
284 .done_dir("/root/done")
285 .failed_items_dir("/root/failed-items")
286 .target_dir("/root/target")
287 .failed_json_repairs_dir("/root/repair")
288 .temporary(false)
289 .build()
290 .expect("Failed building workspace");
291
292 let item: Arc<dyn GetTargetPathForAIExpansion + Send + Sync> = Arc::new(
294 MockItemWithTargetPath { name: "my_item_name".to_string() }
295 );
296
297 let path = ws.target_path(&item, &ExpectedContentType::Json);
298 debug!("target_path => {:?}", path);
299 pretty_assert_eq!(path, PathBuf::from("/root/target/my_item_name.json"));
300
301 let path2 = ws.target_path(&item, &ExpectedContentType::PlainText);
302 debug!("target_path (PlainText) => {:?}", path2);
303 pretty_assert_eq!(path2, PathBuf::from("/root/target/my_item_name.txt"));
304
305 info!("Finished test: test_get_target_path_for_item");
306 }
307
308 #[traced_test]
309 fn test_get_failed_json_repairs_dir() {
310 info!("Starting test: test_get_failed_json_repairs_dir");
311
312 let ws = BatchWorkspaceBuilder::default()
313 .workdir("/root/workdir")
314 .logdir("/root/logs")
315 .done_dir("/root/done")
316 .failed_items_dir("/root/failed-items")
317 .target_dir("/root/target")
318 .failed_json_repairs_dir("/root/failed-json-repairs")
319 .temporary(false)
320 .build()
321 .expect("Failed building workspace");
322
323 let dir = ws.failed_json_repairs_dir();
324 debug!("failed_json_repairs_dir => {:?}", dir);
325 pretty_assert_eq!(*dir, PathBuf::from("/root/failed-json-repairs"));
326
327 info!("Finished test: test_get_failed_json_repairs_dir");
328 }
329
330 #[traced_test]
331 fn test_get_failed_items_dir() {
332 info!("Starting test: test_get_failed_items_dir");
333
334 let ws = BatchWorkspaceBuilder::default()
335 .workdir("/root/workdir")
336 .logdir("/root/logs")
337 .done_dir("/root/done")
338 .failed_items_dir("/root/failed-items")
339 .target_dir("/root/target")
340 .failed_json_repairs_dir("/root/failed-json-repairs")
341 .temporary(false)
342 .build()
343 .expect("Failed building workspace");
344
345 let dir = ws.failed_items_dir();
346 debug!("failed_items_dir => {:?}", dir);
347 pretty_assert_eq!(*dir, PathBuf::from("/root/failed-items"));
348
349 info!("Finished test: test_get_failed_items_dir");
350 }
351
352 #[traced_test]
353 fn test_get_text_storage_path_invokes_todo() {
354 info!("Starting test: test_get_text_storage_path_invokes_todo");
355 let ws = BatchWorkspaceBuilder::default()
356 .workdir("/root/workdir")
357 .logdir("/root/logs")
358 .done_dir("/root/done")
359 .failed_items_dir("/root/failed-items")
360 .target_dir("/root/target")
361 .failed_json_repairs_dir("/root/failed-json-repairs")
362 .temporary(false)
363 .build()
364 .expect("Failed building workspace");
365
366 let _ = ws.text_storage_path(&BatchIndex::Usize(123));
368 }
369
370 #[traced_test]
371 fn test_get_workdir() {
372 info!("Starting test: test_get_workdir");
373
374 let ws = BatchWorkspaceBuilder::default()
375 .workdir("/some/workdir")
376 .logdir("/some/logdir")
377 .done_dir("/some/done")
378 .failed_items_dir("/some/failed-items")
379 .target_dir("/some/target")
380 .failed_json_repairs_dir("/some/repairs")
381 .temporary(false)
382 .build()
383 .expect("Failed building workspace");
384
385 let wd = ws.workdir();
386 debug!("workdir => {:?}", wd);
387 pretty_assert_eq!(*wd, PathBuf::from("/some/workdir"));
388
389 info!("Finished test: test_get_workdir");
390 }
391
392 #[traced_test]
393 async fn concurrency_test_on_trait_methods() {
394 info!("Starting test: concurrency_test_on_trait_methods");
395
396 let workspace = BatchWorkspaceBuilder::default()
398 .workdir("/test/workdir")
399 .logdir("/test/logs")
400 .done_dir("/test/done")
401 .failed_items_dir("/test/failed-items")
402 .target_dir("/test/target")
403 .failed_json_repairs_dir("/test/repair")
404 .temporary(false)
405 .build()
406 .expect("Failed building workspace");
407
408 let arc_ws = Arc::new(workspace);
409
410 let mut tasks = Vec::new();
411 for i in 0..4 {
412 let ws_clone = arc_ws.clone();
413 tasks.push(tokio::spawn(async move {
414 debug!("Task #{} => calling trait methods on workspace", i);
415 let done_dir = ws_clone.get_done_directory();
416 let input_filename = ws_clone.input_filename(&BatchIndex::Usize(42));
417 let output_filename= ws_clone.output_filename(&BatchIndex::Usize(999));
418 let error_filename = ws_clone.error_filename(
419 &BatchIndex::from_uuid_str("f47ac10b-58cc-4372-a567-0e02b2c3d479").unwrap()
420 );
421 let meta_filename = ws_clone.metadata_filename(&BatchIndex::Usize(0));
422 let failed_dir = ws_clone.failed_items_dir();
423 let repairs_dir = ws_clone.failed_json_repairs_dir();
424 let wd = ws_clone.workdir();
425
426 debug!("done_dir = {:?}", done_dir);
427 debug!("input_filename = {:?}", input_filename);
428 debug!("output_filename = {:?}", output_filename);
429 debug!("error_filename = {:?}", error_filename);
430 debug!("metadata_filename = {:?}", meta_filename);
431 debug!("failed_dir = {:?}", failed_dir);
432 debug!("repairs_dir = {:?}", repairs_dir);
433 debug!("workdir = {:?}", wd);
434 }));
435 }
436
437 let results = futures::future::join_all(tasks).await;
438 for (i, res) in results.into_iter().enumerate() {
439 match res {
440 Ok(_) => debug!("Task #{} completed successfully", i),
441 Err(e) => panic!("Task #{} => join error: {:?}", i, e),
442 }
443 }
444
445 info!("Finished test: concurrency_test_on_trait_methods");
446 }
447}