assemble_core/task/
work_handler.rs

1use crate::__export::from_str;
2use crate::cryptography::{hash_file_sha256, Sha256};
3use crate::exception::BuildError;
4use crate::file_collection::{FileCollection, FileSet};
5use crate::identifier::TaskId;
6use crate::lazy_evaluation::anonymous::AnonymousProvider;
7use crate::lazy_evaluation::{IntoProvider, Prop, Provider, ProviderExt, VecProp};
8use crate::project::buildable::IntoBuildable;
9use crate::project::error::ProjectResult;
10
11use crate::provider;
12use crate::task::work_handler::output::Output;
13use crate::task::work_handler::serializer::Serializable;
14use input::Input;
15
16use once_cell::sync::OnceCell;
17use serde::de::DeserializeOwned;
18use serde::ser::Error as _;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use std::collections::{HashMap, HashSet};
21use std::error::Error;
22use std::fs::{create_dir_all, File};
23use std::io;
24use std::io::Read;
25use std::io::Write;
26use std::path::{Path, PathBuf};
27
28use std::time::SystemTime;
29use time::OffsetDateTime;
30use crate::error::PayloadError;
31
32pub mod input;
33pub mod output;
34pub mod serializer;
35
36pub struct WorkHandler {
37    task_id: TaskId,
38    cache_location: PathBuf,
39    inputs: VecProp<Serializable>,
40    outputs: Option<FileSet>,
41    serialized_output: HashMap<String, AnonymousProvider<Serializable>>,
42    final_input: OnceCell<Input>,
43    final_output: OnceCell<Option<Output>>,
44    execution_history: OnceCell<TaskExecutionHistory>,
45    up_to_date_status: OnceCell<bool>,
46    did_work: bool,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50struct TaskExecutionHistory {
51    input: Input,
52    output: Output,
53}
54
55impl WorkHandler {
56    pub fn new(id: &TaskId, cache_loc: PathBuf) -> Self {
57        Self {
58            task_id: id.clone(),
59            cache_location: cache_loc,
60            inputs: VecProp::new(id.join("inputs").unwrap()),
61            outputs: None,
62            serialized_output: Default::default(),
63            final_input: OnceCell::new(),
64            final_output: OnceCell::new(),
65            execution_history: OnceCell::new(),
66            up_to_date_status: OnceCell::new(),
67            did_work: true,
68        }
69    }
70
71    pub fn has_inputs_and_outputs(&self) -> bool {
72        !self.inputs.get().is_empty() && self.outputs.is_some()
73    }
74
75    /// Removes execution history, if it exists.
76    pub fn remove_execution_history(&self) -> io::Result<()> {
77        let path = self.task_id.as_path();
78        let file_location = self.cache_location.join(path);
79        if file_location.exists() {
80            std::fs::remove_file(file_location)?;
81        }
82        Ok(())
83    }
84
85    /// Store execution data. Will only perform a store if there's both an input and an output
86    pub fn store_execution_history(&self) -> ProjectResult<()> {
87        let input = self.get_input()?.clone();
88        if !input.any_inputs() {
89            return Ok(());
90        }
91        let output = if let Some(output) = self.get_output()? {
92            output.clone()
93        } else {
94            return Ok(());
95        };
96        let history = TaskExecutionHistory { input, output };
97        let path = self.task_id.as_path();
98        let file_location = self.cache_location.join(path);
99        if let Some(parent) = file_location.parent() {
100            create_dir_all(parent).map_err(PayloadError::new)?;
101        }
102
103        let mut file = File::options()
104            .write(true)
105            .truncate(true)
106            .create(true)
107            .open(file_location).map_err(PayloadError::new)?;
108
109        serializer::to_writer(&mut file, &history)?;
110        Ok(())
111    }
112
113    pub fn cache_input(&self, input: Input) -> io::Result<()> {
114        let path = self.task_id.as_path();
115        let file_location = self.cache_location.join(path);
116        if let Some(parent) = file_location.parent() {
117            create_dir_all(parent)?;
118        }
119
120        let mut file = File::options()
121            .write(true)
122            .truncate(true)
123            .create(true)
124            .open(file_location)?;
125
126        serializer::to_writer(&mut file, &input).unwrap();
127        Ok(())
128    }
129
130    fn try_get_execution_history(&self) -> Option<&TaskExecutionHistory> {
131        self.execution_history
132            .get_or_try_init(|| -> Result<TaskExecutionHistory, Box<dyn Error>> {
133                let path = self.task_id.as_path();
134                let file_location = self.cache_location.join(path);
135                if file_location.exists() {
136                    let mut read = File::open(&file_location)?;
137                    let mut buffer = String::new();
138                    read.read_to_string(&mut buffer)
139                        .unwrap_or_else(|_| panic!("Could not read to end of {:?}", file_location));
140                    Ok(from_str(&buffer)?)
141                } else {
142                    Err(Box::new(BuildError::new("no file found for cache")))
143                }
144            })
145            .ok()
146    }
147
148    pub fn try_get_prev_input(&self) -> Option<&Input> {
149        self.try_get_execution_history().map(|h| &h.input)
150    }
151
152    pub fn add_input<T: Serialize + Send + Sync + Clone + 'static, P: IntoProvider<T>>(
153        &mut self,
154        id: &str,
155        value: P,
156    ) -> ProjectResult
157    where
158        <P as IntoProvider<T>>::Provider: 'static,
159    {
160        let mut prop: Prop<Serializable> = self.task_id.prop(id).map_err(PayloadError::new)?;
161        let value_provider = value.into_provider();
162        prop.set_with(value_provider.flat_map(|v| Serializable::new(v))).map_err(PayloadError::new)?;
163        self.inputs.push_with(prop);
164        Ok(())
165    }
166
167    pub fn add_input_file<Pa: AsRef<Path> + 'static, P: IntoProvider<Pa>>(
168        &mut self,
169        id: &str,
170        value: P,
171    ) -> ProjectResult
172    where
173        Pa: Send + Sync + Clone,
174        <P as IntoProvider<Pa>>::Provider: 'static + Clone,
175    {
176        let mut prop: Prop<Serializable> = self.task_id.prop(id).map_err(PayloadError::new)?;
177        let provider = value.into_provider();
178        let path_provider = provider.flat_map(|p| Serializable::new(InputFile::new(p.as_ref())));
179        prop.set_with(path_provider).map_err(PayloadError::new)?;
180        self.inputs.push_with(prop);
181        Ok(())
182    }
183
184    pub fn add_input_files<Pa, P: IntoProvider<Pa>>(&mut self, id: &str, value: P) -> ProjectResult
185    where
186        Pa: FileCollection,
187        Pa: Send + Sync + Clone + 'static,
188        <P as IntoProvider<Pa>>::Provider: 'static + Clone,
189    {
190        let mut prop: Prop<Serializable> = self.task_id.prop(id).map_err(PayloadError::new)?;
191        let provider = value.into_provider();
192        let path_provider = provider.flat_map(|p: Pa| Serializable::new(InputFiles::new(p)));
193        prop.set_with(path_provider).map_err(PayloadError::new)?;
194        self.inputs.push_with(prop);
195        Ok(())
196    }
197
198    pub fn add_input_prop<T: Serialize + Send + Sync + Clone + 'static, P>(
199        &mut self,
200        prop: &P,
201    ) -> ProjectResult
202    where
203        P: IntoProvider<T> + Clone,
204        <P as IntoProvider<T>>::Provider: 'static,
205    {
206        let prop = prop.clone().into_provider();
207        let string_prov = AnonymousProvider::new(prop.flat_map(Serializable::new));
208        self.inputs.push_with(string_prov);
209        Ok(())
210    }
211
212    pub fn get_input(&self) -> ProjectResult<&Input> {
213        self.final_input.get_or_try_init(|| {
214            let inputs = self.inputs.fallible_get().map_err(PayloadError::new)?;
215            let input = Input::new(&self.task_id, inputs);
216            Ok(input)
217        })
218    }
219
220    /// Add some output file collection. Can add outputs until [`get_output`](WorkHandler::get_output) is called.
221    pub fn add_output<F: FileCollection>(&mut self, fc: F) {
222        *self.outputs.get_or_insert(FileSet::new()) += FileSet::from_iter(fc.files());
223    }
224
225    /// Add some output file collection. Can add outputs until [`get_output`](WorkHandler::get_output) is called.
226    pub fn add_output_provider<P, F>(&mut self, fc_provider: P)
227    where
228        P: Provider<F> + 'static,
229        F: FileCollection + Send + Sync + Clone + 'static,
230    {
231        *self.outputs.get_or_insert(FileSet::new()) += FileSet::with_provider(fc_provider);
232    }
233
234    /// Add data that can be serialized, then deserialized later for reuse
235    pub fn add_serialized_data<P, T: Serialize + DeserializeOwned + 'static + Send + Sync + Clone>(
236        &mut self,
237        id: &str,
238        value: P,
239    ) where
240        P: IntoProvider<T>,
241        P::Provider: 'static,
242    {
243        let mapped = value
244            .into_provider()
245            .flat_map(|s| Serializable::new(s).ok());
246
247        self.serialized_output
248            .insert(id.to_string(), AnonymousProvider::new(mapped));
249    }
250
251    /// Add data that can be serialized, then deserialized later for reuse
252    pub fn add_empty_serialized_data(&mut self, id: &str) {
253        self.serialized_output.insert(
254            id.to_string(),
255            AnonymousProvider::new(provider!(|| Serializable::new(()).unwrap())),
256        );
257    }
258
259    /// Get the output of this file collection
260    pub fn get_output(&self) -> ProjectResult<Option<&Output>> {
261        self.final_output
262            .get_or_try_init(|| -> ProjectResult<Option<Output>> {
263                let mut serialized = HashMap::new();
264
265                for (key, data) in &self.serialized_output {
266                    serialized.insert(key.clone(), data.fallible_get().map_err(PayloadError::new)?);
267                }
268
269                Ok(self
270                    .outputs
271                    .as_ref()
272                    .map(|o| Output::new(o.clone(), serialized.clone()))
273                    .or_else(|| {
274                        if serialized.is_empty() {
275                            Some(Output::new(FileSet::new(), serialized.clone()))
276                        } else {
277                            None
278                        }
279                    }))
280            })
281            .map(|o| o.as_ref())
282    }
283
284    pub fn prev_work(&self) -> Option<(&Input, &Output)> {
285        self.try_get_prev_input().zip(self.try_get_prev_output())
286    }
287
288    /// Try to get the output of the previous run
289    pub fn try_get_prev_output(&self) -> Option<&Output> {
290        self.try_get_execution_history().map(|h| &h.output)
291    }
292
293    pub fn did_work(&self) -> bool {
294        self.did_work
295    }
296
297    pub fn set_did_work(&mut self, did_work: bool) {
298        self.did_work = did_work;
299    }
300
301    pub fn set_up_to_date(&mut self, up_to_date_status: bool) {
302        self.up_to_date_status
303            .set(up_to_date_status)
304            .expect("up to date status already set")
305    }
306
307    pub fn up_to_date(&self) -> &bool {
308        self.up_to_date_status
309            .get()
310            .expect("up to date status not set")
311    }
312
313    fn serialize_data<T: Serialize>(val: T) -> impl Provider<String> {
314        let string = serializer::to_string(&val).ok();
315        // let owned = Arc::new(val) as Arc<dyn Serialize>;
316        provider!(move || { string.clone() })
317    }
318}
319
320impl IntoBuildable for &WorkHandler {
321    type Buildable = VecProp<Serializable>;
322
323    fn into_buildable(self) -> Self::Buildable {
324        // let mut container = BuiltByContainer::new();
325        // for i in &self.inputs {
326        //     container.add(i.clone());
327        // }
328        // container
329        self.inputs.clone().into_buildable()
330    }
331}
332
333/// An input file is used to serialize a path
334#[derive(Debug)]
335pub struct InputFile(PathBuf);
336
337impl InputFile {
338    pub fn new(path: impl AsRef<Path>) -> Self {
339        let path = path.as_ref().to_path_buf();
340        Self(path)
341    }
342
343    /// Direct implementaiton of serialize
344    pub fn serialize<P: AsRef<Path>, S: Serializer>(
345        path: P,
346        serializer: S,
347    ) -> Result<S::Ok, S::Error> {
348        Self::new(path).serialize(serializer)
349    }
350
351    pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<PathBuf, D::Error> {
352        let data = InputFileData::deserialize(deserializer)?;
353        Ok(data.path)
354    }
355}
356
357#[derive(Serialize, Deserialize)]
358struct InputFileData {
359    path: PathBuf,
360    data: Sha256,
361}
362
363impl Serialize for InputFile {
364    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
365    where
366        S: Serializer,
367    {
368        if self.0.exists() {
369            InputFileData {
370                path: self.0.clone(),
371                data: hash_file_sha256(&self.0).map_err(S::Error::custom)?,
372            }
373            .serialize(serializer)
374        } else {
375            ().serialize(serializer)
376        }
377    }
378}
379
380/// Represents change from previous run
381#[derive(Default)]
382pub enum ChangeStatus {
383    /// Value was deleted.
384    Deleted,
385    /// Value was modified
386    Modified,
387    #[default]
388    Added,
389    Same,
390}
391
392/// Normalizes some system time to UTC time
393pub fn normalize_system_time(system_time: SystemTime) -> OffsetDateTime {
394    let duration = system_time
395        .duration_since(SystemTime::UNIX_EPOCH)
396        .expect("Couldn't determine duration since UNIX EPOCH");
397    let start = OffsetDateTime::UNIX_EPOCH;
398    start + duration
399}
400
401/// Used to serialize a fileset
402pub struct InputFiles(FileSet);
403
404impl InputFiles {
405    fn new<F: FileCollection>(fc: F) -> Self {
406        let fileset = FileSet::from_iter(fc.files());
407        Self(fileset)
408    }
409}
410
411impl Serialize for InputFiles {
412    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
413    where
414        S: Serializer,
415    {
416        let files = self.0.files();
417        if !files.is_empty() {
418            let data = InputFilesData::new(self.0.clone());
419            data.serialize(serializer)
420        } else {
421            ().serialize(serializer)
422        }
423    }
424}
425
426#[derive(Debug, Serialize)]
427struct InputFilesData {
428    all_files: HashSet<PathBuf>,
429    data: HashMap<PathBuf, InputFile>,
430}
431
432impl InputFilesData {
433    pub fn new(set: FileSet) -> Self {
434        let files = set.files();
435        Self {
436            all_files: files.clone(),
437            data: files
438                .into_iter()
439                .map(|f| (f.clone(), InputFile::new(f)))
440                .collect(),
441        }
442    }
443}