hydrate_pipeline/thumbnails/
thumbnail_system.rs

1use crate::thumbnails::thumbnail_thread_pool::{
2    ThumbnailThreadPool, ThumbnailThreadPoolOutcome, ThumbnailThreadPoolRequest,
3    ThumbnailThreadPoolRequestRunJob,
4};
5use crate::thumbnails::ThumbnailProviderRegistry;
6use crate::{HydrateProjectConfiguration, ThumbnailApi, ThumbnailInputHash};
7use crossbeam_channel::Receiver;
8use hydrate_base::lru_cache::LruCache;
9use hydrate_base::AssetId;
10use hydrate_data::{DataSet, SchemaSet};
11use hydrate_schema::HashSet;
12use std::sync::{Arc, Mutex};
13
14// Thumbnail providers are implemented per asset type
15// - Implement a gather method that runs in main thread and can see asset data
16//   This method will indicate all input data to produce the thumbnail.
17// - Implement a render method that will run off-main-thread. It can read import data and only
18//   data gathered from assets in gather().
19// These providers are placed in a registry, this handles the type erasure for us to operate on them as a set
20// The system will keep track of requested thumbnails, thumbnail state (not created, pending, created, etc.)
21// and the registry
22// Thumbnails need to be invalidated, we will use metadata returned from gather() to determine this
23
24const THUMBNAIL_CACHE_SIZE: u32 = 1024;
25const STALENESS_CHECK_TIME_MILLISECONDS: u128 = 1000;
26
27pub struct ThumbnailImage {
28    pub width: u32,
29    pub height: u32,
30    // Basic 8-bit RBGA
31    pub pixel_data: Vec<u8>,
32}
33
34#[derive(Clone)]
35pub struct ThumbnailImageWithHash {
36    pub image: Arc<ThumbnailImage>,
37    pub hash: ThumbnailInputHash,
38}
39
40#[derive(Default)]
41pub struct ThumbnailState {
42    // List of asset dependencies
43    // List of import data dependencies
44    image: Option<ThumbnailImageWithHash>,
45    // Set when the image is loaded
46    //current_input_hash: Option<ThumbnailInputHash>,
47    // Set when the image request is queued and cleared when it completes
48    queued_request_input_hash: Option<ThumbnailInputHash>,
49    failed_to_load: bool,
50    last_staleness_check: Option<std::time::Instant>,
51}
52
53struct ThumbnailSystemStateInner {
54    cache: LruCache<AssetId, ThumbnailState>,
55    refreshed_thumbnails: HashSet<AssetId>,
56}
57
58#[derive(Clone)]
59pub struct ThumbnailSystemState {
60    inner: Arc<Mutex<ThumbnailSystemStateInner>>,
61}
62
63impl Default for ThumbnailSystemState {
64    fn default() -> Self {
65        ThumbnailSystemState {
66            inner: Arc::new(Mutex::new(ThumbnailSystemStateInner {
67                cache: LruCache::new(THUMBNAIL_CACHE_SIZE),
68                refreshed_thumbnails: Default::default(),
69            })),
70        }
71    }
72}
73
74impl ThumbnailSystemState {
75    pub fn take_refreshed_thumbnails(&self) -> HashSet<AssetId> {
76        let mut refreshed_thumbnails = HashSet::default();
77        let mut inner = self.inner.lock().unwrap();
78        std::mem::swap(&mut inner.refreshed_thumbnails, &mut refreshed_thumbnails);
79        refreshed_thumbnails
80    }
81
82    pub fn request(
83        &self,
84        asset_id: AssetId,
85    ) -> Option<ThumbnailImageWithHash> {
86        let mut inner = self.inner.lock().unwrap();
87        if let Some(thumbnail_state) = inner.cache.get(&asset_id, true) {
88            thumbnail_state.image.clone()
89        } else {
90            inner.cache.insert(asset_id, ThumbnailState::default());
91            None
92        }
93    }
94
95    pub fn forget(
96        &self,
97        _asset_id: AssetId,
98    ) {
99    }
100
101    pub fn forget_all(&self) {}
102}
103
104pub struct ThumbnailSystem {
105    // Thumbnails that have been requested, created, etc.
106    thumbnail_system_state: ThumbnailSystemState,
107    thumbnail_provider_registry: ThumbnailProviderRegistry,
108    default_image: Arc<ThumbnailImage>,
109    thread_pool: Option<ThumbnailThreadPool>,
110    thread_pool_result_rx: Receiver<ThumbnailThreadPoolOutcome>,
111    current_requests: HashSet<ThumbnailInputHash>,
112}
113
114impl Drop for ThumbnailSystem {
115    fn drop(&mut self) {
116        let thread_pool = self.thread_pool.take().unwrap();
117        thread_pool.finish();
118    }
119}
120
121impl ThumbnailSystem {
122    pub fn system_state(&self) -> &ThumbnailSystemState {
123        &self.thumbnail_system_state
124    }
125
126    pub fn thumbnail_provider_registry(&self) -> &ThumbnailProviderRegistry {
127        &self.thumbnail_provider_registry
128    }
129
130    pub fn new(
131        hydrate_config: &HydrateProjectConfiguration,
132        thumbnail_provider_registry: ThumbnailProviderRegistry,
133        schema_set: &SchemaSet,
134    ) -> Self {
135        let default_image = ThumbnailImage {
136            width: 1,
137            height: 1,
138            pixel_data: vec![0, 0, 0, 255],
139        };
140
141        let thumbnail_api = ThumbnailApi::new(hydrate_config, schema_set);
142
143        let (thread_pool_result_tx, thread_pool_result_rx) = crossbeam_channel::unbounded();
144        let thread_pool = ThumbnailThreadPool::new(
145            thumbnail_provider_registry.clone(),
146            schema_set.clone(),
147            thumbnail_api.clone(),
148            4,
149            thread_pool_result_tx,
150        );
151
152        ThumbnailSystem {
153            thumbnail_system_state: ThumbnailSystemState::default(),
154            thumbnail_provider_registry,
155            default_image: Arc::new(default_image),
156            thread_pool: Some(thread_pool),
157            thread_pool_result_rx,
158            current_requests: Default::default(),
159        }
160    }
161
162    pub fn update(
163        &mut self,
164        data_set: &DataSet,
165        schema_set: &SchemaSet,
166    ) {
167        let now = std::time::Instant::now();
168        let mut state = self.thumbnail_system_state.inner.lock().unwrap();
169
170        let mut refreshed_thumbnails = vec![];
171
172        for (asset_id, thumbnail_state) in state
173            .cache
174            .pairs_mut()
175            .iter_mut()
176            .filter_map(|x| x.as_mut())
177        {
178            // No more than 50 requests in flight at a time
179            if self.current_requests.len() > 50 {
180                break;
181            }
182
183            // See if we already have a thumbnail loaded for the asset
184            // if thumbnail_state.image.is_some() {
185            //     continue;
186            // }
187
188            let asset_id = *asset_id;
189
190            // See if is already queued to load
191            if thumbnail_state.queued_request_input_hash.is_some() {
192                continue;
193            }
194
195            if thumbnail_state.failed_to_load {
196                continue;
197            }
198
199            if let Some(last_staleness_check) = thumbnail_state.last_staleness_check {
200                if (now - last_staleness_check).as_millis() > STALENESS_CHECK_TIME_MILLISECONDS {
201                    continue;
202                }
203            }
204
205            // Try to find a registered provider
206            let Some(asset_schema) = data_set.asset_schema(asset_id) else {
207                thumbnail_state.failed_to_load = true;
208                continue;
209            };
210
211            let Some(provider) = self
212                .thumbnail_provider_registry
213                .provider_for_asset(asset_schema.fingerprint())
214            else {
215                let old_thumbnail_hash = thumbnail_state.image.as_ref().map(|x| x.hash);
216                let new_thumbnail_hash = ThumbnailInputHash::null();
217                thumbnail_state.image = Some(ThumbnailImageWithHash {
218                    image: self.default_image.clone(),
219                    hash: new_thumbnail_hash,
220                });
221                if old_thumbnail_hash != Some(new_thumbnail_hash) {
222                    refreshed_thumbnails.push(asset_id);
223                }
224                continue;
225            };
226
227            // Calculate the current input hash
228            let dependencies = provider
229                .gather_inner(asset_id, data_set, schema_set)
230                .unwrap();
231            if self
232                .current_requests
233                .contains(&dependencies.thumbnail_input_hash)
234            {
235                continue;
236            }
237
238            // Check if the image we loaded is stale
239            if thumbnail_state.image.as_ref().map(|x| x.hash)
240                == Some(dependencies.thumbnail_input_hash)
241            {
242                continue;
243            }
244
245            // Kick off the request
246            self.current_requests
247                .insert(dependencies.thumbnail_input_hash);
248            thumbnail_state.queued_request_input_hash = Some(dependencies.thumbnail_input_hash);
249            log::trace!("Generate thumbnail for {:?}", asset_id);
250            self.thread_pool
251                .as_ref()
252                .unwrap()
253                .add_request(ThumbnailThreadPoolRequest::RunJob(
254                    ThumbnailThreadPoolRequestRunJob {
255                        asset_id,
256                        asset_type: asset_schema.fingerprint(),
257                        dependencies: Arc::new(dependencies),
258                    },
259                ));
260        }
261
262        while let Ok(result) = self.thread_pool_result_rx.try_recv() {
263            match result {
264                ThumbnailThreadPoolOutcome::RunJobComplete(msg) => {
265                    self.current_requests
266                        .remove(&msg.request.dependencies.thumbnail_input_hash);
267                    if let Some(thumbnail_state) = state.cache.get_mut(&msg.request.asset_id, false)
268                    {
269                        match msg.result {
270                            Ok(image) => {
271                                let old_thumbnail_hash =
272                                    thumbnail_state.image.as_ref().map(|x| x.hash);
273                                let new_thumbnail_hash =
274                                    msg.request.dependencies.thumbnail_input_hash;
275
276                                thumbnail_state.queued_request_input_hash = None;
277                                thumbnail_state.image = Some(ThumbnailImageWithHash {
278                                    image: Arc::new(image),
279                                    hash: msg.request.dependencies.thumbnail_input_hash,
280                                });
281
282                                if old_thumbnail_hash != Some(new_thumbnail_hash) {
283                                    refreshed_thumbnails.push(msg.request.asset_id);
284                                }
285                            }
286                            Err(e) => {
287                                log::warn!("Thumbnail creation failed: {:?}", e);
288                                thumbnail_state.failed_to_load = true;
289                            }
290                        }
291                    }
292                }
293            }
294        }
295
296        for refreshed_thumbnail in refreshed_thumbnails {
297            state.refreshed_thumbnails.insert(refreshed_thumbnail);
298        }
299    }
300}