use crate::{
database::path::{AssetPath, AssetPathStatic},
fetch::{AssetAwaitsAsyncFetch, AssetFetch},
};
use anput::{
bundle::DynamicBundle, third_party::intuicio_data::managed::value::ManagedValue, world::World,
};
use moirai::{
job::{JobHandle, JobLocation, JobResult},
jobs::Jobs,
};
use std::{
collections::HashMap,
error::Error,
sync::{Arc, RwLock},
};
pub struct DeferredAssetFetch<Fetch: AssetFetch> {
fetch: Arc<RwLock<Fetch>>,
jobs: ManagedValue<Jobs>,
#[allow(clippy::type_complexity)]
job_handles: RwLock<HashMap<AssetPathStatic, JobHandle<Result<DynamicBundle, String>>>>,
}
impl<Fetch: AssetFetch> DeferredAssetFetch<Fetch> {
pub fn new(fetch: Fetch) -> Self {
Self {
fetch: Arc::new(RwLock::new(fetch)),
jobs: ManagedValue::Owned(Default::default()),
job_handles: Default::default(),
}
}
pub fn jobs(mut self, jobs: impl Into<ManagedValue<Jobs>>) -> Self {
self.jobs = jobs.into();
self
}
}
impl<Fetch: AssetFetch> AssetFetch for DeferredAssetFetch<Fetch> {
fn load_bytes(&self, path: AssetPath) -> Result<DynamicBundle, Box<dyn Error>> {
let path = path.into_static();
let path2 = path.clone();
let fetch = self.fetch.clone();
let job = async move {
fetch.read().map_err(|error| {
format!(
"Failed to get read access to inner fetch engine in async fetch for asset: `{path}`. Error: {error}"
)
})?.load_bytes(path.clone()).map_err(|error| {
format!(
"Failed async fetch for asset: `{path}`. Error: {error}"
)
})
};
let jobs = self.jobs.read().ok_or_else(|| {
format!("Failed to get read access to jobs runner in async fetch for asset: `{path2}`")
})?;
let handle = jobs.spawn(JobLocation::other_than_current_thread(), job);
self.job_handles
.write()
.map_err(|error| format!("{error}"))?
.insert(path2, handle);
let mut bundle = DynamicBundle::default();
let _ = bundle.add_component(AssetAwaitsAsyncFetch);
Ok(bundle)
}
fn maintain(&mut self, storage: &mut World) -> Result<(), Box<dyn Error>> {
if let ManagedValue::Owned(jobs) = &self.jobs {
jobs.read()
.ok_or("Failed to get read access to jobs runner in deferred fetch maintainance.")?
.run_local();
}
self.fetch
.write()
.map_err(|error| format!("Failed deferred fetch engine maintainance. Error: {error}"))?
.maintain(storage)?;
let complete = self
.job_handles
.read()
.map_err(|error| format!("{error}"))?
.iter()
.filter(|(_, handle)| handle.is_done())
.map(|(path, _)| path.clone())
.collect::<Vec<_>>();
for path in complete.into_iter().rev() {
let handle = self
.job_handles
.write()
.map_err(|error| format!("{error}"))?
.remove(&path)
.unwrap();
match handle.take() {
JobResult::Completed(result) => {
if let Some(entity) = storage.find_by::<true, _>(&path) {
storage.remove::<(AssetAwaitsAsyncFetch,)>(entity)?;
}
let result = result.map_err(|error| {
format!("Async fetch execution of `{path}` asset panicked! Error: {error}")
})?;
if let Some(entity) = storage.find_by::<true, _>(&path) {
storage.insert(entity, result)?;
}
}
JobResult::Cancelled | JobResult::Consumed => {
if let Some(entity) = storage.find_by::<true, _>(&path) {
storage.remove::<(AssetAwaitsAsyncFetch,)>(entity)?;
}
return Err(format!(
"Async fetch execution of `{path}` asset failed with undefined error!"
)
.into());
}
JobResult::InProgress => {
self.job_handles
.write()
.map_err(|error| format!("{error}"))?
.insert(path.clone(), handle);
}
};
}
Ok(())
}
}