use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use crossbeam_channel::{unbounded, Receiver, Sender};
use dashmap::DashMap;
use distill_core::{ArtifactMetadata, AssetMetadata, AssetRef, AssetTypeId, AssetUuid};
use log::error;
use crate::{
handle::{RefOp, SerdeContext},
io::{DataRequest, LoaderIO, MetadataRequest, MetadataRequestResult, ResolveRequest},
storage::{
AssetLoadOp, AssetStorage, AtomicHandleAllocator, HandleAllocator, HandleOp,
IndirectIdentifier, IndirectionResolver, IndirectionTable, LoadHandle, LoadInfo,
LoadStatus, LoaderInfoProvider,
},
Result,
};
#[derive(Copy, Clone, PartialEq, Debug)]
enum LoadState {
None,
WaitingForMetadata,
RequestingMetadata,
RequestDependencies,
WaitingForDependencies,
WaitingForData,
RequestingData,
LoadingAsset,
LoadedUncommitted,
Loaded,
UnloadRequested,
Unloading,
}
#[derive(Copy, Clone, PartialEq, Debug)]
enum IndirectHandleState {
None,
WaitingForMetadata,
RequestingMetadata,
Resolved,
}
#[derive(Debug)]
struct IndirectLoad {
id: IndirectIdentifier,
state: IndirectHandleState,
resolved_uuid: Option<AssetUuid>,
refs: AtomicUsize,
pending_reresolve: bool,
}
#[derive(Debug, Clone)]
struct AssetVersionLoad {
state: LoadState,
metadata: Option<ArtifactMetadata>,
asset_metadata: Option<AssetMetadata>,
asset_type: Option<AssetTypeId>,
auto_commit: bool,
version: u32,
}
#[derive(Debug)]
struct AssetLoad {
asset_id: AssetUuid,
last_state_change_instant: std::time::Instant,
refs: AtomicUsize,
versions: Vec<AssetVersionLoad>,
version_counter: u32,
pending_reload: bool,
}
struct PendingReload {
asset_id: AssetUuid,
version_before: u32,
}
pub struct LoaderState {
handle_allocator: Arc<dyn HandleAllocator>,
load_states: DashMap<LoadHandle, AssetLoad>,
uuid_to_load: DashMap<AssetUuid, LoadHandle>,
op_tx: Sender<HandleOp>,
op_rx: Receiver<HandleOp>,
invalidate_tx: Sender<AssetUuid>,
invalidate_rx: Receiver<AssetUuid>,
#[cfg(feature = "invalidate_path")]
invalidate_path_tx: Sender<PathBuf>,
#[cfg(feature = "invalidate_path")]
invalidate_path_rx: Receiver<PathBuf>,
pending_reloads: Vec<PendingReload>,
indirect_states: DashMap<LoadHandle, IndirectLoad>,
indirect_to_load: DashMap<IndirectIdentifier, LoadHandle>,
indirect_table: IndirectionTable,
responses: IORequestChannels,
}
#[allow(clippy::type_complexity)]
struct IORequestChannels {
data_rx: Receiver<(Result<Vec<u8>>, LoadHandle, u32)>,
data_tx: Sender<(Result<Vec<u8>>, LoadHandle, u32)>,
metadata_rx: Receiver<(
Result<Vec<MetadataRequestResult>>,
HashMap<AssetUuid, (LoadHandle, u32)>,
)>,
metadata_tx: Sender<(
Result<Vec<MetadataRequestResult>>,
HashMap<AssetUuid, (LoadHandle, u32)>,
)>,
resolve_rx: Receiver<(
Result<Vec<(PathBuf, Vec<AssetMetadata>)>>,
IndirectIdentifier,
LoadHandle,
)>,
resolve_tx: Sender<(
Result<Vec<(PathBuf, Vec<AssetMetadata>)>>,
IndirectIdentifier,
LoadHandle,
)>,
}
struct AssetLoadResult {
new_state: LoadState,
asset_type: Option<AssetTypeId>,
}
impl AssetLoadResult {
pub fn from_state(new_state: LoadState) -> Self {
Self {
new_state,
asset_type: None,
}
}
}
impl LoaderState {
fn get_or_insert_indirect(&self, id: IndirectIdentifier) -> LoadHandle {
if let Some(handle) = self.indirect_to_load.get(&id) {
*handle
} else {
let new_handle = self.handle_allocator.alloc();
let new_handle = new_handle.set_indirect();
log::trace!(
"Inserting indirect load for {:?} load handle {:?}",
id,
new_handle
);
self.indirect_states.insert(
new_handle,
IndirectLoad {
id: id.clone(),
state: IndirectHandleState::None,
resolved_uuid: None,
refs: AtomicUsize::new(0),
pending_reresolve: false,
},
);
self.indirect_to_load.insert(id, new_handle);
new_handle
}
}
fn get_or_insert(&self, id: AssetUuid) -> LoadHandle {
let handle = *self.uuid_to_load.entry(id).or_insert_with(|| {
let new_handle = self.handle_allocator.alloc();
log::trace!(
"Inserting load state for {:?} load handle {:?}",
id,
new_handle
);
self.load_states.insert(
new_handle,
AssetLoad {
asset_id: id,
versions: vec![AssetVersionLoad {
asset_type: None,
auto_commit: true,
metadata: None,
asset_metadata: None,
state: LoadState::None,
version: 1,
}],
version_counter: 1,
last_state_change_instant: std::time::Instant::now(),
refs: AtomicUsize::new(0),
pending_reload: false,
},
);
new_handle
});
handle
}
fn add_refs(&self, id: AssetUuid, num_refs: usize) -> LoadHandle {
let handle = self.get_or_insert(id);
self.add_ref_handle(handle, num_refs);
handle
}
fn add_ref_handle(&self, handle: LoadHandle, num_refs: usize) {
if handle.is_indirect() {
let state = self.indirect_states.get(&handle).unwrap();
if let Some(uuid) = state.resolved_uuid {
self.add_refs(uuid, 1);
}
state.refs.fetch_add(1, Ordering::Relaxed);
} else {
self.load_states
.get(&handle)
.map(|h| h.refs.fetch_add(num_refs, Ordering::Relaxed));
}
}
fn get_asset(&self, load: LoadHandle) -> Option<AssetTypeId> {
let load = if load.is_indirect() {
self.indirect_table.resolve(load)?
} else {
load
};
self.load_states
.get(&load)
.map(|load| {
load.versions
.iter()
.find(|version| matches!(version.state, LoadState::Loaded))
.map(|version| version.asset_type)
.unwrap_or(None)
})
.unwrap_or(None)
}
fn get_load_info(&self, load: LoadHandle) -> Option<LoadInfo> {
let load = if load.is_indirect() {
self.indirect_table.resolve(load)?
} else {
load
};
self.load_states.get(&load).map(|s| {
let (path, file_name, asset_name) = s
.versions
.iter()
.filter(|v| v.metadata.is_some())
.max_by_key(|v| v.version)
.and_then(|v| {
v.asset_metadata.as_ref().map(|a| {
let path = a
.search_tags
.iter()
.find(|(tag, _)| tag == "path")
.and_then(|(_, v)| v.clone());
let file_name = a
.search_tags
.iter()
.find(|(tag, _)| tag == "file_name")
.and_then(|(_, v)| v.clone());
let asset_name = a
.search_tags
.iter()
.find(|(tag, _)| tag == "name")
.and_then(|(_, v)| v.clone());
(path, file_name, asset_name)
})
})
.unwrap_or((None, None, None));
LoadInfo {
asset_id: s.asset_id,
refs: s.refs.load(Ordering::Relaxed) as u32,
path,
file_name,
asset_name,
}
})
}
fn remove_refs(&self, load: LoadHandle, num_refs: usize) {
if load.is_indirect() {
if let Some(state) = self.indirect_states.get_mut(&load) {
if let Some(uuid) = state.resolved_uuid {
let uuid_handle = self.get_or_insert(uuid);
self.remove_refs(uuid_handle, num_refs);
}
assert!(
state.refs.fetch_sub(num_refs, Ordering::Relaxed) < usize::MAX - num_refs,
"refcount underflow for indirect load {:?}:{:?}",
load.0,
*state
);
}
} else {
self.load_states.get(&load).map(|h| {
assert!(
h.refs.fetch_sub(num_refs, Ordering::Relaxed) < usize::MAX - num_refs,
"refcount underflow for asset {:?}",
self.get_load_info(load),
);
});
}
}
fn add_ref_indirect(&self, id: IndirectIdentifier) -> LoadHandle {
let handle = self.get_or_insert_indirect(id);
self.add_ref_handle(handle, 1);
handle
}
fn process_load_states(&self, asset_storage: &dyn AssetStorage) {
let mut to_remove = Vec::new();
let keys: Vec<_> = self.load_states.iter().map(|x| *x.key()).collect();
for key in keys {
let mut versions_to_remove = Vec::new();
let mut entry = self.load_states.get_mut(&key).unwrap();
let load = entry.value_mut();
let has_refs = load.refs.load(Ordering::Relaxed) > 0;
if !has_refs && load.versions.is_empty() {
to_remove.push(key);
} else {
if has_refs && load.pending_reload {
if load
.versions
.iter()
.all(|v| matches!(v.state, LoadState::Loaded))
{
load.version_counter += 1;
let new_version = load.version_counter;
load.versions.push(AssetVersionLoad {
asset_type: None,
metadata: None,
asset_metadata: None,
auto_commit: false,
state: LoadState::None,
version: new_version,
});
load.pending_reload = false;
}
}
let last_state_change_instant = load.last_state_change_instant;
let mut versions = load.versions.clone();
drop(entry);
let mut state_change = false;
let mut log_old_state = None;
let mut log_new_state = None;
let newest_version = versions.iter().map(|v| v.version).max().unwrap_or(0);
for version_load in &mut versions {
let new_state = match version_load.state {
LoadState::None if has_refs => {
if newest_version > version_load.version {
versions_to_remove.push(version_load.version);
LoadState::None
} else if version_load.metadata.is_some() {
LoadState::RequestDependencies
} else {
LoadState::WaitingForMetadata
}
}
LoadState::None => {
if newest_version > version_load.version {
versions_to_remove.push(version_load.version);
}
LoadState::None
}
LoadState::WaitingForMetadata => {
if version_load.metadata.is_some() {
LoadState::RequestDependencies
} else {
LoadState::WaitingForMetadata
}
}
LoadState::RequestingMetadata => LoadState::RequestingMetadata,
LoadState::RequestDependencies => {
if let Some(artifact) = version_load.metadata.as_ref() {
for dependency_asset_id in &artifact.load_deps {
if let AssetRef::Uuid(uuid) = dependency_asset_id {
self.add_refs(*uuid, 1);
}
}
}
LoadState::WaitingForDependencies
}
LoadState::WaitingForDependencies => {
let asset_metadata = version_load.metadata.as_ref().unwrap();
let asset_dependencies_committed =
asset_metadata.load_deps.iter().all(|dependency_asset_id| {
self.uuid_to_load
.get(dependency_asset_id.expect_uuid())
.as_ref()
.and_then(|dep_load_handle| {
self.load_states.get(dep_load_handle)
})
.map(|dep_load| {
dep_load.versions.iter().all(|v| {
matches!(
v.state,
LoadState::Loaded
| LoadState::LoadedUncommitted
)
})
})
.unwrap_or(false)
});
if asset_dependencies_committed {
LoadState::WaitingForData
} else {
LoadState::WaitingForDependencies
}
}
LoadState::WaitingForData => LoadState::WaitingForData,
LoadState::RequestingData => LoadState::RequestingData,
LoadState::LoadingAsset => LoadState::LoadingAsset,
LoadState::LoadedUncommitted => LoadState::LoadedUncommitted,
LoadState::Loaded => {
if !has_refs {
LoadState::UnloadRequested
} else {
LoadState::Loaded
}
}
LoadState::UnloadRequested => {
if let Some(asset_type) = version_load.asset_type.take() {
asset_storage.free(&asset_type, key, version_load.version);
}
if let Some(asset_metadata) = version_load.metadata.as_ref() {
asset_metadata
.load_deps
.iter()
.for_each(|dependency_asset_id| {
let uuid = dependency_asset_id.expect_uuid();
let dependency_load_handle =
self.uuid_to_load.get(uuid).unwrap_or_else(|| {
panic!(
"Expected load handle to exist for asset `{:?}`.",
uuid
)
});
log::debug!("Removing ref from `{:?}`", uuid);
self.remove_refs(*dependency_load_handle, 1)
});
}
LoadState::Unloading
}
LoadState::Unloading => {
LoadState::None
}
};
if version_load.state != new_state {
state_change = true;
log_new_state = Some(new_state);
log_old_state = Some(version_load.state);
version_load.state = new_state;
}
}
let mut entry = self.load_states.get_mut(&key).unwrap();
for version in versions_to_remove {
versions.retain(|v| v.version != version);
}
entry.value_mut().versions = versions;
if state_change {
let time_in_state = std::time::Instant::now()
.duration_since(last_state_change_instant)
.as_secs_f32();
log::debug!(
"{:?} {:?} => {:?} in {}s",
key,
log_old_state.unwrap(),
log_new_state.unwrap(),
time_in_state
);
entry.value_mut().last_state_change_instant = std::time::Instant::now();
} else {
let time_in_state = std::time::Instant::now()
.duration_since(last_state_change_instant)
.as_secs_f32();
log::trace!(
"process_load_states Key: {:?} State: {:?} Time in state: {}",
key,
entry
.value()
.versions
.iter()
.map(|v| format!("{:?}", v.state))
.collect::<Vec<_>>()
.join(", "),
time_in_state
);
}
}
}
for _i in to_remove {
}
}
fn process_metadata_requests(&self, io: &mut dyn LoaderIO) {
while let Ok(mut response) = self.responses.metadata_rx.try_recv() {
let request_data = &mut response.1;
match response.0 {
Ok(metadata_list) => {
for metadata in metadata_list {
let request_data =
request_data.remove(&metadata.artifact_metadata.asset_id);
let load_handle = if let Some((handle, _)) = request_data {
handle
} else {
self.get_or_insert(metadata.artifact_metadata.asset_id)
};
let mut load = self
.load_states
.get_mut(&load_handle)
.expect("uuid in uuid_to_load but not in load_states");
log::trace!(
"received metadata for {:?} after {} secs",
load.asset_id,
std::time::Instant::now()
.duration_since(load.last_state_change_instant)
.as_secs_f32()
);
let version_load = load.versions.iter_mut().find(|v| {
if let Some((_, requesting_version)) = request_data {
v.version == requesting_version
} else {
v.metadata.is_none()
}
});
if let Some(version_load) = version_load {
version_load.metadata = Some(metadata.artifact_metadata);
version_load.asset_metadata = metadata.asset_metadata;
if let LoadState::RequestingMetadata = version_load.state {
version_load.state = LoadState::RequestDependencies
}
} else if request_data.is_some() {
load.version_counter += 1;
let new_version = load.version_counter;
load.versions.push(AssetVersionLoad {
asset_type: None,
auto_commit: true,
metadata: Some(metadata.artifact_metadata),
asset_metadata: metadata.asset_metadata,
state: LoadState::None,
version: new_version,
});
}
}
}
Err(err) => {
error!("metadata request failed: {}", err);
}
}
for (handle, version) in request_data.values() {
let mut load = self
.load_states
.get_mut(&handle)
.expect("load in metadata request but not in load_states");
let version_load = load
.versions
.iter_mut()
.find(|v| v.version == *version)
.expect("load in metadata request but not in load.versions");
if let LoadState::RequestingMetadata = version_load.state {
version_load.state = LoadState::WaitingForMetadata
}
}
}
let mut assets_to_request = HashMap::new();
for mut entry in self.load_states.iter_mut() {
let handle = *entry.key();
let load = entry.value_mut();
for version_load in &mut load.versions {
if let LoadState::WaitingForMetadata = version_load.state {
version_load.state = LoadState::RequestingMetadata;
assets_to_request.insert(load.asset_id, (handle, version_load.version));
}
}
}
if !assets_to_request.is_empty() {
io.get_asset_metadata_with_dependencies(MetadataRequest {
tx: self.responses.metadata_tx.clone(),
requests: Some(assets_to_request),
include_asset_metadata: true, })
}
}
fn process_data_requests(&self, storage: &dyn AssetStorage, io: &mut dyn LoaderIO) {
while let Ok(response) = self.responses.data_rx.try_recv() {
let result = response.0;
let handle = response.1;
let version = response.2;
let load = self
.load_states
.get(&handle)
.expect("load did not exist when data request completed");
let load_result = match result {
Ok(artifact_data) => {
let version_load = load
.versions
.iter()
.find(|v| v.version == version)
.expect("load version did not exist when data request completed");
let artifact_type = version_load.metadata.as_ref().unwrap().type_id;
let asset_id = load.asset_id;
log::trace!("asset data request succeeded for asset {:?}", load.asset_id);
drop(load);
let update_result = storage.update_asset(
self,
&artifact_type,
artifact_data,
response.1,
AssetLoadOp::new(self.op_tx.clone(), handle, version),
response.2,
);
if let Err(storage_error) = update_result {
error!(
"AssetStorage implementor error when updating asset {:?}: {}",
asset_id, storage_error
);
AssetLoadResult::from_state(LoadState::WaitingForData)
} else {
AssetLoadResult {
asset_type: Some(artifact_type),
new_state: LoadState::LoadingAsset,
}
}
}
Err(err) => {
error!(
"asset data request failed for asset {:?}: {}",
load.asset_id, err
);
AssetLoadResult::from_state(LoadState::WaitingForMetadata)
}
};
let mut load = self
.load_states
.get_mut(&response.1)
.expect("load did not exist when data request completed");
let version_load = load
.versions
.iter_mut()
.find(|v| v.version == version)
.expect("load version did not exist when data request completed");
version_load.state = load_result.new_state;
if let Some(asset_type) = load_result.asset_type {
version_load.asset_type = Some(asset_type);
}
}
let mut assets_to_request = Vec::new();
for mut load in self.load_states.iter_mut() {
let handle = *load.key();
let load = load.value_mut();
if let Some(version_load) = load
.versions
.iter_mut()
.find(|v| matches!(v.state, LoadState::WaitingForData))
{
version_load.state = LoadState::RequestingData;
let artifact_id = version_load.metadata.as_ref().unwrap().id;
assets_to_request.push(DataRequest {
tx: self.responses.data_tx.clone(),
asset_id: load.asset_id,
artifact_id,
request_data: Some((handle, version_load.version)),
});
}
}
if !assets_to_request.is_empty() {
io.get_artifacts(assets_to_request);
}
}
fn process_load_ops(&self, asset_storage: &dyn AssetStorage) {
while let Ok(op) = self.op_rx.try_recv() {
match op {
HandleOp::Error(_handle, _version, err) => {
panic!("load error {}", err);
}
HandleOp::Complete(handle, version) => {
let mut load = self
.load_states
.get_mut(&handle)
.expect("load op completed but load state does not exist");
let load_version = load
.versions
.iter_mut()
.find(|v| v.version == version)
.expect("loade op completed but version not found in load");
if load_version.auto_commit {
commit_asset(handle, load.value_mut(), version, asset_storage);
} else {
load_version.state = LoadState::LoadedUncommitted;
}
}
HandleOp::Drop(handle, version) => {
panic!(
"load op dropped without calling complete/error, handle {:?} version {}",
handle, version
)
}
}
}
}
#[cfg(feature = "invalidate_path")]
fn process_path_changes(&mut self) {
let mut changes = HashSet::new();
while let Ok(path) = self.invalidate_path_rx.try_recv() {
log::trace!("process_path_changes invalidate_path_rx path: {:?}", path);
changes.insert(path);
}
for entry in self.indirect_to_load.iter() {
let indirect_id = entry.key();
let handle = entry.value();
let cleaned_path =
distill_core::utils::canonicalize_path(&PathBuf::from(indirect_id.path()));
for change in &changes {
if change == &cleaned_path {
if let Some(mut indirect) = self.indirect_states.get_mut(&handle) {
indirect.pending_reresolve = true;
}
}
}
}
}
fn process_asset_changes(&mut self, asset_storage: &dyn AssetStorage) {
if self.pending_reloads.is_empty() {
let mut changes = HashSet::new();
while let Ok(asset) = self.invalidate_rx.try_recv() {
log::trace!("process_asset_changes invalidate_rx asset: {:?}", asset);
changes.insert(asset);
}
if !changes.is_empty() {
for asset_id in &changes {
let current_version = self
.uuid_to_load
.get(asset_id)
.map(|l| *l)
.and_then(|load_handle| {
self.load_states
.get(&load_handle)
.map(|load| (load_handle, load))
})
.map(|(load_handle, load)| {
(load_handle, load.versions.iter().map(|v| v.version).max())
});
if let Some((handle, Some(current_version))) = current_version {
let mut load = self
.load_states
.get_mut(&handle)
.expect("load state should exist for pending reload");
load.pending_reload = true;
self.pending_reloads.push(PendingReload {
asset_id: *asset_id,
version_before: current_version,
});
}
}
}
} else {
let is_finished = self.pending_reloads.iter().all(|reload| {
self.uuid_to_load
.get(&reload.asset_id)
.as_ref()
.and_then(|load_handle| self.load_states.get(load_handle))
.map(|load| {
load.versions.iter().any(|v| {
matches!(v.state, LoadState::Loaded | LoadState::LoadedUncommitted)
&& v.version > reload.version_before
})
})
.unwrap_or(true)
});
log::trace!("reload unfinished");
if is_finished {
for reload in &self.pending_reloads {
if let Some((load_handle, mut load)) = self
.uuid_to_load
.get_mut(&reload.asset_id)
.as_ref()
.and_then(|load_handle| {
self.load_states
.get_mut(load_handle)
.map(|load| (load_handle, load))
})
{
if let Some(version_to_commit) = load
.versions
.iter()
.find(|v| matches!(v.state, LoadState::LoadedUncommitted))
.map(|v| v.version)
{
log::trace!("committing version");
commit_asset(
**load_handle,
load.value_mut(),
version_to_commit,
asset_storage,
);
}
}
}
self.pending_reloads.clear();
}
}
}
fn process_indirect_states(&self) {
for mut entry in self.indirect_states.iter_mut() {
let has_refs = entry.refs.load(Ordering::Relaxed) > 0;
let new_state = match entry.state {
IndirectHandleState::None if has_refs => IndirectHandleState::WaitingForMetadata,
IndirectHandleState::Resolved if entry.pending_reresolve => {
entry.pending_reresolve = false;
IndirectHandleState::WaitingForMetadata
}
state => state,
};
entry.state = new_state;
}
}
fn process_resolve_requests(&self, io: &mut dyn LoaderIO, resolver: &dyn IndirectionResolver) {
while let Ok(response) = self.responses.resolve_rx.try_recv() {
let result = response.0;
let id = response.1;
let load_handle = response.2;
let mut state = self
.indirect_states
.get_mut(&load_handle)
.expect("indirect state did not exist when resolve request completed");
match result {
Ok(candidates) => {
let num_refs = state.refs.load(Ordering::Relaxed);
let new_uuid = resolver.resolve(&id, candidates);
if let Some(existing_uuid) = state.resolved_uuid {
let uuid_handle = self.get_or_insert(existing_uuid);
self.remove_refs(uuid_handle, num_refs);
}
if let Some(new_uuid) = new_uuid {
let uuid_handle = self.get_or_insert(new_uuid);
self.add_refs(new_uuid, num_refs);
self.indirect_table.0.insert(load_handle, uuid_handle);
} else {
self.indirect_table.0.remove(&load_handle);
}
state.resolved_uuid = new_uuid;
state.state = IndirectHandleState::Resolved;
}
Err(err) => {
error!("resolve request failed for id {:?}: {}", id, err);
state.state = IndirectHandleState::None;
}
}
}
let mut assets_to_request = Vec::new();
for mut load in self.indirect_states.iter_mut() {
if let IndirectHandleState::WaitingForMetadata = load.state {
load.state = IndirectHandleState::RequestingMetadata;
assets_to_request.push(ResolveRequest {
tx: self.responses.resolve_tx.clone(),
id: Some((load.id.clone(), *load.key())),
});
}
}
if !assets_to_request.is_empty() {
io.get_asset_candidates(assets_to_request);
}
}
pub fn invalidate_assets(&self, assets: &[AssetUuid]) {
for asset in assets {
let _ = self.invalidate_tx.send(*asset);
}
}
#[cfg(feature = "invalidate_path")]
pub fn invalidate_paths(&self, paths: &[PathBuf]) {
for path in paths {
let _ = self.invalidate_path_tx.send(path.clone());
}
}
}
pub struct Loader {
io: Box<dyn LoaderIO>,
data: LoaderState,
}
impl LoaderInfoProvider for LoaderState {
fn get_load_handle(&self, id: &AssetRef) -> Option<LoadHandle> {
self.uuid_to_load.get(id.expect_uuid()).map(|l| *l)
}
fn get_asset_id(&self, load: LoadHandle) -> Option<AssetUuid> {
self.load_states.get(&load).map(|l| l.asset_id)
}
}
impl Loader {
pub fn new(io: Box<dyn LoaderIO>) -> Loader {
Self::new_with_handle_allocator(io, Arc::new(AtomicHandleAllocator::default()))
}
pub fn new_with_handle_allocator(
io: Box<dyn LoaderIO>,
handle_allocator: Arc<dyn HandleAllocator>,
) -> Loader {
let (op_tx, op_rx) = unbounded();
let (invalidate_tx, invalidate_rx) = unbounded();
#[cfg(feature = "invalidate_path")]
let (invalidate_path_tx, invalidate_path_rx) = unbounded();
let (metadata_tx, metadata_rx) = unbounded();
let (data_tx, data_rx) = unbounded();
let (resolve_tx, resolve_rx) = unbounded();
Loader {
data: LoaderState {
handle_allocator,
load_states: DashMap::default(),
uuid_to_load: DashMap::default(),
op_rx,
op_tx,
invalidate_rx,
invalidate_tx,
#[cfg(feature = "invalidate_path")]
invalidate_path_rx,
#[cfg(feature = "invalidate_path")]
invalidate_path_tx,
pending_reloads: Vec::new(),
indirect_states: DashMap::new(),
indirect_to_load: DashMap::new(),
indirect_table: IndirectionTable(Arc::new(DashMap::new())),
responses: IORequestChannels {
metadata_rx,
metadata_tx,
data_tx,
data_rx,
resolve_tx,
resolve_rx,
},
},
io,
}
}
pub fn with_serde_context<R>(&self, tx: &Sender<RefOp>, mut f: impl FnMut() -> R) -> R {
let mut result = None;
self.io.with_runtime(&mut |runtime| {
result =
Some(runtime.block_on(SerdeContext::with(&self.data, tx.clone(), async { f() })));
});
result.unwrap()
}
pub fn get_load(&self, id: AssetUuid) -> Option<LoadHandle> {
self.data.uuid_to_load.get(&id).map(|l| *l)
}
pub fn get_load_info(&self, load: LoadHandle) -> Option<LoadInfo> {
self.data.get_load_info(load)
}
pub fn get_active_loads(&self) -> Vec<LoadHandle> {
self.data
.load_states
.iter()
.filter(|v| v.value().refs.load(Ordering::Relaxed) > 0)
.map(|l| *l.key())
.collect()
}
pub fn get_load_status(&self, load: LoadHandle) -> LoadStatus {
let load = if load.is_indirect() {
if let Some(load) = self.data.indirect_table.resolve(load) {
load
} else {
return LoadStatus::Unresolved;
}
} else {
load
};
if let Some(load) = self.data.load_states.get(&load) {
let version = load.versions.iter().max_by_key(|v| v.version);
version
.map(|v| match v.state {
LoadState::None => {
if load.refs.load(Ordering::Relaxed) > 0 {
LoadStatus::Loading
} else {
LoadStatus::NotRequested
}
}
LoadState::Loaded => LoadStatus::Loaded,
LoadState::UnloadRequested | LoadState::Unloading => LoadStatus::Unloading,
_ => LoadStatus::Loading,
})
.unwrap_or(LoadStatus::NotRequested)
} else {
LoadStatus::NotRequested
}
}
pub fn add_ref_handle(&self, handle: LoadHandle) {
self.data.add_ref_handle(handle, 1);
}
pub fn add_ref<U: Into<AssetUuid>>(&self, id: U) -> LoadHandle {
self.data.add_refs(id.into(), 1)
}
pub fn add_ref_indirect(&self, id: IndirectIdentifier) -> LoadHandle {
self.data.add_ref_indirect(id)
}
pub fn get_asset_type(&self, load: LoadHandle) -> Option<AssetTypeId> {
self.data.get_asset(load)
}
pub fn remove_ref(&self, load: LoadHandle) {
self.data.remove_refs(load, 1);
}
pub fn process(
&mut self,
asset_storage: &dyn AssetStorage,
resolver: &dyn IndirectionResolver,
) -> Result<()> {
self.io.tick(&mut self.data);
self.data.process_asset_changes(asset_storage);
self.data.process_path_changes();
self.data.process_load_ops(asset_storage);
self.data.process_load_states(asset_storage);
self.data.process_indirect_states();
self.data.process_metadata_requests(self.io.as_mut());
self.data
.process_resolve_requests(self.io.as_mut(), resolver);
self.data
.process_data_requests(asset_storage, self.io.as_mut());
Ok(())
}
pub fn indirection_table(&self) -> IndirectionTable {
self.data.indirect_table.clone()
}
pub fn invalidate_assets(&self, assets: &[AssetUuid]) {
self.data.invalidate_assets(assets);
}
pub fn invalidate_paths(&self, paths: &[PathBuf]) {
self.data.invalidate_paths(paths);
}
}
fn commit_asset(
handle: LoadHandle,
load: &mut AssetLoad,
version: u32,
asset_storage: &dyn AssetStorage,
) {
let version_load = load
.versions
.iter_mut()
.find(|v| v.version == version)
.expect("expected version in load when committing asset");
assert!(
LoadState::LoadingAsset == version_load.state
|| LoadState::LoadedUncommitted == version_load.state
);
let asset_type = version_load
.asset_type
.as_ref()
.expect("in LoadingAsset state but asset_type is None");
asset_storage.commit_asset_version(asset_type, handle, version_load.version);
version_load.state = LoadState::Loaded;
for version_load in load.versions.iter_mut() {
if version_load.version != version {
assert_eq!(LoadState::Loaded, version_load.state);
version_load.state = LoadState::UnloadRequested;
}
}
}