1use super::{JobId, JobTypeId};
2use crate::build::{AssetArtifactIdPair, BuiltArtifact};
3use crate::import::{ImportData, ImportJobs};
4use crate::{BuildLogEvent, LogEventLevel, PipelineResult};
5use hydrate_base::handle::DummySerdeContextHandle;
6use hydrate_base::hashing::HashMap;
7use hydrate_base::{ArtifactId, AssetId, BuiltArtifactHeaderData, Handle};
8use hydrate_data::{
9 DataContainerRef, DataSet, DataSetError, FieldRef, HashObjectMode, PropertyPath, Record,
10 SchemaSet, SingleObject,
11};
12use serde::{Deserialize, Serialize};
13use siphasher::sip128::Hasher128;
14use std::cell::RefCell;
15use std::hash::Hash;
16use std::panic::RefUnwindSafe;
17use std::rc::Rc;
18use std::sync::Arc;
19use type_uuid::{TypeUuid, TypeUuidDynamic};
20
21pub trait ImportDataProvider {
22 fn clone_import_data_metadata_hashes(&self) -> HashMap<AssetId, u64>;
23
24 fn load_import_data(
25 &self,
26 schema_set: &SchemaSet,
27 asset_id: AssetId,
28 ) -> PipelineResult<ImportData>;
29}
30
31impl ImportDataProvider for ImportJobs {
32 fn clone_import_data_metadata_hashes(&self) -> HashMap<AssetId, u64> {
33 self.clone_import_data_metadata_hashes()
34 }
35
36 fn load_import_data(
37 &self,
38 schema_set: &SchemaSet,
39 asset_id: AssetId,
40 ) -> PipelineResult<ImportData> {
41 crate::import::load_import_data(self.import_data_root_path(), schema_set, asset_id)
42 }
43}
44
45pub struct NewJob {
46 pub job_type: JobTypeId,
47 pub input_hash: u128,
48 pub input_data: Vec<u8>,
49}
50
51fn create_artifact_id<T: Hash>(
52 asset_id: AssetId,
53 artifact_key: Option<T>,
54) -> ArtifactId {
55 if let Some(artifact_key) = artifact_key {
56 let mut hasher = siphasher::sip128::SipHasher::default();
57 asset_id.hash(&mut hasher);
58 artifact_key.hash(&mut hasher);
59 let input_hash = hasher.finish128().as_u128();
60 ArtifactId::from_u128(input_hash)
61 } else {
62 ArtifactId::from_uuid(asset_id.as_uuid())
63 }
64}
65
66#[derive(Debug, Copy, Clone)]
67pub enum JobRequestor {
68 Builder(AssetId),
69 Job(JobId),
70}
71
72pub trait JobApi: Send + Sync {
76 fn enqueue_job(
77 &self,
78 job_requestor: JobRequestor,
79 data_set: &DataSet,
80 schema_set: &SchemaSet,
81 job: NewJob,
82 debug_name: String,
83 log_events: &mut Vec<BuildLogEvent>,
84 ) -> PipelineResult<JobId>;
85
86 fn artifact_handle_created(
87 &self,
88 asset_id: AssetId,
89 artifact_id: ArtifactId,
90 );
91
92 fn produce_artifact(
93 &self,
94 artifact: BuiltArtifact,
95 );
96
97 fn fetch_import_data(
98 &self,
99 asset_id: AssetId,
100 ) -> PipelineResult<ImportData>;
101}
102
103pub trait JobInput: Hash + Serialize + for<'a> Deserialize<'a> {}
107
108pub trait JobOutput: Serialize + for<'a> Deserialize<'a> {}
109
110#[derive(Default, Clone)]
111pub struct JobEnumeratedDependencies {
112 pub upstream_jobs: Vec<JobId>,
128}
129
130pub(crate) trait JobProcessorAbstract: Send + Sync + RefUnwindSafe {
131 fn version_inner(&self) -> u32;
132
133 fn enumerate_dependencies_inner(
134 &self,
135 job_id: JobId,
136 job_requestor: JobRequestor,
137 input: &Vec<u8>,
138 data_set: &DataSet,
139 schema_set: &SchemaSet,
140 log_events: &mut Vec<BuildLogEvent>,
141 ) -> PipelineResult<JobEnumeratedDependencies>;
142
143 fn run_inner(
144 &self,
145 job_id: JobId,
146 input: &Vec<u8>,
147 data_set: &DataSet,
148 schema_set: &SchemaSet,
149 job_api: &dyn JobApi,
150 fetched_asset_data: &mut HashMap<AssetId, FetchedAssetData>,
151 fetched_import_data: &mut HashMap<AssetId, FetchedImportData>,
152 log_events: &mut Vec<BuildLogEvent>,
153 ) -> PipelineResult<Arc<Vec<u8>>>;
154}
155
156pub struct EnumerateDependenciesContext<'a, InputT> {
157 pub job_id: JobId,
158 pub(crate) job_requestor: JobRequestor,
159 pub input: &'a InputT,
160 pub data_set: &'a DataSet,
161 pub schema_set: &'a SchemaSet,
162 pub(crate) log_events: &'a Rc<RefCell<&'a mut Vec<BuildLogEvent>>>,
163}
164
165impl<'a, InputT> EnumerateDependenciesContext<'a, InputT> {
166 pub fn warn<T: Into<String>>(
167 &self,
168 message: T,
169 ) {
170 let (asset_id, job_id) = match self.job_requestor {
171 JobRequestor::Builder(asset_id) => (Some(asset_id), None),
172 JobRequestor::Job(job_id) => (None, Some(job_id)),
173 };
174
175 let mut log_events = self.log_events.borrow_mut();
176 let log_event = BuildLogEvent {
177 asset_id,
178 job_id,
179 level: LogEventLevel::Warning,
180 message: format!(
181 "While enumerating dependencies for new job {}: {}",
182 self.job_id.as_uuid(),
183 message.into()
184 ),
185 };
186 log::warn!("Build Warning: {:?}", log_event);
187 log_events.push(log_event);
188 }
189
190 pub fn error<T: Into<String>>(
191 &self,
192 message: T,
193 ) {
194 let (asset_id, job_id) = match self.job_requestor {
195 JobRequestor::Builder(asset_id) => (Some(asset_id), None),
196 JobRequestor::Job(job_id) => (None, Some(job_id)),
197 };
198
199 let mut log_events = self.log_events.borrow_mut();
200 let log_event = BuildLogEvent {
201 asset_id,
202 job_id,
203 level: LogEventLevel::Error,
204 message: format!(
205 "While enumerating dependencies for new job {}: {}",
206 self.job_id.as_uuid(),
207 message.into()
208 ),
209 };
210 log::error!("Build Error: {:?}", log_event);
211 log_events.push(log_event);
212 }
213}
214
215pub(crate) struct FetchedAssetData {
216 pub(crate) _contents_hash: u64,
217}
218
219pub(crate) struct FetchedImportDataInfo {
220 pub(crate) _contents_hash: u64,
221 pub(crate) _metadata_hash: u64,
222}
223
224pub(crate) struct FetchedImportData {
225 pub(crate) _info: FetchedImportDataInfo,
226 pub(crate) import_data: Arc<SingleObject>,
227}
228
229#[derive(Copy, Clone)]
230pub struct RunContext<'a, InputT> {
231 pub job_id: JobId,
232 pub input: &'a InputT,
233 pub data_set: &'a DataSet,
234 pub schema_set: &'a SchemaSet,
235 pub(crate) fetched_asset_data: &'a Rc<RefCell<&'a mut HashMap<AssetId, FetchedAssetData>>>,
236 pub(crate) fetched_import_data: &'a Rc<RefCell<&'a mut HashMap<AssetId, FetchedImportData>>>,
237 pub(crate) job_api: &'a dyn JobApi,
238 pub(crate) log_events: &'a Rc<RefCell<&'a mut Vec<BuildLogEvent>>>,
239}
240
241impl<'a, InputT> RunContext<'a, InputT> {
242 pub fn warn<T: Into<String>>(
243 &self,
244 message: T,
245 ) {
246 let mut log_events = self.log_events.borrow_mut();
247 let log_event = BuildLogEvent {
248 asset_id: None,
249 job_id: Some(self.job_id),
250 level: LogEventLevel::Warning,
251 message: message.into(),
252 };
253 log::warn!("Build Warning: {:?}", log_event);
254 log_events.push(log_event);
255 }
256
257 pub fn error<T: Into<String>>(
258 &self,
259 message: T,
260 ) {
261 let mut log_events = self.log_events.borrow_mut();
262 let log_event = BuildLogEvent {
263 asset_id: None,
264 job_id: Some(self.job_id),
265 level: LogEventLevel::Error,
266 message: message.into(),
267 };
268 log::error!("Build Error: {:?}", log_event);
269 log_events.push(log_event);
270 }
271
272 pub fn asset<T: Record>(
273 &'a self,
274 asset_id: AssetId,
275 ) -> PipelineResult<T::Reader<'a>> {
276 if self
277 .data_set
278 .asset_schema(asset_id)
279 .ok_or(DataSetError::AssetNotFound)?
280 .name()
281 != T::schema_name()
282 {
283 Err(DataSetError::InvalidSchema)?;
284 }
285
286 let mut fetched_asset_data = self.fetched_asset_data.borrow_mut();
287 fetched_asset_data
288 .entry(asset_id)
289 .or_insert_with(|| FetchedAssetData {
290 _contents_hash: self
291 .data_set
292 .hash_object(asset_id, HashObjectMode::PropertiesOnly)
293 .unwrap(),
294 });
295
296 Ok(<T as Record>::Reader::new(
297 PropertyPath::default(),
298 DataContainerRef::from_dataset(self.data_set, self.schema_set, asset_id),
299 ))
300 }
301
302 pub fn imported_data<T: Record>(
303 &'a self,
304 asset_id: AssetId,
305 ) -> PipelineResult<T::Reader<'a>> {
306 let mut fetched_import_data = self.fetched_import_data.borrow_mut();
307 let import_data = if let Some(fetched_import_data) = fetched_import_data.get(&asset_id) {
308 fetched_import_data.import_data.clone()
309 } else {
310 let newly_fetched_import_data = self.job_api.fetch_import_data(asset_id)?;
311 let import_data = Arc::new(newly_fetched_import_data.import_data);
312
313 let old = fetched_import_data.insert(
314 asset_id,
315 FetchedImportData {
316 import_data: import_data.clone(),
317 _info: FetchedImportDataInfo {
318 _contents_hash: newly_fetched_import_data.contents_hash,
319 _metadata_hash: newly_fetched_import_data.metadata_hash,
320 },
321 },
322 );
323 assert!(old.is_none());
324 import_data
325 };
326
327 if import_data.schema().name() != T::schema_name() {
328 Err(DataSetError::InvalidSchema)?;
329 }
330
331 return Ok(<T as Record>::Reader::new(
332 PropertyPath::default(),
333 DataContainerRef::from_single_object_arc(import_data.clone(), self.schema_set),
334 ));
335 }
336
337 pub fn enqueue_job<JobProcessorT: JobProcessor>(
338 &self,
339 input: <JobProcessorT as JobProcessor>::InputT,
340 ) -> PipelineResult<JobId> {
341 enqueue_job::<JobProcessorT>(
342 JobRequestor::Job(self.job_id),
343 self.data_set,
344 self.schema_set,
345 self.job_api,
346 input,
347 &mut self.log_events.borrow_mut(),
348 )
349 }
350
351 pub fn produce_artifact<KeyT: Hash + std::fmt::Display, ArtifactT: TypeUuid + Serialize>(
352 &self,
353 asset_id: AssetId,
354 artifact_key: Option<KeyT>,
355 asset: ArtifactT,
356 ) -> PipelineResult<AssetArtifactIdPair> {
357 produce_artifact(self.job_api, asset_id, artifact_key, asset)
358 }
359
360 pub fn produce_artifact_with_handles<
361 KeyT: Hash + std::fmt::Display,
362 ArtifactT: TypeUuid + Serialize,
363 F: FnOnce(HandleFactory) -> PipelineResult<ArtifactT>,
364 >(
365 &self,
366 asset_id: AssetId,
367 artifact_key: Option<KeyT>,
368 asset_fn: F,
369 ) -> PipelineResult<ArtifactId> {
370 produce_artifact_with_handles(self.job_api, asset_id, artifact_key, asset_fn)
371 }
372
373 pub fn produce_default_artifact<AssetT: TypeUuid + Serialize>(
374 &self,
375 asset_id: AssetId,
376 asset: AssetT,
377 ) -> PipelineResult<ArtifactId> {
378 produce_default_artifact(self.job_api, asset_id, asset)
379 }
380
381 pub fn produce_default_artifact_with_handles<
382 AssetT: TypeUuid + Serialize,
383 F: FnOnce(HandleFactory) -> PipelineResult<AssetT>,
384 >(
385 &self,
386 asset_id: AssetId,
387 asset_fn: F,
388 ) -> PipelineResult<ArtifactId> {
389 produce_default_artifact_with_handles(self.job_api, asset_id, asset_fn)
390 }
391}
392
393pub trait JobProcessor: TypeUuid {
394 type InputT: JobInput + 'static;
395 type OutputT: JobOutput + 'static;
396
397 fn version(&self) -> u32;
398
399 fn enumerate_dependencies(
400 &self,
401 _context: EnumerateDependenciesContext<Self::InputT>,
402 ) -> PipelineResult<JobEnumeratedDependencies> {
403 Ok(JobEnumeratedDependencies::default())
404 }
405
406 fn run<'a>(
407 &'a self,
408 context: &'a RunContext<'a, Self::InputT>,
409 ) -> PipelineResult<Self::OutputT>;
410}
411
412pub(crate) fn enqueue_job<T: JobProcessor>(
413 job_requestor: JobRequestor,
414 data_set: &DataSet,
415 schema_set: &SchemaSet,
416 job_api: &dyn JobApi,
417 input: <T as JobProcessor>::InputT,
418 log_events: &mut Vec<BuildLogEvent>,
419) -> PipelineResult<JobId> {
420 let mut hasher = siphasher::sip128::SipHasher::default();
421 input.hash(&mut hasher);
422 let input_hash = hasher.finish128().as_u128();
423
424 let input_data = bincode::serialize(&input).unwrap();
425
426 let queued_job = NewJob {
427 job_type: JobTypeId::from_bytes(T::UUID),
428 input_hash,
429 input_data,
430 };
431
432 let debug_name = format!("{}", std::any::type_name::<T>());
433 job_api.enqueue_job(
434 job_requestor,
435 data_set,
436 schema_set,
437 queued_job,
438 debug_name,
439 log_events,
440 )
441}
442
443fn produce_default_artifact<T: TypeUuid + Serialize>(
444 job_api: &dyn JobApi,
445 asset_id: AssetId,
446 asset: T,
447) -> PipelineResult<ArtifactId> {
448 produce_artifact_with_handles(job_api, asset_id, None::<u32>, |_handle_factory| Ok(asset))
449}
450
451fn produce_default_artifact_with_handles<
452 T: TypeUuid + Serialize,
453 F: FnOnce(HandleFactory) -> PipelineResult<T>,
454>(
455 job_api: &dyn JobApi,
456 asset_id: AssetId,
457 asset_fn: F,
458) -> PipelineResult<ArtifactId> {
459 produce_artifact_with_handles(job_api, asset_id, None::<u32>, asset_fn)
460}
461
462fn produce_artifact<T: TypeUuid + Serialize, U: Hash + std::fmt::Display>(
463 job_api: &dyn JobApi,
464 asset_id: AssetId,
465 artifact_key: Option<U>,
466 asset: T,
467) -> PipelineResult<AssetArtifactIdPair> {
468 let artifact_id =
469 produce_artifact_with_handles(job_api, asset_id, artifact_key, |_handle_factory| {
470 Ok(asset)
471 })?;
472 Ok(AssetArtifactIdPair {
473 asset_id,
474 artifact_id,
475 })
476}
477
478fn produce_artifact_with_handles<
479 T: TypeUuid + Serialize,
480 U: Hash + std::fmt::Display,
481 F: FnOnce(HandleFactory) -> PipelineResult<T>,
482>(
483 job_api: &dyn JobApi,
484 asset_id: AssetId,
485 artifact_key: Option<U>,
486 asset_fn: F,
487) -> PipelineResult<ArtifactId> {
488 let artifact_key_debug_name = artifact_key.as_ref().map(|x| format!("{}", x));
489 let artifact_id = create_artifact_id(asset_id, artifact_key);
490
491 let mut ctx = DummySerdeContextHandle::default();
492 ctx.begin_serialize_artifact(artifact_id);
493
494 let (built_data, asset_type) = ctx.scope(|| {
495 let asset = (asset_fn)(HandleFactory { job_api });
496 asset.map(|x| (bincode::serialize(&x), x.uuid()))
497 })?;
498
499 let referenced_assets = ctx.end_serialize_artifact(artifact_id);
500
501 log::trace!(
502 "produce_artifact {:?} {:?} {:?}",
503 asset_id,
504 artifact_id,
505 artifact_key_debug_name
506 );
507 job_api.produce_artifact(BuiltArtifact {
508 asset_id,
509 artifact_id,
510 metadata: BuiltArtifactHeaderData {
511 dependencies: referenced_assets
512 .into_iter()
513 .map(|x| ArtifactId::from_uuid(x.0.as_uuid()))
514 .collect(),
515 asset_type: uuid::Uuid::from_bytes(asset_type),
516 },
517 data: built_data?,
518 artifact_key_debug_name,
519 });
520
521 Ok(artifact_id)
522}
523
524#[derive(Copy, Clone)]
525pub struct HandleFactory<'a> {
526 job_api: &'a dyn JobApi,
527}
528
529impl<'a> HandleFactory<'a> {
530 pub fn make_handle_to_default_artifact<T>(
531 &self,
532 asset_id: AssetId,
533 ) -> Handle<T> {
534 self.make_handle_to_artifact_key(asset_id, None::<u32>)
535 }
536
537 pub fn make_handle_to_artifact<T>(
538 &self,
539 asset_artifact_id_pair: AssetArtifactIdPair,
540 ) -> Handle<T> {
541 self.job_api.artifact_handle_created(
542 asset_artifact_id_pair.asset_id,
543 asset_artifact_id_pair.artifact_id,
544 );
545 hydrate_base::handle::make_handle_within_serde_context::<T>(
546 asset_artifact_id_pair.artifact_id,
547 )
548 }
549
550 pub fn make_handle_to_artifact_raw<T>(
551 &self,
552 asset_id: AssetId,
553 artifact_id: ArtifactId,
554 ) -> Handle<T> {
555 self.job_api.artifact_handle_created(asset_id, artifact_id);
556 hydrate_base::handle::make_handle_within_serde_context::<T>(artifact_id)
557 }
558
559 pub fn make_handle_to_artifact_key<T, K: Hash>(
560 &self,
561 asset_id: AssetId,
562 artifact_key: Option<K>,
563 ) -> Handle<T> {
564 let artifact_id = create_artifact_id(asset_id, artifact_key);
565 self.job_api.artifact_handle_created(asset_id, artifact_id);
566 hydrate_base::handle::make_handle_within_serde_context::<T>(artifact_id)
567 }
568}