1use crossbeam_channel::Receiver;
2use hydrate_base::hashing::HashMap;
3use hydrate_base::AssetId;
4use std::collections::VecDeque;
5use std::hash::{Hash, Hasher};
6use std::io::BufReader;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use crate::import::import_storage::ImportDataMetadata;
11use crate::import::import_thread_pool::{
12 ImportThreadOutcome, ImportThreadRequest, ImportThreadRequestImport, ImportWorkerThreadPool,
13};
14use crate::import::import_util::RequestedImportable;
15use crate::{
16 DynEditorModel, HydrateProjectConfiguration, ImportJobToQueue, ImportLogData, ImportLogEvent,
17 LogEventLevel, PipelineResult,
18};
19use hydrate_base::uuid_path::{path_to_uuid, uuid_to_path};
20use hydrate_data::ImportableName;
21use hydrate_data::{ImporterId, SchemaSet, SingleObject};
22
23use super::import_types::*;
24use super::importer_registry::*;
25
26pub fn load_import_data(
27 import_data_root_path: &Path,
28 schema_set: &SchemaSet,
29 asset_id: AssetId,
30) -> PipelineResult<ImportData> {
31 profiling::scope!(&format!("Load asset import data {:?}", asset_id));
32 let path = uuid_to_path(import_data_root_path, asset_id.as_uuid(), "if");
33
34 let file = std::fs::File::open(&path)?;
36 let mut buf_reader = BufReader::new(file);
37 let import_data =
38 super::import_storage::load_import_data_from_b3f(schema_set, &mut buf_reader)?;
39
40 let metadata = path.metadata()?;
41 let metadata_hash = hash_file_metadata(&metadata);
42
43 Ok(ImportData {
44 import_data: import_data.single_object,
45 contents_hash: import_data.metadata.import_data_contents_hash,
46 metadata_hash,
47 })
48}
49
50pub(super) fn hash_file_metadata(metadata: &std::fs::Metadata) -> u64 {
51 let mut hasher = siphasher::sip::SipHasher::default();
52 metadata.modified().unwrap().hash(&mut hasher);
53 metadata.len().hash(&mut hasher);
54 hasher.finish()
55}
56
57pub struct ImportDataMetadataHash {
58 pub metadata_hash: u64,
59}
60
61pub struct ImportData {
62 pub import_data: SingleObject,
63 pub contents_hash: u64,
64 pub metadata_hash: u64,
65}
66
67#[derive(Debug, Copy, Clone, PartialEq)]
68pub enum ImportType {
69 ImportAlways,
71 ImportIfImportDataStale,
73}
74
75#[derive(Clone, Debug)]
77pub struct ImportOp {
78 pub requested_importables: HashMap<ImportableName, RequestedImportable>,
80 pub importer_id: ImporterId,
81 pub path: PathBuf,
82 pub import_type: ImportType,
83}
84
85struct ImportJob {
88 import_data_exists: bool,
89 asset_exists: bool,
90 imported_data_hash: Option<u64>,
91}
92
93impl ImportJob {
94 pub fn new() -> Self {
95 ImportJob {
96 import_data_exists: false,
97 asset_exists: false,
98 imported_data_hash: None,
99 }
100 }
101}
102
103pub struct ImportStatusImporting {
104 pub total_job_count: usize,
105 pub completed_job_count: usize,
106}
107
108pub enum ImportStatus {
109 Idle,
110 Importing(ImportStatusImporting),
111 Completed(Arc<ImportLogData>),
112}
113
114struct ImportTask {
115 thread_pool: ImportWorkerThreadPool,
116 job_count: usize,
117 result_rx: Receiver<ImportThreadOutcome>,
118 log_data: ImportLogData,
119}
120
121pub struct ImportJobs {
125 project_config: HydrateProjectConfiguration,
127 import_data_root_path: PathBuf,
128 import_jobs: HashMap<AssetId, ImportJob>,
129 import_operations: VecDeque<ImportJobToQueue>,
130 current_import_task: Option<ImportTask>,
131}
132
133impl ImportJobs {
134 pub fn duplicate_import_data(
135 &self,
136 old_asset_id: AssetId,
137 new_asset_id: AssetId,
138 ) -> PipelineResult<()> {
139 let old_path = uuid_to_path(&self.import_data_root_path, old_asset_id.as_uuid(), "if");
140 let new_path = uuid_to_path(&self.import_data_root_path, new_asset_id.as_uuid(), "if");
141 std::fs::create_dir_all(new_path.parent().unwrap())?;
142 std::fs::copy(old_path, new_path)?;
143 Ok(())
144 }
145
146 pub fn current_import_log(&self) -> Option<&ImportLogData> {
147 self.current_import_task.as_ref().map(|x| &x.log_data)
148 }
149
150 pub fn is_importing(&self) -> bool {
151 self.current_import_task.is_some()
152 }
153
154 pub fn import_data_root_path(&self) -> &Path {
155 &self.import_data_root_path
156 }
157
158 pub fn new(
159 project_config: &HydrateProjectConfiguration,
160 importer_registry: &ImporterRegistry,
161 editor_model: &dyn DynEditorModel,
162 import_data_root_path: &Path,
163 ) -> Self {
164 let import_jobs =
165 ImportJobs::find_all_jobs(importer_registry, editor_model, import_data_root_path);
166
167 ImportJobs {
168 project_config: project_config.clone(),
169 import_data_root_path: import_data_root_path.to_path_buf(),
170 import_jobs,
171 import_operations: Default::default(),
172 current_import_task: None,
173 }
174 }
175
176 pub fn queue_import_operation(
177 &mut self,
178 import_job_to_queue: ImportJobToQueue,
179 ) {
180 if import_job_to_queue.is_empty() {
181 log::warn!("Dropping empty import job")
182 } else {
183 self.import_operations.push_back(import_job_to_queue);
184 }
185 }
186
187 pub fn load_import_data_hash(
188 &self,
189 asset_id: AssetId,
190 ) -> ImportDataMetadataHash {
191 let path = uuid_to_path(&self.import_data_root_path, asset_id.as_uuid(), "if");
192 let metadata = path.metadata().unwrap();
194 let metadata_hash = hash_file_metadata(&metadata);
195 ImportDataMetadataHash { metadata_hash }
196 }
197
198 pub fn clone_import_data_metadata_hashes(&self) -> HashMap<AssetId, u64> {
201 let mut metadata_hashes = HashMap::default();
202 for (k, v) in &self.import_jobs {
203 if let Some(imported_data_hash) = v.imported_data_hash {
204 metadata_hashes.insert(*k, imported_data_hash);
205 }
206 }
207
208 metadata_hashes
209 }
210
211 #[profiling::function]
212 pub fn start_import_task(
213 &mut self,
214 import_job_to_queue: ImportJobToQueue,
215 importer_registry: &ImporterRegistry,
216 editor_model: &mut dyn DynEditorModel,
217 ) -> PipelineResult<ImportTask> {
218 log::info!(
219 "Starting import task for {} source files",
220 import_job_to_queue.import_job_source_files.len()
221 );
222
223 let import_operations: Vec<_> = import_job_to_queue
224 .import_job_source_files
225 .into_iter()
226 .map(|x| ImportOp {
227 requested_importables: x.requested_importables,
228 importer_id: x.importer_id,
229 path: x.source_file_path,
230 import_type: x.import_type,
231 })
232 .collect();
233
234 let mut existing_asset_import_state = HashMap::default();
238 for (asset_id, asset_info) in editor_model.data_set().assets() {
239 if let Some(import_info) = asset_info.import_info() {
240 let import_metadata = ImportDataMetadata {
241 source_file_size: import_info.source_file_size(),
242 source_file_modified_timestamp: import_info.source_file_modified_timestamp(),
243 import_data_contents_hash: import_info.import_data_contents_hash(),
244 };
245 existing_asset_import_state.insert(*asset_id, import_metadata);
246 }
247 }
248 let existing_asset_import_state = Arc::new(existing_asset_import_state);
249
250 let thread_count = num_cpus::get();
254 let (result_tx, result_rx) = crossbeam_channel::unbounded();
257 let thread_pool = ImportWorkerThreadPool::new(
258 &self.project_config,
259 importer_registry,
260 editor_model.schema_set(),
261 &existing_asset_import_state,
262 &self.import_data_root_path,
263 thread_count,
264 result_tx,
265 );
266
267 let mut job_count = 0;
271 for import_op in import_operations {
272 let mut importable_assets = HashMap::<ImportableName, ImportableAsset>::default();
273 for (name, requested_importable) in &import_op.requested_importables {
274 let canonical_path_references =
275 requested_importable.canonical_path_references.clone();
276 let path_references = requested_importable.path_references.clone();
277
278 importable_assets.insert(
293 name.clone(),
294 ImportableAsset {
295 id: requested_importable.asset_id,
296 canonical_path_references,
297 path_references,
298 },
299 );
300 }
301
302 job_count += 1;
303 thread_pool.add_request(ImportThreadRequest::RequestImport(
304 ImportThreadRequestImport {
305 import_op,
306 importable_assets,
307 },
308 ));
309 }
310
311 Ok(ImportTask {
312 thread_pool,
313 job_count,
314 result_rx,
315 log_data: import_job_to_queue.log_data,
316 })
317 }
318
319 #[profiling::function]
320 pub fn update(
321 &mut self,
322 importer_registry: &ImporterRegistry,
323 editor_model: &mut dyn DynEditorModel,
324 ) -> PipelineResult<ImportStatus> {
325 profiling::scope!("Process Import Operations");
326
327 if let Some(current_import_task) = &self.current_import_task {
331 if !current_import_task.thread_pool.is_idle() {
332 return Ok(ImportStatus::Importing(ImportStatusImporting {
333 total_job_count: current_import_task.job_count,
334 completed_job_count: current_import_task.job_count
335 - current_import_task.thread_pool.active_request_count(),
336 }));
337 }
338 }
339
340 if let Some(mut finished_import_task) = self.current_import_task.take() {
344 finished_import_task.thread_pool.finish();
345
346 for outcome in finished_import_task.result_rx.try_iter() {
350 match outcome {
351 ImportThreadOutcome::Complete(msg) => match msg.result {
352 Ok(result) => {
353 for (name, imported_asset) in result {
354 if let Some(requested_importable) =
355 msg.request.import_op.requested_importables.get(&name)
356 {
357 editor_model.handle_import_complete(
358 requested_importable.asset_id,
359 requested_importable.asset_name.clone(),
360 requested_importable.asset_location.clone(),
361 &imported_asset.default_asset,
362 requested_importable.replace_with_default_asset,
363 imported_asset.import_info,
364 &requested_importable.canonical_path_references,
365 &requested_importable.path_references,
366 )?;
367 }
368 }
369 }
370 Err(e) => finished_import_task
371 .log_data
372 .log_events
373 .push(ImportLogEvent {
374 path: msg.request.import_op.path.clone(),
375 asset_id: None,
376 level: LogEventLevel::FatalError,
377 message: format!("Importer returned error: {}", e.to_string()),
378 }),
379 },
380 }
381 }
382
383 return Ok(ImportStatus::Completed(Arc::new(
384 finished_import_task.log_data,
385 )));
386 }
387
388 let Some(import_job_to_queue) = self.import_operations.pop_front() else {
392 return Ok(ImportStatus::Idle);
394 };
395
396 let import_task =
400 self.start_import_task(import_job_to_queue, importer_registry, editor_model)?;
401 let status = ImportStatus::Importing(ImportStatusImporting {
402 total_job_count: import_task.job_count,
403 completed_job_count: 0,
404 });
405
406 assert!(self.current_import_task.is_none());
407 self.current_import_task = Some(import_task);
408
409 Ok(status)
410 }
411
412 fn find_all_jobs(
413 importer_registry: &ImporterRegistry,
414 editor_model: &dyn DynEditorModel,
415 import_data_root_path: &Path,
416 ) -> HashMap<AssetId, ImportJob> {
417 let mut import_jobs = HashMap::<AssetId, ImportJob>::default();
418
419 let walker = globwalk::GlobWalkerBuilder::from_patterns(import_data_root_path, &["**.if"])
423 .file_type(globwalk::FileType::FILE)
424 .build()
425 .unwrap();
426
427 for file in walker {
428 if let Ok(file) = file {
429 let file = dunce::canonicalize(&file.path()).unwrap();
430 let import_file_uuid = path_to_uuid(import_data_root_path, &file).unwrap();
432 let asset_id = AssetId::from_uuid(import_file_uuid);
433 let job = import_jobs
434 .entry(asset_id)
435 .or_insert_with(|| ImportJob::new());
436
437 let file_metadata = file.metadata().unwrap();
438 let import_data_hash = hash_file_metadata(&file_metadata);
439
440 job.import_data_exists = true;
441 job.imported_data_hash = Some(import_data_hash);
442 }
443 }
444
445 for (asset_id, _) in editor_model.data_set().assets() {
449 if let Some(import_info) = editor_model.data_set().import_info(*asset_id) {
450 let importer_id = import_info.importer_id();
451 let importer = importer_registry.importer(importer_id);
452 if importer.is_some() {
453 let job = import_jobs
454 .entry(*asset_id)
455 .or_insert_with(|| ImportJob::new());
456 job.asset_exists = true;
457 }
458 }
459 }
460
461 import_jobs
462 }
463}