distill_loader/
loader.rs

1use std::{
2    collections::{HashMap, HashSet},
3    path::PathBuf,
4    sync::{
5        atomic::{AtomicUsize, Ordering},
6        Arc,
7    },
8};
9
10use crossbeam_channel::{unbounded, Receiver, Sender};
11use dashmap::DashMap;
12use distill_core::{ArtifactMetadata, AssetMetadata, AssetRef, AssetTypeId, AssetUuid};
13use log::error;
14
15use crate::{
16    handle::{RefOp, SerdeContext},
17    io::{DataRequest, LoaderIO, MetadataRequest, MetadataRequestResult, ResolveRequest},
18    storage::{
19        AssetLoadOp, AssetStorage, AtomicHandleAllocator, HandleAllocator, HandleOp,
20        IndirectIdentifier, IndirectionResolver, IndirectionTable, LoadHandle, LoadInfo,
21        LoadStatus, LoaderInfoProvider,
22    },
23    Result,
24};
25
26/// Describes the state of an asset load operation
27#[derive(Copy, Clone, PartialEq, Debug)]
28enum LoadState {
29    /// Indeterminate state - may transition into a load, or result in removal if ref count is == 0
30    None,
31    /// The load operation needs metadata to progress
32    WaitingForMetadata,
33    /// Metadata is being fetched for the load operation
34    RequestingMetadata,
35    /// Dependencies are requested for loading
36    RequestDependencies,
37    /// Waiting for dependencies to complete loading
38    WaitingForDependencies,
39    /// Waiting for asset data to be fetched
40    WaitingForData,
41    /// Asset data is being fetched
42    RequestingData,
43    /// Engine systems are loading asset
44    LoadingAsset,
45    /// Engine systems have loaded asset, but the asset is not committed.
46    /// This state is only reached when AssetVersionLoad.auto_commit == false.
47    LoadedUncommitted,
48    /// Asset is loaded and ready to use
49    Loaded,
50    /// Asset should be unloaded
51    UnloadRequested,
52    /// Asset is being unloaded by engine systems
53    Unloading,
54}
55
56/// Describes the state of an indirect Handle
57#[derive(Copy, Clone, PartialEq, Debug)]
58enum IndirectHandleState {
59    None,
60    WaitingForMetadata,
61    RequestingMetadata,
62    Resolved,
63}
64
65#[derive(Debug)]
66struct IndirectLoad {
67    id: IndirectIdentifier,
68    state: IndirectHandleState,
69    resolved_uuid: Option<AssetUuid>,
70    refs: AtomicUsize,
71    pending_reresolve: bool,
72}
73
74#[derive(Debug, Clone)]
75struct AssetVersionLoad {
76    state: LoadState,
77    metadata: Option<ArtifactMetadata>,
78    asset_metadata: Option<AssetMetadata>,
79    asset_type: Option<AssetTypeId>,
80    auto_commit: bool,
81    version: u32,
82}
83#[derive(Debug)]
84struct AssetLoad {
85    asset_id: AssetUuid,
86    last_state_change_instant: std::time::Instant,
87    refs: AtomicUsize,
88    versions: Vec<AssetVersionLoad>,
89    version_counter: u32,
90    pending_reload: bool,
91}
92
93/// Keeps track of a pending reload
94struct PendingReload {
95    /// ID of asset that should be reloaded
96    asset_id: AssetUuid,
97    /// The version of the asset before it was reloaded
98    version_before: u32,
99}
100
101pub struct LoaderState {
102    handle_allocator: Arc<dyn HandleAllocator>,
103    load_states: DashMap<LoadHandle, AssetLoad>,
104    uuid_to_load: DashMap<AssetUuid, LoadHandle>,
105    op_tx: Sender<HandleOp>,
106    op_rx: Receiver<HandleOp>,
107    invalidate_tx: Sender<AssetUuid>,
108    invalidate_rx: Receiver<AssetUuid>,
109    #[cfg(feature = "invalidate_path")]
110    invalidate_path_tx: Sender<PathBuf>,
111    #[cfg(feature = "invalidate_path")]
112    invalidate_path_rx: Receiver<PathBuf>,
113    pending_reloads: Vec<PendingReload>,
114    indirect_states: DashMap<LoadHandle, IndirectLoad>,
115    indirect_to_load: DashMap<IndirectIdentifier, LoadHandle>,
116    indirect_table: IndirectionTable,
117    responses: IORequestChannels,
118}
119
120#[allow(clippy::type_complexity)]
121struct IORequestChannels {
122    data_rx: Receiver<(Result<Vec<u8>>, LoadHandle, u32)>,
123    data_tx: Sender<(Result<Vec<u8>>, LoadHandle, u32)>,
124    metadata_rx: Receiver<(
125        Result<Vec<MetadataRequestResult>>,
126        HashMap<AssetUuid, (LoadHandle, u32)>,
127    )>,
128    metadata_tx: Sender<(
129        Result<Vec<MetadataRequestResult>>,
130        HashMap<AssetUuid, (LoadHandle, u32)>,
131    )>,
132    resolve_rx: Receiver<(
133        Result<Vec<(PathBuf, Vec<AssetMetadata>)>>,
134        IndirectIdentifier,
135        LoadHandle,
136    )>,
137    resolve_tx: Sender<(
138        Result<Vec<(PathBuf, Vec<AssetMetadata>)>>,
139        IndirectIdentifier,
140        LoadHandle,
141    )>,
142}
143
144struct AssetLoadResult {
145    new_state: LoadState,
146    asset_type: Option<AssetTypeId>,
147}
148
149impl AssetLoadResult {
150    pub fn from_state(new_state: LoadState) -> Self {
151        Self {
152            new_state,
153            asset_type: None,
154        }
155    }
156}
157
158impl LoaderState {
159    fn get_or_insert_indirect(&self, id: IndirectIdentifier) -> LoadHandle {
160        if let Some(handle) = self.indirect_to_load.get(&id) {
161            *handle
162        } else {
163            let new_handle = self.handle_allocator.alloc();
164            let new_handle = new_handle.set_indirect();
165            log::trace!(
166                "Inserting indirect load for {:?} load handle {:?}",
167                id,
168                new_handle
169            );
170
171            self.indirect_states.insert(
172                new_handle,
173                IndirectLoad {
174                    id: id.clone(),
175                    state: IndirectHandleState::None,
176                    resolved_uuid: None,
177                    refs: AtomicUsize::new(0),
178                    pending_reresolve: false,
179                },
180            );
181            self.indirect_to_load.insert(id, new_handle);
182            new_handle
183        }
184    }
185
186    fn get_or_insert(&self, id: AssetUuid) -> LoadHandle {
187        let handle = *self.uuid_to_load.entry(id).or_insert_with(|| {
188            let new_handle = self.handle_allocator.alloc();
189
190            log::trace!(
191                "Inserting load state for {:?} load handle {:?}",
192                id,
193                new_handle
194            );
195
196            self.load_states.insert(
197                new_handle,
198                AssetLoad {
199                    asset_id: id,
200                    versions: vec![AssetVersionLoad {
201                        asset_type: None,
202                        auto_commit: true,
203                        metadata: None,
204                        asset_metadata: None,
205                        state: LoadState::None,
206                        version: 1,
207                    }],
208                    version_counter: 1,
209                    last_state_change_instant: std::time::Instant::now(),
210                    refs: AtomicUsize::new(0),
211                    pending_reload: false,
212                },
213            );
214            new_handle
215        });
216        handle
217    }
218
219    fn add_refs(&self, id: AssetUuid, num_refs: usize) -> LoadHandle {
220        let handle = self.get_or_insert(id);
221        self.add_ref_handle(handle, num_refs);
222        handle
223    }
224
225    fn add_ref_handle(&self, handle: LoadHandle, num_refs: usize) {
226        if handle.is_indirect() {
227            let state = self.indirect_states.get(&handle).unwrap();
228            if let Some(uuid) = state.resolved_uuid {
229                self.add_refs(uuid, 1);
230            }
231            state.refs.fetch_add(1, Ordering::Relaxed);
232        } else {
233            self.load_states
234                .get(&handle)
235                .map(|h| h.refs.fetch_add(num_refs, Ordering::Relaxed));
236        }
237    }
238
239    fn get_asset(&self, load: LoadHandle) -> Option<AssetTypeId> {
240        let load = if load.is_indirect() {
241            self.indirect_table.resolve(load)?
242        } else {
243            load
244        };
245        self.load_states
246            .get(&load)
247            .map(|load| {
248                load.versions
249                    .iter()
250                    .find(|version| matches!(version.state, LoadState::Loaded))
251                    .map(|version| version.asset_type)
252                    .unwrap_or(None)
253            })
254            .unwrap_or(None)
255    }
256
257    fn get_load_info(&self, load: LoadHandle) -> Option<LoadInfo> {
258        let load = if load.is_indirect() {
259            self.indirect_table.resolve(load)?
260        } else {
261            load
262        };
263        self.load_states.get(&load).map(|s| {
264            let (path, file_name, asset_name) = s
265                .versions
266                .iter()
267                .filter(|v| v.metadata.is_some())
268                .max_by_key(|v| v.version)
269                .and_then(|v| {
270                    v.asset_metadata.as_ref().map(|a| {
271                        let path = a
272                            .search_tags
273                            .iter()
274                            .find(|(tag, _)| tag == "path")
275                            .and_then(|(_, v)| v.clone());
276                        let file_name = a
277                            .search_tags
278                            .iter()
279                            .find(|(tag, _)| tag == "file_name")
280                            .and_then(|(_, v)| v.clone());
281                        let asset_name = a
282                            .search_tags
283                            .iter()
284                            .find(|(tag, _)| tag == "name")
285                            .and_then(|(_, v)| v.clone());
286                        (path, file_name, asset_name)
287                    })
288                })
289                .unwrap_or((None, None, None));
290            LoadInfo {
291                asset_id: s.asset_id,
292                refs: s.refs.load(Ordering::Relaxed) as u32,
293                path,
294                file_name,
295                asset_name,
296            }
297        })
298    }
299
300    fn remove_refs(&self, load: LoadHandle, num_refs: usize) {
301        if load.is_indirect() {
302            if let Some(state) = self.indirect_states.get_mut(&load) {
303                if let Some(uuid) = state.resolved_uuid {
304                    let uuid_handle = self.get_or_insert(uuid);
305                    self.remove_refs(uuid_handle, num_refs);
306                }
307                assert!(
308                    state.refs.fetch_sub(num_refs, Ordering::Relaxed) < usize::MAX - num_refs,
309                    "refcount underflow for indirect load {:?}:{:?}",
310                    load.0,
311                    *state
312                );
313            }
314        } else {
315            self.load_states.get(&load).map(|h| {
316                assert!(
317                    h.refs.fetch_sub(num_refs, Ordering::Relaxed) < usize::MAX - num_refs,
318                    "refcount underflow for asset {:?}",
319                    self.get_load_info(load),
320                );
321            });
322        }
323    }
324
325    fn add_ref_indirect(&self, id: IndirectIdentifier) -> LoadHandle {
326        let handle = self.get_or_insert_indirect(id);
327        self.add_ref_handle(handle, 1);
328        handle
329    }
330
331    fn process_load_states(&self, asset_storage: &dyn AssetStorage) {
332        let mut to_remove = Vec::new();
333        let keys: Vec<_> = self.load_states.iter().map(|x| *x.key()).collect();
334
335        for key in keys {
336            let mut versions_to_remove = Vec::new();
337
338            let mut entry = self.load_states.get_mut(&key).unwrap();
339            let load = entry.value_mut();
340
341            let has_refs = load.refs.load(Ordering::Relaxed) > 0;
342            if !has_refs && load.versions.is_empty() {
343                to_remove.push(key);
344            } else {
345                if has_refs && load.pending_reload {
346                    // Make sure we are not already loading something before starting a load of a new version
347                    if load
348                        .versions
349                        .iter()
350                        .all(|v| matches!(v.state, LoadState::Loaded))
351                    {
352                        load.version_counter += 1;
353                        let new_version = load.version_counter;
354                        load.versions.push(AssetVersionLoad {
355                            asset_type: None,
356                            metadata: None,
357                            asset_metadata: None,
358                            // The assets are not auto_commit for reloads to ensure all assets in a
359                            // changeset are made visible together, atomically
360                            auto_commit: false,
361                            state: LoadState::None,
362                            version: new_version,
363                        });
364                        load.pending_reload = false;
365                    }
366                }
367                let last_state_change_instant = load.last_state_change_instant;
368                let mut versions = load.versions.clone();
369                // make sure we drop the lock before we start processing the state
370                drop(entry);
371                let mut state_change = false;
372                let mut log_old_state = None;
373                let mut log_new_state = None;
374                let newest_version = versions.iter().map(|v| v.version).max().unwrap_or(0);
375                for version_load in &mut versions {
376                    let new_state = match version_load.state {
377                        LoadState::None if has_refs => {
378                            // Remove the version if there's a newer one loading or loaded.
379                            if newest_version > version_load.version {
380                                versions_to_remove.push(version_load.version);
381                                LoadState::None
382                            } else if version_load.metadata.is_some() {
383                                LoadState::RequestDependencies
384                            } else {
385                                LoadState::WaitingForMetadata
386                            }
387                        }
388                        LoadState::None => {
389                            // Remove the version only if there's a newer one.
390                            // TODO: reason about the lifetime of metadata in version loads for dependencies, weak handles
391                            if newest_version > version_load.version {
392                                versions_to_remove.push(version_load.version);
393                            }
394                            LoadState::None
395                        }
396                        LoadState::WaitingForMetadata => {
397                            if version_load.metadata.is_some() {
398                                LoadState::RequestDependencies
399                            } else {
400                                LoadState::WaitingForMetadata
401                            }
402                        }
403                        LoadState::RequestingMetadata => LoadState::RequestingMetadata,
404                        LoadState::RequestDependencies => {
405                            // Add ref to each of the dependent assets.
406                            if let Some(artifact) = version_load.metadata.as_ref() {
407                                for dependency_asset_id in &artifact.load_deps {
408                                    if let AssetRef::Uuid(uuid) = dependency_asset_id {
409                                        self.add_refs(*uuid, 1);
410                                    }
411                                }
412                            }
413
414                            LoadState::WaitingForDependencies
415                        }
416                        LoadState::WaitingForDependencies => {
417                            let asset_metadata = version_load.metadata.as_ref().unwrap();
418
419                            // Ensure dependencies are loaded by engine before continuing to load this asset.
420                            let asset_dependencies_committed =
421                                asset_metadata.load_deps.iter().all(|dependency_asset_id| {
422                                    self.uuid_to_load
423                                        .get(dependency_asset_id.expect_uuid())
424                                        .as_ref()
425                                        .and_then(|dep_load_handle| {
426                                            self.load_states.get(dep_load_handle)
427                                        })
428                                        .map(|dep_load| {
429                                            // Note that we accept assets to be uncommitted but loaded
430                                            // This is to support atomically committing a set of changes when hot reloading
431
432                                            // TODO: Properly check that all dependencies have loaded their *new* version
433                                            dep_load.versions.iter().all(|v| {
434                                                matches!(
435                                                    v.state,
436                                                    LoadState::Loaded
437                                                        | LoadState::LoadedUncommitted
438                                                )
439                                            })
440                                        })
441                                        .unwrap_or(false)
442                                });
443
444                            if asset_dependencies_committed {
445                                LoadState::WaitingForData
446                            } else {
447                                LoadState::WaitingForDependencies
448                            }
449                        }
450                        LoadState::WaitingForData => LoadState::WaitingForData,
451                        LoadState::RequestingData => LoadState::RequestingData,
452                        LoadState::LoadingAsset => LoadState::LoadingAsset,
453                        LoadState::LoadedUncommitted => LoadState::LoadedUncommitted,
454                        LoadState::Loaded => {
455                            if !has_refs {
456                                LoadState::UnloadRequested
457                            } else {
458                                LoadState::Loaded
459                            }
460                        }
461                        LoadState::UnloadRequested => {
462                            if let Some(asset_type) = version_load.asset_type.take() {
463                                asset_storage.free(&asset_type, key, version_load.version);
464                            }
465
466                            if let Some(asset_metadata) = version_load.metadata.as_ref() {
467                                asset_metadata
468                                    .load_deps
469                                    .iter()
470                                    .for_each(|dependency_asset_id| {
471                                        let uuid = dependency_asset_id.expect_uuid();
472                                        // look up handle for uuid
473                                        let dependency_load_handle =
474                                            self.uuid_to_load.get(uuid).unwrap_or_else(|| {
475                                                panic!(
476                                                "Expected load handle to exist for asset `{:?}`.",
477                                                uuid
478                                            )
479                                            });
480                                        log::debug!("Removing ref from `{:?}`", uuid);
481                                        // Remove reference from asset dependency.
482                                        self.remove_refs(*dependency_load_handle, 1)
483                                    });
484                            }
485
486                            LoadState::Unloading
487                        }
488                        LoadState::Unloading => {
489                            // Should we have confirmation from engine here?
490                            LoadState::None
491                        }
492                    };
493                    if version_load.state != new_state {
494                        state_change = true;
495                        log_new_state = Some(new_state);
496                        log_old_state = Some(version_load.state);
497                        version_load.state = new_state;
498                    }
499                }
500                let mut entry = self.load_states.get_mut(&key).unwrap();
501
502                for version in versions_to_remove {
503                    versions.retain(|v| v.version != version);
504                }
505
506                entry.value_mut().versions = versions;
507                if state_change {
508                    let time_in_state = std::time::Instant::now()
509                        .duration_since(last_state_change_instant)
510                        .as_secs_f32();
511                    log::debug!(
512                        "{:?} {:?} => {:?} in {}s",
513                        key,
514                        log_old_state.unwrap(),
515                        log_new_state.unwrap(),
516                        time_in_state
517                    );
518
519                    entry.value_mut().last_state_change_instant = std::time::Instant::now();
520                } else {
521                    let time_in_state = std::time::Instant::now()
522                        .duration_since(last_state_change_instant)
523                        .as_secs_f32();
524                    log::trace!(
525                        "process_load_states Key: {:?} State: {:?} Time in state: {}",
526                        key,
527                        entry
528                            .value()
529                            .versions
530                            .iter()
531                            .map(|v| format!("{:?}", v.state))
532                            .collect::<Vec<_>>()
533                            .join(", "),
534                        time_in_state
535                    );
536                }
537            }
538
539            // Uncomment for recursive logging of dependency's load states
540            /*
541            if log::log_enabled!(log::Level::Trace) {
542                for entry in load_states.iter() {
543                    if entry.value().state == LoadState::WaitingForDependencies {
544                        dump_dependencies(&value.asset_id, load_states, uuid_to_load, metadata, 0);
545                    }
546                }
547            }
548            */
549        }
550        for _i in to_remove {
551            // TODO: This will reset the version counter because it's stored in the AssetLoad.
552            // Is this a problem? Should we guarantee that users never see the same version twice, ever?
553            // Should we store version counters separately?
554            //     let load_state = load_states.remove(&i);
555            //     if let Some((_, load_state)) = load_state {
556            //         uuid_to_load.remove(&load_state.asset_id);
557            //     }
558        }
559    }
560
561    fn process_metadata_requests(&self, io: &mut dyn LoaderIO) {
562        while let Ok(mut response) = self.responses.metadata_rx.try_recv() {
563            let request_data = &mut response.1;
564            match response.0 {
565                Ok(metadata_list) => {
566                    for metadata in metadata_list {
567                        let request_data =
568                            request_data.remove(&metadata.artifact_metadata.asset_id);
569                        let load_handle = if let Some((handle, _)) = request_data {
570                            handle
571                        } else {
572                            self.get_or_insert(metadata.artifact_metadata.asset_id)
573                        };
574                        let mut load = self
575                            .load_states
576                            .get_mut(&load_handle)
577                            .expect("uuid in uuid_to_load but not in load_states");
578                        log::trace!(
579                            "received metadata for {:?} after {} secs",
580                            load.asset_id,
581                            std::time::Instant::now()
582                                .duration_since(load.last_state_change_instant)
583                                .as_secs_f32()
584                        );
585                        let version_load = load.versions.iter_mut().find(|v| {
586                            if let Some((_, requesting_version)) = request_data {
587                                v.version == requesting_version
588                            } else {
589                                v.metadata.is_none()
590                            }
591                        });
592                        if let Some(version_load) = version_load {
593                            version_load.metadata = Some(metadata.artifact_metadata);
594                            version_load.asset_metadata = metadata.asset_metadata;
595                            if let LoadState::RequestingMetadata = version_load.state {
596                                version_load.state = LoadState::RequestDependencies
597                            }
598                        } else if request_data.is_some() {
599                            load.version_counter += 1;
600                            let new_version = load.version_counter;
601                            load.versions.push(AssetVersionLoad {
602                                asset_type: None,
603                                auto_commit: true,
604                                metadata: Some(metadata.artifact_metadata),
605                                asset_metadata: metadata.asset_metadata,
606                                state: LoadState::None,
607                                version: new_version,
608                            });
609                        }
610                    }
611                }
612                Err(err) => {
613                    error!("metadata request failed: {}", err);
614                }
615            }
616            for (handle, version) in request_data.values() {
617                let mut load = self
618                    .load_states
619                    .get_mut(&handle)
620                    .expect("load in metadata request but not in load_states");
621                let version_load = load
622                    .versions
623                    .iter_mut()
624                    .find(|v| v.version == *version)
625                    .expect("load in metadata request but not in load.versions");
626                if let LoadState::RequestingMetadata = version_load.state {
627                    version_load.state = LoadState::WaitingForMetadata
628                }
629            }
630        }
631        let mut assets_to_request = HashMap::new();
632        for mut entry in self.load_states.iter_mut() {
633            let handle = *entry.key();
634            let load = entry.value_mut();
635            for version_load in &mut load.versions {
636                if let LoadState::WaitingForMetadata = version_load.state {
637                    version_load.state = LoadState::RequestingMetadata;
638                    assets_to_request.insert(load.asset_id, (handle, version_load.version));
639                }
640            }
641        }
642        if !assets_to_request.is_empty() {
643            io.get_asset_metadata_with_dependencies(MetadataRequest {
644                tx: self.responses.metadata_tx.clone(),
645                requests: Some(assets_to_request),
646                include_asset_metadata: true, // TODO make this a user-controlled feature to reduce memory usage
647            })
648        }
649    }
650
651    fn process_data_requests(&self, storage: &dyn AssetStorage, io: &mut dyn LoaderIO) {
652        while let Ok(response) = self.responses.data_rx.try_recv() {
653            let result = response.0;
654            let handle = response.1;
655            let version = response.2;
656            let load = self
657                .load_states
658                .get(&handle)
659                .expect("load did not exist when data request completed");
660            let load_result = match result {
661                Ok(artifact_data) => {
662                    let version_load = load
663                        .versions
664                        .iter()
665                        .find(|v| v.version == version)
666                        .expect("load version did not exist when data request completed");
667
668                    let artifact_type = version_load.metadata.as_ref().unwrap().type_id;
669                    let asset_id = load.asset_id;
670                    log::trace!("asset data request succeeded for asset {:?}", load.asset_id);
671                    // We don't want to be holding a lock to the load while calling AssetStorage::update_asset in `load_data`,
672                    // so we drop the load ref, and save the state transition as a return value.
673                    drop(load);
674                    let update_result = storage.update_asset(
675                        self,
676                        &artifact_type,
677                        artifact_data,
678                        response.1,
679                        AssetLoadOp::new(self.op_tx.clone(), handle, version),
680                        response.2,
681                    );
682                    if let Err(storage_error) = update_result {
683                        error!(
684                            "AssetStorage implementor error when updating asset {:?}: {}",
685                            asset_id, storage_error
686                        );
687                        AssetLoadResult::from_state(LoadState::WaitingForData)
688                    } else {
689                        AssetLoadResult {
690                            asset_type: Some(artifact_type),
691                            new_state: LoadState::LoadingAsset,
692                        }
693                    }
694                }
695                Err(err) => {
696                    error!(
697                        "asset data request failed for asset {:?}: {}",
698                        load.asset_id, err
699                    );
700                    AssetLoadResult::from_state(LoadState::WaitingForMetadata)
701                }
702            };
703            let mut load = self
704                .load_states
705                .get_mut(&response.1)
706                .expect("load did not exist when data request completed");
707            let version_load = load
708                .versions
709                .iter_mut()
710                .find(|v| v.version == version)
711                .expect("load version did not exist when data request completed");
712            version_load.state = load_result.new_state;
713            if let Some(asset_type) = load_result.asset_type {
714                version_load.asset_type = Some(asset_type);
715            }
716        }
717        let mut assets_to_request = Vec::new();
718        for mut load in self.load_states.iter_mut() {
719            let handle = *load.key();
720            let load = load.value_mut();
721
722            if let Some(version_load) = load
723                .versions
724                .iter_mut()
725                .find(|v| matches!(v.state, LoadState::WaitingForData))
726            {
727                version_load.state = LoadState::RequestingData;
728                let artifact_id = version_load.metadata.as_ref().unwrap().id;
729                assets_to_request.push(DataRequest {
730                    tx: self.responses.data_tx.clone(),
731                    asset_id: load.asset_id,
732                    artifact_id,
733                    request_data: Some((handle, version_load.version)),
734                });
735            }
736        }
737        if !assets_to_request.is_empty() {
738            io.get_artifacts(assets_to_request);
739        }
740    }
741
742    fn process_load_ops(&self, asset_storage: &dyn AssetStorage) {
743        while let Ok(op) = self.op_rx.try_recv() {
744            match op {
745                HandleOp::Error(_handle, _version, err) => {
746                    panic!("load error {}", err);
747                }
748                HandleOp::Complete(handle, version) => {
749                    let mut load = self
750                        .load_states
751                        .get_mut(&handle)
752                        .expect("load op completed but load state does not exist");
753                    let load_version = load
754                        .versions
755                        .iter_mut()
756                        .find(|v| v.version == version)
757                        .expect("loade op completed but version not found in load");
758                    if load_version.auto_commit {
759                        commit_asset(handle, load.value_mut(), version, asset_storage);
760                    } else {
761                        load_version.state = LoadState::LoadedUncommitted;
762                    }
763                }
764                HandleOp::Drop(handle, version) => {
765                    panic!(
766                        "load op dropped without calling complete/error, handle {:?} version {}",
767                        handle, version
768                    )
769                }
770            }
771        }
772    }
773
774    #[cfg(feature = "invalidate_path")]
775    fn process_path_changes(&mut self) {
776        let mut changes = HashSet::new();
777        while let Ok(path) = self.invalidate_path_rx.try_recv() {
778            log::trace!("process_path_changes invalidate_path_rx path: {:?}", path);
779            changes.insert(path);
780        }
781        for entry in self.indirect_to_load.iter() {
782            let indirect_id = entry.key();
783            let handle = entry.value();
784            let cleaned_path =
785                distill_core::utils::canonicalize_path(&PathBuf::from(indirect_id.path()));
786            for change in &changes {
787                if change == &cleaned_path {
788                    if let Some(mut indirect) = self.indirect_states.get_mut(&handle) {
789                        indirect.pending_reresolve = true;
790                    }
791                }
792            }
793        }
794    }
795
796    /// Checks for changed assets that need to be reloaded or unloaded
797    fn process_asset_changes(&mut self, asset_storage: &dyn AssetStorage) {
798        if self.pending_reloads.is_empty() {
799            // if we have no pending hot reloads, poll for new changes
800            let mut changes = HashSet::new();
801            while let Ok(asset) = self.invalidate_rx.try_recv() {
802                log::trace!("process_asset_changes invalidate_rx asset: {:?}", asset);
803                changes.insert(asset);
804            }
805            if !changes.is_empty() {
806                // TODO handle deleted assets
807                for asset_id in &changes {
808                    let current_version = self
809                        .uuid_to_load
810                        .get(asset_id)
811                        .map(|l| *l)
812                        .and_then(|load_handle| {
813                            self.load_states
814                                .get(&load_handle)
815                                .map(|load| (load_handle, load))
816                        })
817                        .map(|(load_handle, load)| {
818                            (load_handle, load.versions.iter().map(|v| v.version).max())
819                        });
820                    if let Some((handle, Some(current_version))) = current_version {
821                        let mut load = self
822                            .load_states
823                            .get_mut(&handle)
824                            .expect("load state should exist for pending reload");
825                        load.pending_reload = true;
826                        self.pending_reloads.push(PendingReload {
827                            asset_id: *asset_id,
828                            version_before: current_version,
829                        });
830                    }
831                }
832            }
833        } else {
834            let is_finished = self.pending_reloads.iter().all(|reload| {
835                self.uuid_to_load
836                    .get(&reload.asset_id)
837                    .as_ref()
838                    .and_then(|load_handle| self.load_states.get(load_handle))
839                    .map(|load| {
840                        // The reload is considered finished if we have a loaded asset with a version
841                        // that is higher than the version observed when the reload was requested
842                        load.versions.iter().any(|v| {
843                            matches!(v.state, LoadState::Loaded | LoadState::LoadedUncommitted)
844                                && v.version > reload.version_before
845                        })
846                    })
847                    // A pending reload for something that is not supposed to be loaded is considered finished.
848                    // The asset could have been unloaded by being unreferenced.
849                    .unwrap_or(true)
850            });
851            log::trace!("reload unfinished");
852            if is_finished {
853                for reload in &self.pending_reloads {
854                    if let Some((load_handle, mut load)) = self
855                        .uuid_to_load
856                        .get_mut(&reload.asset_id)
857                        .as_ref()
858                        .and_then(|load_handle| {
859                            self.load_states
860                                .get_mut(load_handle)
861                                .map(|load| (load_handle, load))
862                        })
863                    {
864                        if let Some(version_to_commit) = load
865                            .versions
866                            .iter()
867                            .find(|v| matches!(v.state, LoadState::LoadedUncommitted))
868                            .map(|v| v.version)
869                        {
870                            log::trace!("committing version");
871                            // Commit reloaded asset
872                            commit_asset(
873                                **load_handle,
874                                load.value_mut(),
875                                version_to_commit,
876                                asset_storage,
877                            );
878                        }
879                    }
880                }
881                self.pending_reloads.clear();
882            }
883        }
884    }
885
886    fn process_indirect_states(&self) {
887        for mut entry in self.indirect_states.iter_mut() {
888            let has_refs = entry.refs.load(Ordering::Relaxed) > 0;
889            let new_state = match entry.state {
890                IndirectHandleState::None if has_refs => IndirectHandleState::WaitingForMetadata,
891                IndirectHandleState::Resolved if entry.pending_reresolve => {
892                    entry.pending_reresolve = false;
893                    IndirectHandleState::WaitingForMetadata
894                }
895                state => state,
896            };
897            entry.state = new_state;
898        }
899    }
900
901    fn process_resolve_requests(&self, io: &mut dyn LoaderIO, resolver: &dyn IndirectionResolver) {
902        while let Ok(response) = self.responses.resolve_rx.try_recv() {
903            let result = response.0;
904            let id = response.1;
905            let load_handle = response.2;
906            let mut state = self
907                .indirect_states
908                .get_mut(&load_handle)
909                .expect("indirect state did not exist when resolve request completed");
910            match result {
911                Ok(candidates) => {
912                    let num_refs = state.refs.load(Ordering::Relaxed);
913                    let new_uuid = resolver.resolve(&id, candidates);
914                    if let Some(existing_uuid) = state.resolved_uuid {
915                        let uuid_handle = self.get_or_insert(existing_uuid);
916                        self.remove_refs(uuid_handle, num_refs);
917                    }
918                    if let Some(new_uuid) = new_uuid {
919                        let uuid_handle = self.get_or_insert(new_uuid);
920                        self.add_refs(new_uuid, num_refs);
921                        self.indirect_table.0.insert(load_handle, uuid_handle);
922                    } else {
923                        self.indirect_table.0.remove(&load_handle);
924                    }
925                    state.resolved_uuid = new_uuid;
926                    state.state = IndirectHandleState::Resolved;
927                }
928                Err(err) => {
929                    error!("resolve request failed for id {:?}: {}", id, err);
930                    state.state = IndirectHandleState::None;
931                }
932            }
933        }
934        let mut assets_to_request = Vec::new();
935        for mut load in self.indirect_states.iter_mut() {
936            if let IndirectHandleState::WaitingForMetadata = load.state {
937                load.state = IndirectHandleState::RequestingMetadata;
938                assets_to_request.push(ResolveRequest {
939                    tx: self.responses.resolve_tx.clone(),
940                    id: Some((load.id.clone(), *load.key())),
941                });
942            }
943        }
944        if !assets_to_request.is_empty() {
945            io.get_asset_candidates(assets_to_request);
946        }
947    }
948
949    pub fn invalidate_assets(&self, assets: &[AssetUuid]) {
950        for asset in assets {
951            let _ = self.invalidate_tx.send(*asset);
952        }
953    }
954
955    #[cfg(feature = "invalidate_path")]
956    pub fn invalidate_paths(&self, paths: &[PathBuf]) {
957        for path in paths {
958            let _ = self.invalidate_path_tx.send(path.clone());
959        }
960    }
961}
962
963/// Loads and tracks lifetimes of asset data.
964pub struct Loader {
965    io: Box<dyn LoaderIO>,
966    data: LoaderState,
967}
968
969impl LoaderInfoProvider for LoaderState {
970    fn get_load_handle(&self, id: &AssetRef) -> Option<LoadHandle> {
971        self.uuid_to_load.get(id.expect_uuid()).map(|l| *l)
972    }
973
974    fn get_asset_id(&self, load: LoadHandle) -> Option<AssetUuid> {
975        self.load_states.get(&load).map(|l| l.asset_id)
976    }
977}
978
979impl Loader {
980    pub fn new(io: Box<dyn LoaderIO>) -> Loader {
981        Self::new_with_handle_allocator(io, Arc::new(AtomicHandleAllocator::default()))
982    }
983
984    pub fn new_with_handle_allocator(
985        io: Box<dyn LoaderIO>,
986        handle_allocator: Arc<dyn HandleAllocator>,
987    ) -> Loader {
988        let (op_tx, op_rx) = unbounded();
989        let (invalidate_tx, invalidate_rx) = unbounded();
990        #[cfg(feature = "invalidate_path")]
991        let (invalidate_path_tx, invalidate_path_rx) = unbounded();
992        let (metadata_tx, metadata_rx) = unbounded();
993        let (data_tx, data_rx) = unbounded();
994        let (resolve_tx, resolve_rx) = unbounded();
995        Loader {
996            data: LoaderState {
997                handle_allocator,
998                load_states: DashMap::default(),
999                uuid_to_load: DashMap::default(),
1000                op_rx,
1001                op_tx,
1002                invalidate_rx,
1003                invalidate_tx,
1004                #[cfg(feature = "invalidate_path")]
1005                invalidate_path_rx,
1006                #[cfg(feature = "invalidate_path")]
1007                invalidate_path_tx,
1008                pending_reloads: Vec::new(),
1009                indirect_states: DashMap::new(),
1010                indirect_to_load: DashMap::new(),
1011                indirect_table: IndirectionTable(Arc::new(DashMap::new())),
1012                responses: IORequestChannels {
1013                    metadata_rx,
1014                    metadata_tx,
1015                    data_tx,
1016                    data_rx,
1017                    resolve_tx,
1018                    resolve_rx,
1019                },
1020            },
1021            io,
1022        }
1023    }
1024
1025    pub fn with_serde_context<R>(&self, tx: &Sender<RefOp>, mut f: impl FnMut() -> R) -> R {
1026        let mut result = None;
1027        self.io.with_runtime(&mut |runtime| {
1028            result =
1029                Some(runtime.block_on(SerdeContext::with(&self.data, tx.clone(), async { f() })));
1030        });
1031        result.unwrap()
1032    }
1033
1034    /// Returns the load handle for the asset with the given UUID, if present.
1035    ///
1036    /// This will only return `Some(..)` if there has been a previous call to [`Loader::add_ref`].
1037    ///
1038    /// # Parameters
1039    ///
1040    /// * `id`: UUID of the asset.
1041    pub fn get_load(&self, id: AssetUuid) -> Option<LoadHandle> {
1042        self.data.uuid_to_load.get(&id).map(|l| *l)
1043    }
1044
1045    /// Returns information about the load of an asset.
1046    ///
1047    /// **Note:** The information is true at the time the `LoadInfo` is retrieved. The actual number
1048    /// of references may change.
1049    ///
1050    /// # Parameters
1051    ///
1052    /// * `load_handle`: ID allocated by `Loader` to track loading of the asset.
1053    pub fn get_load_info(&self, load: LoadHandle) -> Option<LoadInfo> {
1054        self.data.get_load_info(load)
1055    }
1056
1057    /// Returns handles to all active asset loads.
1058    pub fn get_active_loads(&self) -> Vec<LoadHandle> {
1059        self.data
1060            .load_states
1061            .iter()
1062            .filter(|v| v.value().refs.load(Ordering::Relaxed) > 0)
1063            .map(|l| *l.key())
1064            .collect()
1065    }
1066
1067    /// Returns the asset load status.
1068    ///
1069    /// # Parameters
1070    ///
1071    /// * `load`: ID allocated by `Loader` to track loading of the asset.
1072    pub fn get_load_status(&self, load: LoadHandle) -> LoadStatus {
1073        let load = if load.is_indirect() {
1074            if let Some(load) = self.data.indirect_table.resolve(load) {
1075                load
1076            } else {
1077                return LoadStatus::Unresolved;
1078            }
1079        } else {
1080            load
1081        };
1082        if let Some(load) = self.data.load_states.get(&load) {
1083            let version = load.versions.iter().max_by_key(|v| v.version);
1084            version
1085                .map(|v| match v.state {
1086                    LoadState::None => {
1087                        if load.refs.load(Ordering::Relaxed) > 0 {
1088                            LoadStatus::Loading
1089                        } else {
1090                            LoadStatus::NotRequested
1091                        }
1092                    }
1093                    LoadState::Loaded => LoadStatus::Loaded,
1094                    LoadState::UnloadRequested | LoadState::Unloading => LoadStatus::Unloading,
1095                    _ => LoadStatus::Loading,
1096                })
1097                .unwrap_or(LoadStatus::NotRequested)
1098        } else {
1099            LoadStatus::NotRequested
1100        }
1101    }
1102
1103    pub fn add_ref_handle(&self, handle: LoadHandle) {
1104        self.data.add_ref_handle(handle, 1);
1105    }
1106
1107    /// Adds a reference to an asset and returns its [`LoadHandle`].
1108    ///
1109    /// If the asset is already loaded, this returns the existing [`LoadHandle`]. If it is not
1110    /// loaded, this allocates a new [`LoadHandle`] and returns that.
1111    ///
1112    /// # Parameters
1113    ///
1114    /// * `id`: UUID of the asset.
1115    pub fn add_ref<U: Into<AssetUuid>>(&self, id: U) -> LoadHandle {
1116        self.data.add_refs(id.into(), 1)
1117    }
1118
1119    /// Adds a reference to an indirect id and returns its [`LoadHandle`] with [`LoadHandle::is_indirect`] set to `true`.
1120    ///
1121    /// # Parameters
1122    ///
1123    /// * `id`: IndirectIdentifier for the load.
1124    pub fn add_ref_indirect(&self, id: IndirectIdentifier) -> LoadHandle {
1125        self.data.add_ref_indirect(id)
1126    }
1127
1128    /// Returns the [`AssetTypeId`] for the currently loaded asset of the provided load handle.
1129    ///
1130    /// # Parameters
1131    ///
1132    /// * `load`: ID allocated by `Loader` to track loading of the asset.
1133    pub fn get_asset_type(&self, load: LoadHandle) -> Option<AssetTypeId> {
1134        self.data.get_asset(load)
1135    }
1136
1137    /// Removes a reference to an asset.
1138    ///
1139    /// # Parameters
1140    ///
1141    /// * `load_handle`: ID allocated by `Loader` to track loading of the asset.
1142    pub fn remove_ref(&self, load: LoadHandle) {
1143        self.data.remove_refs(load, 1);
1144    }
1145
1146    /// Processes pending load operations.
1147    ///
1148    /// Load operations include:
1149    ///
1150    /// * Requesting asset metadata.
1151    /// * Requesting asset data.
1152    /// * Committing completed [`AssetLoadOp`]s.
1153    /// * Updating the [`LoadStatus`]es of assets.
1154    /// * Resolving active [`IndirectIdentifier`]s.
1155    ///
1156    /// # Parameters
1157    ///
1158    /// * `asset_storage`: Storage for all assets of all asset types.
1159    pub fn process(
1160        &mut self,
1161        asset_storage: &dyn AssetStorage,
1162        resolver: &dyn IndirectionResolver,
1163    ) -> Result<()> {
1164        self.io.tick(&mut self.data);
1165        self.data.process_asset_changes(asset_storage);
1166        self.data.process_path_changes();
1167        self.data.process_load_ops(asset_storage);
1168        self.data.process_load_states(asset_storage);
1169        self.data.process_indirect_states();
1170        self.data.process_metadata_requests(self.io.as_mut());
1171        self.data
1172            .process_resolve_requests(self.io.as_mut(), resolver);
1173        self.data
1174            .process_data_requests(asset_storage, self.io.as_mut());
1175        Ok(())
1176    }
1177
1178    /// Returns a reference to the loader's [`IndirectionTable`].
1179    ///
1180    /// When a user fetches an asset by LoadHandle, implementors of [`AssetStorage`]
1181    /// should resolve LoadHandles where [`LoadHandle::is_indirect`] returns true by using [`IndirectionTable::resolve`].
1182    /// IndirectionTable is Send + Sync + Clone so that it can be retrieved once at startup,
1183    /// then stored in implementors of [`AssetStorage`].
1184    pub fn indirection_table(&self) -> IndirectionTable {
1185        self.data.indirect_table.clone()
1186    }
1187
1188    /// Invalidates the data & metadata of the provided asset IDs.
1189    ///
1190    /// This causes the asset data to be reloaded.
1191    pub fn invalidate_assets(&self, assets: &[AssetUuid]) {
1192        self.data.invalidate_assets(assets);
1193    }
1194
1195    /// Invalidates indirect identifiers that may match the provided paths.
1196    ///
1197    /// This may cause indirect handles to resolve to new assets.
1198    pub fn invalidate_paths(&self, paths: &[PathBuf]) {
1199        self.data.invalidate_paths(paths);
1200    }
1201}
1202
1203fn commit_asset(
1204    handle: LoadHandle,
1205    load: &mut AssetLoad,
1206    version: u32,
1207    asset_storage: &dyn AssetStorage,
1208) {
1209    let version_load = load
1210        .versions
1211        .iter_mut()
1212        .find(|v| v.version == version)
1213        .expect("expected version in load when committing asset");
1214    assert!(
1215        LoadState::LoadingAsset == version_load.state
1216            || LoadState::LoadedUncommitted == version_load.state
1217    );
1218    let asset_type = version_load
1219        .asset_type
1220        .as_ref()
1221        .expect("in LoadingAsset state but asset_type is None");
1222    asset_storage.commit_asset_version(asset_type, handle, version_load.version);
1223    version_load.state = LoadState::Loaded;
1224    for version_load in load.versions.iter_mut() {
1225        if version_load.version != version {
1226            assert_eq!(LoadState::Loaded, version_load.state);
1227            version_load.state = LoadState::UnloadRequested;
1228        }
1229    }
1230}