batch_mode_batch_workspace/
interface.rs1crate::ix!();
3
4#[async_trait]
5impl BatchWorkspaceInterface for BatchWorkspace {}
6
7impl GetDoneDirectory for BatchWorkspace {
8
9 fn get_done_directory(&self) -> &PathBuf {
10 self.done_dir()
11 }
12}
13
14impl GetInputFilenameAtIndex for BatchWorkspace {
15
16 fn input_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
17 self.workdir().join(format!("batch_input_{}.jsonl", batch_idx))
18 }
19}
20
21impl GetOutputFilenameAtIndex for BatchWorkspace {
22
23 fn output_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
24 self.workdir().join(format!("batch_output_{}.jsonl", batch_idx))
25 }
26}
27
28impl GetErrorFilenameAtIndex for BatchWorkspace {
29
30 fn error_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
31 self.workdir().join(format!("batch_error_{}.jsonl", batch_idx))
32 }
33}
34
35impl GetMetadataFilenameAtIndex for BatchWorkspace {
36
37 fn metadata_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
38 self.workdir().join(format!("batch_metadata_{}.jsonl", batch_idx))
39 }
40}
41
42impl GetSeedManifestFilenameAtIndex for BatchWorkspace {
43 fn seed_manifest_filename(&self, idx: &BatchIndex) -> PathBuf {
44 self.workdir().join(format!("batch_seed_manifest_{}.jsonl", idx))
45 }
46}
47
48impl GetTargetPath for BatchWorkspace {
49
50 type Item = Arc<dyn GetTargetPathForAIExpansion + Send + Sync + 'static>;
51
52 fn target_path(
53 &self,
54 item: &Self::Item,
55 expected_content_type: &ExpectedContentType
56 ) -> PathBuf {
57
58 let mut path = item.target_path_for_ai_json_expansion(
60 &self.target_dir(),
61 expected_content_type
62 );
63
64 match expected_content_type {
67 ExpectedContentType::Json => {
68 if path.extension().map(|ext| ext != "json").unwrap_or(true) {
69 path.set_extension("json");
70 }
71 }
72 ExpectedContentType::JsonLines => {
73 if path.extension().map(|ext| ext != "jsonl").unwrap_or(true) {
74 path.set_extension("jsonl");
75 }
76 }
77 ExpectedContentType::PlainText => {
78 if path.extension().map(|ext| ext != "txt").unwrap_or(true) {
79 path.set_extension("txt");
80 }
81 }
82 }
83 debug!("final target_path => {:?}", path);
84 path
85 }
86}
87
88impl GetFailedJsonRepairsDir for BatchWorkspace {
89
90 fn failed_json_repairs_dir(&self) -> PathBuf {
91 self.failed_json_repairs_dir().to_path_buf()
92 }
93}
94
95impl GetFailedItemsDir for BatchWorkspace {
96
97 fn failed_items_dir(&self) -> PathBuf {
98 self.failed_items_dir().to_path_buf()
99 }
100}
101
102impl GetTextStoragePath for BatchWorkspace {
103 fn text_storage_path(&self, batch_idx: &BatchIndex) -> PathBuf {
104 trace!("computing text_storage_path for index: {:?}", batch_idx);
105 let suffix = match batch_idx {
108 BatchIndex::Usize(u) => format!("{}", u),
109 BatchIndex::Uuid(u) => format!("{}", u),
110 };
111 let path = self.workdir().join(format!("batch_text_{}.txt", suffix));
112 debug!("calculated text_storage_path => {:?}", path);
113 path
114 }
115}
116
117impl GetWorkdir for BatchWorkspace {
118
119 fn workdir(&self) -> PathBuf {
120 self.workdir().clone()
121 }
122}
123
124#[cfg(test)]
125mod batch_workspace_interface_exhaustive_tests {
126 use super::*;
127 use std::sync::Arc;
128 use std::path::PathBuf;
129 use tracing::*;
130 use tokio::runtime::Runtime;
131
132 #[derive(NamedItem, Debug)]
134 struct MockItemWithTargetPath {
135 name: String,
136 }
137
138 #[traced_test]
139 fn test_get_done_directory() {
140 info!("Starting test: test_get_done_directory");
141
142 let ws = BatchWorkspaceBuilder::default()
144 .workdir("/some/root/workdir")
145 .logdir("/some/root/logs")
146 .done_dir("/some/root/done")
147 .failed_items_dir("/some/root/failed-items")
148 .target_dir("/some/root/target")
149 .failed_json_repairs_dir("/some/root/failed-json-repairs")
150 .temporary(false)
151 .build()
152 .expect("Failed building workspace");
153
154 let done_dir = ws.get_done_directory();
155 debug!("Returned done_dir: {:?}", done_dir);
156 pretty_assert_eq!(
157 *done_dir,
158 PathBuf::from("/some/root/done"),
159 "Should match the expected done directory"
160 );
161 info!("Finished test: test_get_done_directory");
162 }
163
164 #[traced_test]
165 fn test_get_input_filename_at_index_usize() {
166 info!("Starting test: test_get_input_filename_at_index_usize");
167
168 let ws = BatchWorkspaceBuilder::default()
169 .workdir("/my/workdir")
170 .logdir("/my/logs")
171 .done_dir("/my/done")
172 .failed_items_dir("/my/failed-items")
173 .target_dir("/my/target")
174 .failed_json_repairs_dir("/my/failed-json-repairs")
175 .temporary(false)
176 .build()
177 .expect("Failed building workspace");
178
179 let idx = BatchIndex::Usize(42);
180 let path = ws.input_filename(&idx);
181 debug!("input_filename => {:?}", path);
182 pretty_assert_eq!(path, PathBuf::from("/my/workdir/batch_input_42.jsonl"));
183 info!("Finished test: test_get_input_filename_at_index_usize");
184 }
185
186 #[traced_test]
187 fn test_get_input_filename_at_index_uuid() {
188 info!("Starting test: test_get_input_filename_at_index_uuid");
189
190 let ws = BatchWorkspaceBuilder::default()
191 .workdir("/my/workdir")
192 .logdir("/my/logs")
193 .done_dir("/my/done")
194 .failed_items_dir("/my/failed-items")
195 .target_dir("/my/target")
196 .failed_json_repairs_dir("/my/failed-json-repairs")
197 .temporary(false)
198 .build()
199 .expect("Failed building workspace");
200
201 let idx_uuid = BatchIndex::from_uuid_str("550e8400-e29b-41d4-a716-446655440000")
202 .unwrap();
203 let path = ws.input_filename(&idx_uuid);
204 debug!("input_filename => {:?}", path);
205 pretty_assert_eq!(
206 path,
207 PathBuf::from("/my/workdir/batch_input_550e8400-e29b-41d4-a716-446655440000.jsonl")
208 );
209 info!("Finished test: test_get_input_filename_at_index_uuid");
210 }
211
212 #[traced_test]
213 fn test_get_output_filename_at_index() {
214 info!("Starting test: test_get_output_filename_at_index");
215
216 let ws = BatchWorkspaceBuilder::default()
217 .workdir("/data/workdir")
218 .logdir("/data/logs")
219 .done_dir("/data/done")
220 .failed_items_dir("/data/failed")
221 .target_dir("/data/target")
222 .failed_json_repairs_dir("/data/repair")
223 .temporary(false)
224 .build()
225 .expect("Failed building workspace");
226
227 let idx = BatchIndex::Usize(99);
228 let path = ws.output_filename(&idx);
229 debug!("output_filename => {:?}", path);
230 pretty_assert_eq!(path, PathBuf::from("/data/workdir/batch_output_99.jsonl"));
231 info!("Finished test: test_get_output_filename_at_index");
232 }
233
234 #[traced_test]
235 fn test_get_error_filename_at_index() {
236 info!("Starting test: test_get_error_filename_at_index");
237
238 let ws = BatchWorkspaceBuilder::default()
239 .workdir("/data/workdir")
240 .logdir("/data/logs")
241 .done_dir("/data/done")
242 .failed_items_dir("/data/failed")
243 .target_dir("/data/target")
244 .failed_json_repairs_dir("/data/repair")
245 .temporary(false)
246 .build()
247 .expect("Failed building workspace");
248
249 let idx = BatchIndex::from_uuid_str("f47ac10b-58cc-4372-a567-0e02b2c3d479").unwrap();
250 let path = ws.error_filename(&idx);
251 debug!("error_filename => {:?}", path);
252 pretty_assert_eq!(
253 path,
254 PathBuf::from("/data/workdir/batch_error_f47ac10b-58cc-4372-a567-0e02b2c3d479.jsonl")
255 );
256 info!("Finished test: test_get_error_filename_at_index");
257 }
258
259 #[traced_test]
260 fn test_get_metadata_filename_at_index() {
261 info!("Starting test: test_get_metadata_filename_at_index");
262
263 let ws = BatchWorkspaceBuilder::default()
264 .workdir("/data/workdir")
265 .logdir("/data/logs")
266 .done_dir("/data/done")
267 .failed_items_dir("/data/failed")
268 .target_dir("/data/target")
269 .failed_json_repairs_dir("/data/repair")
270 .temporary(false)
271 .build()
272 .expect("Failed building workspace");
273
274 let idx = BatchIndex::Usize(0);
275 let path = ws.metadata_filename(&idx);
276 debug!("metadata_filename => {:?}", path);
277 pretty_assert_eq!(
278 path,
279 PathBuf::from("/data/workdir/batch_metadata_0.jsonl")
280 );
281 info!("Finished test: test_get_metadata_filename_at_index");
282 }
283
284 #[traced_test]
285 fn test_get_target_path_for_item() {
286 info!("Starting test: test_get_target_path_for_item");
287
288 let ws = BatchWorkspaceBuilder::default()
289 .workdir("/root/workdir")
290 .logdir("/root/logs")
291 .done_dir("/root/done")
292 .failed_items_dir("/root/failed-items")
293 .target_dir("/root/target")
294 .failed_json_repairs_dir("/root/repair")
295 .temporary(false)
296 .build()
297 .expect("Failed building workspace");
298
299 let item: Arc<dyn GetTargetPathForAIExpansion + Send + Sync> = Arc::new(
301 MockItemWithTargetPath { name: "my_item_name".to_string() }
302 );
303
304 let path = ws.target_path(&item, &ExpectedContentType::Json);
305 debug!("target_path => {:?}", path);
306 pretty_assert_eq!(path, PathBuf::from("/root/target/my_item_name.json"));
307
308 let path2 = ws.target_path(&item, &ExpectedContentType::PlainText);
309 debug!("target_path (PlainText) => {:?}", path2);
310 pretty_assert_eq!(path2, PathBuf::from("/root/target/my_item_name.txt"));
311
312 info!("Finished test: test_get_target_path_for_item");
313 }
314
315 #[traced_test]
316 fn test_get_failed_json_repairs_dir() {
317 info!("Starting test: test_get_failed_json_repairs_dir");
318
319 let ws = BatchWorkspaceBuilder::default()
320 .workdir("/root/workdir")
321 .logdir("/root/logs")
322 .done_dir("/root/done")
323 .failed_items_dir("/root/failed-items")
324 .target_dir("/root/target")
325 .failed_json_repairs_dir("/root/failed-json-repairs")
326 .temporary(false)
327 .build()
328 .expect("Failed building workspace");
329
330 let dir = ws.failed_json_repairs_dir();
331 debug!("failed_json_repairs_dir => {:?}", dir);
332 pretty_assert_eq!(*dir, PathBuf::from("/root/failed-json-repairs"));
333
334 info!("Finished test: test_get_failed_json_repairs_dir");
335 }
336
337 #[traced_test]
338 fn test_get_failed_items_dir() {
339 info!("Starting test: test_get_failed_items_dir");
340
341 let ws = BatchWorkspaceBuilder::default()
342 .workdir("/root/workdir")
343 .logdir("/root/logs")
344 .done_dir("/root/done")
345 .failed_items_dir("/root/failed-items")
346 .target_dir("/root/target")
347 .failed_json_repairs_dir("/root/failed-json-repairs")
348 .temporary(false)
349 .build()
350 .expect("Failed building workspace");
351
352 let dir = ws.failed_items_dir();
353 debug!("failed_items_dir => {:?}", dir);
354 pretty_assert_eq!(*dir, PathBuf::from("/root/failed-items"));
355
356 info!("Finished test: test_get_failed_items_dir");
357 }
358
359 #[traced_test]
360 fn test_get_text_storage_path_invokes_todo() {
361 info!("Starting test: test_get_text_storage_path_invokes_todo");
362 let ws = BatchWorkspaceBuilder::default()
363 .workdir("/root/workdir")
364 .logdir("/root/logs")
365 .done_dir("/root/done")
366 .failed_items_dir("/root/failed-items")
367 .target_dir("/root/target")
368 .failed_json_repairs_dir("/root/failed-json-repairs")
369 .temporary(false)
370 .build()
371 .expect("Failed building workspace");
372
373 let _ = ws.text_storage_path(&BatchIndex::Usize(123));
375 }
376
377 #[traced_test]
378 fn test_get_workdir() {
379 info!("Starting test: test_get_workdir");
380
381 let ws = BatchWorkspaceBuilder::default()
382 .workdir("/some/workdir")
383 .logdir("/some/logdir")
384 .done_dir("/some/done")
385 .failed_items_dir("/some/failed-items")
386 .target_dir("/some/target")
387 .failed_json_repairs_dir("/some/repairs")
388 .temporary(false)
389 .build()
390 .expect("Failed building workspace");
391
392 let wd = ws.workdir();
393 debug!("workdir => {:?}", wd);
394 pretty_assert_eq!(*wd, PathBuf::from("/some/workdir"));
395
396 info!("Finished test: test_get_workdir");
397 }
398
399 #[traced_test]
400 async fn concurrency_test_on_trait_methods() {
401 info!("Starting test: concurrency_test_on_trait_methods");
402
403 let workspace = BatchWorkspaceBuilder::default()
405 .workdir("/test/workdir")
406 .logdir("/test/logs")
407 .done_dir("/test/done")
408 .failed_items_dir("/test/failed-items")
409 .target_dir("/test/target")
410 .failed_json_repairs_dir("/test/repair")
411 .temporary(false)
412 .build()
413 .expect("Failed building workspace");
414
415 let arc_ws = Arc::new(workspace);
416
417 let mut tasks = Vec::new();
418 for i in 0..4 {
419 let ws_clone = arc_ws.clone();
420 tasks.push(tokio::spawn(async move {
421 debug!("Task #{} => calling trait methods on workspace", i);
422 let done_dir = ws_clone.get_done_directory();
423 let input_filename = ws_clone.input_filename(&BatchIndex::Usize(42));
424 let output_filename= ws_clone.output_filename(&BatchIndex::Usize(999));
425 let error_filename = ws_clone.error_filename(
426 &BatchIndex::from_uuid_str("f47ac10b-58cc-4372-a567-0e02b2c3d479").unwrap()
427 );
428 let meta_filename = ws_clone.metadata_filename(&BatchIndex::Usize(0));
429 let failed_dir = ws_clone.failed_items_dir();
430 let repairs_dir = ws_clone.failed_json_repairs_dir();
431 let wd = ws_clone.workdir();
432
433 debug!("done_dir = {:?}", done_dir);
434 debug!("input_filename = {:?}", input_filename);
435 debug!("output_filename = {:?}", output_filename);
436 debug!("error_filename = {:?}", error_filename);
437 debug!("metadata_filename = {:?}", meta_filename);
438 debug!("failed_dir = {:?}", failed_dir);
439 debug!("repairs_dir = {:?}", repairs_dir);
440 debug!("workdir = {:?}", wd);
441 }));
442 }
443
444 let results = futures::future::join_all(tasks).await;
445 for (i, res) in results.into_iter().enumerate() {
446 match res {
447 Ok(_) => debug!("Task #{} completed successfully", i),
448 Err(e) => panic!("Task #{} => join error: {:?}", i, e),
449 }
450 }
451
452 info!("Finished test: concurrency_test_on_trait_methods");
453 }
454}