1use crate::build::{BuiltArtifact, WrittenArtifact};
2use crate::import::ImportData;
3use crate::{BuildLogData, BuildLogEvent, LogEventLevel, PipelineResult};
4use crossbeam_channel::{Receiver, Sender};
5use hydrate_base::hashing::HashMap;
6use hydrate_base::uuid_path::uuid_and_hash_to_path;
7use hydrate_base::{ArtifactId, AssetId};
8use hydrate_data::{DataSet, SchemaSet};
9use serde::{Deserialize, Serialize};
10use std::cell::RefCell;
11use std::hash::Hasher;
12use std::io::{BufWriter, Write};
13use std::panic::RefUnwindSafe;
14use std::path::PathBuf;
15use std::rc::Rc;
16use std::sync::Arc;
17
18use super::*;
19
20struct JobWrapper<T: JobProcessor>(T);
21
22impl<T: JobProcessor + Send + Sync + RefUnwindSafe> JobProcessorAbstract for JobWrapper<T>
23where
24 <T as JobProcessor>::InputT: for<'a> Deserialize<'a> + 'static,
25 <T as JobProcessor>::OutputT: Serialize + 'static,
26{
27 fn version_inner(&self) -> u32 {
28 self.0.version()
29 }
30
31 fn enumerate_dependencies_inner(
32 &self,
33 job_id: JobId,
34 job_requestor: JobRequestor,
35 input: &Vec<u8>,
36 data_set: &DataSet,
37 schema_set: &SchemaSet,
38 log_events: &mut Vec<BuildLogEvent>,
39 ) -> PipelineResult<JobEnumeratedDependencies> {
40 let data: <T as JobProcessor>::InputT = bincode::deserialize(input.as_slice()).unwrap();
41 self.0.enumerate_dependencies(EnumerateDependenciesContext {
42 job_id,
43 job_requestor,
44 input: &data,
45 data_set,
46 schema_set,
47 log_events: &Rc::new(RefCell::new(log_events)),
48 })
49 }
50
51 fn run_inner(
52 &self,
53 job_id: JobId,
54 input: &Vec<u8>,
55 data_set: &DataSet,
56 schema_set: &SchemaSet,
57 job_api: &dyn JobApi,
58 fetched_asset_data: &mut HashMap<AssetId, FetchedAssetData>,
59 fetched_import_data: &mut HashMap<AssetId, FetchedImportData>,
60 log_events: &mut Vec<BuildLogEvent>,
61 ) -> PipelineResult<Arc<Vec<u8>>> {
62 let data: <T as JobProcessor>::InputT = bincode::deserialize(input.as_slice()).unwrap();
63 let output = {
64 profiling::scope!(&format!("{:?}::run", std::any::type_name::<T>()));
65 self.0.run(&RunContext {
66 job_id,
67 input: &data,
68 data_set,
69 schema_set,
70 fetched_asset_data: &Rc::new(RefCell::new(fetched_asset_data)),
71 fetched_import_data: &Rc::new(RefCell::new(fetched_import_data)),
72 job_api,
73 log_events: &Rc::new(RefCell::new(log_events)),
74 })
75 }?;
76 Ok(Arc::new(bincode::serialize(&output)?))
77 }
78}
79
80struct JobState {
95 job_type: JobTypeId,
96 dependencies: Arc<JobEnumeratedDependencies>,
97 input_data: Arc<Vec<u8>>,
98 debug_name: Arc<String>,
99
100 has_been_scheduled: bool,
102 output_data: Option<JobStateOutput>,
104}
105
106struct JobStateOutput {
107 _output_data: PipelineResult<Arc<Vec<u8>>>,
108 _fetched_asset_data: HashMap<AssetId, FetchedAssetData>,
109 _fetched_import_data: HashMap<AssetId, FetchedImportData>,
110}
111
112#[derive(Clone)]
115struct QueuedJob {
116 job_id: JobId,
117 job_requestor: JobRequestor,
118 job_type: JobTypeId,
119 input_data: Arc<Vec<u8>>,
120 dependencies: PipelineResult<JobEnumeratedDependencies>,
121 debug_name: Arc<String>,
122}
123
124#[derive(Default)]
125pub struct JobProcessorRegistryBuilder {
126 job_processors: HashMap<JobTypeId, Arc<dyn JobProcessorAbstract>>,
127}
128
129impl JobProcessorRegistryBuilder {
130 pub fn register_job_processor<
131 T: JobProcessor + Send + Sync + RefUnwindSafe + Default + 'static,
132 >(
133 &mut self
134 ) where
135 <T as JobProcessor>::InputT: for<'a> Deserialize<'a>,
136 <T as JobProcessor>::OutputT: Serialize,
137 {
138 let old = self.job_processors.insert(
139 JobTypeId::from_bytes(T::UUID),
140 Arc::new(JobWrapper(T::default())),
141 );
142 if old.is_some() {
143 panic!("Multiple job processors registered with the same UUID");
144 }
145 }
146
147 pub fn register_job_processor_instance<
148 T: JobProcessor + Send + Sync + RefUnwindSafe + 'static,
149 >(
150 &mut self,
151 job_processor: T,
152 ) where
153 <T as JobProcessor>::InputT: for<'a> Deserialize<'a>,
154 <T as JobProcessor>::OutputT: Serialize,
155 {
156 let old = self.job_processors.insert(
157 JobTypeId::from_bytes(T::UUID),
158 Arc::new(JobWrapper(job_processor)),
159 );
160 if old.is_some() {
161 panic!("Multiple job processors registered with the same UUID");
162 }
163 }
164
165 pub fn build(self) -> JobProcessorRegistry {
166 let inner = JobProcessorRegistryInner {
167 job_processors: self.job_processors,
168 };
169
170 JobProcessorRegistry {
171 inner: Arc::new(inner),
172 }
173 }
174}
175
176pub struct JobProcessorRegistryInner {
177 job_processors: HashMap<JobTypeId, Arc<dyn JobProcessorAbstract>>,
178}
179
180#[derive(Clone)]
181pub struct JobProcessorRegistry {
182 inner: Arc<JobProcessorRegistryInner>,
183}
184
185impl JobProcessorRegistry {
186 fn get(
187 &self,
188 job_type_id: JobTypeId,
189 ) -> Option<&dyn JobProcessorAbstract> {
190 self.inner.job_processors.get(&job_type_id).map(|x| &**x)
191 }
192
193 fn contains_key(
194 &self,
195 job_type_id: JobTypeId,
196 ) -> bool {
197 self.inner.job_processors.contains_key(&job_type_id)
198 }
199
200 pub(crate) fn get_processor(
201 &self,
202 job_type: JobTypeId,
203 ) -> Option<Arc<dyn JobProcessorAbstract>> {
204 self.inner.job_processors.get(&job_type).cloned()
205 }
206}
207
208struct JobApiImplInner {
209 schema_set: SchemaSet,
210 import_data_root_path: PathBuf,
211 build_data_root_path: PathBuf,
212 job_processor_registry: JobProcessorRegistry,
213 job_create_queue_tx: Sender<QueuedJob>,
214 artifact_handle_created_tx: Sender<AssetArtifactIdPair>,
215 written_artifact_queue_tx: Sender<WrittenArtifact>,
216}
217
218#[derive(Clone)]
219pub struct JobApiImpl {
220 inner: Arc<JobApiImplInner>,
221}
222
223impl JobApi for JobApiImpl {
224 fn enqueue_job(
225 &self,
226 job_requestor: JobRequestor,
227 data_set: &DataSet,
228 schema_set: &SchemaSet,
229 new_job: NewJob,
230 debug_name: String,
231 log_events: &mut Vec<BuildLogEvent>,
232 ) -> PipelineResult<JobId> {
233 let job_id = JobId::from_u128(new_job.input_hash);
241 let processor = self
242 .inner
243 .job_processor_registry
244 .get(new_job.job_type)
245 .unwrap();
246
247 let dependencies = processor.enumerate_dependencies_inner(
248 job_id,
249 job_requestor,
250 &new_job.input_data,
251 data_set,
252 schema_set,
253 log_events,
254 );
255
256 self.inner
257 .job_create_queue_tx
258 .send(QueuedJob {
259 job_id,
260 job_requestor,
261 job_type: new_job.job_type,
262 input_data: Arc::new(new_job.input_data),
263 dependencies,
264 debug_name: Arc::new(debug_name),
265 })
266 .unwrap();
267
268 Ok(job_id)
269 }
270
271 fn artifact_handle_created(
272 &self,
273 asset_id: AssetId,
274 artifact_id: ArtifactId,
275 ) {
276 self.inner
278 .artifact_handle_created_tx
279 .send(AssetArtifactIdPair {
280 asset_id,
281 artifact_id,
282 })
283 .unwrap();
284 }
285
286 fn produce_artifact(
287 &self,
288 artifact: BuiltArtifact,
289 ) {
290 profiling::scope!("Write Asset to Disk");
291 let mut hasher = siphasher::sip::SipHasher::default();
295 artifact.data.hash(&mut hasher);
296 artifact.metadata.hash(&mut hasher);
297 let build_hash = hasher.finish();
298
299 let path = uuid_and_hash_to_path(
303 &self.inner.build_data_root_path,
304 artifact.artifact_id.as_uuid(),
305 build_hash,
306 "bf",
307 );
308
309 if let Some(parent) = path.parent() {
310 std::fs::create_dir_all(parent).unwrap();
311 }
312
313 let file = std::fs::File::create(&path).unwrap();
317 let mut buf_writer = BufWriter::new(file);
318 artifact.metadata.write_header(&mut buf_writer).unwrap();
319 buf_writer.write(&artifact.data).unwrap();
320
321 self.inner
326 .written_artifact_queue_tx
327 .send(WrittenArtifact {
328 asset_id: artifact.asset_id,
329 artifact_id: artifact.artifact_id,
330 metadata: artifact.metadata,
331 build_hash,
332 artifact_key_debug_name: artifact.artifact_key_debug_name,
333 })
334 .unwrap();
335 }
336
337 fn fetch_import_data(
338 &self,
339 asset_id: AssetId,
340 ) -> PipelineResult<ImportData> {
341 crate::import::load_import_data(
342 &self.inner.import_data_root_path,
343 &self.inner.schema_set,
344 asset_id,
345 )
346 }
347}
348
349#[derive(Clone, Debug)]
350pub struct AssetArtifactIdPair {
351 pub asset_id: AssetId,
352 pub artifact_id: ArtifactId,
353}
354
355pub struct JobExecutor {
356 _root_path: PathBuf,
358 job_api_impl: JobApiImpl,
359
360 job_processor_registry: JobProcessorRegistry,
361
362 current_jobs: HashMap<JobId, JobState>,
364
365 job_create_queue_rx: Receiver<QueuedJob>,
367
368 artifact_handle_created_rx: Receiver<AssetArtifactIdPair>,
370
371 written_artifact_queue_rx: Receiver<WrittenArtifact>,
372
373 thread_pool_result_rx: Receiver<JobExecutorThreadPoolOutcome>,
374 thread_pool: Option<JobExecutorThreadPool>,
375
376 completed_job_count: usize,
377 last_job_print_time: Option<std::time::Instant>,
378}
379
380impl Drop for JobExecutor {
381 fn drop(&mut self) {
382 let thread_pool = self.thread_pool.take().unwrap();
383 thread_pool.finish();
384 }
385}
386
387impl JobExecutor {
388 pub fn reset(&mut self) {
389 assert!(self.is_idle());
390 self.current_jobs.clear();
391 self.completed_job_count = 0;
392 }
393
394 pub fn new(
395 schema_set: &SchemaSet,
396 job_processor_registry: &JobProcessorRegistry,
397 import_data_root_path: PathBuf,
398 job_data_root_path: PathBuf,
399 build_data_root_path: PathBuf,
400 ) -> Self {
401 let (job_create_queue_tx, job_create_queue_rx) = crossbeam_channel::unbounded();
402 let (artifact_handle_created_tx, artifact_handle_created_rx) =
406 crossbeam_channel::unbounded();
407 let (written_artifact_queue_tx, written_artifact_queue_rx) = crossbeam_channel::unbounded();
408
409 let job_api_impl = JobApiImpl {
410 inner: Arc::new(JobApiImplInner {
411 schema_set: schema_set.clone(),
412 import_data_root_path: import_data_root_path.clone(),
413 build_data_root_path,
414 job_processor_registry: job_processor_registry.clone(),
415 job_create_queue_tx,
416 artifact_handle_created_tx,
417 written_artifact_queue_tx,
418 }),
419 };
420
421 let thread_count = num_cpus::get();
422 let (thread_pool_result_tx, thread_pool_result_rx) = crossbeam_channel::unbounded();
425 let thread_pool = JobExecutorThreadPool::new(
426 job_processor_registry.clone(),
427 schema_set.clone(),
428 &job_data_root_path,
429 job_api_impl.clone(),
430 thread_count,
431 thread_pool_result_tx,
432 );
433
434 JobExecutor {
435 _root_path: job_data_root_path,
436 job_api_impl,
437 job_processor_registry: job_processor_registry.clone(),
438 current_jobs: Default::default(),
440 job_create_queue_rx,
442 artifact_handle_created_rx,
448 written_artifact_queue_rx,
450 thread_pool_result_rx,
451 thread_pool: Some(thread_pool),
452 completed_job_count: 0,
453 last_job_print_time: None,
454 }
455 }
456
457 pub fn job_api(&self) -> &dyn JobApi {
458 &self.job_api_impl
459 }
460
461 pub fn take_written_artifacts(
471 &self,
472 artifact_asset_lookup: &mut HashMap<ArtifactId, AssetId>,
473 ) -> Vec<WrittenArtifact> {
474 let mut written_artifacts = Vec::default();
475 while let Ok(written_artifact) = self.written_artifact_queue_rx.try_recv() {
476 let old = artifact_asset_lookup
477 .insert(written_artifact.artifact_id, written_artifact.asset_id);
478 if old.is_some() {
481 assert_eq!(old, Some(written_artifact.asset_id));
483 }
484
485 written_artifacts.push(written_artifact);
486 }
487
488 while let Ok(asset_artifact_pair) = self.artifact_handle_created_rx.try_recv() {
489 let old = artifact_asset_lookup.insert(
490 asset_artifact_pair.artifact_id,
491 asset_artifact_pair.asset_id,
492 );
493 if old.is_some() {
494 assert_eq!(old, Some(asset_artifact_pair.asset_id));
497 }
498 }
499
500 written_artifacts
501 }
502
503 fn handle_create_queue(
504 &mut self,
505 log_data: &mut BuildLogData,
506 ) {
507 while let Ok(queued_job) = self.job_create_queue_rx.try_recv() {
508 log_data
509 .requestors
510 .entry(queued_job.job_id)
511 .or_default()
512 .push(queued_job.job_requestor);
513 if !self.current_jobs.contains_key(&queued_job.job_id) {
515 assert!(self
516 .job_processor_registry
517 .contains_key(queued_job.job_type));
518
519 let job_state = match queued_job.dependencies {
520 Ok(dependencies) => JobState {
521 job_type: queued_job.job_type,
522 dependencies: Arc::new(dependencies),
523 input_data: queued_job.input_data,
524 debug_name: queued_job.debug_name,
525 has_been_scheduled: false,
526 output_data: None,
527 },
528 Err(e) => {
529 let log_error = BuildLogEvent {
530 job_id: Some(queued_job.job_id),
531 asset_id: None,
532 level: LogEventLevel::FatalError,
533 message: format!(
534 "enumerate_dependencies returned error: {}",
535 e.to_string()
536 ),
537 };
538 log::error!("Build Error: {:?}", log_error);
539 log_data.log_events.push(log_error);
540
541 JobState {
542 job_type: queued_job.job_type,
543 dependencies: Arc::new(JobEnumeratedDependencies::default()),
544 input_data: queued_job.input_data,
545 debug_name: queued_job.debug_name,
546 has_been_scheduled: true,
547 output_data: Some(JobStateOutput {
548 _output_data: Err(e),
549 _fetched_asset_data: Default::default(),
550 _fetched_import_data: Default::default(),
551 }),
552 }
553 }
554 };
555
556 self.current_jobs.insert(queued_job.job_id, job_state);
557 }
558 }
559 }
560
561 fn handle_completed_queue(
562 &mut self,
563 log_events: &mut Vec<BuildLogEvent>,
564 ) {
565 while let Ok(result) = self.thread_pool_result_rx.try_recv() {
566 match result {
567 JobExecutorThreadPoolOutcome::RunJobComplete(msg) => {
568 let job = self.current_jobs.get_mut(&msg.request.job_id).unwrap();
569 match msg.result {
570 Ok(data) => {
571 job.output_data = Some(JobStateOutput {
572 _output_data: Ok(data.output_data),
573 _fetched_asset_data: data.fetched_asset_data,
574 _fetched_import_data: data.fetched_import_data,
575 });
576
577 for log_event in data.log_events {
578 log_events.push(log_event);
579 }
580 }
581 Err(e) => {
582 let log_event = BuildLogEvent {
583 job_id: Some(msg.request.job_id),
584 asset_id: None,
585 level: LogEventLevel::FatalError,
586 message: format!("Build job returned error: {}", e.to_string()),
587 };
588 log::error!("Build Error: {:?}", log_event);
589 log_events.push(log_event);
590
591 job.output_data = Some(JobStateOutput {
592 _output_data: Err(e),
593 _fetched_asset_data: Default::default(),
594 _fetched_import_data: Default::default(),
595 });
596 }
597 }
598 self.completed_job_count += 1;
599
600 }
603 }
604 }
605 }
606
607 #[profiling::function]
608 pub fn update(
609 &mut self,
610 data_set: &Arc<DataSet>,
611 log_data: &mut BuildLogData,
612 ) {
613 self.handle_create_queue(log_data);
617
618 let mut started_jobs = Vec::default();
619
620 for (&job_id, job_state) in &self.current_jobs {
622 if job_state.has_been_scheduled {
626 continue;
627 }
628
629 assert!(job_state.output_data.is_none());
630
631 let mut waiting_on_upstream_job = false;
635 for upstream_job in &job_state.dependencies.upstream_jobs {
636 let dependency = self.current_jobs.get(upstream_job);
637 let Some(dependency_job_state) = dependency else {
638 panic!("Job has a dependency on another job that has not been created");
639 };
642
643 if dependency_job_state.output_data.is_none() {
644 waiting_on_upstream_job = true;
645 break;
646 }
647 }
648
649 if waiting_on_upstream_job {
650 continue;
651 }
652
653 self.thread_pool
687 .as_ref()
688 .unwrap()
689 .add_request(JobExecutorThreadPoolRequest::RunJob(
690 JobExecutorThreadPoolRequestRunJob {
691 job_id,
692 job_type: job_state.job_type,
693 data_set: data_set.clone(),
694 _debug_name: job_state.debug_name.clone(),
695 _dependencies: job_state.dependencies.clone(),
696 input_data: job_state.input_data.clone(),
697 },
698 ));
699
700 started_jobs.push(job_id);
701 }
702
703 for job_id in started_jobs {
704 self.current_jobs
705 .get_mut(&job_id)
706 .unwrap()
707 .has_been_scheduled = true;
708 }
709
710 self.handle_completed_queue(&mut log_data.log_events);
711
712 let now = std::time::Instant::now();
713 let mut print_progress = true;
714 if let Some(last_job_print_time) = self.last_job_print_time {
715 if (now - last_job_print_time) < std::time::Duration::from_millis(500) {
716 print_progress = false;
717 }
718 }
719
720 if print_progress {
721 log::info!(
722 "Jobs: {}/{}",
723 self.completed_job_count,
724 self.current_jobs.len()
725 );
726 self.last_job_print_time = Some(now);
727 }
728 }
729
730 pub fn completed_job_count(&self) -> usize {
731 self.completed_job_count
732 }
733
734 pub fn current_job_count(&self) -> usize {
735 self.current_jobs.len()
736 }
737
738 pub fn is_idle(&self) -> bool {
739 if !self.job_create_queue_rx.is_empty() {
740 return false;
741 }
742
743 if !self.thread_pool.as_ref().unwrap().is_idle() {
748 return false;
749 }
750
751 if !self.written_artifact_queue_rx.is_empty() {
756 return false;
757 }
758
759 for (_id, job) in &self.current_jobs {
761 if job.output_data.is_none() {
762 return false;
763 }
764 }
765
766 true
767 }
768}