hydrate_pipeline/import/
import_jobs.rs

1use crossbeam_channel::Receiver;
2use hydrate_base::hashing::HashMap;
3use hydrate_base::AssetId;
4use std::collections::VecDeque;
5use std::hash::{Hash, Hasher};
6use std::io::BufReader;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use crate::import::import_storage::ImportDataMetadata;
11use crate::import::import_thread_pool::{
12    ImportThreadOutcome, ImportThreadRequest, ImportThreadRequestImport, ImportWorkerThreadPool,
13};
14use crate::import::import_util::RequestedImportable;
15use crate::{
16    DynEditorModel, HydrateProjectConfiguration, ImportJobToQueue, ImportLogData, ImportLogEvent,
17    LogEventLevel, PipelineResult,
18};
19use hydrate_base::uuid_path::{path_to_uuid, uuid_to_path};
20use hydrate_data::ImportableName;
21use hydrate_data::{ImporterId, SchemaSet, SingleObject};
22
23use super::import_types::*;
24use super::importer_registry::*;
25
26pub fn load_import_data(
27    import_data_root_path: &Path,
28    schema_set: &SchemaSet,
29    asset_id: AssetId,
30) -> PipelineResult<ImportData> {
31    profiling::scope!(&format!("Load asset import data {:?}", asset_id));
32    let path = uuid_to_path(import_data_root_path, asset_id.as_uuid(), "if");
33
34    // b3f format
35    let file = std::fs::File::open(&path)?;
36    let mut buf_reader = BufReader::new(file);
37    let import_data =
38        super::import_storage::load_import_data_from_b3f(schema_set, &mut buf_reader)?;
39
40    let metadata = path.metadata()?;
41    let metadata_hash = hash_file_metadata(&metadata);
42
43    Ok(ImportData {
44        import_data: import_data.single_object,
45        contents_hash: import_data.metadata.import_data_contents_hash,
46        metadata_hash,
47    })
48}
49
50pub(super) fn hash_file_metadata(metadata: &std::fs::Metadata) -> u64 {
51    let mut hasher = siphasher::sip::SipHasher::default();
52    metadata.modified().unwrap().hash(&mut hasher);
53    metadata.len().hash(&mut hasher);
54    hasher.finish()
55}
56
57pub struct ImportDataMetadataHash {
58    pub metadata_hash: u64,
59}
60
61pub struct ImportData {
62    pub import_data: SingleObject,
63    pub contents_hash: u64,
64    pub metadata_hash: u64,
65}
66
67#[derive(Debug, Copy, Clone, PartialEq)]
68pub enum ImportType {
69    // Used when the asset doesn't exist
70    ImportAlways,
71    // Used if the asset already exists
72    ImportIfImportDataStale,
73}
74
75// An in-flight import operation we want to perform
76#[derive(Clone, Debug)]
77pub struct ImportOp {
78    // The string is a key is an importable name
79    pub requested_importables: HashMap<ImportableName, RequestedImportable>,
80    pub importer_id: ImporterId,
81    pub path: PathBuf,
82    pub import_type: ImportType,
83}
84
85// A known import job, each existing asset that imports data will have an associated import job.
86// It could be in a completed state, or there could be a problem with it and we need to re-run it.
87struct ImportJob {
88    import_data_exists: bool,
89    asset_exists: bool,
90    imported_data_hash: Option<u64>,
91}
92
93impl ImportJob {
94    pub fn new() -> Self {
95        ImportJob {
96            import_data_exists: false,
97            asset_exists: false,
98            imported_data_hash: None,
99        }
100    }
101}
102
103pub struct ImportStatusImporting {
104    pub total_job_count: usize,
105    pub completed_job_count: usize,
106}
107
108pub enum ImportStatus {
109    Idle,
110    Importing(ImportStatusImporting),
111    Completed(Arc<ImportLogData>),
112}
113
114struct ImportTask {
115    thread_pool: ImportWorkerThreadPool,
116    job_count: usize,
117    result_rx: Receiver<ImportThreadOutcome>,
118    log_data: ImportLogData,
119}
120
121// Cache of all known import jobs. This includes imports that are complete, in progress, or not started.
122// We find these by scanning existing assets and import data. We also inspect the asset and imported
123// data to see if the job is complete, or is in a failed or stale state.
124pub struct ImportJobs {
125    //import_editor_model: EditorModel
126    project_config: HydrateProjectConfiguration,
127    import_data_root_path: PathBuf,
128    import_jobs: HashMap<AssetId, ImportJob>,
129    import_operations: VecDeque<ImportJobToQueue>,
130    current_import_task: Option<ImportTask>,
131}
132
133impl ImportJobs {
134    pub fn duplicate_import_data(
135        &self,
136        old_asset_id: AssetId,
137        new_asset_id: AssetId,
138    ) -> PipelineResult<()> {
139        let old_path = uuid_to_path(&self.import_data_root_path, old_asset_id.as_uuid(), "if");
140        let new_path = uuid_to_path(&self.import_data_root_path, new_asset_id.as_uuid(), "if");
141        std::fs::create_dir_all(new_path.parent().unwrap())?;
142        std::fs::copy(old_path, new_path)?;
143        Ok(())
144    }
145
146    pub fn current_import_log(&self) -> Option<&ImportLogData> {
147        self.current_import_task.as_ref().map(|x| &x.log_data)
148    }
149
150    pub fn is_importing(&self) -> bool {
151        self.current_import_task.is_some()
152    }
153
154    pub fn import_data_root_path(&self) -> &Path {
155        &self.import_data_root_path
156    }
157
158    pub fn new(
159        project_config: &HydrateProjectConfiguration,
160        importer_registry: &ImporterRegistry,
161        editor_model: &dyn DynEditorModel,
162        import_data_root_path: &Path,
163    ) -> Self {
164        let import_jobs =
165            ImportJobs::find_all_jobs(importer_registry, editor_model, import_data_root_path);
166
167        ImportJobs {
168            project_config: project_config.clone(),
169            import_data_root_path: import_data_root_path.to_path_buf(),
170            import_jobs,
171            import_operations: Default::default(),
172            current_import_task: None,
173        }
174    }
175
176    pub fn queue_import_operation(
177        &mut self,
178        import_job_to_queue: ImportJobToQueue,
179    ) {
180        if import_job_to_queue.is_empty() {
181            log::warn!("Dropping empty import job")
182        } else {
183            self.import_operations.push_back(import_job_to_queue);
184        }
185    }
186
187    pub fn load_import_data_hash(
188        &self,
189        asset_id: AssetId,
190    ) -> ImportDataMetadataHash {
191        let path = uuid_to_path(&self.import_data_root_path, asset_id.as_uuid(), "if");
192        //println!("LOAD DATA HASH PATH {:?}", path);
193        let metadata = path.metadata().unwrap();
194        let metadata_hash = hash_file_metadata(&metadata);
195        ImportDataMetadataHash { metadata_hash }
196    }
197
198    // We do a clone because we want to allow background processing of this data and detecting if
199    // import data changed at end of the build - which would invalidate it
200    pub fn clone_import_data_metadata_hashes(&self) -> HashMap<AssetId, u64> {
201        let mut metadata_hashes = HashMap::default();
202        for (k, v) in &self.import_jobs {
203            if let Some(imported_data_hash) = v.imported_data_hash {
204                metadata_hashes.insert(*k, imported_data_hash);
205            }
206        }
207
208        metadata_hashes
209    }
210
211    #[profiling::function]
212    pub fn start_import_task(
213        &mut self,
214        import_job_to_queue: ImportJobToQueue,
215        importer_registry: &ImporterRegistry,
216        editor_model: &mut dyn DynEditorModel,
217    ) -> PipelineResult<ImportTask> {
218        log::info!(
219            "Starting import task for {} source files",
220            import_job_to_queue.import_job_source_files.len()
221        );
222
223        let import_operations: Vec<_> = import_job_to_queue
224            .import_job_source_files
225            .into_iter()
226            .map(|x| ImportOp {
227                requested_importables: x.requested_importables,
228                importer_id: x.importer_id,
229                path: x.source_file_path,
230                import_type: x.import_type,
231            })
232            .collect();
233
234        //
235        // Cache the import info for all assets
236        //
237        let mut existing_asset_import_state = HashMap::default();
238        for (asset_id, asset_info) in editor_model.data_set().assets() {
239            if let Some(import_info) = asset_info.import_info() {
240                let import_metadata = ImportDataMetadata {
241                    source_file_size: import_info.source_file_size(),
242                    source_file_modified_timestamp: import_info.source_file_modified_timestamp(),
243                    import_data_contents_hash: import_info.import_data_contents_hash(),
244                };
245                existing_asset_import_state.insert(*asset_id, import_metadata);
246            }
247        }
248        let existing_asset_import_state = Arc::new(existing_asset_import_state);
249
250        //
251        // Create the thread pool
252        //
253        let thread_count = num_cpus::get();
254        //let thread_count = 1;
255
256        let (result_tx, result_rx) = crossbeam_channel::unbounded();
257        let thread_pool = ImportWorkerThreadPool::new(
258            &self.project_config,
259            importer_registry,
260            editor_model.schema_set(),
261            &existing_asset_import_state,
262            &self.import_data_root_path,
263            thread_count,
264            result_tx,
265        );
266
267        //
268        // Queue the import operations
269        //
270        let mut job_count = 0;
271        for import_op in import_operations {
272            let mut importable_assets = HashMap::<ImportableName, ImportableAsset>::default();
273            for (name, requested_importable) in &import_op.requested_importables {
274                let canonical_path_references =
275                    requested_importable.canonical_path_references.clone();
276                let path_references = requested_importable.path_references.clone();
277
278                // We could merge in any paths that were already configured in the asset DB. However
279                // for now we rely on the code queueing the update to determine if it wants to do that
280                // or not.
281                // if !requested_importable.replace_with_default_asset {
282                //     let asset_referenced_paths = editor_model
283                //         .data_set()
284                //         .resolve_all_path_references(requested_importable.asset_id)
285                //         .unwrap_or_default();
286                //
287                //     for (k, v) in asset_referenced_paths {
288                //         path_references.insert(k, v);
289                //     }
290                // }
291
292                importable_assets.insert(
293                    name.clone(),
294                    ImportableAsset {
295                        id: requested_importable.asset_id,
296                        canonical_path_references,
297                        path_references,
298                    },
299                );
300            }
301
302            job_count += 1;
303            thread_pool.add_request(ImportThreadRequest::RequestImport(
304                ImportThreadRequestImport {
305                    import_op,
306                    importable_assets,
307                },
308            ));
309        }
310
311        Ok(ImportTask {
312            thread_pool,
313            job_count,
314            result_rx,
315            log_data: import_job_to_queue.log_data,
316        })
317    }
318
319    #[profiling::function]
320    pub fn update(
321        &mut self,
322        importer_registry: &ImporterRegistry,
323        editor_model: &mut dyn DynEditorModel,
324    ) -> PipelineResult<ImportStatus> {
325        profiling::scope!("Process Import Operations");
326
327        //
328        // If we already have an import task running, report progress
329        //
330        if let Some(current_import_task) = &self.current_import_task {
331            if !current_import_task.thread_pool.is_idle() {
332                return Ok(ImportStatus::Importing(ImportStatusImporting {
333                    total_job_count: current_import_task.job_count,
334                    completed_job_count: current_import_task.job_count
335                        - current_import_task.thread_pool.active_request_count(),
336                }));
337            }
338        }
339
340        //
341        // If we have a completed import task, merge results back into the editor model
342        //
343        if let Some(mut finished_import_task) = self.current_import_task.take() {
344            finished_import_task.thread_pool.finish();
345
346            //
347            // Commit the imports
348            //
349            for outcome in finished_import_task.result_rx.try_iter() {
350                match outcome {
351                    ImportThreadOutcome::Complete(msg) => match msg.result {
352                        Ok(result) => {
353                            for (name, imported_asset) in result {
354                                if let Some(requested_importable) =
355                                    msg.request.import_op.requested_importables.get(&name)
356                                {
357                                    editor_model.handle_import_complete(
358                                        requested_importable.asset_id,
359                                        requested_importable.asset_name.clone(),
360                                        requested_importable.asset_location.clone(),
361                                        &imported_asset.default_asset,
362                                        requested_importable.replace_with_default_asset,
363                                        imported_asset.import_info,
364                                        &requested_importable.canonical_path_references,
365                                        &requested_importable.path_references,
366                                    )?;
367                                }
368                            }
369                        }
370                        Err(e) => finished_import_task
371                            .log_data
372                            .log_events
373                            .push(ImportLogEvent {
374                                path: msg.request.import_op.path.clone(),
375                                asset_id: None,
376                                level: LogEventLevel::FatalError,
377                                message: format!("Importer returned error: {}", e.to_string()),
378                            }),
379                    },
380                }
381            }
382
383            return Ok(ImportStatus::Completed(Arc::new(
384                finished_import_task.log_data,
385            )));
386        }
387
388        //
389        // Check if we have pending imports/should start a new import task
390        //
391        let Some(import_job_to_queue) = self.import_operations.pop_front() else {
392            // Nothing is pending import
393            return Ok(ImportStatus::Idle);
394        };
395
396        //
397        // Start a new import task with all pending imports
398        //
399        let import_task =
400            self.start_import_task(import_job_to_queue, importer_registry, editor_model)?;
401        let status = ImportStatus::Importing(ImportStatusImporting {
402            total_job_count: import_task.job_count,
403            completed_job_count: 0,
404        });
405
406        assert!(self.current_import_task.is_none());
407        self.current_import_task = Some(import_task);
408
409        Ok(status)
410    }
411
412    fn find_all_jobs(
413        importer_registry: &ImporterRegistry,
414        editor_model: &dyn DynEditorModel,
415        import_data_root_path: &Path,
416    ) -> HashMap<AssetId, ImportJob> {
417        let mut import_jobs = HashMap::<AssetId, ImportJob>::default();
418
419        //
420        // Scan import dir for known import data
421        //
422        let walker = globwalk::GlobWalkerBuilder::from_patterns(import_data_root_path, &["**.if"])
423            .file_type(globwalk::FileType::FILE)
424            .build()
425            .unwrap();
426
427        for file in walker {
428            if let Ok(file) = file {
429                let file = dunce::canonicalize(&file.path()).unwrap();
430                //println!("import file {:?}", file);
431                let import_file_uuid = path_to_uuid(import_data_root_path, &file).unwrap();
432                let asset_id = AssetId::from_uuid(import_file_uuid);
433                let job = import_jobs
434                    .entry(asset_id)
435                    .or_insert_with(|| ImportJob::new());
436
437                let file_metadata = file.metadata().unwrap();
438                let import_data_hash = hash_file_metadata(&file_metadata);
439
440                job.import_data_exists = true;
441                job.imported_data_hash = Some(import_data_hash);
442            }
443        }
444
445        //
446        // Scan assets to find any asset that has an associated importer
447        //
448        for (asset_id, _) in editor_model.data_set().assets() {
449            if let Some(import_info) = editor_model.data_set().import_info(*asset_id) {
450                let importer_id = import_info.importer_id();
451                let importer = importer_registry.importer(importer_id);
452                if importer.is_some() {
453                    let job = import_jobs
454                        .entry(*asset_id)
455                        .or_insert_with(|| ImportJob::new());
456                    job.asset_exists = true;
457                }
458            }
459        }
460
461        import_jobs
462    }
463}