asset_container/
assets.rs

1use std::borrow::Cow;
2use std::ops::Deref;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
6use std::task::Poll;
7
8use futures::{Future, FutureExt, Stream, StreamExt};
9
10pub use self::flags::AssetFlags;
11use crate::Error;
12mod flags;
13
14#[derive(Debug)]
15pub struct AssetRef<'a, T>
16where
17  T: Asset + Clone,
18{
19  parent_flags: AtomicU32,
20  asset: Cow<'a, T>,
21}
22
23impl<'a, T> Clone for AssetRef<'a, T>
24where
25  T: Asset + Clone,
26  T: AssetManager<Asset = T>,
27{
28  fn clone(&self) -> Self {
29    Self::new(self.asset.clone(), self.get_asset_flags())
30  }
31}
32
33impl<'a, T> Asset for AssetRef<'a, T>
34where
35  T: Asset + Clone,
36  T: AssetManager<Asset = T>,
37{
38  type Options = T::Options;
39
40  fn fetch_with_progress(&self, options: Self::Options) -> Pin<Box<dyn Stream<Item = Progress> + Send + '_>> {
41    self.asset.fetch_with_progress(options)
42  }
43
44  fn fetch(&self, options: Self::Options) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + Sync + '_>> {
45    self.asset.fetch(options)
46  }
47
48  fn name(&self) -> &str {
49    self.asset.name()
50  }
51
52  fn update_baseurl(&self, baseurl: &Path) {
53    self.asset.update_baseurl(baseurl);
54  }
55}
56
57impl<'a, T> Deref for AssetRef<'a, T>
58where
59  T: Asset + Clone,
60  T: AssetManager<Asset = T>,
61{
62  type Target = T;
63
64  fn deref(&self) -> &Self::Target {
65    &self.asset
66  }
67}
68
69impl<'a, T> AssetManager for AssetRef<'a, T>
70where
71  T: Asset + AssetManager<Asset = T> + Clone,
72{
73  type Asset = T;
74
75  fn assets(&self) -> Assets<Self::Asset> {
76    self.asset.assets()
77  }
78
79  fn set_baseurl(&self, baseurl: &Path) {
80    self.asset.set_baseurl(baseurl);
81  }
82
83  fn get_asset_flags(&self) -> u32 {
84    self.parent_flags.load(Ordering::Relaxed) | self.asset.get_asset_flags()
85  }
86}
87
88impl<'a, T> AssetRef<'a, T>
89where
90  T: Asset + Clone,
91{
92  pub const fn new(asset: Cow<'a, T>, flags: u32) -> Self {
93    Self {
94      parent_flags: AtomicU32::new(flags),
95      asset,
96    }
97  }
98
99  pub fn into_owned(self) -> T {
100    self.asset.into_owned()
101  }
102}
103
104#[derive(Debug)]
105#[must_use]
106pub struct Assets<'a, T>
107where
108  T: Asset + Clone,
109  T: AssetManager<Asset = T>,
110{
111  list: Vec<AssetRef<'a, T>>,
112  flags: u32,
113}
114
115impl<'a, T> Assets<'a, T>
116where
117  T: Asset + Clone,
118  T: AssetManager<Asset = T>,
119{
120  pub fn new(list: Vec<Cow<'a, T>>, parent_flags: u32) -> Self {
121    Self {
122      list: list
123        .into_iter()
124        .map(|asset| AssetRef::new(asset, parent_flags))
125        .collect(),
126      flags: parent_flags,
127    }
128  }
129
130  #[must_use]
131  pub fn to_owned(self) -> Vec<T> {
132    self.list.into_iter().map(|asset| asset.into_owned()).collect()
133  }
134
135  pub fn set_baseurl(&self, baseurl: &Path) {
136    self.list.iter().for_each(|asset| asset.update_baseurl(baseurl));
137  }
138
139  pub fn iter(&self) -> impl Iterator<Item = &AssetRef<T>> {
140    self.list.iter().map(|r| {
141      r.parent_flags.fetch_or(self.flags, Ordering::Relaxed);
142      r
143    })
144  }
145
146  #[must_use]
147  pub fn len(&self) -> usize {
148    self.list.len()
149  }
150
151  #[must_use]
152  pub fn is_empty(&self) -> bool {
153    self.list.is_empty()
154  }
155
156  pub fn push(&mut self, asset: &'a T) {
157    self.list.push(AssetRef::new(Cow::Borrowed(asset), self.flags));
158  }
159
160  pub fn push_owned(&mut self, asset: T) {
161    self.list.push(AssetRef::new(Cow::Owned(asset), self.flags));
162  }
163
164  #[allow(clippy::needless_pass_by_value)]
165  pub fn pull(&'a mut self, options: T::Options) -> AssetPull<'a> {
166    AssetPull::new(self, &options)
167  }
168
169  #[allow(clippy::needless_pass_by_value)]
170  pub fn pull_with_progress(&'a mut self, options: T::Options) -> AssetPullWithProgress<'a> {
171    AssetPullWithProgress::new(self, &options)
172  }
173
174  pub fn extend(&mut self, other: Self) {
175    for asset in other.list {
176      asset.parent_flags.fetch_or(self.flags, Ordering::Relaxed);
177      self.list.push(asset);
178    }
179  }
180}
181
182#[must_use]
183pub struct AssetProgress<'a> {
184  name: String,
185  progress: Pin<Box<dyn Stream<Item = Progress> + Send + 'a>>,
186}
187
188impl std::fmt::Debug for AssetProgress<'_> {
189  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190    f.debug_struct("AssetProgress").field("name", &self.name).finish()
191  }
192}
193
194#[must_use]
195pub struct AssetFut<'a> {
196  name: String,
197  progress: Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>>,
198  finished: AtomicBool,
199}
200
201impl std::fmt::Debug for AssetFut<'_> {
202  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203    f.debug_struct("AssetFut").field("name", &self.name).finish()
204  }
205}
206
207#[must_use]
208pub struct CompleteAsset {
209  name: String,
210  result: Result<Vec<u8>, Error>,
211}
212
213impl std::fmt::Debug for CompleteAsset {
214  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215    f.debug_struct("CompleteAsset").field("name", &self.name).finish()
216  }
217}
218
219impl CompleteAsset {
220  #[must_use]
221  pub fn name(&self) -> &str {
222    &self.name
223  }
224
225  pub const fn result(&self) -> &Result<Vec<u8>, Error> {
226    &self.result
227  }
228
229  #[allow(clippy::missing_const_for_fn)]
230  pub fn into_bytes(self) -> Result<Vec<u8>, Error> {
231    self.result
232  }
233}
234
235#[must_use]
236#[allow(missing_debug_implementations)]
237pub struct AssetPull<'a> {
238  assets: Vec<AssetFut<'a>>,
239  finished: AtomicBool,
240}
241
242impl<'a> AssetPull<'a> {
243  pub fn new<T>(assets: &'a mut Assets<'a, T>, options: &T::Options) -> Self
244  where
245    T: Asset + Clone,
246    T: AssetManager<Asset = T>,
247  {
248    let assets = assets
249      .iter()
250      .map(|asset| AssetFut {
251        name: asset.name().to_owned(),
252        progress: asset.fetch(options.clone()),
253        finished: AtomicBool::new(false),
254      })
255      .collect();
256    Self {
257      assets,
258      finished: AtomicBool::new(false),
259    }
260  }
261}
262
263impl<'a> Future for AssetPull<'a> {
264  type Output = Result<Vec<CompleteAsset>, Error>;
265
266  fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
267    let mut all_done = true;
268    let mut results = Vec::new();
269    let this = self.get_mut();
270    for asset in &mut this.assets {
271      let name = &asset.name;
272      let fut = &mut asset.progress;
273      if asset.finished.load(std::sync::atomic::Ordering::Relaxed) {
274        continue;
275      }
276      match fut.poll_unpin(cx) {
277        Poll::Ready(result) => {
278          asset.finished.store(true, std::sync::atomic::Ordering::Relaxed);
279          results.push(CompleteAsset {
280            name: name.clone(),
281            result,
282          });
283        }
284        Poll::Pending => {
285          all_done = false;
286        }
287      }
288    }
289    if this.finished.load(std::sync::atomic::Ordering::Relaxed) {
290      Poll::Ready(Ok(results))
291    } else if all_done {
292      this.finished.store(true, std::sync::atomic::Ordering::Relaxed);
293      std::task::Poll::Ready(Ok(results))
294    } else {
295      std::task::Poll::Pending
296    }
297  }
298}
299
300#[must_use]
301#[allow(missing_debug_implementations)]
302pub struct AssetPullWithProgress<'a> {
303  assets: Vec<AssetProgress<'a>>,
304  finished: AtomicBool,
305}
306
307impl<'a> AssetPullWithProgress<'a> {
308  pub fn new<T>(assets: &'a mut Assets<'a, T>, options: &T::Options) -> Self
309  where
310    T: Asset + Clone,
311    T: AssetManager<Asset = T>,
312  {
313    let assets = assets
314      .iter()
315      .map(|asset| AssetProgress {
316        name: asset.name().to_owned(),
317        progress: asset.fetch_with_progress(options.clone()),
318      })
319      .collect();
320    Self {
321      assets,
322      finished: AtomicBool::new(false),
323    }
324  }
325}
326
327impl<'a> Stream for AssetPullWithProgress<'a> {
328  type Item = Vec<Progress>;
329
330  fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
331    let mut all_done = true;
332    let mut progress = Vec::new();
333    for asset in &mut self.assets {
334      let name = &asset.name;
335      let stream = &mut asset.progress;
336      match stream.poll_next_unpin(cx) {
337        Poll::Ready(Some(p)) => {
338          if !matches!(
339            p.status,
340            Status::AssetComplete(_) | Status::PullFinished | Status::Error(_)
341          ) {
342            all_done = false;
343          }
344          progress.push(p);
345        }
346        Poll::Ready(None) => {
347          progress.push(Progress {
348            status: Status::PullFinished,
349            asset: name.clone(),
350          });
351        }
352        Poll::Pending => {
353          all_done = false;
354        }
355      }
356    }
357    if self.finished.load(std::sync::atomic::Ordering::Relaxed) {
358      Poll::Ready(None)
359    } else if all_done {
360      self.finished.store(true, std::sync::atomic::Ordering::Relaxed);
361      std::task::Poll::Ready(Some(progress))
362    } else if progress.is_empty() {
363      cx.waker().wake_by_ref();
364      std::task::Poll::Pending
365    } else {
366      std::task::Poll::Ready(Some(progress))
367    }
368  }
369}
370
371#[derive(Debug, Clone)]
372#[non_exhaustive]
373#[must_use]
374pub struct Progress {
375  pub status: Status,
376  pub asset: String,
377}
378
379impl Progress {
380  pub fn new<T: Into<String>>(asset: T, status: Status) -> Self {
381    Self {
382      status,
383      asset: asset.into(),
384    }
385  }
386}
387
388#[derive(Debug, Clone)]
389#[allow(clippy::exhaustive_enums)]
390pub enum Status {
391  Error(String),
392  AssetComplete(Vec<u8>),
393  PullFinished,
394  Progress { num: usize, total: usize },
395}
396
397pub trait AssetManager {
398  type Asset: AssetManager<Asset = Self::Asset> + Asset + Clone;
399  fn assets(&self) -> Assets<Self::Asset>;
400  fn set_baseurl(&self, baseurl: &Path);
401  fn get_asset_flags(&self) -> u32 {
402    AssetFlags::empty().bits()
403  }
404}
405
406pub trait Asset: AssetManager {
407  type Options: Clone;
408  fn fetch_with_progress(&self, options: Self::Options) -> Pin<Box<dyn Stream<Item = Progress> + Send + '_>>;
409  fn fetch(&self, options: Self::Options) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + Sync + '_>>;
410  fn name(&self) -> &str;
411  fn update_baseurl(&self, baseurl: &Path);
412}
413
414impl<T, O> Asset for Option<T>
415where
416  T: Asset<Options = O> + Send + Sync + 'static,
417  O: Clone + Send + Sync + 'static,
418{
419  type Options = T::Options;
420
421  fn fetch_with_progress(&self, options: Self::Options) -> Pin<Box<dyn Stream<Item = Progress> + Send + '_>> {
422    self.as_ref().map_or_else(
423      || {
424        let stream = futures::stream::once(async move { Progress::new("None", Status::Error("None".to_owned())) });
425        let a = stream.boxed();
426        a
427      },
428      |a| {
429        let a = a.fetch_with_progress(options);
430        a
431      },
432    )
433  }
434
435  fn fetch(&self, options: Self::Options) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + Sync + '_>> {
436    Box::pin(async move {
437      match self {
438        Some(a) => a.fetch(options).await,
439        None => Err(Error::FileNotFound("None".to_owned())),
440      }
441    })
442  }
443
444  fn name(&self) -> &str {
445    self.as_ref().map_or("", |a| a.name())
446  }
447
448  fn update_baseurl(&self, baseurl: &Path) {
449    if let Some(asset) = self.as_ref() {
450      asset.update_baseurl(baseurl);
451    }
452  }
453}
454
455impl<T> AssetManager for Option<T>
456where
457  T: AssetManager,
458{
459  type Asset = T::Asset;
460
461  fn set_baseurl(&self, baseurl: &Path) {
462    if let Some(asset) = self.as_ref() {
463      asset.set_baseurl(baseurl);
464    }
465  }
466
467  fn assets(&self) -> Assets<Self::Asset> {
468    self
469      .as_ref()
470      .map_or_else(|| Assets::new(vec![], self.get_asset_flags()), |a| a.assets())
471  }
472
473  fn get_asset_flags(&self) -> u32 {
474    self.as_ref().map_or(0, |a| a.get_asset_flags())
475  }
476}
477
478impl<K, T> AssetManager for std::collections::HashMap<K, T>
479where
480  T: AssetManager,
481{
482  type Asset = T::Asset;
483
484  fn set_baseurl(&self, baseurl: &Path) {
485    for asset in self.values() {
486      asset.set_baseurl(baseurl);
487    }
488  }
489
490  fn assets(&self) -> Assets<Self::Asset> {
491    let mut assets = Assets::new(vec![], self.get_asset_flags());
492    for (_, asset) in self.iter() {
493      assets.extend(asset.assets());
494    }
495    assets
496  }
497
498  fn get_asset_flags(&self) -> u32 {
499    self.values().fold(0, |flags, asset| flags | asset.get_asset_flags())
500  }
501}
502
503impl<T> AssetManager for Vec<T>
504where
505  T: AssetManager,
506{
507  type Asset = T::Asset;
508
509  fn set_baseurl(&self, baseurl: &Path) {
510    for asset in self.iter() {
511      asset.set_baseurl(baseurl);
512    }
513  }
514
515  fn assets(&self) -> Assets<Self::Asset> {
516    let mut assets = Assets::new(vec![], self.get_asset_flags());
517    for asset in self.iter() {
518      assets.extend(asset.assets());
519    }
520    assets
521  }
522
523  fn get_asset_flags(&self) -> u32 {
524    self.iter().fold(0, |flags, asset| flags | asset.get_asset_flags())
525  }
526}