1use crate::__export::from_str;
2use crate::cryptography::{hash_file_sha256, Sha256};
3use crate::exception::BuildError;
4use crate::file_collection::{FileCollection, FileSet};
5use crate::identifier::TaskId;
6use crate::lazy_evaluation::anonymous::AnonymousProvider;
7use crate::lazy_evaluation::{IntoProvider, Prop, Provider, ProviderExt, VecProp};
8use crate::project::buildable::IntoBuildable;
9use crate::project::error::ProjectResult;
10
11use crate::provider;
12use crate::task::work_handler::output::Output;
13use crate::task::work_handler::serializer::Serializable;
14use input::Input;
15
16use once_cell::sync::OnceCell;
17use serde::de::DeserializeOwned;
18use serde::ser::Error as _;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use std::collections::{HashMap, HashSet};
21use std::error::Error;
22use std::fs::{create_dir_all, File};
23use std::io;
24use std::io::Read;
25use std::io::Write;
26use std::path::{Path, PathBuf};
27
28use std::time::SystemTime;
29use time::OffsetDateTime;
30use crate::error::PayloadError;
31
32pub mod input;
33pub mod output;
34pub mod serializer;
35
36pub struct WorkHandler {
37 task_id: TaskId,
38 cache_location: PathBuf,
39 inputs: VecProp<Serializable>,
40 outputs: Option<FileSet>,
41 serialized_output: HashMap<String, AnonymousProvider<Serializable>>,
42 final_input: OnceCell<Input>,
43 final_output: OnceCell<Option<Output>>,
44 execution_history: OnceCell<TaskExecutionHistory>,
45 up_to_date_status: OnceCell<bool>,
46 did_work: bool,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50struct TaskExecutionHistory {
51 input: Input,
52 output: Output,
53}
54
55impl WorkHandler {
56 pub fn new(id: &TaskId, cache_loc: PathBuf) -> Self {
57 Self {
58 task_id: id.clone(),
59 cache_location: cache_loc,
60 inputs: VecProp::new(id.join("inputs").unwrap()),
61 outputs: None,
62 serialized_output: Default::default(),
63 final_input: OnceCell::new(),
64 final_output: OnceCell::new(),
65 execution_history: OnceCell::new(),
66 up_to_date_status: OnceCell::new(),
67 did_work: true,
68 }
69 }
70
71 pub fn has_inputs_and_outputs(&self) -> bool {
72 !self.inputs.get().is_empty() && self.outputs.is_some()
73 }
74
75 pub fn remove_execution_history(&self) -> io::Result<()> {
77 let path = self.task_id.as_path();
78 let file_location = self.cache_location.join(path);
79 if file_location.exists() {
80 std::fs::remove_file(file_location)?;
81 }
82 Ok(())
83 }
84
85 pub fn store_execution_history(&self) -> ProjectResult<()> {
87 let input = self.get_input()?.clone();
88 if !input.any_inputs() {
89 return Ok(());
90 }
91 let output = if let Some(output) = self.get_output()? {
92 output.clone()
93 } else {
94 return Ok(());
95 };
96 let history = TaskExecutionHistory { input, output };
97 let path = self.task_id.as_path();
98 let file_location = self.cache_location.join(path);
99 if let Some(parent) = file_location.parent() {
100 create_dir_all(parent).map_err(PayloadError::new)?;
101 }
102
103 let mut file = File::options()
104 .write(true)
105 .truncate(true)
106 .create(true)
107 .open(file_location).map_err(PayloadError::new)?;
108
109 serializer::to_writer(&mut file, &history)?;
110 Ok(())
111 }
112
113 pub fn cache_input(&self, input: Input) -> io::Result<()> {
114 let path = self.task_id.as_path();
115 let file_location = self.cache_location.join(path);
116 if let Some(parent) = file_location.parent() {
117 create_dir_all(parent)?;
118 }
119
120 let mut file = File::options()
121 .write(true)
122 .truncate(true)
123 .create(true)
124 .open(file_location)?;
125
126 serializer::to_writer(&mut file, &input).unwrap();
127 Ok(())
128 }
129
130 fn try_get_execution_history(&self) -> Option<&TaskExecutionHistory> {
131 self.execution_history
132 .get_or_try_init(|| -> Result<TaskExecutionHistory, Box<dyn Error>> {
133 let path = self.task_id.as_path();
134 let file_location = self.cache_location.join(path);
135 if file_location.exists() {
136 let mut read = File::open(&file_location)?;
137 let mut buffer = String::new();
138 read.read_to_string(&mut buffer)
139 .unwrap_or_else(|_| panic!("Could not read to end of {:?}", file_location));
140 Ok(from_str(&buffer)?)
141 } else {
142 Err(Box::new(BuildError::new("no file found for cache")))
143 }
144 })
145 .ok()
146 }
147
148 pub fn try_get_prev_input(&self) -> Option<&Input> {
149 self.try_get_execution_history().map(|h| &h.input)
150 }
151
152 pub fn add_input<T: Serialize + Send + Sync + Clone + 'static, P: IntoProvider<T>>(
153 &mut self,
154 id: &str,
155 value: P,
156 ) -> ProjectResult
157 where
158 <P as IntoProvider<T>>::Provider: 'static,
159 {
160 let mut prop: Prop<Serializable> = self.task_id.prop(id).map_err(PayloadError::new)?;
161 let value_provider = value.into_provider();
162 prop.set_with(value_provider.flat_map(|v| Serializable::new(v))).map_err(PayloadError::new)?;
163 self.inputs.push_with(prop);
164 Ok(())
165 }
166
167 pub fn add_input_file<Pa: AsRef<Path> + 'static, P: IntoProvider<Pa>>(
168 &mut self,
169 id: &str,
170 value: P,
171 ) -> ProjectResult
172 where
173 Pa: Send + Sync + Clone,
174 <P as IntoProvider<Pa>>::Provider: 'static + Clone,
175 {
176 let mut prop: Prop<Serializable> = self.task_id.prop(id).map_err(PayloadError::new)?;
177 let provider = value.into_provider();
178 let path_provider = provider.flat_map(|p| Serializable::new(InputFile::new(p.as_ref())));
179 prop.set_with(path_provider).map_err(PayloadError::new)?;
180 self.inputs.push_with(prop);
181 Ok(())
182 }
183
184 pub fn add_input_files<Pa, P: IntoProvider<Pa>>(&mut self, id: &str, value: P) -> ProjectResult
185 where
186 Pa: FileCollection,
187 Pa: Send + Sync + Clone + 'static,
188 <P as IntoProvider<Pa>>::Provider: 'static + Clone,
189 {
190 let mut prop: Prop<Serializable> = self.task_id.prop(id).map_err(PayloadError::new)?;
191 let provider = value.into_provider();
192 let path_provider = provider.flat_map(|p: Pa| Serializable::new(InputFiles::new(p)));
193 prop.set_with(path_provider).map_err(PayloadError::new)?;
194 self.inputs.push_with(prop);
195 Ok(())
196 }
197
198 pub fn add_input_prop<T: Serialize + Send + Sync + Clone + 'static, P>(
199 &mut self,
200 prop: &P,
201 ) -> ProjectResult
202 where
203 P: IntoProvider<T> + Clone,
204 <P as IntoProvider<T>>::Provider: 'static,
205 {
206 let prop = prop.clone().into_provider();
207 let string_prov = AnonymousProvider::new(prop.flat_map(Serializable::new));
208 self.inputs.push_with(string_prov);
209 Ok(())
210 }
211
212 pub fn get_input(&self) -> ProjectResult<&Input> {
213 self.final_input.get_or_try_init(|| {
214 let inputs = self.inputs.fallible_get().map_err(PayloadError::new)?;
215 let input = Input::new(&self.task_id, inputs);
216 Ok(input)
217 })
218 }
219
220 pub fn add_output<F: FileCollection>(&mut self, fc: F) {
222 *self.outputs.get_or_insert(FileSet::new()) += FileSet::from_iter(fc.files());
223 }
224
225 pub fn add_output_provider<P, F>(&mut self, fc_provider: P)
227 where
228 P: Provider<F> + 'static,
229 F: FileCollection + Send + Sync + Clone + 'static,
230 {
231 *self.outputs.get_or_insert(FileSet::new()) += FileSet::with_provider(fc_provider);
232 }
233
234 pub fn add_serialized_data<P, T: Serialize + DeserializeOwned + 'static + Send + Sync + Clone>(
236 &mut self,
237 id: &str,
238 value: P,
239 ) where
240 P: IntoProvider<T>,
241 P::Provider: 'static,
242 {
243 let mapped = value
244 .into_provider()
245 .flat_map(|s| Serializable::new(s).ok());
246
247 self.serialized_output
248 .insert(id.to_string(), AnonymousProvider::new(mapped));
249 }
250
251 pub fn add_empty_serialized_data(&mut self, id: &str) {
253 self.serialized_output.insert(
254 id.to_string(),
255 AnonymousProvider::new(provider!(|| Serializable::new(()).unwrap())),
256 );
257 }
258
259 pub fn get_output(&self) -> ProjectResult<Option<&Output>> {
261 self.final_output
262 .get_or_try_init(|| -> ProjectResult<Option<Output>> {
263 let mut serialized = HashMap::new();
264
265 for (key, data) in &self.serialized_output {
266 serialized.insert(key.clone(), data.fallible_get().map_err(PayloadError::new)?);
267 }
268
269 Ok(self
270 .outputs
271 .as_ref()
272 .map(|o| Output::new(o.clone(), serialized.clone()))
273 .or_else(|| {
274 if serialized.is_empty() {
275 Some(Output::new(FileSet::new(), serialized.clone()))
276 } else {
277 None
278 }
279 }))
280 })
281 .map(|o| o.as_ref())
282 }
283
284 pub fn prev_work(&self) -> Option<(&Input, &Output)> {
285 self.try_get_prev_input().zip(self.try_get_prev_output())
286 }
287
288 pub fn try_get_prev_output(&self) -> Option<&Output> {
290 self.try_get_execution_history().map(|h| &h.output)
291 }
292
293 pub fn did_work(&self) -> bool {
294 self.did_work
295 }
296
297 pub fn set_did_work(&mut self, did_work: bool) {
298 self.did_work = did_work;
299 }
300
301 pub fn set_up_to_date(&mut self, up_to_date_status: bool) {
302 self.up_to_date_status
303 .set(up_to_date_status)
304 .expect("up to date status already set")
305 }
306
307 pub fn up_to_date(&self) -> &bool {
308 self.up_to_date_status
309 .get()
310 .expect("up to date status not set")
311 }
312
313 fn serialize_data<T: Serialize>(val: T) -> impl Provider<String> {
314 let string = serializer::to_string(&val).ok();
315 provider!(move || { string.clone() })
317 }
318}
319
320impl IntoBuildable for &WorkHandler {
321 type Buildable = VecProp<Serializable>;
322
323 fn into_buildable(self) -> Self::Buildable {
324 self.inputs.clone().into_buildable()
330 }
331}
332
333#[derive(Debug)]
335pub struct InputFile(PathBuf);
336
337impl InputFile {
338 pub fn new(path: impl AsRef<Path>) -> Self {
339 let path = path.as_ref().to_path_buf();
340 Self(path)
341 }
342
343 pub fn serialize<P: AsRef<Path>, S: Serializer>(
345 path: P,
346 serializer: S,
347 ) -> Result<S::Ok, S::Error> {
348 Self::new(path).serialize(serializer)
349 }
350
351 pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<PathBuf, D::Error> {
352 let data = InputFileData::deserialize(deserializer)?;
353 Ok(data.path)
354 }
355}
356
357#[derive(Serialize, Deserialize)]
358struct InputFileData {
359 path: PathBuf,
360 data: Sha256,
361}
362
363impl Serialize for InputFile {
364 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
365 where
366 S: Serializer,
367 {
368 if self.0.exists() {
369 InputFileData {
370 path: self.0.clone(),
371 data: hash_file_sha256(&self.0).map_err(S::Error::custom)?,
372 }
373 .serialize(serializer)
374 } else {
375 ().serialize(serializer)
376 }
377 }
378}
379
380#[derive(Default)]
382pub enum ChangeStatus {
383 Deleted,
385 Modified,
387 #[default]
388 Added,
389 Same,
390}
391
392pub fn normalize_system_time(system_time: SystemTime) -> OffsetDateTime {
394 let duration = system_time
395 .duration_since(SystemTime::UNIX_EPOCH)
396 .expect("Couldn't determine duration since UNIX EPOCH");
397 let start = OffsetDateTime::UNIX_EPOCH;
398 start + duration
399}
400
401pub struct InputFiles(FileSet);
403
404impl InputFiles {
405 fn new<F: FileCollection>(fc: F) -> Self {
406 let fileset = FileSet::from_iter(fc.files());
407 Self(fileset)
408 }
409}
410
411impl Serialize for InputFiles {
412 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
413 where
414 S: Serializer,
415 {
416 let files = self.0.files();
417 if !files.is_empty() {
418 let data = InputFilesData::new(self.0.clone());
419 data.serialize(serializer)
420 } else {
421 ().serialize(serializer)
422 }
423 }
424}
425
426#[derive(Debug, Serialize)]
427struct InputFilesData {
428 all_files: HashSet<PathBuf>,
429 data: HashMap<PathBuf, InputFile>,
430}
431
432impl InputFilesData {
433 pub fn new(set: FileSet) -> Self {
434 let files = set.files();
435 Self {
436 all_files: files.clone(),
437 data: files
438 .into_iter()
439 .map(|f| (f.clone(), InputFile::new(f)))
440 .collect(),
441 }
442 }
443}