keket 0.20.3

Database-like Asset management on top of ECS storage
Documentation
use crate::{
    database::{handle::AssetHandle, inspector::AssetInspector},
    protocol::AssetProtocol,
    store::AssetBytesAreReadyToStore,
};
use anput::{
    bundle::DynamicBundle, third_party::intuicio_data::managed::ManagedLazy, world::World,
};
use std::{
    collections::HashMap,
    error::Error,
    pin::Pin,
    sync::{Arc, RwLock},
    task::{Context, Poll, Waker},
};

pub struct AssetAwaitsAsyncProcessing;
pub struct AssetAwaitsAsyncProducing;

type AssetProtocolProcessFuture =
    Pin<Box<dyn Future<Output = Result<DynamicBundle, Box<dyn Error>>> + Send + Sync>>;
type AssetProtocolProduceFuture =
    Pin<Box<dyn Future<Output = Result<Vec<u8>, Box<dyn Error>>> + Send + Sync>>;

#[derive(Clone)]
pub struct FutureStorageAccess(Arc<RwLock<ManagedLazy<World>>>);

impl FutureStorageAccess {
    pub fn access(&self) -> Result<ManagedLazy<World>, Box<dyn Error>> {
        Ok(self.0.read().map_err(|error| format!("{error}"))?.clone())
    }
}

pub struct FutureAssetProtocol {
    name: String,
    #[allow(clippy::type_complexity)]
    process_future_spawner: Option<
        Box<
            dyn Fn(AssetHandle, FutureStorageAccess, Vec<u8>) -> AssetProtocolProcessFuture
                + Send
                + Sync,
        >,
    >,
    #[allow(clippy::type_complexity)]
    produce_future_spawner:
        Option<Box<dyn Fn(AssetInspector) -> AssetProtocolProduceFuture + Send + Sync>>,
    process_futures:
        RwLock<HashMap<AssetHandle, Option<(AssetProtocolProcessFuture, FutureStorageAccess)>>>,
    produce_futures: RwLock<HashMap<AssetHandle, Option<AssetProtocolProduceFuture>>>,
}

impl FutureAssetProtocol {
    pub fn new(name: impl ToString) -> Self {
        Self {
            name: name.to_string(),
            process_future_spawner: None,
            produce_future_spawner: None,
            process_futures: Default::default(),
            produce_futures: Default::default(),
        }
    }

    pub fn process<Fut>(
        mut self,
        future_spawner: impl Fn(AssetHandle, FutureStorageAccess, Vec<u8>) -> Fut
        + Send
        + Sync
        + 'static,
    ) -> Self
    where
        Fut: Future<Output = Result<DynamicBundle, Box<dyn Error>>> + Send + Sync + 'static,
    {
        self.process_future_spawner = Some(Box::new(move |handle, access, bytes| {
            Box::pin(future_spawner(handle, access, bytes))
        }));
        self
    }

    pub fn produce<Fut>(
        mut self,
        future_spawner: impl Fn(AssetInspector) -> Fut + Send + Sync + 'static,
    ) -> Self
    where
        Fut: Future<Output = Result<Vec<u8>, Box<dyn Error>>> + Send + Sync + 'static,
    {
        self.produce_future_spawner = Some(Box::new(move |inspector| {
            Box::pin(future_spawner(inspector))
        }));
        self
    }
}

impl AssetProtocol for FutureAssetProtocol {
    fn name(&self) -> &str {
        &self.name
    }

    fn process_bytes(
        &mut self,
        handle: AssetHandle,
        storage: &mut World,
        bytes: Vec<u8>,
    ) -> Result<(), Box<dyn Error>> {
        let Some(future_spawner) = self.process_future_spawner.as_ref() else {
            return Ok(());
        };
        let (lazy, _lifetime) = ManagedLazy::make(storage);
        let access = FutureStorageAccess(Arc::new(RwLock::new(lazy)));
        self.process_futures
            .write()
            .map_err(|error| format!("{error}"))?
            .insert(
                handle,
                Some(((future_spawner)(handle, access.clone(), bytes), access)),
            );
        storage.insert(handle.entity(), (AssetAwaitsAsyncProcessing,))?;
        Ok(())
    }

    fn produce_asset_bytes(
        &mut self,
        handle: AssetHandle,
        storage: &mut World,
    ) -> Result<(), Box<dyn Error>> {
        let Some(future_spawner) = self.produce_future_spawner.as_ref() else {
            return Err(format!(
                "Asset protocol `{}` does not support producing bytes.",
                self.name()
            )
            .into());
        };
        let inspector = AssetInspector::new_raw(storage, handle.entity());
        self.produce_futures
            .write()
            .map_err(|error| format!("{error}"))?
            .insert(handle, Some((future_spawner)(inspector)));
        storage.insert(handle.entity(), (AssetAwaitsAsyncProducing,))?;
        Ok(())
    }

    fn maintain(&mut self, storage: &mut World) -> Result<(), Box<dyn Error>> {
        let mut cx = Context::from_waker(Waker::noop());
        let mut futures = self
            .process_futures
            .write()
            .map_err(|error| format!("{error}"))?;
        for (handle, future) in futures.iter_mut() {
            if let Some((mut f, access)) = future.take() {
                let (lazy, _lifetime) = ManagedLazy::make(storage);
                *access.0.write().map_err(|error| format!("{error}"))? = lazy;
                match f.as_mut().poll(&mut cx) {
                    Poll::Ready(Ok(result)) => {
                        storage.remove::<(AssetAwaitsAsyncProcessing,)>(handle.entity())?;
                        storage.insert(handle.entity(), result)?;
                    }
                    Poll::Ready(Err(e)) => {
                        return Err(e);
                    }
                    Poll::Pending => {
                        *future = Some((f, access));
                    }
                }
            }
        }
        futures.retain(|_, v| v.is_some());
        let mut futures = self
            .produce_futures
            .write()
            .map_err(|error| format!("{error}"))?;
        for (handle, future) in futures.iter_mut() {
            if let Some(mut f) = future.take() {
                match f.as_mut().poll(&mut cx) {
                    Poll::Ready(Ok(result)) => {
                        storage.remove::<(AssetAwaitsAsyncProducing,)>(handle.entity())?;
                        storage.insert(handle.entity(), (AssetBytesAreReadyToStore(result),))?;
                    }
                    Poll::Ready(Err(e)) => {
                        return Err(e);
                    }
                    Poll::Pending => {
                        *future = Some(f);
                    }
                }
            }
        }
        futures.retain(|_, v| v.is_some());
        Ok(())
    }
}