batch_mode_batch_workspace/
workspace.rs

1// ---------------- [ File: src/workspace.rs ]
2crate::ix!();
3
4#[derive(Getters,Debug)]
5#[getset(get="pub")]
6pub struct BatchWorkspace {
7    workdir:                    PathBuf,
8    logdir:                     PathBuf,
9    done_dir:                   PathBuf,
10    failed_items_dir:           PathBuf,
11    target_dir:                 PathBuf,
12    failed_json_repairs_dir:    PathBuf,
13    temp_dir:                   Option<TempDir>, // Added this field
14    temporary:                  bool,
15}
16
17impl PartialEq for BatchWorkspace {
18    fn eq(&self, other: &Self) -> bool {
19        self.workdir                    == other.workdir &&
20        self.logdir                     == other.logdir &&
21        self.done_dir                   == other.done_dir &&
22        self.target_dir                 == other.target_dir &&
23        self.failed_json_repairs_dir    == other.failed_json_repairs_dir &&
24        self.failed_items_dir           == other.failed_items_dir &&
25        self.temporary                  == other.temporary
26        // Exclude `temp_dir` from equality comparison
27    }
28}
29
30impl Eq for BatchWorkspace {}
31
32unsafe impl Send for BatchWorkspace {}
33unsafe impl Sync for BatchWorkspace {}
34
35impl BatchWorkspace {
36
37    pub async fn find_existing_triple_with_given_index(self: Arc<BatchWorkspace>, index: &BatchIndex) 
38        -> Result<BatchFileTriple,BatchWorkspaceError> 
39    {
40        let maybe_triple = self.locate_batch_files(index).await?;
41        match maybe_triple {
42            Some(triple) => Ok(triple),
43            None         => Err(BatchWorkspaceError::NoBatchFileTripleAtIndex { index: index.clone() }),
44        }
45    }
46
47    pub async fn new_in(product_root: impl AsRef<Path>) -> Result<Arc<Self>, BatchWorkspaceError> {
48
49        info!("creating new workspace in {:?}", product_root.as_ref());
50
51        let product_root = product_root.as_ref();
52        tokio::fs::create_dir_all(product_root).await?;
53
54        let workspace = Self {
55            workdir:                 product_root.join("workdir"),
56            logdir:                  product_root.join("logs"),
57            done_dir:                product_root.join("done"),
58            target_dir:              product_root.join("target"),
59            failed_json_repairs_dir: product_root.join("failed-json-repairs"),
60            failed_items_dir:        product_root.join("failed-items"),
61            temp_dir:                None, // No TempDir here
62            temporary:               false,
63        };
64
65        workspace.create_directories_if_dne().await?;
66
67        Ok(Arc::new(workspace))
68    }
69
70    pub async fn new_temp() -> Result<Arc<Self>, BatchWorkspaceError> {
71
72        let temp_dir = tempdir()?;
73        let temp_dir_path = temp_dir.path().to_path_buf();
74
75        info!("creating new temporary workspace in {:?}", temp_dir_path);
76
77        let workspace = Self {
78            workdir:                 temp_dir_path.join("workdir"),
79            logdir:                  temp_dir_path.join("logs"),
80            done_dir:                temp_dir_path.join("done"),
81            target_dir:              temp_dir_path.join("target"),
82            failed_json_repairs_dir: temp_dir_path.join("failed-json-repairs"),
83            failed_items_dir:        temp_dir_path.join("failed-items"),
84            temp_dir:                Some(temp_dir), // Store TempDir here
85            temporary:               true,
86        };
87
88        workspace.create_directories_if_dne().await?;
89
90        Ok(Arc::new(workspace))
91    }
92
93    pub async fn new_mock() -> Result<Arc<Self>,BatchWorkspaceError> {
94
95        let workspace = Self::new_temp().await?;
96        let workdir = workspace.workdir();
97        
98        let filenames = [
99            "batch_input_0.jsonl",
100            "batch_output_1.jsonl",
101            "batch_error_12345.jsonl",
102            "batch_input_550e8400-e29b-41d4-a716-446655440000.jsonl",
103            "batch_output_f47ac10b-58cc-4372-a567-0e02b2c3d479.jsonl",
104            "batch_error_invalid.jsonl", // Should be ignored
105            "random_file.txt",           // Should be ignored
106        ];
107        
108        info!("writing mock files {:#?} in our mock workspace", filenames);
109
110        for filename in filenames {
111            fs::write(workdir.join(filename), "").await?;
112        }
113        
114        Ok(workspace)
115    }
116
117    #[cfg(test)]
118    pub async fn cleanup_if_temporary(&self) -> Result<(),BatchWorkspaceError> {
119        if self.temporary {
120            fs::remove_dir_all(&self.workdir).await?;
121            fs::remove_dir_all(&self.logdir).await?;
122            fs::remove_dir_all(&self.done_dir).await?;
123            fs::remove_dir_all(&self.target_dir).await?;
124            fs::remove_dir_all(&self.failed_json_repairs_dir).await?;
125            fs::remove_dir_all(&self.failed_items_dir).await?;
126        }
127        Ok(())
128    }
129
130    async fn create_directories_if_dne(&self) -> Result<(),BatchWorkspaceError> {
131        // Ensure the work directories exist
132        tokio::fs::create_dir_all(&self.workdir).await?;
133        tokio::fs::create_dir_all(&self.logdir).await?;
134        tokio::fs::create_dir_all(&self.done_dir).await?;
135        tokio::fs::create_dir_all(&self.target_dir).await?;
136        tokio::fs::create_dir_all(&self.failed_json_repairs_dir).await?;
137        tokio::fs::create_dir_all(&self.failed_items_dir).await?;
138        Ok(())
139    }
140
141    pub fn get_target_directory_files(&self) -> Vec<PathBuf> {
142        // Example implementation: scan the target directory for existing files
143        std::fs::read_dir(&self.target_dir)
144            .unwrap()
145            .filter_map(|entry| entry.ok().map(|e| e.path()))
146            .collect()
147    }
148
149    pub fn batch_expansion_error_log_filename(&self, batch_idx: &BatchIndex) -> PathBuf {
150        self.logdir.join(format!("batch_expansion_error_log_{}.jsonl", batch_idx))
151    }
152}