1use std::{
2 marker::PhantomData,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc, Weak,
6 },
7};
8
9use crossbeam_queue::SegQueue;
10use derivative::Derivative;
11use log::{debug, error, trace, warn};
12use rayon::ThreadPool;
13
14use amethyst_core::{
15 ecs::{
16 hibitset::BitSet,
17 prelude::{Component, Read, ReadExpect, System, SystemData, VecStorage, World, Write},
18 storage::UnprotectedStorage,
19 },
20 SystemDesc, Time,
21};
22use amethyst_error::{Error, ResultExt};
23
24#[cfg(feature = "profiler")]
25use thread_profiler::profile_scope;
26
27use crate::{
28 asset::{Asset, FormatValue, ProcessableAsset},
29 error,
30 progress::Tracker,
31 reload::{HotReloadStrategy, Reload},
32};
33
34#[derive(Debug, Default)]
36pub struct Allocator {
37 store_count: AtomicUsize,
38}
39
40impl Allocator {
41 pub fn next_id(&self) -> usize {
43 self.store_count.fetch_add(1, Ordering::Relaxed)
44 }
45}
46
47pub struct AssetStorage<A: Asset> {
50 assets: VecStorage<(A, u32)>,
51 bitset: BitSet,
52 handles: Vec<Handle<A>>,
53 handle_alloc: Allocator,
54 pub(crate) processed: Arc<SegQueue<Processed<A>>>,
55 reloads: Vec<(WeakHandle<A>, Box<dyn Reload<A::Data>>)>,
56 unused_handles: SegQueue<Handle<A>>,
57}
58
59pub enum ProcessingState<A>
61where
62 A: Asset,
63{
64 Loading(A::Data),
66 Loaded(A),
68}
69
70impl<A: Asset> AssetStorage<A> {
71 pub fn new() -> Self {
73 Default::default()
74 }
75
76 pub(crate) fn allocate(&self) -> Handle<A> {
78 self.unused_handles
79 .pop()
80 .unwrap_or_else(|_| self.allocate_new())
81 }
82
83 fn allocate_new(&self) -> Handle<A> {
84 let id = self.handle_alloc.next_id() as u32;
85 Handle {
86 id: Arc::new(id),
87 marker: PhantomData,
88 }
89 }
90
91 pub fn unload_all(&mut self) {
94 unsafe { self.assets.clean(&self.bitset) }
95 self.bitset.clear();
96 }
97
98 pub fn clone_asset(&mut self, handle: &Handle<A>) -> Option<Handle<A>>
104 where
105 A: Clone,
106 {
107 if let Some(asset) = self.get(handle).map(A::clone) {
108 let h = self.allocate();
109
110 let id = h.id();
111 self.bitset.add(id);
112 self.handles.push(h.clone());
113
114 unsafe {
115 self.assets.insert(id, (asset, 0));
116 }
117
118 Some(h)
119 } else {
120 None
121 }
122 }
123
124 pub fn get(&self, handle: &Handle<A>) -> Option<&A> {
126 if self.bitset.contains(handle.id()) {
127 Some(unsafe { &self.assets.get(handle.id()).0 })
128 } else {
129 None
130 }
131 }
132
133 pub fn get_version(&self, handle: &Handle<A>) -> Option<u32> {
135 if self.bitset.contains(handle.id()) {
136 Some(unsafe { self.assets.get(handle.id()).1 })
137 } else {
138 None
139 }
140 }
141
142 pub fn get_with_version(&self, handle: &Handle<A>) -> Option<&(A, u32)> {
144 if self.bitset.contains(handle.id()) {
145 Some(unsafe { self.assets.get(handle.id()) })
146 } else {
147 None
148 }
149 }
150
151 pub fn get_by_id(&self, id: u32) -> Option<&A> {
153 if self.bitset.contains(id) {
154 Some(unsafe { &self.assets.get(id).0 })
155 } else {
156 None
157 }
158 }
159
160 pub fn replace(&mut self, handle: &Handle<A>, asset: A) -> A {
163 if self.bitset.contains(handle.id()) {
164 let data = unsafe { self.assets.get_mut(handle.id()) };
165 data.1 += 1;
166 std::mem::replace(&mut data.0, asset)
167 } else {
168 panic!("Trying to replace not loaded asset");
169 }
170 }
171
172 pub fn insert(&mut self, asset: A) -> Handle<A> {
179 let handle = self.allocate();
180 let id = handle.id();
181 self.bitset.add(id);
182 self.handles.push(handle.clone());
183 unsafe {
184 self.assets.insert(id, (asset, 0));
185 }
186 handle
187 }
188
189 pub fn contains(&self, handle: &Handle<A>) -> bool {
191 self.bitset.contains(handle.id())
192 }
193
194 pub fn contains_id(&self, id: u32) -> bool {
196 self.bitset.contains(id)
197 }
198
199 pub unsafe fn get_by_id_unchecked(&self, id: u32) -> &A {
207 &self.assets.get(id).0
208 }
209
210 pub fn get_mut(&mut self, handle: &Handle<A>) -> Option<&mut A> {
212 if self.bitset.contains(handle.id()) {
213 Some(unsafe { &mut self.assets.get_mut(handle.id()).0 })
214 } else {
215 None
216 }
217 }
218
219 pub fn process<F>(
221 &mut self,
222 f: F,
223 frame_number: u64,
224 pool: &ThreadPool,
225 strategy: Option<&HotReloadStrategy>,
226 ) where
227 F: FnMut(A::Data) -> Result<ProcessingState<A>, Error>,
228 {
229 self.process_custom_drop(f, |_| {}, frame_number, pool, strategy);
230 }
231
232 pub fn process_custom_drop<F, D>(
235 &mut self,
236 mut f: F,
237 mut drop_fn: D,
238 frame_number: u64,
239 pool: &ThreadPool,
240 strategy: Option<&HotReloadStrategy>,
241 ) where
242 D: FnMut(A),
243 F: FnMut(A::Data) -> Result<ProcessingState<A>, Error>,
244 {
245 {
246 let mut requeue = Vec::new();
247 while let Ok(processed) = self.processed.pop() {
248 let assets = &mut self.assets;
249 let bitset = &mut self.bitset;
250 let handles = &mut self.handles;
251 let reloads = &mut self.reloads;
252
253 let f = &mut f;
254 let (reload_obj, handle) = match processed {
255 Processed::NewAsset {
256 data,
257 handle,
258 name,
259 tracker,
260 } => {
261 let (asset, reload_obj) = match data
262 .map(|FormatValue { data, reload }| (data, reload))
263 .and_then(|(d, rel)| f(d).map(|a| (a, rel)))
264 .with_context(|_| error::Error::Asset(name.clone()))
265 {
266 Ok((ProcessingState::Loaded(x), r)) => {
267 debug!(
268 "{:?}: Asset {:?} (handle id: {:?}) has been loaded successfully",
269 A::NAME,
270 name,
271 handle,
272 );
273 if handle.is_unique() {
277 warn!(
278 "Loading unnecessary asset. Handle {} is unique ",
279 handle.id()
280 );
281 tracker.fail(
282 handle.id(),
283 A::NAME,
284 name,
285 Error::from(error::Error::UnusedHandle),
286 );
287 } else {
288 tracker.success();
289 }
290
291 (x, r)
292 }
293 Ok((ProcessingState::Loading(x), r)) => {
294 debug!(
295 "{:?}: Asset {:?} (handle id: {:?}) is not complete, readding to queue",
296 A::NAME,
297 name,
298 handle,
299 );
300 requeue.push(Processed::NewAsset {
301 data: Ok(FormatValue { data: x, reload: r }),
302 handle,
303 name,
304 tracker,
305 });
306 continue;
307 }
308 Err(e) => {
309 error!(
310 "{:?}: Asset {:?} (handle id: {:?}) could not be loaded: {}",
311 A::NAME,
312 name,
313 handle,
314 e,
315 );
316 tracker.fail(handle.id(), A::NAME, name, e);
317
318 continue;
319 }
320 };
321
322 let id = handle.id();
323 bitset.add(id);
324 handles.push(handle.clone());
325
326 unsafe {
329 assets.insert(id, (asset, 0));
330 }
331
332 (reload_obj, handle)
333 }
334 Processed::HotReload {
335 data,
336 handle,
337 name,
338 old_reload,
339 } => {
340 let (asset, reload_obj) = match data
341 .map(|FormatValue { data, reload }| (data, reload))
342 .and_then(|(d, rel)| f(d).map(|a| (a, rel)))
343 .with_context(|_| error::Error::Asset(name.clone()))
344 {
345 Ok((ProcessingState::Loaded(x), r)) => (x, r),
346 Ok((ProcessingState::Loading(x), r)) => {
347 debug!(
348 "{:?}: Asset {:?} (handle id: {:?}) is not complete, readding to queue",
349 A::NAME,
350 name,
351 handle,
352 );
353 requeue.push(Processed::HotReload {
354 data: Ok(FormatValue { data: x, reload: r }),
355 handle,
356 name,
357 old_reload,
358 });
359 continue;
360 }
361 Err(e) => {
362 error!(
363 "{:?}: Failed to hot-reload asset {:?} (handle id: {:?}): {}\n\
364 Falling back to old reload object.",
365 A::NAME,
366 name,
367 handle,
368 e,
369 );
370
371 reloads.push((handle.downgrade(), old_reload));
372
373 continue;
374 }
375 };
376
377 let id = handle.id();
378 assert!(
379 bitset.contains(id),
380 "Expected handle {:?} to be valid, but the asset storage says otherwise",
381 handle,
382 );
383 let data = unsafe { self.assets.get_mut(id) };
384 data.1 += 1;
385 drop_fn(std::mem::replace(&mut data.0, asset));
386
387 (reload_obj, handle)
388 }
389 };
390
391 if let Some(reload_obj) = reload_obj {
393 reloads.push((handle.downgrade(), reload_obj));
394 }
395 }
396
397 for p in requeue.drain(..) {
398 self.processed.push(p);
399 }
400 }
401
402 let mut count = 0;
403 let mut skip = 0;
404 while let Some(i) = self.handles.iter().skip(skip).position(Handle::is_unique) {
405 count += 1;
406 let i = skip + i;
408 skip = i;
409 let handle = self.handles.swap_remove(i);
410 let id = handle.id();
411 unsafe {
412 let (asset, _) = self.assets.remove(id);
413 drop_fn(asset);
414 }
415 self.bitset.remove(id);
416
417 self.unused_handles.push(Handle {
420 id: Arc::new(id),
421 marker: PhantomData,
422 });
423 }
424 if count != 0 {
425 debug!("{:?}: Freed {} handle ids", A::NAME, count,);
426 }
427
428 if strategy
429 .map(|s| s.needs_reload(frame_number))
430 .unwrap_or(false)
431 {
432 trace!("{:?}: Testing for asset reloads..", A::NAME);
433 self.hot_reload(pool);
434 }
435 }
436
437 fn hot_reload(&mut self, pool: &ThreadPool) {
438 self.reloads.retain(|&(ref handle, _)| !handle.is_dead());
439 while let Some(p) = self
440 .reloads
441 .iter()
442 .position(|&(_, ref rel)| rel.needs_reload())
443 {
444 let (handle, rel): (WeakHandle<_>, Box<dyn Reload<_>>) = self.reloads.swap_remove(p);
445
446 let name = rel.name();
447 let format = rel.format();
448 let handle = handle.upgrade();
449
450 debug!(
451 "{:?}: Asset {:?} (handle id: {:?}) needs a reload using format {:?}",
452 A::NAME,
453 name,
454 handle,
455 format,
456 );
457
458 if let Some(handle) = handle {
459 let processed = self.processed.clone();
460 pool.spawn(move || {
461 let old_reload = rel.clone();
462 let data = rel.reload().with_context(|_| error::Error::Format(format));
463
464 let p = Processed::HotReload {
465 data,
466 name,
467 handle,
468 old_reload,
469 };
470 processed.push(p);
471 });
472 }
473 }
474 }
475}
476
477impl<A: Asset> Default for AssetStorage<A> {
478 fn default() -> Self {
479 AssetStorage {
480 assets: Default::default(),
481 bitset: Default::default(),
482 handles: Default::default(),
483 handle_alloc: Default::default(),
484 processed: Arc::new(SegQueue::new()),
485 reloads: Default::default(),
486 unused_handles: SegQueue::new(),
487 }
488 }
489}
490
491impl<A: Asset> Drop for AssetStorage<A> {
492 fn drop(&mut self) {
493 let bitset = &self.bitset;
494 unsafe { self.assets.clean(bitset) }
495 }
496}
497
498#[derive(Default)]
505pub struct Processor<A> {
506 marker: PhantomData<A>,
507}
508
509impl<A> Processor<A> {
510 pub fn new() -> Self {
513 Processor {
514 marker: PhantomData,
515 }
516 }
517}
518
519impl<'a, 'b, A> SystemDesc<'a, 'b, Processor<A>> for Processor<A>
520where
521 A: Asset + ProcessableAsset,
522{
523 fn build(self, world: &mut World) -> Processor<A> {
524 <Processor<A> as System<'_>>::SystemData::setup(world);
525 self
526 }
527}
528
529impl<'a, A> System<'a> for Processor<A>
530where
531 A: Asset + ProcessableAsset,
532{
533 type SystemData = (
534 Write<'a, AssetStorage<A>>,
535 ReadExpect<'a, Arc<ThreadPool>>,
536 Read<'a, Time>,
537 Option<Read<'a, HotReloadStrategy>>,
538 );
539
540 fn run(&mut self, (mut storage, pool, time, strategy): Self::SystemData) {
541 #[cfg(feature = "profiler")]
542 profile_scope!("processor_system");
543
544 storage.process(
545 ProcessableAsset::process,
546 time.frame_number(),
547 &**pool,
548 strategy.as_deref(),
549 );
550 }
551}
552
553#[derive(Derivative)]
557#[derivative(
558 Clone(bound = ""),
559 Eq(bound = ""),
560 Hash(bound = ""),
561 PartialEq(bound = ""),
562 Debug(bound = "")
563)]
564pub struct Handle<A: ?Sized> {
565 id: Arc<u32>,
566 #[derivative(Debug = "ignore")]
567 marker: PhantomData<A>,
568}
569
570impl<A> Handle<A> {
571 pub fn id(&self) -> u32 {
573 *self.id.as_ref()
574 }
575
576 pub fn downgrade(&self) -> WeakHandle<A> {
578 let id = Arc::downgrade(&self.id);
579
580 WeakHandle {
581 id,
582 marker: PhantomData,
583 }
584 }
585
586 fn is_unique(&self) -> bool {
588 Arc::strong_count(&self.id) == 1
589 }
590}
591
592impl<A> Component for Handle<A>
593where
594 A: Asset,
595{
596 type Storage = A::HandleStorage;
597}
598
599pub(crate) enum Processed<A: Asset> {
600 NewAsset {
601 data: Result<FormatValue<A::Data>, Error>,
602 handle: Handle<A>,
603 name: String,
604 tracker: Box<dyn Tracker>,
605 },
606 HotReload {
607 data: Result<FormatValue<A::Data>, Error>,
608 handle: Handle<A>,
609 name: String,
610 old_reload: Box<dyn Reload<A::Data>>,
611 },
612}
613
614#[derive(Derivative)]
617#[derivative(Clone(bound = ""), Debug(bound = ""))]
618pub struct WeakHandle<A> {
619 id: Weak<u32>,
620 #[derivative(Debug = "ignore")]
621 marker: PhantomData<A>,
622}
623
624impl<A> WeakHandle<A> {
625 #[inline]
627 pub fn upgrade(&self) -> Option<Handle<A>> {
628 self.id.upgrade().map(|id| Handle {
629 id,
630 marker: PhantomData,
631 })
632 }
633
634 #[inline]
636 pub fn is_dead(&self) -> bool {
637 self.id.upgrade().is_none()
638 }
639}