use std::{
marker::PhantomData,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, Weak,
},
};
use crossbeam::queue::MsQueue;
use hibitset::BitSet;
use rayon::ThreadPool;
use amethyst_core::{
specs::{
prelude::{Component, Read, ReadExpect, System, VecStorage, Write},
storage::UnprotectedStorage,
},
Time,
};
use crate::{
asset::{Asset, FormatValue},
error::{Error, ErrorKind, Result, ResultExt},
progress::Tracker,
reload::{HotReloadStrategy, Reload},
};
#[derive(Debug, Default)]
pub struct Allocator {
store_count: AtomicUsize,
}
impl Allocator {
pub fn next_id(&self) -> usize {
self.store_count.fetch_add(1, Ordering::Relaxed)
}
}
pub struct AssetStorage<A: Asset> {
assets: VecStorage<A>,
bitset: BitSet,
handles: Vec<Handle<A>>,
handle_alloc: Allocator,
pub(crate) processed: Arc<MsQueue<Processed<A>>>,
reloads: Vec<(WeakHandle<A>, Box<dyn Reload<A>>)>,
unused_handles: MsQueue<Handle<A>>,
requeue: Mutex<Vec<Processed<A>>>,
}
pub enum ProcessingState<A>
where
A: Asset,
{
Loading(A::Data),
Loaded(A),
}
impl<A: Asset> AssetStorage<A> {
pub fn new() -> Self {
Default::default()
}
pub(crate) fn allocate(&self) -> Handle<A> {
self.unused_handles
.try_pop()
.unwrap_or_else(|| self.allocate_new())
}
fn allocate_new(&self) -> Handle<A> {
let id = self.handle_alloc.next_id() as u32;
Handle {
id: Arc::new(id),
marker: PhantomData,
}
}
pub fn clone_asset(&mut self, handle: &Handle<A>) -> Option<Handle<A>>
where
A: Clone,
{
if let Some(asset) = self.get(handle).map(A::clone) {
let h = self.allocate();
let id = h.id();
self.bitset.add(id);
self.handles.push(h.clone());
unsafe {
self.assets.insert(id, asset);
}
Some(h)
} else {
None
}
}
pub fn get(&self, handle: &Handle<A>) -> Option<&A> {
if self.bitset.contains(handle.id()) {
Some(unsafe { self.assets.get(handle.id()) })
} else {
None
}
}
pub fn get_mut(&mut self, handle: &Handle<A>) -> Option<&mut A> {
if self.bitset.contains(handle.id()) {
Some(unsafe { self.assets.get_mut(handle.id()) })
} else {
None
}
}
pub fn process<F>(
&mut self,
f: F,
frame_number: u64,
pool: &ThreadPool,
strategy: Option<&HotReloadStrategy>,
) where
F: FnMut(A::Data) -> Result<ProcessingState<A>>,
{
self.process_custom_drop(f, |_| {}, frame_number, pool, strategy);
}
pub fn process_custom_drop<F, D>(
&mut self,
mut f: F,
mut drop_fn: D,
frame_number: u64,
pool: &ThreadPool,
strategy: Option<&HotReloadStrategy>,
) where
D: FnMut(A),
F: FnMut(A::Data) -> Result<ProcessingState<A>>,
{
{
let requeue = self
.requeue
.get_mut()
.expect("The mutex of `requeue` in `AssetStorage` was poisoned");
while let Some(processed) = self.processed.try_pop() {
let assets = &mut self.assets;
let bitset = &mut self.bitset;
let handles = &mut self.handles;
let reloads = &mut self.reloads;
let f = &mut f;
let (reload_obj, handle) = match processed {
Processed::NewAsset {
data,
handle,
name,
tracker,
} => {
let (asset, reload_obj) = match data
.map(|FormatValue { data, reload }| (data, reload))
.and_then(|(d, rel)| f(d).map(|a| (a, rel)))
.chain_err(|| ErrorKind::Asset(name.clone()))
{
Ok((ProcessingState::Loaded(x), r)) => {
debug!(
"{:?}: Asset {:?} (handle id: {:?}) has been loaded successfully",
A::NAME,
name,
handle,
);
if handle.is_unique() {
warn!(
"Loading unnecessary asset. Handle {} is unique ",
handle.id()
);
tracker.fail(
handle.id(),
A::NAME,
name,
Error::from_kind(ErrorKind::UnusedHandle),
);
} else {
tracker.success();
}
(x, r)
}
Ok((ProcessingState::Loading(x), r)) => {
debug!(
"{:?}: Asset {:?} (handle id: {:?}) is not complete, readding to queue",
A::NAME,
name,
handle,
);
requeue.push(Processed::NewAsset {
data: Ok(FormatValue { data: x, reload: r }),
handle,
name,
tracker,
});
continue;
}
Err(e) => {
error!(
"{:?}: Asset {:?} (handle id: {:?}) could not be loaded: {}",
A::NAME,
name,
handle,
e,
);
tracker.fail(handle.id(), A::NAME, name, e);
continue;
}
};
let id = handle.id();
bitset.add(id);
handles.push(handle.clone());
unsafe {
assets.insert(id, asset);
}
(reload_obj, handle)
}
Processed::HotReload {
data,
handle,
name,
old_reload,
} => {
let (asset, reload_obj) = match data
.map(|FormatValue { data, reload }| (data, reload))
.and_then(|(d, rel)| f(d).map(|a| (a, rel)))
.chain_err(|| ErrorKind::Asset(name.clone()))
{
Ok((ProcessingState::Loaded(x), r)) => (x, r),
Ok((ProcessingState::Loading(x), r)) => {
debug!(
"{:?}: Asset {:?} (handle id: {:?}) is not complete, readding to queue",
A::NAME,
name,
handle,
);
requeue.push(Processed::HotReload {
data: Ok(FormatValue { data: x, reload: r }),
handle,
name,
old_reload,
});
continue;
}
Err(e) => {
error!(
"{:?}: Failed to hot-reload asset {:?} (handle id: {:?}): {}\n\
Falling back to old reload object.",
A::NAME,
name,
handle,
e,
);
reloads.push((handle.downgrade(), old_reload));
continue;
}
};
let id = handle.id();
assert!(
bitset.contains(id),
"Expected handle {:?} to be valid, but the asset storage says otherwise",
handle,
);
unsafe {
let old = assets.get_mut(id);
*old = asset;
}
(reload_obj, handle)
}
};
if let Some(reload_obj) = reload_obj {
reloads.push((handle.downgrade(), reload_obj));
}
}
for p in requeue.drain(..) {
self.processed.push(p);
}
}
let mut count = 0;
let mut skip = 0;
while let Some(i) = self.handles.iter().skip(skip).position(Handle::is_unique) {
count += 1;
let i = skip + i;
skip = i;
let handle = self.handles.swap_remove(i);
let id = handle.id();
unsafe {
drop_fn(self.assets.remove(id));
}
self.bitset.remove(id);
self.unused_handles.push(Handle {
id: Arc::new(id),
marker: PhantomData,
});
}
if count != 0 {
debug!("{:?}: Freed {} handle ids", A::NAME, count,);
}
if strategy
.map(|s| s.needs_reload(frame_number))
.unwrap_or(false)
{
trace!("{:?}: Testing for asset reloads..", A::NAME);
self.hot_reload(pool);
}
}
fn hot_reload(&mut self, pool: &ThreadPool) {
self.reloads.retain(|&(ref handle, _)| !handle.is_dead());
while let Some(p) = self
.reloads
.iter()
.position(|&(_, ref rel)| rel.needs_reload())
{
let (handle, rel): (WeakHandle<_>, Box<dyn Reload<_>>) = self.reloads.swap_remove(p);
let name = rel.name();
let format = rel.format();
let handle = handle.upgrade();
debug!(
"{:?}: Asset {:?} (handle id: {:?}) needs a reload using format {:?}",
A::NAME,
name,
handle,
format,
);
if let Some(handle) = handle {
let processed = self.processed.clone();
pool.spawn(move || {
let old_reload = rel.clone();
let data = rel.reload().chain_err(|| ErrorKind::Format(format));
let p = Processed::HotReload {
data,
name,
handle,
old_reload,
};
processed.push(p);
});
}
}
}
}
impl<A: Asset> Default for AssetStorage<A> {
fn default() -> Self {
AssetStorage {
assets: Default::default(),
bitset: Default::default(),
handles: Default::default(),
handle_alloc: Default::default(),
processed: Arc::new(MsQueue::new()),
reloads: Default::default(),
unused_handles: MsQueue::new(),
requeue: Mutex::new(Vec::default()),
}
}
}
impl<A: Asset> Drop for AssetStorage<A> {
fn drop(&mut self) {
let bitset = &self.bitset;
unsafe { self.assets.clean(bitset) }
}
}
pub struct Processor<A> {
marker: PhantomData<A>,
}
impl<A> Processor<A> {
pub fn new() -> Self {
Processor {
marker: PhantomData,
}
}
}
impl<'a, A> System<'a> for Processor<A>
where
A: Asset,
A::Data: Into<Result<ProcessingState<A>>>,
{
type SystemData = (
Write<'a, AssetStorage<A>>,
ReadExpect<'a, Arc<ThreadPool>>,
Read<'a, Time>,
Option<Read<'a, HotReloadStrategy>>,
);
fn run(&mut self, (mut storage, pool, time, strategy): Self::SystemData) {
use std::ops::Deref;
storage.process(
Into::into,
time.frame_number(),
&**pool,
strategy.as_ref().map(Deref::deref),
);
}
}
#[derive(Derivative)]
#[derivative(
Clone(bound = ""),
Eq(bound = ""),
Hash(bound = ""),
PartialEq(bound = ""),
Debug(bound = "")
)]
pub struct Handle<A: ?Sized> {
id: Arc<u32>,
#[derivative(Debug = "ignore")]
marker: PhantomData<A>,
}
impl<A> Handle<A> {
pub fn id(&self) -> u32 {
*self.id.as_ref()
}
pub fn downgrade(&self) -> WeakHandle<A> {
let id = Arc::downgrade(&self.id);
WeakHandle {
id,
marker: PhantomData,
}
}
fn is_unique(&self) -> bool {
Arc::strong_count(&self.id) == 1
}
}
impl<A> Component for Handle<A>
where
A: Asset,
{
type Storage = A::HandleStorage;
}
pub(crate) enum Processed<A: Asset> {
NewAsset {
data: Result<FormatValue<A>>,
handle: Handle<A>,
name: String,
tracker: Box<dyn Tracker>,
},
HotReload {
data: Result<FormatValue<A>>,
handle: Handle<A>,
name: String,
old_reload: Box<dyn Reload<A>>,
},
}
#[derive(Derivative)]
#[derivative(Clone(bound = ""))]
pub struct WeakHandle<A> {
id: Weak<u32>,
marker: PhantomData<A>,
}
impl<A> WeakHandle<A> {
#[inline]
pub fn upgrade(&self) -> Option<Handle<A>> {
self.id.upgrade().map(|id| Handle {
id,
marker: PhantomData,
})
}
#[inline]
pub fn is_dead(&self) -> bool {
self.upgrade().is_none()
}
}