1use std::{
2 collections::{HashMap, HashSet},
3 path::PathBuf,
4 sync::{
5 atomic::{AtomicUsize, Ordering},
6 Arc,
7 },
8};
9
10use crossbeam_channel::{unbounded, Receiver, Sender};
11use dashmap::DashMap;
12use distill_core::{ArtifactMetadata, AssetMetadata, AssetRef, AssetTypeId, AssetUuid};
13use log::error;
14
15use crate::{
16 handle::{RefOp, SerdeContext},
17 io::{DataRequest, LoaderIO, MetadataRequest, MetadataRequestResult, ResolveRequest},
18 storage::{
19 AssetLoadOp, AssetStorage, AtomicHandleAllocator, HandleAllocator, HandleOp,
20 IndirectIdentifier, IndirectionResolver, IndirectionTable, LoadHandle, LoadInfo,
21 LoadStatus, LoaderInfoProvider,
22 },
23 Result,
24};
25
26#[derive(Copy, Clone, PartialEq, Debug)]
28enum LoadState {
29 None,
31 WaitingForMetadata,
33 RequestingMetadata,
35 RequestDependencies,
37 WaitingForDependencies,
39 WaitingForData,
41 RequestingData,
43 LoadingAsset,
45 LoadedUncommitted,
48 Loaded,
50 UnloadRequested,
52 Unloading,
54}
55
56#[derive(Copy, Clone, PartialEq, Debug)]
58enum IndirectHandleState {
59 None,
60 WaitingForMetadata,
61 RequestingMetadata,
62 Resolved,
63}
64
65#[derive(Debug)]
66struct IndirectLoad {
67 id: IndirectIdentifier,
68 state: IndirectHandleState,
69 resolved_uuid: Option<AssetUuid>,
70 refs: AtomicUsize,
71 pending_reresolve: bool,
72}
73
74#[derive(Debug, Clone)]
75struct AssetVersionLoad {
76 state: LoadState,
77 metadata: Option<ArtifactMetadata>,
78 asset_metadata: Option<AssetMetadata>,
79 asset_type: Option<AssetTypeId>,
80 auto_commit: bool,
81 version: u32,
82}
83#[derive(Debug)]
84struct AssetLoad {
85 asset_id: AssetUuid,
86 last_state_change_instant: std::time::Instant,
87 refs: AtomicUsize,
88 versions: Vec<AssetVersionLoad>,
89 version_counter: u32,
90 pending_reload: bool,
91}
92
93struct PendingReload {
95 asset_id: AssetUuid,
97 version_before: u32,
99}
100
101pub struct LoaderState {
102 handle_allocator: Arc<dyn HandleAllocator>,
103 load_states: DashMap<LoadHandle, AssetLoad>,
104 uuid_to_load: DashMap<AssetUuid, LoadHandle>,
105 op_tx: Sender<HandleOp>,
106 op_rx: Receiver<HandleOp>,
107 invalidate_tx: Sender<AssetUuid>,
108 invalidate_rx: Receiver<AssetUuid>,
109 #[cfg(feature = "invalidate_path")]
110 invalidate_path_tx: Sender<PathBuf>,
111 #[cfg(feature = "invalidate_path")]
112 invalidate_path_rx: Receiver<PathBuf>,
113 pending_reloads: Vec<PendingReload>,
114 indirect_states: DashMap<LoadHandle, IndirectLoad>,
115 indirect_to_load: DashMap<IndirectIdentifier, LoadHandle>,
116 indirect_table: IndirectionTable,
117 responses: IORequestChannels,
118}
119
120#[allow(clippy::type_complexity)]
121struct IORequestChannels {
122 data_rx: Receiver<(Result<Vec<u8>>, LoadHandle, u32)>,
123 data_tx: Sender<(Result<Vec<u8>>, LoadHandle, u32)>,
124 metadata_rx: Receiver<(
125 Result<Vec<MetadataRequestResult>>,
126 HashMap<AssetUuid, (LoadHandle, u32)>,
127 )>,
128 metadata_tx: Sender<(
129 Result<Vec<MetadataRequestResult>>,
130 HashMap<AssetUuid, (LoadHandle, u32)>,
131 )>,
132 resolve_rx: Receiver<(
133 Result<Vec<(PathBuf, Vec<AssetMetadata>)>>,
134 IndirectIdentifier,
135 LoadHandle,
136 )>,
137 resolve_tx: Sender<(
138 Result<Vec<(PathBuf, Vec<AssetMetadata>)>>,
139 IndirectIdentifier,
140 LoadHandle,
141 )>,
142}
143
144struct AssetLoadResult {
145 new_state: LoadState,
146 asset_type: Option<AssetTypeId>,
147}
148
149impl AssetLoadResult {
150 pub fn from_state(new_state: LoadState) -> Self {
151 Self {
152 new_state,
153 asset_type: None,
154 }
155 }
156}
157
158impl LoaderState {
159 fn get_or_insert_indirect(&self, id: IndirectIdentifier) -> LoadHandle {
160 if let Some(handle) = self.indirect_to_load.get(&id) {
161 *handle
162 } else {
163 let new_handle = self.handle_allocator.alloc();
164 let new_handle = new_handle.set_indirect();
165 log::trace!(
166 "Inserting indirect load for {:?} load handle {:?}",
167 id,
168 new_handle
169 );
170
171 self.indirect_states.insert(
172 new_handle,
173 IndirectLoad {
174 id: id.clone(),
175 state: IndirectHandleState::None,
176 resolved_uuid: None,
177 refs: AtomicUsize::new(0),
178 pending_reresolve: false,
179 },
180 );
181 self.indirect_to_load.insert(id, new_handle);
182 new_handle
183 }
184 }
185
186 fn get_or_insert(&self, id: AssetUuid) -> LoadHandle {
187 let handle = *self.uuid_to_load.entry(id).or_insert_with(|| {
188 let new_handle = self.handle_allocator.alloc();
189
190 log::trace!(
191 "Inserting load state for {:?} load handle {:?}",
192 id,
193 new_handle
194 );
195
196 self.load_states.insert(
197 new_handle,
198 AssetLoad {
199 asset_id: id,
200 versions: vec![AssetVersionLoad {
201 asset_type: None,
202 auto_commit: true,
203 metadata: None,
204 asset_metadata: None,
205 state: LoadState::None,
206 version: 1,
207 }],
208 version_counter: 1,
209 last_state_change_instant: std::time::Instant::now(),
210 refs: AtomicUsize::new(0),
211 pending_reload: false,
212 },
213 );
214 new_handle
215 });
216 handle
217 }
218
219 fn add_refs(&self, id: AssetUuid, num_refs: usize) -> LoadHandle {
220 let handle = self.get_or_insert(id);
221 self.add_ref_handle(handle, num_refs);
222 handle
223 }
224
225 fn add_ref_handle(&self, handle: LoadHandle, num_refs: usize) {
226 if handle.is_indirect() {
227 let state = self.indirect_states.get(&handle).unwrap();
228 if let Some(uuid) = state.resolved_uuid {
229 self.add_refs(uuid, 1);
230 }
231 state.refs.fetch_add(1, Ordering::Relaxed);
232 } else {
233 self.load_states
234 .get(&handle)
235 .map(|h| h.refs.fetch_add(num_refs, Ordering::Relaxed));
236 }
237 }
238
239 fn get_asset(&self, load: LoadHandle) -> Option<AssetTypeId> {
240 let load = if load.is_indirect() {
241 self.indirect_table.resolve(load)?
242 } else {
243 load
244 };
245 self.load_states
246 .get(&load)
247 .map(|load| {
248 load.versions
249 .iter()
250 .find(|version| matches!(version.state, LoadState::Loaded))
251 .map(|version| version.asset_type)
252 .unwrap_or(None)
253 })
254 .unwrap_or(None)
255 }
256
257 fn get_load_info(&self, load: LoadHandle) -> Option<LoadInfo> {
258 let load = if load.is_indirect() {
259 self.indirect_table.resolve(load)?
260 } else {
261 load
262 };
263 self.load_states.get(&load).map(|s| {
264 let (path, file_name, asset_name) = s
265 .versions
266 .iter()
267 .filter(|v| v.metadata.is_some())
268 .max_by_key(|v| v.version)
269 .and_then(|v| {
270 v.asset_metadata.as_ref().map(|a| {
271 let path = a
272 .search_tags
273 .iter()
274 .find(|(tag, _)| tag == "path")
275 .and_then(|(_, v)| v.clone());
276 let file_name = a
277 .search_tags
278 .iter()
279 .find(|(tag, _)| tag == "file_name")
280 .and_then(|(_, v)| v.clone());
281 let asset_name = a
282 .search_tags
283 .iter()
284 .find(|(tag, _)| tag == "name")
285 .and_then(|(_, v)| v.clone());
286 (path, file_name, asset_name)
287 })
288 })
289 .unwrap_or((None, None, None));
290 LoadInfo {
291 asset_id: s.asset_id,
292 refs: s.refs.load(Ordering::Relaxed) as u32,
293 path,
294 file_name,
295 asset_name,
296 }
297 })
298 }
299
300 fn remove_refs(&self, load: LoadHandle, num_refs: usize) {
301 if load.is_indirect() {
302 if let Some(state) = self.indirect_states.get_mut(&load) {
303 if let Some(uuid) = state.resolved_uuid {
304 let uuid_handle = self.get_or_insert(uuid);
305 self.remove_refs(uuid_handle, num_refs);
306 }
307 assert!(
308 state.refs.fetch_sub(num_refs, Ordering::Relaxed) < usize::MAX - num_refs,
309 "refcount underflow for indirect load {:?}:{:?}",
310 load.0,
311 *state
312 );
313 }
314 } else {
315 self.load_states.get(&load).map(|h| {
316 assert!(
317 h.refs.fetch_sub(num_refs, Ordering::Relaxed) < usize::MAX - num_refs,
318 "refcount underflow for asset {:?}",
319 self.get_load_info(load),
320 );
321 });
322 }
323 }
324
325 fn add_ref_indirect(&self, id: IndirectIdentifier) -> LoadHandle {
326 let handle = self.get_or_insert_indirect(id);
327 self.add_ref_handle(handle, 1);
328 handle
329 }
330
331 fn process_load_states(&self, asset_storage: &dyn AssetStorage) {
332 let mut to_remove = Vec::new();
333 let keys: Vec<_> = self.load_states.iter().map(|x| *x.key()).collect();
334
335 for key in keys {
336 let mut versions_to_remove = Vec::new();
337
338 let mut entry = self.load_states.get_mut(&key).unwrap();
339 let load = entry.value_mut();
340
341 let has_refs = load.refs.load(Ordering::Relaxed) > 0;
342 if !has_refs && load.versions.is_empty() {
343 to_remove.push(key);
344 } else {
345 if has_refs && load.pending_reload {
346 if load
348 .versions
349 .iter()
350 .all(|v| matches!(v.state, LoadState::Loaded))
351 {
352 load.version_counter += 1;
353 let new_version = load.version_counter;
354 load.versions.push(AssetVersionLoad {
355 asset_type: None,
356 metadata: None,
357 asset_metadata: None,
358 auto_commit: false,
361 state: LoadState::None,
362 version: new_version,
363 });
364 load.pending_reload = false;
365 }
366 }
367 let last_state_change_instant = load.last_state_change_instant;
368 let mut versions = load.versions.clone();
369 drop(entry);
371 let mut state_change = false;
372 let mut log_old_state = None;
373 let mut log_new_state = None;
374 let newest_version = versions.iter().map(|v| v.version).max().unwrap_or(0);
375 for version_load in &mut versions {
376 let new_state = match version_load.state {
377 LoadState::None if has_refs => {
378 if newest_version > version_load.version {
380 versions_to_remove.push(version_load.version);
381 LoadState::None
382 } else if version_load.metadata.is_some() {
383 LoadState::RequestDependencies
384 } else {
385 LoadState::WaitingForMetadata
386 }
387 }
388 LoadState::None => {
389 if newest_version > version_load.version {
392 versions_to_remove.push(version_load.version);
393 }
394 LoadState::None
395 }
396 LoadState::WaitingForMetadata => {
397 if version_load.metadata.is_some() {
398 LoadState::RequestDependencies
399 } else {
400 LoadState::WaitingForMetadata
401 }
402 }
403 LoadState::RequestingMetadata => LoadState::RequestingMetadata,
404 LoadState::RequestDependencies => {
405 if let Some(artifact) = version_load.metadata.as_ref() {
407 for dependency_asset_id in &artifact.load_deps {
408 if let AssetRef::Uuid(uuid) = dependency_asset_id {
409 self.add_refs(*uuid, 1);
410 }
411 }
412 }
413
414 LoadState::WaitingForDependencies
415 }
416 LoadState::WaitingForDependencies => {
417 let asset_metadata = version_load.metadata.as_ref().unwrap();
418
419 let asset_dependencies_committed =
421 asset_metadata.load_deps.iter().all(|dependency_asset_id| {
422 self.uuid_to_load
423 .get(dependency_asset_id.expect_uuid())
424 .as_ref()
425 .and_then(|dep_load_handle| {
426 self.load_states.get(dep_load_handle)
427 })
428 .map(|dep_load| {
429 dep_load.versions.iter().all(|v| {
434 matches!(
435 v.state,
436 LoadState::Loaded
437 | LoadState::LoadedUncommitted
438 )
439 })
440 })
441 .unwrap_or(false)
442 });
443
444 if asset_dependencies_committed {
445 LoadState::WaitingForData
446 } else {
447 LoadState::WaitingForDependencies
448 }
449 }
450 LoadState::WaitingForData => LoadState::WaitingForData,
451 LoadState::RequestingData => LoadState::RequestingData,
452 LoadState::LoadingAsset => LoadState::LoadingAsset,
453 LoadState::LoadedUncommitted => LoadState::LoadedUncommitted,
454 LoadState::Loaded => {
455 if !has_refs {
456 LoadState::UnloadRequested
457 } else {
458 LoadState::Loaded
459 }
460 }
461 LoadState::UnloadRequested => {
462 if let Some(asset_type) = version_load.asset_type.take() {
463 asset_storage.free(&asset_type, key, version_load.version);
464 }
465
466 if let Some(asset_metadata) = version_load.metadata.as_ref() {
467 asset_metadata
468 .load_deps
469 .iter()
470 .for_each(|dependency_asset_id| {
471 let uuid = dependency_asset_id.expect_uuid();
472 let dependency_load_handle =
474 self.uuid_to_load.get(uuid).unwrap_or_else(|| {
475 panic!(
476 "Expected load handle to exist for asset `{:?}`.",
477 uuid
478 )
479 });
480 log::debug!("Removing ref from `{:?}`", uuid);
481 self.remove_refs(*dependency_load_handle, 1)
483 });
484 }
485
486 LoadState::Unloading
487 }
488 LoadState::Unloading => {
489 LoadState::None
491 }
492 };
493 if version_load.state != new_state {
494 state_change = true;
495 log_new_state = Some(new_state);
496 log_old_state = Some(version_load.state);
497 version_load.state = new_state;
498 }
499 }
500 let mut entry = self.load_states.get_mut(&key).unwrap();
501
502 for version in versions_to_remove {
503 versions.retain(|v| v.version != version);
504 }
505
506 entry.value_mut().versions = versions;
507 if state_change {
508 let time_in_state = std::time::Instant::now()
509 .duration_since(last_state_change_instant)
510 .as_secs_f32();
511 log::debug!(
512 "{:?} {:?} => {:?} in {}s",
513 key,
514 log_old_state.unwrap(),
515 log_new_state.unwrap(),
516 time_in_state
517 );
518
519 entry.value_mut().last_state_change_instant = std::time::Instant::now();
520 } else {
521 let time_in_state = std::time::Instant::now()
522 .duration_since(last_state_change_instant)
523 .as_secs_f32();
524 log::trace!(
525 "process_load_states Key: {:?} State: {:?} Time in state: {}",
526 key,
527 entry
528 .value()
529 .versions
530 .iter()
531 .map(|v| format!("{:?}", v.state))
532 .collect::<Vec<_>>()
533 .join(", "),
534 time_in_state
535 );
536 }
537 }
538
539 }
550 for _i in to_remove {
551 }
559 }
560
561 fn process_metadata_requests(&self, io: &mut dyn LoaderIO) {
562 while let Ok(mut response) = self.responses.metadata_rx.try_recv() {
563 let request_data = &mut response.1;
564 match response.0 {
565 Ok(metadata_list) => {
566 for metadata in metadata_list {
567 let request_data =
568 request_data.remove(&metadata.artifact_metadata.asset_id);
569 let load_handle = if let Some((handle, _)) = request_data {
570 handle
571 } else {
572 self.get_or_insert(metadata.artifact_metadata.asset_id)
573 };
574 let mut load = self
575 .load_states
576 .get_mut(&load_handle)
577 .expect("uuid in uuid_to_load but not in load_states");
578 log::trace!(
579 "received metadata for {:?} after {} secs",
580 load.asset_id,
581 std::time::Instant::now()
582 .duration_since(load.last_state_change_instant)
583 .as_secs_f32()
584 );
585 let version_load = load.versions.iter_mut().find(|v| {
586 if let Some((_, requesting_version)) = request_data {
587 v.version == requesting_version
588 } else {
589 v.metadata.is_none()
590 }
591 });
592 if let Some(version_load) = version_load {
593 version_load.metadata = Some(metadata.artifact_metadata);
594 version_load.asset_metadata = metadata.asset_metadata;
595 if let LoadState::RequestingMetadata = version_load.state {
596 version_load.state = LoadState::RequestDependencies
597 }
598 } else if request_data.is_some() {
599 load.version_counter += 1;
600 let new_version = load.version_counter;
601 load.versions.push(AssetVersionLoad {
602 asset_type: None,
603 auto_commit: true,
604 metadata: Some(metadata.artifact_metadata),
605 asset_metadata: metadata.asset_metadata,
606 state: LoadState::None,
607 version: new_version,
608 });
609 }
610 }
611 }
612 Err(err) => {
613 error!("metadata request failed: {}", err);
614 }
615 }
616 for (handle, version) in request_data.values() {
617 let mut load = self
618 .load_states
619 .get_mut(&handle)
620 .expect("load in metadata request but not in load_states");
621 let version_load = load
622 .versions
623 .iter_mut()
624 .find(|v| v.version == *version)
625 .expect("load in metadata request but not in load.versions");
626 if let LoadState::RequestingMetadata = version_load.state {
627 version_load.state = LoadState::WaitingForMetadata
628 }
629 }
630 }
631 let mut assets_to_request = HashMap::new();
632 for mut entry in self.load_states.iter_mut() {
633 let handle = *entry.key();
634 let load = entry.value_mut();
635 for version_load in &mut load.versions {
636 if let LoadState::WaitingForMetadata = version_load.state {
637 version_load.state = LoadState::RequestingMetadata;
638 assets_to_request.insert(load.asset_id, (handle, version_load.version));
639 }
640 }
641 }
642 if !assets_to_request.is_empty() {
643 io.get_asset_metadata_with_dependencies(MetadataRequest {
644 tx: self.responses.metadata_tx.clone(),
645 requests: Some(assets_to_request),
646 include_asset_metadata: true, })
648 }
649 }
650
651 fn process_data_requests(&self, storage: &dyn AssetStorage, io: &mut dyn LoaderIO) {
652 while let Ok(response) = self.responses.data_rx.try_recv() {
653 let result = response.0;
654 let handle = response.1;
655 let version = response.2;
656 let load = self
657 .load_states
658 .get(&handle)
659 .expect("load did not exist when data request completed");
660 let load_result = match result {
661 Ok(artifact_data) => {
662 let version_load = load
663 .versions
664 .iter()
665 .find(|v| v.version == version)
666 .expect("load version did not exist when data request completed");
667
668 let artifact_type = version_load.metadata.as_ref().unwrap().type_id;
669 let asset_id = load.asset_id;
670 log::trace!("asset data request succeeded for asset {:?}", load.asset_id);
671 drop(load);
674 let update_result = storage.update_asset(
675 self,
676 &artifact_type,
677 artifact_data,
678 response.1,
679 AssetLoadOp::new(self.op_tx.clone(), handle, version),
680 response.2,
681 );
682 if let Err(storage_error) = update_result {
683 error!(
684 "AssetStorage implementor error when updating asset {:?}: {}",
685 asset_id, storage_error
686 );
687 AssetLoadResult::from_state(LoadState::WaitingForData)
688 } else {
689 AssetLoadResult {
690 asset_type: Some(artifact_type),
691 new_state: LoadState::LoadingAsset,
692 }
693 }
694 }
695 Err(err) => {
696 error!(
697 "asset data request failed for asset {:?}: {}",
698 load.asset_id, err
699 );
700 AssetLoadResult::from_state(LoadState::WaitingForMetadata)
701 }
702 };
703 let mut load = self
704 .load_states
705 .get_mut(&response.1)
706 .expect("load did not exist when data request completed");
707 let version_load = load
708 .versions
709 .iter_mut()
710 .find(|v| v.version == version)
711 .expect("load version did not exist when data request completed");
712 version_load.state = load_result.new_state;
713 if let Some(asset_type) = load_result.asset_type {
714 version_load.asset_type = Some(asset_type);
715 }
716 }
717 let mut assets_to_request = Vec::new();
718 for mut load in self.load_states.iter_mut() {
719 let handle = *load.key();
720 let load = load.value_mut();
721
722 if let Some(version_load) = load
723 .versions
724 .iter_mut()
725 .find(|v| matches!(v.state, LoadState::WaitingForData))
726 {
727 version_load.state = LoadState::RequestingData;
728 let artifact_id = version_load.metadata.as_ref().unwrap().id;
729 assets_to_request.push(DataRequest {
730 tx: self.responses.data_tx.clone(),
731 asset_id: load.asset_id,
732 artifact_id,
733 request_data: Some((handle, version_load.version)),
734 });
735 }
736 }
737 if !assets_to_request.is_empty() {
738 io.get_artifacts(assets_to_request);
739 }
740 }
741
742 fn process_load_ops(&self, asset_storage: &dyn AssetStorage) {
743 while let Ok(op) = self.op_rx.try_recv() {
744 match op {
745 HandleOp::Error(_handle, _version, err) => {
746 panic!("load error {}", err);
747 }
748 HandleOp::Complete(handle, version) => {
749 let mut load = self
750 .load_states
751 .get_mut(&handle)
752 .expect("load op completed but load state does not exist");
753 let load_version = load
754 .versions
755 .iter_mut()
756 .find(|v| v.version == version)
757 .expect("loade op completed but version not found in load");
758 if load_version.auto_commit {
759 commit_asset(handle, load.value_mut(), version, asset_storage);
760 } else {
761 load_version.state = LoadState::LoadedUncommitted;
762 }
763 }
764 HandleOp::Drop(handle, version) => {
765 panic!(
766 "load op dropped without calling complete/error, handle {:?} version {}",
767 handle, version
768 )
769 }
770 }
771 }
772 }
773
774 #[cfg(feature = "invalidate_path")]
775 fn process_path_changes(&mut self) {
776 let mut changes = HashSet::new();
777 while let Ok(path) = self.invalidate_path_rx.try_recv() {
778 log::trace!("process_path_changes invalidate_path_rx path: {:?}", path);
779 changes.insert(path);
780 }
781 for entry in self.indirect_to_load.iter() {
782 let indirect_id = entry.key();
783 let handle = entry.value();
784 let cleaned_path =
785 distill_core::utils::canonicalize_path(&PathBuf::from(indirect_id.path()));
786 for change in &changes {
787 if change == &cleaned_path {
788 if let Some(mut indirect) = self.indirect_states.get_mut(&handle) {
789 indirect.pending_reresolve = true;
790 }
791 }
792 }
793 }
794 }
795
796 fn process_asset_changes(&mut self, asset_storage: &dyn AssetStorage) {
798 if self.pending_reloads.is_empty() {
799 let mut changes = HashSet::new();
801 while let Ok(asset) = self.invalidate_rx.try_recv() {
802 log::trace!("process_asset_changes invalidate_rx asset: {:?}", asset);
803 changes.insert(asset);
804 }
805 if !changes.is_empty() {
806 for asset_id in &changes {
808 let current_version = self
809 .uuid_to_load
810 .get(asset_id)
811 .map(|l| *l)
812 .and_then(|load_handle| {
813 self.load_states
814 .get(&load_handle)
815 .map(|load| (load_handle, load))
816 })
817 .map(|(load_handle, load)| {
818 (load_handle, load.versions.iter().map(|v| v.version).max())
819 });
820 if let Some((handle, Some(current_version))) = current_version {
821 let mut load = self
822 .load_states
823 .get_mut(&handle)
824 .expect("load state should exist for pending reload");
825 load.pending_reload = true;
826 self.pending_reloads.push(PendingReload {
827 asset_id: *asset_id,
828 version_before: current_version,
829 });
830 }
831 }
832 }
833 } else {
834 let is_finished = self.pending_reloads.iter().all(|reload| {
835 self.uuid_to_load
836 .get(&reload.asset_id)
837 .as_ref()
838 .and_then(|load_handle| self.load_states.get(load_handle))
839 .map(|load| {
840 load.versions.iter().any(|v| {
843 matches!(v.state, LoadState::Loaded | LoadState::LoadedUncommitted)
844 && v.version > reload.version_before
845 })
846 })
847 .unwrap_or(true)
850 });
851 log::trace!("reload unfinished");
852 if is_finished {
853 for reload in &self.pending_reloads {
854 if let Some((load_handle, mut load)) = self
855 .uuid_to_load
856 .get_mut(&reload.asset_id)
857 .as_ref()
858 .and_then(|load_handle| {
859 self.load_states
860 .get_mut(load_handle)
861 .map(|load| (load_handle, load))
862 })
863 {
864 if let Some(version_to_commit) = load
865 .versions
866 .iter()
867 .find(|v| matches!(v.state, LoadState::LoadedUncommitted))
868 .map(|v| v.version)
869 {
870 log::trace!("committing version");
871 commit_asset(
873 **load_handle,
874 load.value_mut(),
875 version_to_commit,
876 asset_storage,
877 );
878 }
879 }
880 }
881 self.pending_reloads.clear();
882 }
883 }
884 }
885
886 fn process_indirect_states(&self) {
887 for mut entry in self.indirect_states.iter_mut() {
888 let has_refs = entry.refs.load(Ordering::Relaxed) > 0;
889 let new_state = match entry.state {
890 IndirectHandleState::None if has_refs => IndirectHandleState::WaitingForMetadata,
891 IndirectHandleState::Resolved if entry.pending_reresolve => {
892 entry.pending_reresolve = false;
893 IndirectHandleState::WaitingForMetadata
894 }
895 state => state,
896 };
897 entry.state = new_state;
898 }
899 }
900
901 fn process_resolve_requests(&self, io: &mut dyn LoaderIO, resolver: &dyn IndirectionResolver) {
902 while let Ok(response) = self.responses.resolve_rx.try_recv() {
903 let result = response.0;
904 let id = response.1;
905 let load_handle = response.2;
906 let mut state = self
907 .indirect_states
908 .get_mut(&load_handle)
909 .expect("indirect state did not exist when resolve request completed");
910 match result {
911 Ok(candidates) => {
912 let num_refs = state.refs.load(Ordering::Relaxed);
913 let new_uuid = resolver.resolve(&id, candidates);
914 if let Some(existing_uuid) = state.resolved_uuid {
915 let uuid_handle = self.get_or_insert(existing_uuid);
916 self.remove_refs(uuid_handle, num_refs);
917 }
918 if let Some(new_uuid) = new_uuid {
919 let uuid_handle = self.get_or_insert(new_uuid);
920 self.add_refs(new_uuid, num_refs);
921 self.indirect_table.0.insert(load_handle, uuid_handle);
922 } else {
923 self.indirect_table.0.remove(&load_handle);
924 }
925 state.resolved_uuid = new_uuid;
926 state.state = IndirectHandleState::Resolved;
927 }
928 Err(err) => {
929 error!("resolve request failed for id {:?}: {}", id, err);
930 state.state = IndirectHandleState::None;
931 }
932 }
933 }
934 let mut assets_to_request = Vec::new();
935 for mut load in self.indirect_states.iter_mut() {
936 if let IndirectHandleState::WaitingForMetadata = load.state {
937 load.state = IndirectHandleState::RequestingMetadata;
938 assets_to_request.push(ResolveRequest {
939 tx: self.responses.resolve_tx.clone(),
940 id: Some((load.id.clone(), *load.key())),
941 });
942 }
943 }
944 if !assets_to_request.is_empty() {
945 io.get_asset_candidates(assets_to_request);
946 }
947 }
948
949 pub fn invalidate_assets(&self, assets: &[AssetUuid]) {
950 for asset in assets {
951 let _ = self.invalidate_tx.send(*asset);
952 }
953 }
954
955 #[cfg(feature = "invalidate_path")]
956 pub fn invalidate_paths(&self, paths: &[PathBuf]) {
957 for path in paths {
958 let _ = self.invalidate_path_tx.send(path.clone());
959 }
960 }
961}
962
963pub struct Loader {
965 io: Box<dyn LoaderIO>,
966 data: LoaderState,
967}
968
969impl LoaderInfoProvider for LoaderState {
970 fn get_load_handle(&self, id: &AssetRef) -> Option<LoadHandle> {
971 self.uuid_to_load.get(id.expect_uuid()).map(|l| *l)
972 }
973
974 fn get_asset_id(&self, load: LoadHandle) -> Option<AssetUuid> {
975 self.load_states.get(&load).map(|l| l.asset_id)
976 }
977}
978
979impl Loader {
980 pub fn new(io: Box<dyn LoaderIO>) -> Loader {
981 Self::new_with_handle_allocator(io, Arc::new(AtomicHandleAllocator::default()))
982 }
983
984 pub fn new_with_handle_allocator(
985 io: Box<dyn LoaderIO>,
986 handle_allocator: Arc<dyn HandleAllocator>,
987 ) -> Loader {
988 let (op_tx, op_rx) = unbounded();
989 let (invalidate_tx, invalidate_rx) = unbounded();
990 #[cfg(feature = "invalidate_path")]
991 let (invalidate_path_tx, invalidate_path_rx) = unbounded();
992 let (metadata_tx, metadata_rx) = unbounded();
993 let (data_tx, data_rx) = unbounded();
994 let (resolve_tx, resolve_rx) = unbounded();
995 Loader {
996 data: LoaderState {
997 handle_allocator,
998 load_states: DashMap::default(),
999 uuid_to_load: DashMap::default(),
1000 op_rx,
1001 op_tx,
1002 invalidate_rx,
1003 invalidate_tx,
1004 #[cfg(feature = "invalidate_path")]
1005 invalidate_path_rx,
1006 #[cfg(feature = "invalidate_path")]
1007 invalidate_path_tx,
1008 pending_reloads: Vec::new(),
1009 indirect_states: DashMap::new(),
1010 indirect_to_load: DashMap::new(),
1011 indirect_table: IndirectionTable(Arc::new(DashMap::new())),
1012 responses: IORequestChannels {
1013 metadata_rx,
1014 metadata_tx,
1015 data_tx,
1016 data_rx,
1017 resolve_tx,
1018 resolve_rx,
1019 },
1020 },
1021 io,
1022 }
1023 }
1024
1025 pub fn with_serde_context<R>(&self, tx: &Sender<RefOp>, mut f: impl FnMut() -> R) -> R {
1026 let mut result = None;
1027 self.io.with_runtime(&mut |runtime| {
1028 result =
1029 Some(runtime.block_on(SerdeContext::with(&self.data, tx.clone(), async { f() })));
1030 });
1031 result.unwrap()
1032 }
1033
1034 pub fn get_load(&self, id: AssetUuid) -> Option<LoadHandle> {
1042 self.data.uuid_to_load.get(&id).map(|l| *l)
1043 }
1044
1045 pub fn get_load_info(&self, load: LoadHandle) -> Option<LoadInfo> {
1054 self.data.get_load_info(load)
1055 }
1056
1057 pub fn get_active_loads(&self) -> Vec<LoadHandle> {
1059 self.data
1060 .load_states
1061 .iter()
1062 .filter(|v| v.value().refs.load(Ordering::Relaxed) > 0)
1063 .map(|l| *l.key())
1064 .collect()
1065 }
1066
1067 pub fn get_load_status(&self, load: LoadHandle) -> LoadStatus {
1073 let load = if load.is_indirect() {
1074 if let Some(load) = self.data.indirect_table.resolve(load) {
1075 load
1076 } else {
1077 return LoadStatus::Unresolved;
1078 }
1079 } else {
1080 load
1081 };
1082 if let Some(load) = self.data.load_states.get(&load) {
1083 let version = load.versions.iter().max_by_key(|v| v.version);
1084 version
1085 .map(|v| match v.state {
1086 LoadState::None => {
1087 if load.refs.load(Ordering::Relaxed) > 0 {
1088 LoadStatus::Loading
1089 } else {
1090 LoadStatus::NotRequested
1091 }
1092 }
1093 LoadState::Loaded => LoadStatus::Loaded,
1094 LoadState::UnloadRequested | LoadState::Unloading => LoadStatus::Unloading,
1095 _ => LoadStatus::Loading,
1096 })
1097 .unwrap_or(LoadStatus::NotRequested)
1098 } else {
1099 LoadStatus::NotRequested
1100 }
1101 }
1102
1103 pub fn add_ref_handle(&self, handle: LoadHandle) {
1104 self.data.add_ref_handle(handle, 1);
1105 }
1106
1107 pub fn add_ref<U: Into<AssetUuid>>(&self, id: U) -> LoadHandle {
1116 self.data.add_refs(id.into(), 1)
1117 }
1118
1119 pub fn add_ref_indirect(&self, id: IndirectIdentifier) -> LoadHandle {
1125 self.data.add_ref_indirect(id)
1126 }
1127
1128 pub fn get_asset_type(&self, load: LoadHandle) -> Option<AssetTypeId> {
1134 self.data.get_asset(load)
1135 }
1136
1137 pub fn remove_ref(&self, load: LoadHandle) {
1143 self.data.remove_refs(load, 1);
1144 }
1145
1146 pub fn process(
1160 &mut self,
1161 asset_storage: &dyn AssetStorage,
1162 resolver: &dyn IndirectionResolver,
1163 ) -> Result<()> {
1164 self.io.tick(&mut self.data);
1165 self.data.process_asset_changes(asset_storage);
1166 self.data.process_path_changes();
1167 self.data.process_load_ops(asset_storage);
1168 self.data.process_load_states(asset_storage);
1169 self.data.process_indirect_states();
1170 self.data.process_metadata_requests(self.io.as_mut());
1171 self.data
1172 .process_resolve_requests(self.io.as_mut(), resolver);
1173 self.data
1174 .process_data_requests(asset_storage, self.io.as_mut());
1175 Ok(())
1176 }
1177
1178 pub fn indirection_table(&self) -> IndirectionTable {
1185 self.data.indirect_table.clone()
1186 }
1187
1188 pub fn invalidate_assets(&self, assets: &[AssetUuid]) {
1192 self.data.invalidate_assets(assets);
1193 }
1194
1195 pub fn invalidate_paths(&self, paths: &[PathBuf]) {
1199 self.data.invalidate_paths(paths);
1200 }
1201}
1202
1203fn commit_asset(
1204 handle: LoadHandle,
1205 load: &mut AssetLoad,
1206 version: u32,
1207 asset_storage: &dyn AssetStorage,
1208) {
1209 let version_load = load
1210 .versions
1211 .iter_mut()
1212 .find(|v| v.version == version)
1213 .expect("expected version in load when committing asset");
1214 assert!(
1215 LoadState::LoadingAsset == version_load.state
1216 || LoadState::LoadedUncommitted == version_load.state
1217 );
1218 let asset_type = version_load
1219 .asset_type
1220 .as_ref()
1221 .expect("in LoadingAsset state but asset_type is None");
1222 asset_storage.commit_asset_version(asset_type, handle, version_load.version);
1223 version_load.state = LoadState::Loaded;
1224 for version_load in load.versions.iter_mut() {
1225 if version_load.version != version {
1226 assert_eq!(LoadState::Loaded, version_load.state);
1227 version_load.state = LoadState::UnloadRequested;
1228 }
1229 }
1230}