visor_nannou_wgpu/texture/
capturer.rs1use crate as wgpu;
2use std::fmt;
3use std::future::Future;
4use std::sync::atomic::{self, AtomicU32};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use image::{GenericImage, GenericImageView};
9
10#[derive(Debug, Default)]
20pub struct Capturer {
21 converter_data_pair: Mutex<Option<ConverterDataPair>>,
22 thread_pool: Arc<Mutex<Option<Arc<ThreadPool>>>>,
23 workers: Option<u32>,
24 timeout: Option<Duration>,
25}
26
27#[derive(Debug)]
29struct ThreadPool {
30 active_futures: Arc<AtomicU32>,
31 workers: u32,
32 timeout: Option<Duration>,
33}
34
35pub struct Snapshot {
40 buffer: wgpu::RowPaddedBuffer,
41 thread_pool: Arc<Mutex<Option<Arc<ThreadPool>>>>,
42 workers: Option<u32>,
43 timeout: Option<Duration>,
44}
45
46pub struct AwaitWorkerTimeout<F>(pub F);
49
50#[derive(Debug)]
51struct ConverterDataPair {
52 src_descriptor: wgpu::TextureDescriptor<'static>,
53 reshaper: wgpu::TextureReshaper,
54 dst_texture: wgpu::Texture,
55}
56
57pub struct Rgba8AsyncMappedImageBuffer<'buffer>(wgpu::ImageReadMapping<'buffer>);
61
62impl ThreadPool {
63 fn spawn_when_worker_available<F>(&self, future: F) -> Result<(), AwaitWorkerTimeout<F>>
66 where
67 F: 'static + Future<Output = ()> + Send,
68 {
69 let mut start = None;
73 let mut interval_us = 128;
74 while self.active_futures() >= self.workers() {
75 if let Some(timeout) = self.timeout {
76 let start = start.get_or_insert_with(instant::Instant::now);
77 if start.elapsed() > timeout {
78 return Err(AwaitWorkerTimeout(future));
79 }
80 }
81 let duration = Duration::from_micros(interval_us);
82 std::thread::sleep(duration);
83 interval_us *= 2;
84 }
85
86 let active_futures = self.active_futures.clone();
88 let future = async move {
89 active_futures.fetch_add(1, atomic::Ordering::SeqCst);
90 future.await;
91 active_futures.fetch_sub(1, atomic::Ordering::SeqCst);
92 };
93
94 tokio::spawn(future);
95 Ok(())
96 }
97
98 fn active_futures(&self) -> u32 {
99 self.active_futures.load(atomic::Ordering::SeqCst)
100 }
101
102 fn workers(&self) -> u32 {
103 self.workers
104 }
105
106 fn await_active_futures(&self, device: &wgpu::Device) -> Result<(), AwaitWorkerTimeout<()>> {
109 let mut start = None;
110 let mut interval_us = 128;
111 while self.active_futures() > 0 {
112 if let Some(timeout) = self.timeout {
113 let start = start.get_or_insert_with(instant::Instant::now);
114 if start.elapsed() > timeout {
115 return Err(AwaitWorkerTimeout(()));
116 }
117 }
118 device.poll(wgpu::Maintain::Wait);
119 let duration = Duration::from_micros(interval_us);
120 std::thread::sleep(duration);
121 interval_us *= 2;
122 }
123 Ok(())
124 }
125}
126
127impl Capturer {
128 pub const DST_FORMAT: wgpu::TextureFormat = wgpu::TextureFormat::Rgba8UnormSrgb;
130
131 pub fn new(workers: Option<u32>, timeout: Option<Duration>) -> Self {
148 Capturer {
149 converter_data_pair: Default::default(),
150 thread_pool: Default::default(),
151 workers,
152 timeout,
153 }
154 }
155
156 pub fn active_snapshots(&self) -> u32 {
165 if let Ok(guard) = self.thread_pool.lock() {
166 if let Some(tp) = guard.as_ref() {
167 return tp.active_futures.load(atomic::Ordering::SeqCst);
168 }
169 }
170 0
171 }
172
173 pub fn workers(&self) -> u32 {
176 if let Ok(guard) = self.thread_pool.lock() {
177 if let Some(tp) = guard.as_ref() {
178 return tp.workers();
179 }
180 }
181 self.workers.unwrap_or(num_cpus::get() as u32)
182 }
183
184 pub fn capture(
186 &self,
187 device: &wgpu::Device,
188 encoder: &mut wgpu::CommandEncoder,
189 src_texture: &wgpu::Texture,
190 ) -> Snapshot {
191 let buffer = if src_texture.format() != Self::DST_FORMAT {
192 let mut converter_data_pair = self
193 .converter_data_pair
194 .lock()
195 .expect("failed to lock converter");
196
197 let converter_data_pair = converter_data_pair
199 .get_or_insert_with(|| create_converter_data_pair(device, src_texture));
200
201 if !wgpu::texture_descriptor_eq(
203 src_texture.descriptor(),
204 &converter_data_pair.src_descriptor,
205 ) {
206 *converter_data_pair = create_converter_data_pair(device, src_texture);
207 }
208
209 let dst_view = converter_data_pair.dst_texture.view();
211 converter_data_pair
212 .reshaper
213 .encode_render_pass(&dst_view.build(), encoder);
214
215 converter_data_pair.dst_texture.to_buffer(device, encoder)
216 } else {
217 src_texture.to_buffer(device, encoder)
218 };
219
220 Snapshot {
221 buffer,
222 thread_pool: self.thread_pool.clone(),
223 workers: self.workers,
224 timeout: self.timeout,
225 }
226 }
227
228 pub fn await_active_snapshots(
231 &self,
232 device: &wgpu::Device,
233 ) -> Result<(), AwaitWorkerTimeout<()>> {
234 if let Ok(guard) = self.thread_pool.lock() {
235 if let Some(tp) = guard.as_ref() {
236 return tp.await_active_futures(device);
237 }
238 }
239 Ok(())
240 }
241}
242
243impl Snapshot {
244 pub async fn read_async<'buffer>(
246 &'buffer self,
247 ) -> Result<Rgba8AsyncMappedImageBuffer<'buffer>, wgpu::BufferAsyncError> {
248 let mapping = self.buffer.read().await?;
249 Ok(Rgba8AsyncMappedImageBuffer(mapping))
250 }
251
252 pub fn read<F>(self, callback: F) -> Result<(), AwaitWorkerTimeout<impl Future<Output = ()>>>
268 where
269 F: 'static + Send + FnOnce(Result<Rgba8AsyncMappedImageBuffer, wgpu::BufferAsyncError>),
270 {
271 let thread_pool = self.thread_pool();
272 let read_future = async move {
273 let res = self.read_async().await;
274 callback(res);
275 };
276 thread_pool.spawn_when_worker_available(read_future)
277 }
278
279 fn thread_pool(&self) -> Arc<ThreadPool> {
280 let mut guard = self
281 .thread_pool
282 .lock()
283 .expect("failed to acquire thread handle");
284 let thread_pool = guard.get_or_insert_with(|| {
285 let workers = self.workers.unwrap_or(num_cpus::get() as u32);
286 let thread_pool = ThreadPool {
287 active_futures: Arc::new(AtomicU32::new(0)),
288 workers,
289 timeout: self.timeout,
290 };
291 Arc::new(thread_pool)
292 });
293 thread_pool.clone()
294 }
295}
296
297impl<'b> Rgba8AsyncMappedImageBuffer<'b> {
298 pub fn as_image(&self) -> image::SubImage<wgpu::ImageHolder<image::Rgba<u8>>> {
299 unsafe { self.0.as_image::<image::Rgba<u8>>() }
301 }
302 pub fn to_owned(&self) -> image::ImageBuffer<image::Rgba<u8>, Vec<u8>> {
304 let view = self.as_image();
305 let mut result = image::ImageBuffer::new(view.width(), view.height());
306 result
307 .copy_from(&view, 0, 0)
308 .expect("nannou internal error: image copy failed");
309 result
310 }
311}
312
313impl<T> std::error::Error for AwaitWorkerTimeout<T> {}
314
315impl<T> fmt::Debug for AwaitWorkerTimeout<T> {
316 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
317 f.debug_struct("AwaitWorkerTimeout").finish()
318 }
319}
320
321impl<T> fmt::Display for AwaitWorkerTimeout<T> {
322 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
323 f.debug_struct("AwaitWorkerTimeout").finish()
324 }
325}
326
327fn create_converter_data_pair(
329 device: &wgpu::Device,
330 src_texture: &wgpu::Texture,
331) -> ConverterDataPair {
332 let dst_texture = wgpu::TextureBuilder::from(src_texture.descriptor.clone())
334 .sample_count(1)
335 .format(Capturer::DST_FORMAT)
336 .usage(wgpu::TextureUsages::RENDER_ATTACHMENT | wgpu::TextureUsages::COPY_SRC)
337 .build(device);
338
339 let src_sample_count = src_texture.sample_count();
341 let src_sample_type = src_texture.sample_type();
342 let src_view = src_texture.create_view(&wgpu::TextureViewDescriptor::default());
343 let dst_sample_count = 1;
344 let dst_format = dst_texture.format();
345 let reshaper = wgpu::TextureReshaper::new(
346 device,
347 &src_view,
348 src_sample_count,
349 src_sample_type,
350 dst_sample_count,
351 dst_format,
352 );
353
354 let src_descriptor = src_texture.descriptor.clone();
356
357 ConverterDataPair {
358 src_descriptor,
359 reshaper,
360 dst_texture,
361 }
362}