batch_mode_batch_workspace/
workspace.rs1crate::ix!();
3
4#[derive(Getters,Debug)]
5#[getset(get="pub")]
6pub struct BatchWorkspace {
7 workdir: PathBuf,
8 logdir: PathBuf,
9 done_dir: PathBuf,
10 failed_items_dir: PathBuf,
11 target_dir: PathBuf,
12 failed_json_repairs_dir: PathBuf,
13 temp_dir: Option<TempDir>, temporary: bool,
15}
16
17impl PartialEq for BatchWorkspace {
18 fn eq(&self, other: &Self) -> bool {
19 self.workdir == other.workdir &&
20 self.logdir == other.logdir &&
21 self.done_dir == other.done_dir &&
22 self.target_dir == other.target_dir &&
23 self.failed_json_repairs_dir == other.failed_json_repairs_dir &&
24 self.failed_items_dir == other.failed_items_dir &&
25 self.temporary == other.temporary
26 }
28}
29
30impl Eq for BatchWorkspace {}
31
32unsafe impl Send for BatchWorkspace {}
33unsafe impl Sync for BatchWorkspace {}
34
35impl BatchWorkspace {
36
37 pub async fn find_existing_triple_with_given_index(self: Arc<BatchWorkspace>, index: &BatchIndex)
38 -> Result<BatchFileTriple,BatchWorkspaceError>
39 {
40 let maybe_triple = self.locate_batch_files(index).await?;
41 match maybe_triple {
42 Some(triple) => Ok(triple),
43 None => Err(BatchWorkspaceError::NoBatchFileTripleAtIndex { index: index.clone() }),
44 }
45 }
46
47 pub async fn new_in(product_root: impl AsRef<Path>) -> Result<Arc<Self>, BatchWorkspaceError> {
48
49 info!("creating new workspace in {:?}", product_root.as_ref());
50
51 let product_root = product_root.as_ref();
52 tokio::fs::create_dir_all(product_root).await?;
53
54 let workspace = Self {
55 workdir: product_root.join("workdir"),
56 logdir: product_root.join("logs"),
57 done_dir: product_root.join("done"),
58 target_dir: product_root.join("target"),
59 failed_json_repairs_dir: product_root.join("failed-json-repairs"),
60 failed_items_dir: product_root.join("failed-items"),
61 temp_dir: None, temporary: false,
63 };
64
65 workspace.create_directories_if_dne().await?;
66
67 Ok(Arc::new(workspace))
68 }
69
70 pub async fn new_temp() -> Result<Arc<Self>, BatchWorkspaceError> {
71
72 let temp_dir = tempdir()?;
73 let temp_dir_path = temp_dir.path().to_path_buf();
74
75 info!("creating new temporary workspace in {:?}", temp_dir_path);
76
77 let workspace = Self {
78 workdir: temp_dir_path.join("workdir"),
79 logdir: temp_dir_path.join("logs"),
80 done_dir: temp_dir_path.join("done"),
81 target_dir: temp_dir_path.join("target"),
82 failed_json_repairs_dir: temp_dir_path.join("failed-json-repairs"),
83 failed_items_dir: temp_dir_path.join("failed-items"),
84 temp_dir: Some(temp_dir), temporary: true,
86 };
87
88 workspace.create_directories_if_dne().await?;
89
90 Ok(Arc::new(workspace))
91 }
92
93 pub async fn new_mock() -> Result<Arc<Self>,BatchWorkspaceError> {
94
95 let workspace = Self::new_temp().await?;
96 let workdir = workspace.workdir();
97
98 let filenames = [
99 "batch_input_0.jsonl",
100 "batch_output_1.jsonl",
101 "batch_error_12345.jsonl",
102 "batch_input_550e8400-e29b-41d4-a716-446655440000.jsonl",
103 "batch_output_f47ac10b-58cc-4372-a567-0e02b2c3d479.jsonl",
104 "batch_error_invalid.jsonl", "random_file.txt", ];
107
108 info!("writing mock files {:#?} in our mock workspace", filenames);
109
110 for filename in filenames {
111 fs::write(workdir.join(filename), "").await?;
112 }
113
114 Ok(workspace)
115 }
116
117 #[cfg(test)]
118 pub async fn cleanup_if_temporary(&self) -> Result<(),BatchWorkspaceError> {
119 if self.temporary {
120 fs::remove_dir_all(&self.workdir).await?;
121 fs::remove_dir_all(&self.logdir).await?;
122 fs::remove_dir_all(&self.done_dir).await?;
123 fs::remove_dir_all(&self.target_dir).await?;
124 fs::remove_dir_all(&self.failed_json_repairs_dir).await?;
125 fs::remove_dir_all(&self.failed_items_dir).await?;
126 }
127 Ok(())
128 }
129
130 async fn create_directories_if_dne(&self) -> Result<(),BatchWorkspaceError> {
131 tokio::fs::create_dir_all(&self.workdir).await?;
133 tokio::fs::create_dir_all(&self.logdir).await?;
134 tokio::fs::create_dir_all(&self.done_dir).await?;
135 tokio::fs::create_dir_all(&self.target_dir).await?;
136 tokio::fs::create_dir_all(&self.failed_json_repairs_dir).await?;
137 tokio::fs::create_dir_all(&self.failed_items_dir).await?;
138 Ok(())
139 }
140
141 pub fn get_target_directory_files(&self) -> Vec<PathBuf> {
142 std::fs::read_dir(&self.target_dir)
144 .unwrap()
145 .filter_map(|entry| entry.ok().map(|e| e.path()))
146 .collect()
147 }
148
149 pub fn batch_expansion_error_log_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
150 self.logdir.join(format!("batch_expansion_error_log_{}.jsonl", batch_idx))
151 }
152}