hydrate_pipeline/build/job_system/
executor.rs

1use crate::build::{BuiltArtifact, WrittenArtifact};
2use crate::import::ImportData;
3use crate::{BuildLogData, BuildLogEvent, LogEventLevel, PipelineResult};
4use crossbeam_channel::{Receiver, Sender};
5use hydrate_base::hashing::HashMap;
6use hydrate_base::uuid_path::uuid_and_hash_to_path;
7use hydrate_base::{ArtifactId, AssetId};
8use hydrate_data::{DataSet, SchemaSet};
9use serde::{Deserialize, Serialize};
10use std::cell::RefCell;
11use std::hash::Hasher;
12use std::io::{BufWriter, Write};
13use std::panic::RefUnwindSafe;
14use std::path::PathBuf;
15use std::rc::Rc;
16use std::sync::Arc;
17
18use super::*;
19
20struct JobWrapper<T: JobProcessor>(T);
21
22impl<T: JobProcessor + Send + Sync + RefUnwindSafe> JobProcessorAbstract for JobWrapper<T>
23where
24    <T as JobProcessor>::InputT: for<'a> Deserialize<'a> + 'static,
25    <T as JobProcessor>::OutputT: Serialize + 'static,
26{
27    fn version_inner(&self) -> u32 {
28        self.0.version()
29    }
30
31    fn enumerate_dependencies_inner(
32        &self,
33        job_id: JobId,
34        job_requestor: JobRequestor,
35        input: &Vec<u8>,
36        data_set: &DataSet,
37        schema_set: &SchemaSet,
38        log_events: &mut Vec<BuildLogEvent>,
39    ) -> PipelineResult<JobEnumeratedDependencies> {
40        let data: <T as JobProcessor>::InputT = bincode::deserialize(input.as_slice()).unwrap();
41        self.0.enumerate_dependencies(EnumerateDependenciesContext {
42            job_id,
43            job_requestor,
44            input: &data,
45            data_set,
46            schema_set,
47            log_events: &Rc::new(RefCell::new(log_events)),
48        })
49    }
50
51    fn run_inner(
52        &self,
53        job_id: JobId,
54        input: &Vec<u8>,
55        data_set: &DataSet,
56        schema_set: &SchemaSet,
57        job_api: &dyn JobApi,
58        fetched_asset_data: &mut HashMap<AssetId, FetchedAssetData>,
59        fetched_import_data: &mut HashMap<AssetId, FetchedImportData>,
60        log_events: &mut Vec<BuildLogEvent>,
61    ) -> PipelineResult<Arc<Vec<u8>>> {
62        let data: <T as JobProcessor>::InputT = bincode::deserialize(input.as_slice()).unwrap();
63        let output = {
64            profiling::scope!(&format!("{:?}::run", std::any::type_name::<T>()));
65            self.0.run(&RunContext {
66                job_id,
67                input: &data,
68                data_set,
69                schema_set,
70                fetched_asset_data: &Rc::new(RefCell::new(fetched_asset_data)),
71                fetched_import_data: &Rc::new(RefCell::new(fetched_import_data)),
72                job_api,
73                log_events: &Rc::new(RefCell::new(log_events)),
74            })
75        }?;
76        Ok(Arc::new(bincode::serialize(&output)?))
77    }
78}
79
80// struct JobHistory {
81//     // version() returned from the processor, if it bumps we invalidate the job
82//     job_version: u32,
83//
84//     // The dependencies that existed when we ran this job last time (may not need this?)
85//     dependencies: JobEnumeratedDependencies,
86//     // Hash of import data used to run the job. If our import data changed, the job results can't be
87//     // reused
88//     import_data_hashes: HashMap<AssetId, u128>,
89//     // All the jobs this job produced. Even if we can reuse the results of this job, we will have
90//     // to check downstream jobs do not detect an input data change.
91//     downstream_jobs: Vec<QueuedJob>,
92// }
93
94struct JobState {
95    job_type: JobTypeId,
96    dependencies: Arc<JobEnumeratedDependencies>,
97    input_data: Arc<Vec<u8>>,
98    debug_name: Arc<String>,
99
100    // When we send the job to the thread pool, this is set to true
101    has_been_scheduled: bool,
102    // This would eventually be stored on file system
103    output_data: Option<JobStateOutput>,
104}
105
106struct JobStateOutput {
107    _output_data: PipelineResult<Arc<Vec<u8>>>,
108    _fetched_asset_data: HashMap<AssetId, FetchedAssetData>,
109    _fetched_import_data: HashMap<AssetId, FetchedImportData>,
110}
111
112//TODO: Future optimization, we clone this and it could be big, especially when we re-run jobs. We
113// could just enqueue the ID of the job if we have the job history
114#[derive(Clone)]
115struct QueuedJob {
116    job_id: JobId,
117    job_requestor: JobRequestor,
118    job_type: JobTypeId,
119    input_data: Arc<Vec<u8>>,
120    dependencies: PipelineResult<JobEnumeratedDependencies>,
121    debug_name: Arc<String>,
122}
123
124#[derive(Default)]
125pub struct JobProcessorRegistryBuilder {
126    job_processors: HashMap<JobTypeId, Arc<dyn JobProcessorAbstract>>,
127}
128
129impl JobProcessorRegistryBuilder {
130    pub fn register_job_processor<
131        T: JobProcessor + Send + Sync + RefUnwindSafe + Default + 'static,
132    >(
133        &mut self
134    ) where
135        <T as JobProcessor>::InputT: for<'a> Deserialize<'a>,
136        <T as JobProcessor>::OutputT: Serialize,
137    {
138        let old = self.job_processors.insert(
139            JobTypeId::from_bytes(T::UUID),
140            Arc::new(JobWrapper(T::default())),
141        );
142        if old.is_some() {
143            panic!("Multiple job processors registered with the same UUID");
144        }
145    }
146
147    pub fn register_job_processor_instance<
148        T: JobProcessor + Send + Sync + RefUnwindSafe + 'static,
149    >(
150        &mut self,
151        job_processor: T,
152    ) where
153        <T as JobProcessor>::InputT: for<'a> Deserialize<'a>,
154        <T as JobProcessor>::OutputT: Serialize,
155    {
156        let old = self.job_processors.insert(
157            JobTypeId::from_bytes(T::UUID),
158            Arc::new(JobWrapper(job_processor)),
159        );
160        if old.is_some() {
161            panic!("Multiple job processors registered with the same UUID");
162        }
163    }
164
165    pub fn build(self) -> JobProcessorRegistry {
166        let inner = JobProcessorRegistryInner {
167            job_processors: self.job_processors,
168        };
169
170        JobProcessorRegistry {
171            inner: Arc::new(inner),
172        }
173    }
174}
175
176pub struct JobProcessorRegistryInner {
177    job_processors: HashMap<JobTypeId, Arc<dyn JobProcessorAbstract>>,
178}
179
180#[derive(Clone)]
181pub struct JobProcessorRegistry {
182    inner: Arc<JobProcessorRegistryInner>,
183}
184
185impl JobProcessorRegistry {
186    fn get(
187        &self,
188        job_type_id: JobTypeId,
189    ) -> Option<&dyn JobProcessorAbstract> {
190        self.inner.job_processors.get(&job_type_id).map(|x| &**x)
191    }
192
193    fn contains_key(
194        &self,
195        job_type_id: JobTypeId,
196    ) -> bool {
197        self.inner.job_processors.contains_key(&job_type_id)
198    }
199
200    pub(crate) fn get_processor(
201        &self,
202        job_type: JobTypeId,
203    ) -> Option<Arc<dyn JobProcessorAbstract>> {
204        self.inner.job_processors.get(&job_type).cloned()
205    }
206}
207
208struct JobApiImplInner {
209    schema_set: SchemaSet,
210    import_data_root_path: PathBuf,
211    build_data_root_path: PathBuf,
212    job_processor_registry: JobProcessorRegistry,
213    job_create_queue_tx: Sender<QueuedJob>,
214    artifact_handle_created_tx: Sender<AssetArtifactIdPair>,
215    written_artifact_queue_tx: Sender<WrittenArtifact>,
216}
217
218#[derive(Clone)]
219pub struct JobApiImpl {
220    inner: Arc<JobApiImplInner>,
221}
222
223impl JobApi for JobApiImpl {
224    fn enqueue_job(
225        &self,
226        job_requestor: JobRequestor,
227        data_set: &DataSet,
228        schema_set: &SchemaSet,
229        new_job: NewJob,
230        debug_name: String,
231        log_events: &mut Vec<BuildLogEvent>,
232    ) -> PipelineResult<JobId> {
233        // Dependencies:
234        // - Job Versioning - so if logic changes we can bump version of the processor and kick jobs to rerun
235        // - Asset (we need to know hash of data in it)
236        // - Import Data (we need to know hash of data in it)
237        // - Intermediate data (we need the job's input hash, which takes into account the parameters of the job including
238        //   hashes of above stuff
239        // - Build Data (we need the build hash, which takes into account the asset/import data
240        let job_id = JobId::from_u128(new_job.input_hash);
241        let processor = self
242            .inner
243            .job_processor_registry
244            .get(new_job.job_type)
245            .unwrap();
246
247        let dependencies = processor.enumerate_dependencies_inner(
248            job_id,
249            job_requestor,
250            &new_job.input_data,
251            data_set,
252            schema_set,
253            log_events,
254        );
255
256        self.inner
257            .job_create_queue_tx
258            .send(QueuedJob {
259                job_id,
260                job_requestor,
261                job_type: new_job.job_type,
262                input_data: Arc::new(new_job.input_data),
263                dependencies,
264                debug_name: Arc::new(debug_name),
265            })
266            .unwrap();
267
268        Ok(job_id)
269    }
270
271    fn artifact_handle_created(
272        &self,
273        asset_id: AssetId,
274        artifact_id: ArtifactId,
275    ) {
276        //TODO: Is this necessary, can we handle it when the job result is returned?
277        self.inner
278            .artifact_handle_created_tx
279            .send(AssetArtifactIdPair {
280                asset_id,
281                artifact_id,
282            })
283            .unwrap();
284    }
285
286    fn produce_artifact(
287        &self,
288        artifact: BuiltArtifact,
289    ) {
290        profiling::scope!("Write Asset to Disk");
291        //
292        // Hash the artifact
293        //
294        let mut hasher = siphasher::sip::SipHasher::default();
295        artifact.data.hash(&mut hasher);
296        artifact.metadata.hash(&mut hasher);
297        let build_hash = hasher.finish();
298
299        //
300        // Determine where we will store the asset and ensure the directory exists
301        //
302        let path = uuid_and_hash_to_path(
303            &self.inner.build_data_root_path,
304            artifact.artifact_id.as_uuid(),
305            build_hash,
306            "bf",
307        );
308
309        if let Some(parent) = path.parent() {
310            std::fs::create_dir_all(parent).unwrap();
311        }
312
313        //
314        // Serialize the artifacts to disk
315        //
316        let file = std::fs::File::create(&path).unwrap();
317        let mut buf_writer = BufWriter::new(file);
318        artifact.metadata.write_header(&mut buf_writer).unwrap();
319        buf_writer.write(&artifact.data).unwrap();
320
321        //
322        // Send info about the written asset back to main thread for inclusion in the manifest
323        //
324        //TODO: Is this necessary, can we handle it when the job result is returned?
325        self.inner
326            .written_artifact_queue_tx
327            .send(WrittenArtifact {
328                asset_id: artifact.asset_id,
329                artifact_id: artifact.artifact_id,
330                metadata: artifact.metadata,
331                build_hash,
332                artifact_key_debug_name: artifact.artifact_key_debug_name,
333            })
334            .unwrap();
335    }
336
337    fn fetch_import_data(
338        &self,
339        asset_id: AssetId,
340    ) -> PipelineResult<ImportData> {
341        crate::import::load_import_data(
342            &self.inner.import_data_root_path,
343            &self.inner.schema_set,
344            asset_id,
345        )
346    }
347}
348
349#[derive(Clone, Debug)]
350pub struct AssetArtifactIdPair {
351    pub asset_id: AssetId,
352    pub artifact_id: ArtifactId,
353}
354
355pub struct JobExecutor {
356    // Will be needed when we start doing job caching
357    _root_path: PathBuf,
358    job_api_impl: JobApiImpl,
359
360    job_processor_registry: JobProcessorRegistry,
361
362    // All the jobs that we have run or will run in this job batch
363    current_jobs: HashMap<JobId, JobState>,
364
365    // Queue for jobs to request additional jobs to run
366    job_create_queue_rx: Receiver<QueuedJob>,
367
368    //TODO: We will have additional deques for jobs that are in a ready state to avoid O(n) iteration
369    artifact_handle_created_rx: Receiver<AssetArtifactIdPair>,
370
371    written_artifact_queue_rx: Receiver<WrittenArtifact>,
372
373    thread_pool_result_rx: Receiver<JobExecutorThreadPoolOutcome>,
374    thread_pool: Option<JobExecutorThreadPool>,
375
376    completed_job_count: usize,
377    last_job_print_time: Option<std::time::Instant>,
378}
379
380impl Drop for JobExecutor {
381    fn drop(&mut self) {
382        let thread_pool = self.thread_pool.take().unwrap();
383        thread_pool.finish();
384    }
385}
386
387impl JobExecutor {
388    pub fn reset(&mut self) {
389        assert!(self.is_idle());
390        self.current_jobs.clear();
391        self.completed_job_count = 0;
392    }
393
394    pub fn new(
395        schema_set: &SchemaSet,
396        job_processor_registry: &JobProcessorRegistry,
397        import_data_root_path: PathBuf,
398        job_data_root_path: PathBuf,
399        build_data_root_path: PathBuf,
400    ) -> Self {
401        let (job_create_queue_tx, job_create_queue_rx) = crossbeam_channel::unbounded();
402        //let (job_completed_queue_tx, job_completed_queue_rx) = crossbeam_channel::unbounded();
403        //let (built_asset_queue_tx, built_asset_queue_rx) = crossbeam_channel::unbounded();
404
405        let (artifact_handle_created_tx, artifact_handle_created_rx) =
406            crossbeam_channel::unbounded();
407        let (written_artifact_queue_tx, written_artifact_queue_rx) = crossbeam_channel::unbounded();
408
409        let job_api_impl = JobApiImpl {
410            inner: Arc::new(JobApiImplInner {
411                schema_set: schema_set.clone(),
412                import_data_root_path: import_data_root_path.clone(),
413                build_data_root_path,
414                job_processor_registry: job_processor_registry.clone(),
415                job_create_queue_tx,
416                artifact_handle_created_tx,
417                written_artifact_queue_tx,
418            }),
419        };
420
421        let thread_count = num_cpus::get();
422        //let thread_count = 1;
423
424        let (thread_pool_result_tx, thread_pool_result_rx) = crossbeam_channel::unbounded();
425        let thread_pool = JobExecutorThreadPool::new(
426            job_processor_registry.clone(),
427            schema_set.clone(),
428            &job_data_root_path,
429            job_api_impl.clone(),
430            thread_count,
431            thread_pool_result_tx,
432        );
433
434        JobExecutor {
435            _root_path: job_data_root_path,
436            job_api_impl,
437            job_processor_registry: job_processor_registry.clone(),
438            //job_history: Default::default(),
439            current_jobs: Default::default(),
440            //job_create_queue_tx,
441            job_create_queue_rx,
442            //job_completed_queue_tx,
443            //job_completed_queue_rx,
444            // built_asset_queue_tx,
445            // built_asset_queue_rx,
446            //artifact_handle_created_tx,
447            artifact_handle_created_rx,
448            //built_artifact_queue_tx,
449            written_artifact_queue_rx,
450            thread_pool_result_rx,
451            thread_pool: Some(thread_pool),
452            completed_job_count: 0,
453            last_job_print_time: None,
454        }
455    }
456
457    pub fn job_api(&self) -> &dyn JobApi {
458        &self.job_api_impl
459    }
460
461    // pub fn take_built_assets(&self) -> Vec<BuiltAsset> {
462    //     let mut built_assets = Vec::default();
463    //     while let Ok(built_asset) = self.built_asset_queue_rx.try_recv() {
464    //         built_assets.push(built_asset);
465    //     }
466    //
467    //     built_assets
468    // }
469
470    pub fn take_written_artifacts(
471        &self,
472        artifact_asset_lookup: &mut HashMap<ArtifactId, AssetId>,
473    ) -> Vec<WrittenArtifact> {
474        let mut written_artifacts = Vec::default();
475        while let Ok(written_artifact) = self.written_artifact_queue_rx.try_recv() {
476            let old = artifact_asset_lookup
477                .insert(written_artifact.artifact_id, written_artifact.asset_id);
478            //assert!(old.is_none());
479
480            if old.is_some() {
481                // Is it possible a job has already created a handle to this asset, even if the asset hasn't been built yet
482                assert_eq!(old, Some(written_artifact.asset_id));
483            }
484
485            written_artifacts.push(written_artifact);
486        }
487
488        while let Ok(asset_artifact_pair) = self.artifact_handle_created_rx.try_recv() {
489            let old = artifact_asset_lookup.insert(
490                asset_artifact_pair.artifact_id,
491                asset_artifact_pair.asset_id,
492            );
493            if old.is_some() {
494                // This happens after taking built artifacts because the built artifacts might have handles
495                // to artifacts and we need to know the asset ID associated with them.
496                assert_eq!(old, Some(asset_artifact_pair.asset_id));
497            }
498        }
499
500        written_artifacts
501    }
502
503    fn handle_create_queue(
504        &mut self,
505        log_data: &mut BuildLogData,
506    ) {
507        while let Ok(queued_job) = self.job_create_queue_rx.try_recv() {
508            log_data
509                .requestors
510                .entry(queued_job.job_id)
511                .or_default()
512                .push(queued_job.job_requestor);
513            // If key exists, we already queued a job with these exact inputs and we can reuse the outputs
514            if !self.current_jobs.contains_key(&queued_job.job_id) {
515                assert!(self
516                    .job_processor_registry
517                    .contains_key(queued_job.job_type));
518
519                let job_state = match queued_job.dependencies {
520                    Ok(dependencies) => JobState {
521                        job_type: queued_job.job_type,
522                        dependencies: Arc::new(dependencies),
523                        input_data: queued_job.input_data,
524                        debug_name: queued_job.debug_name,
525                        has_been_scheduled: false,
526                        output_data: None,
527                    },
528                    Err(e) => {
529                        let log_error = BuildLogEvent {
530                            job_id: Some(queued_job.job_id),
531                            asset_id: None,
532                            level: LogEventLevel::FatalError,
533                            message: format!(
534                                "enumerate_dependencies returned error: {}",
535                                e.to_string()
536                            ),
537                        };
538                        log::error!("Build Error: {:?}", log_error);
539                        log_data.log_events.push(log_error);
540
541                        JobState {
542                            job_type: queued_job.job_type,
543                            dependencies: Arc::new(JobEnumeratedDependencies::default()),
544                            input_data: queued_job.input_data,
545                            debug_name: queued_job.debug_name,
546                            has_been_scheduled: true,
547                            output_data: Some(JobStateOutput {
548                                _output_data: Err(e),
549                                _fetched_asset_data: Default::default(),
550                                _fetched_import_data: Default::default(),
551                            }),
552                        }
553                    }
554                };
555
556                self.current_jobs.insert(queued_job.job_id, job_state);
557            }
558        }
559    }
560
561    fn handle_completed_queue(
562        &mut self,
563        log_events: &mut Vec<BuildLogEvent>,
564    ) {
565        while let Ok(result) = self.thread_pool_result_rx.try_recv() {
566            match result {
567                JobExecutorThreadPoolOutcome::RunJobComplete(msg) => {
568                    let job = self.current_jobs.get_mut(&msg.request.job_id).unwrap();
569                    match msg.result {
570                        Ok(data) => {
571                            job.output_data = Some(JobStateOutput {
572                                _output_data: Ok(data.output_data),
573                                _fetched_asset_data: data.fetched_asset_data,
574                                _fetched_import_data: data.fetched_import_data,
575                            });
576
577                            for log_event in data.log_events {
578                                log_events.push(log_event);
579                            }
580                        }
581                        Err(e) => {
582                            let log_event = BuildLogEvent {
583                                job_id: Some(msg.request.job_id),
584                                asset_id: None,
585                                level: LogEventLevel::FatalError,
586                                message: format!("Build job returned error: {}", e.to_string()),
587                            };
588                            log::error!("Build Error: {:?}", log_event);
589                            log_events.push(log_event);
590
591                            job.output_data = Some(JobStateOutput {
592                                _output_data: Err(e),
593                                _fetched_asset_data: Default::default(),
594                                _fetched_import_data: Default::default(),
595                            });
596                        }
597                    }
598                    self.completed_job_count += 1;
599
600                    // When we a
601                    //TODO: do we hash stuff here and do anything with it?
602                }
603            }
604        }
605    }
606
607    #[profiling::function]
608    pub fn update(
609        &mut self,
610        data_set: &Arc<DataSet>,
611        log_data: &mut BuildLogData,
612    ) {
613        //
614        // Pull jobs off the create queue. Determine their dependencies and prepare them to run.
615        //
616        self.handle_create_queue(log_data);
617
618        let mut started_jobs = Vec::default();
619
620        //TODO: Don't iterate every job in existence
621        for (&job_id, job_state) in &self.current_jobs {
622            //
623            // See if we already did this job during the current execution cycle
624            //
625            if job_state.has_been_scheduled {
626                continue;
627            }
628
629            assert!(job_state.output_data.is_none());
630
631            //
632            // See if the job we need to wait for has completed
633            //
634            let mut waiting_on_upstream_job = false;
635            for upstream_job in &job_state.dependencies.upstream_jobs {
636                let dependency = self.current_jobs.get(upstream_job);
637                let Some(dependency_job_state) = dependency else {
638                    panic!("Job has a dependency on another job that has not been created");
639                    //TODO: We would not terminate if we remove the panic
640                    //break;
641                };
642
643                if dependency_job_state.output_data.is_none() {
644                    waiting_on_upstream_job = true;
645                    break;
646                }
647            }
648
649            if waiting_on_upstream_job {
650                continue;
651            }
652
653            //
654            // If we've run this job in the past and have a cached result, we can reuse the result.
655            // But we still need to schedule downstream jobs in case their dependencies changed and
656            // they need to be reprocessed
657            //
658            //let mut _has_run_job_before = false;
659            //let mut _can_reuse_result = true;
660            // let job_history = self.job_history.get(&job_id);
661            // if let Some(job_history) = job_history {
662            //     _has_run_job_before = true;
663            //     _can_reuse_result = true;
664            //     //TODO: Check if input data not represented in the job hash changed
665            //     // job_history.import_data_hashed
666            //     // job_history.dependencies
667            //
668            //     // can_reuse_result may be set to false here
669            //
670            //
671            //     if _has_run_job_before && _can_reuse_result {
672            //         // Kick off child jobs
673            //         for downstream_job in &job_history.downstream_jobs {
674            //             self.enqueue_job_internal(downstream_job.clone());
675            //         }
676            //
677            //         // Bail, we will reuse the output from the previous run
678            //         break;
679            //     }
680            // }
681
682            //
683            // At this point we have either never run the job before, or we know the job inputs have changed
684            // Go ahead and run it.
685            //
686            self.thread_pool
687                .as_ref()
688                .unwrap()
689                .add_request(JobExecutorThreadPoolRequest::RunJob(
690                    JobExecutorThreadPoolRequestRunJob {
691                        job_id,
692                        job_type: job_state.job_type,
693                        data_set: data_set.clone(),
694                        _debug_name: job_state.debug_name.clone(),
695                        _dependencies: job_state.dependencies.clone(),
696                        input_data: job_state.input_data.clone(),
697                    },
698                ));
699
700            started_jobs.push(job_id);
701        }
702
703        for job_id in started_jobs {
704            self.current_jobs
705                .get_mut(&job_id)
706                .unwrap()
707                .has_been_scheduled = true;
708        }
709
710        self.handle_completed_queue(&mut log_data.log_events);
711
712        let now = std::time::Instant::now();
713        let mut print_progress = true;
714        if let Some(last_job_print_time) = self.last_job_print_time {
715            if (now - last_job_print_time) < std::time::Duration::from_millis(500) {
716                print_progress = false;
717            }
718        }
719
720        if print_progress {
721            log::info!(
722                "Jobs: {}/{}",
723                self.completed_job_count,
724                self.current_jobs.len()
725            );
726            self.last_job_print_time = Some(now);
727        }
728    }
729
730    pub fn completed_job_count(&self) -> usize {
731        self.completed_job_count
732    }
733
734    pub fn current_job_count(&self) -> usize {
735        self.current_jobs.len()
736    }
737
738    pub fn is_idle(&self) -> bool {
739        if !self.job_create_queue_rx.is_empty() {
740            return false;
741        }
742
743        // if !self.job_completed_queue_rx.is_empty() {
744        //     return false;
745        // }
746
747        if !self.thread_pool.as_ref().unwrap().is_idle() {
748            return false;
749        }
750
751        // if !self.built_asset_queue_rx.is_empty() {
752        //     return false;
753        // }
754
755        if !self.written_artifact_queue_rx.is_empty() {
756            return false;
757        }
758
759        //TODO: Don't iterate, keep a count
760        for (_id, job) in &self.current_jobs {
761            if job.output_data.is_none() {
762                return false;
763            }
764        }
765
766        true
767    }
768}