hydrate_pipeline/build/job_system/
job_system_traits.rs

1use super::{JobId, JobTypeId};
2use crate::build::{AssetArtifactIdPair, BuiltArtifact};
3use crate::import::{ImportData, ImportJobs};
4use crate::{BuildLogEvent, LogEventLevel, PipelineResult};
5use hydrate_base::handle::DummySerdeContextHandle;
6use hydrate_base::hashing::HashMap;
7use hydrate_base::{ArtifactId, AssetId, BuiltArtifactHeaderData, Handle};
8use hydrate_data::{
9    DataContainerRef, DataSet, DataSetError, FieldRef, HashObjectMode, PropertyPath, Record,
10    SchemaSet, SingleObject,
11};
12use serde::{Deserialize, Serialize};
13use siphasher::sip128::Hasher128;
14use std::cell::RefCell;
15use std::hash::Hash;
16use std::panic::RefUnwindSafe;
17use std::rc::Rc;
18use std::sync::Arc;
19use type_uuid::{TypeUuid, TypeUuidDynamic};
20
21pub trait ImportDataProvider {
22    fn clone_import_data_metadata_hashes(&self) -> HashMap<AssetId, u64>;
23
24    fn load_import_data(
25        &self,
26        schema_set: &SchemaSet,
27        asset_id: AssetId,
28    ) -> PipelineResult<ImportData>;
29}
30
31impl ImportDataProvider for ImportJobs {
32    fn clone_import_data_metadata_hashes(&self) -> HashMap<AssetId, u64> {
33        self.clone_import_data_metadata_hashes()
34    }
35
36    fn load_import_data(
37        &self,
38        schema_set: &SchemaSet,
39        asset_id: AssetId,
40    ) -> PipelineResult<ImportData> {
41        crate::import::load_import_data(self.import_data_root_path(), schema_set, asset_id)
42    }
43}
44
45pub struct NewJob {
46    pub job_type: JobTypeId,
47    pub input_hash: u128,
48    pub input_data: Vec<u8>,
49}
50
51fn create_artifact_id<T: Hash>(
52    asset_id: AssetId,
53    artifact_key: Option<T>,
54) -> ArtifactId {
55    if let Some(artifact_key) = artifact_key {
56        let mut hasher = siphasher::sip128::SipHasher::default();
57        asset_id.hash(&mut hasher);
58        artifact_key.hash(&mut hasher);
59        let input_hash = hasher.finish128().as_u128();
60        ArtifactId::from_u128(input_hash)
61    } else {
62        ArtifactId::from_uuid(asset_id.as_uuid())
63    }
64}
65
66#[derive(Debug, Copy, Clone)]
67pub enum JobRequestor {
68    Builder(AssetId),
69    Job(JobId),
70}
71
72//
73// API Design
74//
75pub trait JobApi: Send + Sync {
76    fn enqueue_job(
77        &self,
78        job_requestor: JobRequestor,
79        data_set: &DataSet,
80        schema_set: &SchemaSet,
81        job: NewJob,
82        debug_name: String,
83        log_events: &mut Vec<BuildLogEvent>,
84    ) -> PipelineResult<JobId>;
85
86    fn artifact_handle_created(
87        &self,
88        asset_id: AssetId,
89        artifact_id: ArtifactId,
90    );
91
92    fn produce_artifact(
93        &self,
94        artifact: BuiltArtifact,
95    );
96
97    fn fetch_import_data(
98        &self,
99        asset_id: AssetId,
100    ) -> PipelineResult<ImportData>;
101}
102
103//
104// Job Traits
105//
106pub trait JobInput: Hash + Serialize + for<'a> Deserialize<'a> {}
107
108pub trait JobOutput: Serialize + for<'a> Deserialize<'a> {}
109
110#[derive(Default, Clone)]
111pub struct JobEnumeratedDependencies {
112    // The contents of assets can affect the output so we need to include a hash of the contents of
113    // the asset. But assets can ref other assets, task needs to list all assets that are touched
114    // (including prototypes of those assets).
115    //
116    // We could do it at asset type granularity? (i.e. if you change an asset of type X all jobs that
117    // read an asset of type X have to rerun.
118    //
119    // What if we provide a data_set reader that keeps track of what was read? When we run the task
120    // the first time we don't know what we will touch or how to hash it but we can store it. Second
121    // build we can check if anything that was read last time was modified.
122    //
123    // Alternatively, jobs that read assets must always copy data out of the data set into a hashable
124    // form and pass it as input to a job.
125    //pub import_data: Vec<AssetId>,
126    //pub built_data: Vec<ArtifactId>,
127    pub upstream_jobs: Vec<JobId>,
128}
129
130pub(crate) trait JobProcessorAbstract: Send + Sync + RefUnwindSafe {
131    fn version_inner(&self) -> u32;
132
133    fn enumerate_dependencies_inner(
134        &self,
135        job_id: JobId,
136        job_requestor: JobRequestor,
137        input: &Vec<u8>,
138        data_set: &DataSet,
139        schema_set: &SchemaSet,
140        log_events: &mut Vec<BuildLogEvent>,
141    ) -> PipelineResult<JobEnumeratedDependencies>;
142
143    fn run_inner(
144        &self,
145        job_id: JobId,
146        input: &Vec<u8>,
147        data_set: &DataSet,
148        schema_set: &SchemaSet,
149        job_api: &dyn JobApi,
150        fetched_asset_data: &mut HashMap<AssetId, FetchedAssetData>,
151        fetched_import_data: &mut HashMap<AssetId, FetchedImportData>,
152        log_events: &mut Vec<BuildLogEvent>,
153    ) -> PipelineResult<Arc<Vec<u8>>>;
154}
155
156pub struct EnumerateDependenciesContext<'a, InputT> {
157    pub job_id: JobId,
158    pub(crate) job_requestor: JobRequestor,
159    pub input: &'a InputT,
160    pub data_set: &'a DataSet,
161    pub schema_set: &'a SchemaSet,
162    pub(crate) log_events: &'a Rc<RefCell<&'a mut Vec<BuildLogEvent>>>,
163}
164
165impl<'a, InputT> EnumerateDependenciesContext<'a, InputT> {
166    pub fn warn<T: Into<String>>(
167        &self,
168        message: T,
169    ) {
170        let (asset_id, job_id) = match self.job_requestor {
171            JobRequestor::Builder(asset_id) => (Some(asset_id), None),
172            JobRequestor::Job(job_id) => (None, Some(job_id)),
173        };
174
175        let mut log_events = self.log_events.borrow_mut();
176        let log_event = BuildLogEvent {
177            asset_id,
178            job_id,
179            level: LogEventLevel::Warning,
180            message: format!(
181                "While enumerating dependencies for new job {}: {}",
182                self.job_id.as_uuid(),
183                message.into()
184            ),
185        };
186        log::warn!("Build Warning: {:?}", log_event);
187        log_events.push(log_event);
188    }
189
190    pub fn error<T: Into<String>>(
191        &self,
192        message: T,
193    ) {
194        let (asset_id, job_id) = match self.job_requestor {
195            JobRequestor::Builder(asset_id) => (Some(asset_id), None),
196            JobRequestor::Job(job_id) => (None, Some(job_id)),
197        };
198
199        let mut log_events = self.log_events.borrow_mut();
200        let log_event = BuildLogEvent {
201            asset_id,
202            job_id,
203            level: LogEventLevel::Error,
204            message: format!(
205                "While enumerating dependencies for new job {}: {}",
206                self.job_id.as_uuid(),
207                message.into()
208            ),
209        };
210        log::error!("Build Error: {:?}", log_event);
211        log_events.push(log_event);
212    }
213}
214
215pub(crate) struct FetchedAssetData {
216    pub(crate) _contents_hash: u64,
217}
218
219pub(crate) struct FetchedImportDataInfo {
220    pub(crate) _contents_hash: u64,
221    pub(crate) _metadata_hash: u64,
222}
223
224pub(crate) struct FetchedImportData {
225    pub(crate) _info: FetchedImportDataInfo,
226    pub(crate) import_data: Arc<SingleObject>,
227}
228
229#[derive(Copy, Clone)]
230pub struct RunContext<'a, InputT> {
231    pub job_id: JobId,
232    pub input: &'a InputT,
233    pub data_set: &'a DataSet,
234    pub schema_set: &'a SchemaSet,
235    pub(crate) fetched_asset_data: &'a Rc<RefCell<&'a mut HashMap<AssetId, FetchedAssetData>>>,
236    pub(crate) fetched_import_data: &'a Rc<RefCell<&'a mut HashMap<AssetId, FetchedImportData>>>,
237    pub(crate) job_api: &'a dyn JobApi,
238    pub(crate) log_events: &'a Rc<RefCell<&'a mut Vec<BuildLogEvent>>>,
239}
240
241impl<'a, InputT> RunContext<'a, InputT> {
242    pub fn warn<T: Into<String>>(
243        &self,
244        message: T,
245    ) {
246        let mut log_events = self.log_events.borrow_mut();
247        let log_event = BuildLogEvent {
248            asset_id: None,
249            job_id: Some(self.job_id),
250            level: LogEventLevel::Warning,
251            message: message.into(),
252        };
253        log::warn!("Build Warning: {:?}", log_event);
254        log_events.push(log_event);
255    }
256
257    pub fn error<T: Into<String>>(
258        &self,
259        message: T,
260    ) {
261        let mut log_events = self.log_events.borrow_mut();
262        let log_event = BuildLogEvent {
263            asset_id: None,
264            job_id: Some(self.job_id),
265            level: LogEventLevel::Error,
266            message: message.into(),
267        };
268        log::error!("Build Error: {:?}", log_event);
269        log_events.push(log_event);
270    }
271
272    pub fn asset<T: Record>(
273        &'a self,
274        asset_id: AssetId,
275    ) -> PipelineResult<T::Reader<'a>> {
276        if self
277            .data_set
278            .asset_schema(asset_id)
279            .ok_or(DataSetError::AssetNotFound)?
280            .name()
281            != T::schema_name()
282        {
283            Err(DataSetError::InvalidSchema)?;
284        }
285
286        let mut fetched_asset_data = self.fetched_asset_data.borrow_mut();
287        fetched_asset_data
288            .entry(asset_id)
289            .or_insert_with(|| FetchedAssetData {
290                _contents_hash: self
291                    .data_set
292                    .hash_object(asset_id, HashObjectMode::PropertiesOnly)
293                    .unwrap(),
294            });
295
296        Ok(<T as Record>::Reader::new(
297            PropertyPath::default(),
298            DataContainerRef::from_dataset(self.data_set, self.schema_set, asset_id),
299        ))
300    }
301
302    pub fn imported_data<T: Record>(
303        &'a self,
304        asset_id: AssetId,
305    ) -> PipelineResult<T::Reader<'a>> {
306        let mut fetched_import_data = self.fetched_import_data.borrow_mut();
307        let import_data = if let Some(fetched_import_data) = fetched_import_data.get(&asset_id) {
308            fetched_import_data.import_data.clone()
309        } else {
310            let newly_fetched_import_data = self.job_api.fetch_import_data(asset_id)?;
311            let import_data = Arc::new(newly_fetched_import_data.import_data);
312
313            let old = fetched_import_data.insert(
314                asset_id,
315                FetchedImportData {
316                    import_data: import_data.clone(),
317                    _info: FetchedImportDataInfo {
318                        _contents_hash: newly_fetched_import_data.contents_hash,
319                        _metadata_hash: newly_fetched_import_data.metadata_hash,
320                    },
321                },
322            );
323            assert!(old.is_none());
324            import_data
325        };
326
327        if import_data.schema().name() != T::schema_name() {
328            Err(DataSetError::InvalidSchema)?;
329        }
330
331        return Ok(<T as Record>::Reader::new(
332            PropertyPath::default(),
333            DataContainerRef::from_single_object_arc(import_data.clone(), self.schema_set),
334        ));
335    }
336
337    pub fn enqueue_job<JobProcessorT: JobProcessor>(
338        &self,
339        input: <JobProcessorT as JobProcessor>::InputT,
340    ) -> PipelineResult<JobId> {
341        enqueue_job::<JobProcessorT>(
342            JobRequestor::Job(self.job_id),
343            self.data_set,
344            self.schema_set,
345            self.job_api,
346            input,
347            &mut self.log_events.borrow_mut(),
348        )
349    }
350
351    pub fn produce_artifact<KeyT: Hash + std::fmt::Display, ArtifactT: TypeUuid + Serialize>(
352        &self,
353        asset_id: AssetId,
354        artifact_key: Option<KeyT>,
355        asset: ArtifactT,
356    ) -> PipelineResult<AssetArtifactIdPair> {
357        produce_artifact(self.job_api, asset_id, artifact_key, asset)
358    }
359
360    pub fn produce_artifact_with_handles<
361        KeyT: Hash + std::fmt::Display,
362        ArtifactT: TypeUuid + Serialize,
363        F: FnOnce(HandleFactory) -> PipelineResult<ArtifactT>,
364    >(
365        &self,
366        asset_id: AssetId,
367        artifact_key: Option<KeyT>,
368        asset_fn: F,
369    ) -> PipelineResult<ArtifactId> {
370        produce_artifact_with_handles(self.job_api, asset_id, artifact_key, asset_fn)
371    }
372
373    pub fn produce_default_artifact<AssetT: TypeUuid + Serialize>(
374        &self,
375        asset_id: AssetId,
376        asset: AssetT,
377    ) -> PipelineResult<ArtifactId> {
378        produce_default_artifact(self.job_api, asset_id, asset)
379    }
380
381    pub fn produce_default_artifact_with_handles<
382        AssetT: TypeUuid + Serialize,
383        F: FnOnce(HandleFactory) -> PipelineResult<AssetT>,
384    >(
385        &self,
386        asset_id: AssetId,
387        asset_fn: F,
388    ) -> PipelineResult<ArtifactId> {
389        produce_default_artifact_with_handles(self.job_api, asset_id, asset_fn)
390    }
391}
392
393pub trait JobProcessor: TypeUuid {
394    type InputT: JobInput + 'static;
395    type OutputT: JobOutput + 'static;
396
397    fn version(&self) -> u32;
398
399    fn enumerate_dependencies(
400        &self,
401        _context: EnumerateDependenciesContext<Self::InputT>,
402    ) -> PipelineResult<JobEnumeratedDependencies> {
403        Ok(JobEnumeratedDependencies::default())
404    }
405
406    fn run<'a>(
407        &'a self,
408        context: &'a RunContext<'a, Self::InputT>,
409    ) -> PipelineResult<Self::OutputT>;
410}
411
412pub(crate) fn enqueue_job<T: JobProcessor>(
413    job_requestor: JobRequestor,
414    data_set: &DataSet,
415    schema_set: &SchemaSet,
416    job_api: &dyn JobApi,
417    input: <T as JobProcessor>::InputT,
418    log_events: &mut Vec<BuildLogEvent>,
419) -> PipelineResult<JobId> {
420    let mut hasher = siphasher::sip128::SipHasher::default();
421    input.hash(&mut hasher);
422    let input_hash = hasher.finish128().as_u128();
423
424    let input_data = bincode::serialize(&input).unwrap();
425
426    let queued_job = NewJob {
427        job_type: JobTypeId::from_bytes(T::UUID),
428        input_hash,
429        input_data,
430    };
431
432    let debug_name = format!("{}", std::any::type_name::<T>());
433    job_api.enqueue_job(
434        job_requestor,
435        data_set,
436        schema_set,
437        queued_job,
438        debug_name,
439        log_events,
440    )
441}
442
443fn produce_default_artifact<T: TypeUuid + Serialize>(
444    job_api: &dyn JobApi,
445    asset_id: AssetId,
446    asset: T,
447) -> PipelineResult<ArtifactId> {
448    produce_artifact_with_handles(job_api, asset_id, None::<u32>, |_handle_factory| Ok(asset))
449}
450
451fn produce_default_artifact_with_handles<
452    T: TypeUuid + Serialize,
453    F: FnOnce(HandleFactory) -> PipelineResult<T>,
454>(
455    job_api: &dyn JobApi,
456    asset_id: AssetId,
457    asset_fn: F,
458) -> PipelineResult<ArtifactId> {
459    produce_artifact_with_handles(job_api, asset_id, None::<u32>, asset_fn)
460}
461
462fn produce_artifact<T: TypeUuid + Serialize, U: Hash + std::fmt::Display>(
463    job_api: &dyn JobApi,
464    asset_id: AssetId,
465    artifact_key: Option<U>,
466    asset: T,
467) -> PipelineResult<AssetArtifactIdPair> {
468    let artifact_id =
469        produce_artifact_with_handles(job_api, asset_id, artifact_key, |_handle_factory| {
470            Ok(asset)
471        })?;
472    Ok(AssetArtifactIdPair {
473        asset_id,
474        artifact_id,
475    })
476}
477
478fn produce_artifact_with_handles<
479    T: TypeUuid + Serialize,
480    U: Hash + std::fmt::Display,
481    F: FnOnce(HandleFactory) -> PipelineResult<T>,
482>(
483    job_api: &dyn JobApi,
484    asset_id: AssetId,
485    artifact_key: Option<U>,
486    asset_fn: F,
487) -> PipelineResult<ArtifactId> {
488    let artifact_key_debug_name = artifact_key.as_ref().map(|x| format!("{}", x));
489    let artifact_id = create_artifact_id(asset_id, artifact_key);
490
491    let mut ctx = DummySerdeContextHandle::default();
492    ctx.begin_serialize_artifact(artifact_id);
493
494    let (built_data, asset_type) = ctx.scope(|| {
495        let asset = (asset_fn)(HandleFactory { job_api });
496        asset.map(|x| (bincode::serialize(&x), x.uuid()))
497    })?;
498
499    let referenced_assets = ctx.end_serialize_artifact(artifact_id);
500
501    log::trace!(
502        "produce_artifact {:?} {:?} {:?}",
503        asset_id,
504        artifact_id,
505        artifact_key_debug_name
506    );
507    job_api.produce_artifact(BuiltArtifact {
508        asset_id,
509        artifact_id,
510        metadata: BuiltArtifactHeaderData {
511            dependencies: referenced_assets
512                .into_iter()
513                .map(|x| ArtifactId::from_uuid(x.0.as_uuid()))
514                .collect(),
515            asset_type: uuid::Uuid::from_bytes(asset_type),
516        },
517        data: built_data?,
518        artifact_key_debug_name,
519    });
520
521    Ok(artifact_id)
522}
523
524#[derive(Copy, Clone)]
525pub struct HandleFactory<'a> {
526    job_api: &'a dyn JobApi,
527}
528
529impl<'a> HandleFactory<'a> {
530    pub fn make_handle_to_default_artifact<T>(
531        &self,
532        asset_id: AssetId,
533    ) -> Handle<T> {
534        self.make_handle_to_artifact_key(asset_id, None::<u32>)
535    }
536
537    pub fn make_handle_to_artifact<T>(
538        &self,
539        asset_artifact_id_pair: AssetArtifactIdPair,
540    ) -> Handle<T> {
541        self.job_api.artifact_handle_created(
542            asset_artifact_id_pair.asset_id,
543            asset_artifact_id_pair.artifact_id,
544        );
545        hydrate_base::handle::make_handle_within_serde_context::<T>(
546            asset_artifact_id_pair.artifact_id,
547        )
548    }
549
550    pub fn make_handle_to_artifact_raw<T>(
551        &self,
552        asset_id: AssetId,
553        artifact_id: ArtifactId,
554    ) -> Handle<T> {
555        self.job_api.artifact_handle_created(asset_id, artifact_id);
556        hydrate_base::handle::make_handle_within_serde_context::<T>(artifact_id)
557    }
558
559    pub fn make_handle_to_artifact_key<T, K: Hash>(
560        &self,
561        asset_id: AssetId,
562        artifact_key: Option<K>,
563    ) -> Handle<T> {
564        let artifact_id = create_artifact_id(asset_id, artifact_key);
565        self.job_api.artifact_handle_created(asset_id, artifact_id);
566        hydrate_base::handle::make_handle_within_serde_context::<T>(artifact_id)
567    }
568}