hydrate_pipeline/thumbnails/
thumbnail_system.rs1use crate::thumbnails::thumbnail_thread_pool::{
2 ThumbnailThreadPool, ThumbnailThreadPoolOutcome, ThumbnailThreadPoolRequest,
3 ThumbnailThreadPoolRequestRunJob,
4};
5use crate::thumbnails::ThumbnailProviderRegistry;
6use crate::{HydrateProjectConfiguration, ThumbnailApi, ThumbnailInputHash};
7use crossbeam_channel::Receiver;
8use hydrate_base::lru_cache::LruCache;
9use hydrate_base::AssetId;
10use hydrate_data::{DataSet, SchemaSet};
11use hydrate_schema::HashSet;
12use std::sync::{Arc, Mutex};
13
14const THUMBNAIL_CACHE_SIZE: u32 = 1024;
25const STALENESS_CHECK_TIME_MILLISECONDS: u128 = 1000;
26
27pub struct ThumbnailImage {
28 pub width: u32,
29 pub height: u32,
30 pub pixel_data: Vec<u8>,
32}
33
34#[derive(Clone)]
35pub struct ThumbnailImageWithHash {
36 pub image: Arc<ThumbnailImage>,
37 pub hash: ThumbnailInputHash,
38}
39
40#[derive(Default)]
41pub struct ThumbnailState {
42 image: Option<ThumbnailImageWithHash>,
45 queued_request_input_hash: Option<ThumbnailInputHash>,
49 failed_to_load: bool,
50 last_staleness_check: Option<std::time::Instant>,
51}
52
53struct ThumbnailSystemStateInner {
54 cache: LruCache<AssetId, ThumbnailState>,
55 refreshed_thumbnails: HashSet<AssetId>,
56}
57
58#[derive(Clone)]
59pub struct ThumbnailSystemState {
60 inner: Arc<Mutex<ThumbnailSystemStateInner>>,
61}
62
63impl Default for ThumbnailSystemState {
64 fn default() -> Self {
65 ThumbnailSystemState {
66 inner: Arc::new(Mutex::new(ThumbnailSystemStateInner {
67 cache: LruCache::new(THUMBNAIL_CACHE_SIZE),
68 refreshed_thumbnails: Default::default(),
69 })),
70 }
71 }
72}
73
74impl ThumbnailSystemState {
75 pub fn take_refreshed_thumbnails(&self) -> HashSet<AssetId> {
76 let mut refreshed_thumbnails = HashSet::default();
77 let mut inner = self.inner.lock().unwrap();
78 std::mem::swap(&mut inner.refreshed_thumbnails, &mut refreshed_thumbnails);
79 refreshed_thumbnails
80 }
81
82 pub fn request(
83 &self,
84 asset_id: AssetId,
85 ) -> Option<ThumbnailImageWithHash> {
86 let mut inner = self.inner.lock().unwrap();
87 if let Some(thumbnail_state) = inner.cache.get(&asset_id, true) {
88 thumbnail_state.image.clone()
89 } else {
90 inner.cache.insert(asset_id, ThumbnailState::default());
91 None
92 }
93 }
94
95 pub fn forget(
96 &self,
97 _asset_id: AssetId,
98 ) {
99 }
100
101 pub fn forget_all(&self) {}
102}
103
104pub struct ThumbnailSystem {
105 thumbnail_system_state: ThumbnailSystemState,
107 thumbnail_provider_registry: ThumbnailProviderRegistry,
108 default_image: Arc<ThumbnailImage>,
109 thread_pool: Option<ThumbnailThreadPool>,
110 thread_pool_result_rx: Receiver<ThumbnailThreadPoolOutcome>,
111 current_requests: HashSet<ThumbnailInputHash>,
112}
113
114impl Drop for ThumbnailSystem {
115 fn drop(&mut self) {
116 let thread_pool = self.thread_pool.take().unwrap();
117 thread_pool.finish();
118 }
119}
120
121impl ThumbnailSystem {
122 pub fn system_state(&self) -> &ThumbnailSystemState {
123 &self.thumbnail_system_state
124 }
125
126 pub fn thumbnail_provider_registry(&self) -> &ThumbnailProviderRegistry {
127 &self.thumbnail_provider_registry
128 }
129
130 pub fn new(
131 hydrate_config: &HydrateProjectConfiguration,
132 thumbnail_provider_registry: ThumbnailProviderRegistry,
133 schema_set: &SchemaSet,
134 ) -> Self {
135 let default_image = ThumbnailImage {
136 width: 1,
137 height: 1,
138 pixel_data: vec![0, 0, 0, 255],
139 };
140
141 let thumbnail_api = ThumbnailApi::new(hydrate_config, schema_set);
142
143 let (thread_pool_result_tx, thread_pool_result_rx) = crossbeam_channel::unbounded();
144 let thread_pool = ThumbnailThreadPool::new(
145 thumbnail_provider_registry.clone(),
146 schema_set.clone(),
147 thumbnail_api.clone(),
148 4,
149 thread_pool_result_tx,
150 );
151
152 ThumbnailSystem {
153 thumbnail_system_state: ThumbnailSystemState::default(),
154 thumbnail_provider_registry,
155 default_image: Arc::new(default_image),
156 thread_pool: Some(thread_pool),
157 thread_pool_result_rx,
158 current_requests: Default::default(),
159 }
160 }
161
162 pub fn update(
163 &mut self,
164 data_set: &DataSet,
165 schema_set: &SchemaSet,
166 ) {
167 let now = std::time::Instant::now();
168 let mut state = self.thumbnail_system_state.inner.lock().unwrap();
169
170 let mut refreshed_thumbnails = vec![];
171
172 for (asset_id, thumbnail_state) in state
173 .cache
174 .pairs_mut()
175 .iter_mut()
176 .filter_map(|x| x.as_mut())
177 {
178 if self.current_requests.len() > 50 {
180 break;
181 }
182
183 let asset_id = *asset_id;
189
190 if thumbnail_state.queued_request_input_hash.is_some() {
192 continue;
193 }
194
195 if thumbnail_state.failed_to_load {
196 continue;
197 }
198
199 if let Some(last_staleness_check) = thumbnail_state.last_staleness_check {
200 if (now - last_staleness_check).as_millis() > STALENESS_CHECK_TIME_MILLISECONDS {
201 continue;
202 }
203 }
204
205 let Some(asset_schema) = data_set.asset_schema(asset_id) else {
207 thumbnail_state.failed_to_load = true;
208 continue;
209 };
210
211 let Some(provider) = self
212 .thumbnail_provider_registry
213 .provider_for_asset(asset_schema.fingerprint())
214 else {
215 let old_thumbnail_hash = thumbnail_state.image.as_ref().map(|x| x.hash);
216 let new_thumbnail_hash = ThumbnailInputHash::null();
217 thumbnail_state.image = Some(ThumbnailImageWithHash {
218 image: self.default_image.clone(),
219 hash: new_thumbnail_hash,
220 });
221 if old_thumbnail_hash != Some(new_thumbnail_hash) {
222 refreshed_thumbnails.push(asset_id);
223 }
224 continue;
225 };
226
227 let dependencies = provider
229 .gather_inner(asset_id, data_set, schema_set)
230 .unwrap();
231 if self
232 .current_requests
233 .contains(&dependencies.thumbnail_input_hash)
234 {
235 continue;
236 }
237
238 if thumbnail_state.image.as_ref().map(|x| x.hash)
240 == Some(dependencies.thumbnail_input_hash)
241 {
242 continue;
243 }
244
245 self.current_requests
247 .insert(dependencies.thumbnail_input_hash);
248 thumbnail_state.queued_request_input_hash = Some(dependencies.thumbnail_input_hash);
249 log::trace!("Generate thumbnail for {:?}", asset_id);
250 self.thread_pool
251 .as_ref()
252 .unwrap()
253 .add_request(ThumbnailThreadPoolRequest::RunJob(
254 ThumbnailThreadPoolRequestRunJob {
255 asset_id,
256 asset_type: asset_schema.fingerprint(),
257 dependencies: Arc::new(dependencies),
258 },
259 ));
260 }
261
262 while let Ok(result) = self.thread_pool_result_rx.try_recv() {
263 match result {
264 ThumbnailThreadPoolOutcome::RunJobComplete(msg) => {
265 self.current_requests
266 .remove(&msg.request.dependencies.thumbnail_input_hash);
267 if let Some(thumbnail_state) = state.cache.get_mut(&msg.request.asset_id, false)
268 {
269 match msg.result {
270 Ok(image) => {
271 let old_thumbnail_hash =
272 thumbnail_state.image.as_ref().map(|x| x.hash);
273 let new_thumbnail_hash =
274 msg.request.dependencies.thumbnail_input_hash;
275
276 thumbnail_state.queued_request_input_hash = None;
277 thumbnail_state.image = Some(ThumbnailImageWithHash {
278 image: Arc::new(image),
279 hash: msg.request.dependencies.thumbnail_input_hash,
280 });
281
282 if old_thumbnail_hash != Some(new_thumbnail_hash) {
283 refreshed_thumbnails.push(msg.request.asset_id);
284 }
285 }
286 Err(e) => {
287 log::warn!("Thumbnail creation failed: {:?}", e);
288 thumbnail_state.failed_to_load = true;
289 }
290 }
291 }
292 }
293 }
294 }
295
296 for refreshed_thumbnail in refreshed_thumbnails {
297 state.refreshed_thumbnails.insert(refreshed_thumbnail);
298 }
299 }
300}