rafx_assets/assets/
load_queue_hydrate.rs

1use crate::resource_loader::RafxLoadEventHandler;
2use crate::resource_loader::RafxResourceLoadResult;
3use crossbeam_channel::{Receiver, Sender};
4use hydrate_base::LoadHandle;
5use hydrate_loader::storage::ArtifactLoadOp;
6use std::marker::PhantomData;
7use type_uuid::TypeUuid;
8
9//
10// Message handling for asset load/commit/free events
11//
12pub struct LoadRequest<AssetDataT, AssetT> {
13    pub load_handle: LoadHandle,
14    pub load_op: ArtifactLoadOp,
15    pub result_tx: Sender<AssetT>,
16    pub asset: AssetDataT,
17}
18
19pub struct CommitRequest<T> {
20    pub load_handle: LoadHandle,
21    phantom_data: PhantomData<T>,
22}
23
24pub struct FreeRequest<T> {
25    pub load_handle: LoadHandle,
26    phantom_data: PhantomData<T>,
27}
28
29pub struct LoadQueuesTx<AssetDataT, AssetT> {
30    load_request_tx: Sender<LoadRequest<AssetDataT, AssetT>>,
31    commit_request_tx: Sender<CommitRequest<AssetDataT>>,
32    free_request_tx: Sender<FreeRequest<AssetDataT>>,
33}
34
35impl<AssetDataT, AssetT> Clone for LoadQueuesTx<AssetDataT, AssetT> {
36    fn clone(&self) -> Self {
37        LoadQueuesTx {
38            load_request_tx: self.load_request_tx.clone(),
39            commit_request_tx: self.commit_request_tx.clone(),
40            free_request_tx: self.free_request_tx.clone(),
41        }
42    }
43}
44
45pub struct LoadQueuesRx<AssetDataT, AssetT> {
46    load_request_rx: Receiver<LoadRequest<AssetDataT, AssetT>>,
47    commit_request_rx: Receiver<CommitRequest<AssetDataT>>,
48    free_request_rx: Receiver<FreeRequest<AssetDataT>>,
49}
50
51pub struct LoadQueues<AssetDataT, AssetT> {
52    tx: LoadQueuesTx<AssetDataT, AssetT>,
53    rx: LoadQueuesRx<AssetDataT, AssetT>,
54}
55
56impl<AssetDataT, AssetT> LoadQueues<AssetDataT, AssetT> {
57    pub fn take_load_requests(&mut self) -> Vec<LoadRequest<AssetDataT, AssetT>> {
58        self.rx.load_request_rx.try_iter().collect()
59    }
60
61    pub fn take_commit_requests(&mut self) -> Vec<CommitRequest<AssetDataT>> {
62        self.rx.commit_request_rx.try_iter().collect()
63    }
64
65    pub fn take_free_requests(&mut self) -> Vec<FreeRequest<AssetDataT>> {
66        self.rx.free_request_rx.try_iter().collect()
67    }
68}
69
70impl<AssetDataT, AssetT> LoadQueues<AssetDataT, AssetT>
71where
72    AssetDataT: for<'a> serde::Deserialize<'a> + 'static + Send + Clone,
73    AssetT: TypeUuid + 'static + Send,
74{
75    pub fn create_loader(&self) -> RafxGenericLoadEventHandler<AssetDataT, AssetT> {
76        RafxGenericLoadEventHandler {
77            load_queues: self.tx.clone(),
78        }
79    }
80}
81
82impl<AssetDataT, AssetT> Default for LoadQueues<AssetDataT, AssetT> {
83    fn default() -> Self {
84        let (load_request_tx, load_request_rx) = crossbeam_channel::unbounded();
85        let (commit_request_tx, commit_request_rx) = crossbeam_channel::unbounded();
86        let (free_request_tx, free_request_rx) = crossbeam_channel::unbounded();
87
88        let tx = LoadQueuesTx {
89            load_request_tx,
90            commit_request_tx,
91            free_request_tx,
92        };
93
94        let rx = LoadQueuesRx {
95            load_request_rx,
96            commit_request_rx,
97            free_request_rx,
98        };
99
100        LoadQueues { tx, rx }
101    }
102}
103
104//
105// A generic load handler that allows routing load/commit/free events to load queues owned by asset
106// type handlers
107//
108pub struct RafxGenericLoadEventHandler<AssetDataT, AssetT>
109where
110    AssetDataT: for<'a> serde::Deserialize<'a> + 'static + Send,
111    AssetT: TypeUuid + 'static + Send,
112{
113    load_queues: LoadQueuesTx<AssetDataT, AssetT>,
114}
115
116impl<AssetDataT, AssetT> RafxLoadEventHandler<AssetDataT, AssetT>
117    for RafxGenericLoadEventHandler<AssetDataT, AssetT>
118where
119    AssetDataT: for<'a> serde::Deserialize<'a> + 'static + Send,
120    AssetT: TypeUuid + 'static + Send,
121{
122    fn update_asset(
123        &mut self,
124        load_handle: LoadHandle,
125        load_op: ArtifactLoadOp,
126        asset: AssetDataT,
127    ) -> RafxResourceLoadResult<AssetT> {
128        log::trace!(
129            "GenericLoader update_asset {} {:?}",
130            core::any::type_name::<AssetDataT>(),
131            load_handle
132        );
133
134        let (result_tx, result_rx) = crossbeam_channel::bounded(1);
135
136        let request = LoadRequest {
137            load_handle,
138            load_op,
139            result_tx,
140            asset,
141        };
142
143        self.load_queues.load_request_tx.send(request).unwrap();
144        RafxResourceLoadResult::new(result_rx)
145    }
146
147    fn commit_asset_version(
148        &mut self,
149        load_handle: LoadHandle,
150    ) {
151        log::trace!(
152            "GenericLoader commit_asset_version {} {:?}",
153            core::any::type_name::<AssetDataT>(),
154            load_handle
155        );
156        let request = CommitRequest {
157            load_handle,
158            phantom_data: Default::default(),
159        };
160
161        self.load_queues.commit_request_tx.send(request).unwrap();
162    }
163
164    fn free(
165        &mut self,
166        load_handle: LoadHandle,
167    ) {
168        log::trace!(
169            "GenericLoader free {} {:?}",
170            core::any::type_name::<AssetDataT>(),
171            load_handle
172        );
173        let request = FreeRequest {
174            load_handle,
175            phantom_data: Default::default(),
176        };
177
178        self.load_queues.free_request_tx.send(request).unwrap();
179    }
180}