hydrate_pipeline/build/
build_jobs.rs

1use crate::import::ImportJobs;
2use crate::{BuildLogData, BuildLogEvent, DynEditorModel, LogEventLevel, PipelineResult};
3use hydrate_base::hashing::HashSet;
4use hydrate_base::{hashing::HashMap, AssetId};
5use hydrate_base::{
6    ArtifactId, BuiltArtifactHeaderData, DebugArtifactManifestDataJson, DebugManifestFileJson,
7    StringHash,
8};
9use hydrate_data::{DataSet, HashObjectMode, SchemaSet};
10use std::cell::RefCell;
11use std::collections::VecDeque;
12use std::hash::{Hash, Hasher};
13use std::io::Write;
14use std::path::PathBuf;
15use std::rc::Rc;
16use std::sync::Arc;
17
18use super::*;
19
20struct BuildRequest {
21    asset_id: AssetId,
22}
23
24// A known build job, each existing asset will have an associated build job.
25// It could be in a completed state, or there could be a problem with it and we need to re-run it.
26struct BuildJob {
27    build_data_exists: HashSet<(ArtifactId, u64)>,
28    asset_exists: bool,
29}
30
31impl BuildJob {
32    pub fn new() -> Self {
33        BuildJob {
34            build_data_exists: Default::default(),
35            asset_exists: false,
36        }
37    }
38}
39
40pub struct BuildStatusBuilding {
41    pub total_job_count: usize,
42    pub completed_job_count: usize,
43}
44
45pub enum BuildStatus {
46    Idle,
47    Building(BuildStatusBuilding),
48    Completed(Arc<BuildLogData>),
49}
50
51struct BuiltArtifactInfo {
52    asset_id: AssetId,
53    artifact_key_debug_name: Option<String>,
54    metadata: BuiltArtifactHeaderData,
55}
56
57struct BuildTask {
58    requested_build_ops: VecDeque<BuildRequest>,
59    started_build_ops: HashSet<AssetId>,
60    build_hashes: HashMap<ArtifactId, u64>,
61    artifact_asset_lookup: HashMap<ArtifactId, AssetId>,
62    built_artifact_info: HashMap<ArtifactId, BuiltArtifactInfo>,
63    data_set: Arc<DataSet>,
64    schema_set: SchemaSet,
65    manifest_build_hash: u64,
66    log_data: BuildLogData,
67}
68
69// Cache of all build jobs. This includes builds that are complete, in progress, or not started.
70// We find these by scanning existing assets. We also inspect the asset and built data to see if the
71// job is complete, or is in a failed or stale state.
72pub struct BuildJobs {
73    build_data_root_path: PathBuf,
74    job_executor: JobExecutor,
75    build_jobs: HashMap<AssetId, BuildJob>,
76    //force_rebuild_operations: Vec<BuildOp>
77    current_build_task: Option<BuildTask>,
78    previous_manifest_build_hash: Option<u64>,
79    request_build: bool,
80    needs_build: bool,
81    force_build_queue: HashSet<AssetId>,
82}
83
84impl BuildJobs {
85    pub fn current_build_log(&self) -> Option<&BuildLogData> {
86        self.current_build_task.as_ref().map(|x| &x.log_data)
87    }
88
89    pub fn is_building(&self) -> bool {
90        self.current_build_task.is_some()
91    }
92
93    pub fn new(
94        schema_set: &SchemaSet,
95        job_processor_registry: &JobProcessorRegistry,
96        import_data_root_path: PathBuf,
97        job_data_root_path: PathBuf,
98        build_data_root_path: PathBuf,
99    ) -> Self {
100        //TODO: May need to scan disk to see what is cached?
101        let job_executor = JobExecutor::new(
102            schema_set,
103            job_processor_registry,
104            import_data_root_path,
105            job_data_root_path,
106            build_data_root_path.clone(),
107        );
108        let build_jobs = Default::default();
109
110        BuildJobs {
111            build_data_root_path,
112            job_executor,
113            build_jobs,
114            //force_rebuild_operations: Default::default()
115            current_build_task: None,
116            previous_manifest_build_hash: None,
117            request_build: false,
118            needs_build: false,
119            force_build_queue: Default::default(),
120        }
121    }
122
123    pub fn queue_build_operation(
124        &mut self,
125        asset_id: AssetId,
126    ) {
127        self.force_build_queue.insert(asset_id);
128    }
129
130    pub fn build(&mut self) {
131        self.request_build = true;
132    }
133
134    pub fn needs_build(&self) -> bool {
135        self.needs_build
136    }
137
138    #[profiling::function]
139    pub fn update(
140        &mut self,
141        builder_registry: &BuilderRegistry,
142        editor_model: &mut dyn DynEditorModel,
143        import_jobs: &ImportJobs,
144    ) -> PipelineResult<BuildStatus> {
145        profiling::scope!("Process Build Operations");
146
147        if let Some(build_task) = &mut self.current_build_task {
148            //
149            // For all the requested assets, see if there is a builder for the asset. If there is,
150            // kick off the jobs needed to produce the asset for it
151            //
152            {
153                //profiling::scope!("Start Jobs");
154                while let Some(request) = build_task.requested_build_ops.pop_front() {
155                    if build_task.started_build_ops.contains(&request.asset_id) {
156                        continue;
157                    }
158
159                    let asset_id = request.asset_id;
160                    build_task.started_build_ops.insert(asset_id);
161
162                    // If this unwrap trips, possibly there is a handle to an artifact with an asset
163                    // ID that doesn't exist
164                    let Some(asset_type) = editor_model.data_set().asset_schema(asset_id) else {
165                        panic!("Asset id {:?} was referenced but does not exist", asset_id);
166                    };
167
168                    let Some(builder) =
169                        builder_registry.builder_for_asset(asset_type.fingerprint())
170                    else {
171                        continue;
172                    };
173
174                    if let Err(e) = builder.start_jobs(BuilderContext {
175                        asset_id,
176                        data_set: &build_task.data_set,
177                        schema_set: &build_task.schema_set,
178                        job_api: self.job_executor.job_api(),
179                        log_events: &Rc::new(RefCell::new(&mut build_task.log_data.log_events)),
180                    }) {
181                        let log_event = BuildLogEvent {
182                            job_id: None,
183                            asset_id: Some(asset_id),
184                            level: LogEventLevel::FatalError,
185                            message: format!("start_jobs returned error: {}", e.to_string()),
186                        };
187                        log::error!("Build Error: {:?}", log_event);
188                        build_task.log_data.log_events.push(log_event);
189                    }
190                }
191            }
192
193            //
194            // Pump the job executor, this will schedule work to be done on threads
195            //
196            {
197                //profiling::scope!("Job Executor Update");
198                self.job_executor
199                    .update(&build_task.data_set, &mut build_task.log_data);
200            }
201
202            {
203                //profiling::scope!("Take written artifacts");
204
205                //
206                // Jobs will produce artifacts. We will save these to disk and possibly trigger
207                // additional jobs for assets that they reference.
208                //
209                let written_artifacts = self
210                    .job_executor
211                    .take_written_artifacts(&mut build_task.artifact_asset_lookup);
212
213                for written_artifact in written_artifacts {
214                    //
215                    // Trigger building any dependencies.
216                    //
217                    for &dependency_artifact_id in &written_artifact.metadata.dependencies {
218                        let dependency_asset_id = *build_task
219                            .artifact_asset_lookup
220                            .get(&dependency_artifact_id)
221                            .unwrap();
222                        build_task.requested_build_ops.push_back(BuildRequest {
223                            asset_id: dependency_asset_id,
224                        });
225                    }
226
227                    //
228                    // Ensure the artifact will be in the metadata
229                    //
230                    build_task
231                        .build_hashes
232                        .insert(written_artifact.artifact_id, written_artifact.build_hash);
233
234                    let job = self
235                        .build_jobs
236                        .entry(written_artifact.asset_id)
237                        .or_insert_with(|| BuildJob::new());
238                    job.asset_exists = true;
239                    job.build_data_exists
240                        .insert((written_artifact.artifact_id, written_artifact.build_hash));
241
242                    build_task.built_artifact_info.insert(
243                        written_artifact.artifact_id,
244                        BuiltArtifactInfo {
245                            asset_id: written_artifact.asset_id,
246                            artifact_key_debug_name: written_artifact.artifact_key_debug_name,
247                            metadata: written_artifact.metadata,
248                        },
249                    );
250                }
251            }
252
253            if !(build_task.requested_build_ops.is_empty() && self.job_executor.is_idle()) {
254                //
255                // We are still processing
256                //
257                let completed_job_count = self.job_executor.completed_job_count();
258                let total_job_count = self.job_executor.current_job_count();
259                return Ok(BuildStatus::Building(BuildStatusBuilding {
260                    total_job_count,
261                    completed_job_count,
262                }));
263            }
264        }
265
266        //
267        // Finish the current build task
268        //
269        if let Some(build_task) = self.current_build_task.take() {
270            //
271            // Write the manifest file
272            //TODO: Only if it doesn't already exist? We could skip the whole building process in that case
273            //
274            let mut manifest_path = self.build_data_root_path.clone();
275            manifest_path.push("manifests");
276            std::fs::create_dir_all(&manifest_path).unwrap();
277
278            // This is a more compact file that is run at release
279            let manifest_path_release = manifest_path.join(format!(
280                "{:0>16x}.manifest_release",
281                build_task.manifest_build_hash
282            ));
283            let manifest_release_file = std::fs::File::create(manifest_path_release).unwrap();
284            let mut manifest_release_file_writer = std::io::BufWriter::new(manifest_release_file);
285
286            // This is a json file that supplements the release manifest
287            let manifest_path_debug = manifest_path.join(format!(
288                "{:0>16x}.manifest_debug",
289                build_task.manifest_build_hash
290            ));
291
292            let mut manifest_json = DebugManifestFileJson::default();
293
294            let mut all_hashes = HashSet::default();
295            for (&artifact_id, &build_hash) in &build_task.build_hashes {
296                let built_artifact_info = build_task.built_artifact_info.get(&artifact_id).unwrap();
297                let asset_id = built_artifact_info.asset_id;
298
299                fn add_dependencies_recursively(
300                    artifact_id: ArtifactId,
301                    combined_hash: &mut u64,
302                    all_dependencies: &mut HashSet<ArtifactId>,
303                    build_task: &BuildTask,
304                ) {
305                    // Get the hash and combine it with the hash so far
306                    *combined_hash ^= build_task.build_hashes.get(&artifact_id).unwrap();
307
308                    // Visit all of its dependencies
309                    for dependency in &build_task
310                        .built_artifact_info
311                        .get(&artifact_id)
312                        .unwrap()
313                        .metadata
314                        .dependencies
315                    {
316                        // Visit each artifact only once
317                        if !all_dependencies.contains(&dependency) {
318                            add_dependencies_recursively(
319                                *dependency,
320                                combined_hash,
321                                all_dependencies,
322                                build_task,
323                            );
324                        }
325                    }
326                }
327
328                let mut combined_build_hash = 0;
329                let mut all_dependencies = HashSet::<ArtifactId>::default();
330                add_dependencies_recursively(
331                    artifact_id,
332                    &mut combined_build_hash,
333                    &mut all_dependencies,
334                    &build_task,
335                );
336
337                let is_default_artifact = artifact_id.as_uuid() == asset_id.as_uuid();
338                let symbol_name = if is_default_artifact {
339                    // editor_model.path_node_id_to_path(asset_id.get)
340                    // //let location = edit_context.asset_location(asset_id).unwrap();
341                    //TODO: Assert the cached asset path tree is not stale?
342                    let path = editor_model.asset_display_name_long(asset_id);
343                    assert!(!path.is_empty());
344                    Some(path)
345                } else {
346                    None
347                };
348
349                let symbol_name_hash =
350                    StringHash::from_runtime_str(&symbol_name.clone().unwrap_or_default()).hash();
351                if symbol_name_hash != 0 {
352                    let newly_inserted = all_hashes.insert(symbol_name_hash);
353                    if !newly_inserted {
354                        // We have a hash collision if this fires
355                        panic!("Two artifacts have been produced with the same symbol name. Check for assets with the same name: {:?}", symbol_name);
356                    }
357                }
358
359                let debug_name = if let Some(artifact_key_debug_name) =
360                    &built_artifact_info.artifact_key_debug_name
361                {
362                    format!(
363                        "{}#{}",
364                        editor_model.asset_display_name_long(asset_id),
365                        artifact_key_debug_name
366                    )
367                } else {
368                    editor_model.asset_display_name_long(asset_id)
369                };
370
371                manifest_json.artifacts.push(DebugArtifactManifestDataJson {
372                    artifact_id,
373                    build_hash: format!("{:0>16x}", build_hash),
374                    combined_build_hash: format!("{:0>16x}", combined_build_hash),
375                    symbol_hash: format!("{:0>32x}", symbol_name_hash),
376                    symbol_name: symbol_name.unwrap_or_default(),
377                    artifact_type: built_artifact_info.metadata.asset_type,
378                    debug_name,
379                    //dependencies: artifact_metadata.dependencies.clone(),
380                });
381
382                // Write the artifact ID, build hash, asset type, and hash of symbol name in CSV (this could be very compact binary one day
383                write!(
384                    manifest_release_file_writer,
385                    "{:0>32x},{:0>16x},{:0>16x},{:0>32x},{:0>32x}\n",
386                    artifact_id.as_u128(),
387                    build_hash,
388                    combined_build_hash,
389                    built_artifact_info.metadata.asset_type.as_u128(),
390                    symbol_name_hash
391                )
392                .unwrap();
393            }
394
395            drop(manifest_release_file_writer);
396
397            {
398                profiling::scope!("Write debug manifest data");
399                let json = {
400                    profiling::scope!("serde_json::to_string_pretty");
401                    serde_json::to_string_pretty(&manifest_json).unwrap()
402                };
403
404                profiling::scope!("std::fs::write");
405                std::fs::write(manifest_path_debug, json).unwrap();
406            }
407
408            //
409            // Write a new TOC with summary of this build
410            //
411            let mut toc_path = self.build_data_root_path.clone();
412            toc_path.push("toc");
413            std::fs::create_dir_all(&toc_path).unwrap();
414
415            let timestamp = std::time::SystemTime::now()
416                .duration_since(std::time::SystemTime::UNIX_EPOCH)
417                .unwrap()
418                .as_millis();
419            toc_path.push(format!("{:0>16x}.toc", timestamp));
420
421            std::fs::write(
422                toc_path,
423                format!("{:0>16x}", build_task.manifest_build_hash),
424            )
425            .unwrap();
426
427            self.previous_manifest_build_hash = Some(build_task.manifest_build_hash);
428            return Ok(BuildStatus::Completed(Arc::new(build_task.log_data)));
429        }
430
431        //
432        // Consider starting a new build task
433        //
434        //
435        // If we don't have any pending import jobs, and we don't have a build in-flight, and
436        // something has been changed since the last build, we can start a build now. We need to
437        // first store the hashes of everything that will potentially go into the build.
438        //
439        let mut manifest_build_hash = 0;
440        let mut asset_hashes = HashMap::default();
441        for (asset_id, object) in editor_model.data_set().assets() {
442            let mut hash = editor_model
443                .data_set()
444                .hash_object(*asset_id, HashObjectMode::PropertiesOnly)
445                .unwrap();
446
447            if let Some(import_data) = editor_model.data_set().import_info(*asset_id) {
448                hash ^= import_data.import_data_contents_hash();
449            }
450
451            if !editor_model.is_path_node_or_root(object.schema()) {
452                asset_hashes.insert(*asset_id, hash);
453            }
454
455            let mut inner_hasher = siphasher::sip::SipHasher::default();
456            asset_id.hash(&mut inner_hasher);
457            hash.hash(&mut inner_hasher);
458            manifest_build_hash = manifest_build_hash ^ inner_hasher.finish();
459        }
460
461        let import_data_metadata_hashes = import_jobs.clone_import_data_metadata_hashes();
462        for (k, v) in &import_data_metadata_hashes {
463            let mut inner_hasher = siphasher::sip::SipHasher::default();
464            k.hash(&mut inner_hasher);
465            v.hash(&mut inner_hasher);
466            manifest_build_hash = manifest_build_hash ^ inner_hasher.finish();
467        }
468
469        self.needs_build =
470            if let Some(previous_manifest_build_hash) = self.previous_manifest_build_hash {
471                previous_manifest_build_hash != manifest_build_hash
472            } else {
473                true
474            };
475
476        //
477        // Decide what assets we will initially request. This could be everything or just
478        // a small set of assets (like a level, or all assets marked as "always export")
479        //
480        let mut requested_build_ops = VecDeque::default();
481        if self.request_build {
482            self.request_build = false;
483            // if !needs_rebuild {
484            //     return Ok(BuildStatus::Idle);
485            // } else {
486            for (&asset_id, _) in &asset_hashes {
487                assert!(!editor_model.is_path_node_or_root(
488                    &editor_model.data_set().asset_schema(asset_id).unwrap()
489                ));
490
491                //TODO: Skip assets that aren't explicitly requested, if any were requested
492                //      For now just build everything
493                requested_build_ops.push_back(BuildRequest { asset_id });
494            }
495            //}
496        } else if !self.force_build_queue.is_empty() {
497            for asset_id in self.force_build_queue.drain() {
498                requested_build_ops.push_back(BuildRequest { asset_id });
499            }
500        } else {
501            return Ok(BuildStatus::Idle);
502        }
503
504        self.job_executor.reset();
505
506        let data_set = {
507            profiling::scope!("Clone Dataset");
508            Arc::new(editor_model.data_set().clone())
509        };
510        let schema_set = editor_model.schema_set().clone();
511
512        assert!(self.current_build_task.is_none());
513        let total_job_count = requested_build_ops.len();
514        self.current_build_task = Some(BuildTask {
515            requested_build_ops,
516            started_build_ops: Default::default(),
517            build_hashes: Default::default(),
518            artifact_asset_lookup: Default::default(),
519            built_artifact_info: Default::default(),
520            data_set,
521            schema_set,
522            manifest_build_hash: manifest_build_hash,
523            log_data: Default::default(),
524        });
525
526        Ok(BuildStatus::Building(BuildStatusBuilding {
527            total_job_count,
528            completed_job_count: 0,
529        }))
530    }
531}