visor_nannou_wgpu/texture/
capturer.rs

1use crate as wgpu;
2use std::fmt;
3use std::future::Future;
4use std::sync::atomic::{self, AtomicU32};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use image::{GenericImage, GenericImageView};
9
10/// A type dedicated to capturing a texture as a non-linear sRGBA image that can be read on the
11/// CPU.
12///
13/// Calling **capture** will return a **Snapshot** that may be read after the given command encoder
14/// has been submitted. **Snapshot**s can be read on the current thread via **read** or on a thread
15/// pool via **read_threaded**.
16///
17/// If the **Capturer** is dropped while threaded callbacks are still being processed, the drop
18/// implementation will block the current thread.
19#[derive(Debug, Default)]
20pub struct Capturer {
21    converter_data_pair: Mutex<Option<ConverterDataPair>>,
22    thread_pool: Arc<Mutex<Option<Arc<ThreadPool>>>>,
23    workers: Option<u32>,
24    timeout: Option<Duration>,
25}
26
27/// A wrapper around the futures thread pool that counts active futures.
28#[derive(Debug)]
29struct ThreadPool {
30    active_futures: Arc<AtomicU32>,
31    workers: u32,
32    timeout: Option<Duration>,
33}
34
35/// A snapshot captured by a **Capturer**.
36///
37/// A snapshot is a thin wrapper around a **wgpu::BufferImage** that knows that the image format is
38/// specifically non-linear sRGBA8.
39pub struct Snapshot {
40    buffer: wgpu::RowPaddedBuffer,
41    thread_pool: Arc<Mutex<Option<Arc<ThreadPool>>>>,
42    workers: Option<u32>,
43    timeout: Option<Duration>,
44}
45
46/// An error indicating that the threadpool timed out while waiting for a worker to become
47/// available.
48pub struct AwaitWorkerTimeout<F>(pub F);
49
50#[derive(Debug)]
51struct ConverterDataPair {
52    src_descriptor: wgpu::TextureDescriptor<'static>,
53    reshaper: wgpu::TextureReshaper,
54    dst_texture: wgpu::Texture,
55}
56
57/// A wrapper around a slice of bytes representing a non-linear sRGBA image.
58///
59/// Can be read from a captured `Snapshot`.
60pub struct Rgba8AsyncMappedImageBuffer<'buffer>(wgpu::ImageReadMapping<'buffer>);
61
62impl ThreadPool {
63    /// Spawns the given future if a worker is available. Otherwise, blocks and waits for a worker
64    /// to become available before spawning the future.
65    fn spawn_when_worker_available<F>(&self, future: F) -> Result<(), AwaitWorkerTimeout<F>>
66    where
67        F: 'static + Future<Output = ()> + Send,
68    {
69        // Wait until the number of active futures is less than the number of threads.
70        // If we don't wait, the capture futures may quickly fall far behind the main
71        // swapchain thread resulting in an out of memory error.
72        let mut start = None;
73        let mut interval_us = 128;
74        while self.active_futures() >= self.workers() {
75            if let Some(timeout) = self.timeout {
76                let start = start.get_or_insert_with(instant::Instant::now);
77                if start.elapsed() > timeout {
78                    return Err(AwaitWorkerTimeout(future));
79                }
80            }
81            let duration = Duration::from_micros(interval_us);
82            std::thread::sleep(duration);
83            interval_us *= 2;
84        }
85
86        // Wrap the future with the counter.
87        let active_futures = self.active_futures.clone();
88        let future = async move {
89            active_futures.fetch_add(1, atomic::Ordering::SeqCst);
90            future.await;
91            active_futures.fetch_sub(1, atomic::Ordering::SeqCst);
92        };
93
94        tokio::spawn(future);
95        Ok(())
96    }
97
98    fn active_futures(&self) -> u32 {
99        self.active_futures.load(atomic::Ordering::SeqCst)
100    }
101
102    fn workers(&self) -> u32 {
103        self.workers
104    }
105
106    /// Await for the completion of all active futures, polling the device as necessary until all
107    /// futures have completed.
108    fn await_active_futures(&self, device: &wgpu::Device) -> Result<(), AwaitWorkerTimeout<()>> {
109        let mut start = None;
110        let mut interval_us = 128;
111        while self.active_futures() > 0 {
112            if let Some(timeout) = self.timeout {
113                let start = start.get_or_insert_with(instant::Instant::now);
114                if start.elapsed() > timeout {
115                    return Err(AwaitWorkerTimeout(()));
116                }
117            }
118            device.poll(wgpu::Maintain::Wait);
119            let duration = Duration::from_micros(interval_us);
120            std::thread::sleep(duration);
121            interval_us *= 2;
122        }
123        Ok(())
124    }
125}
126
127impl Capturer {
128    /// The format to which textures will be converted before being mapped back to the CPU.
129    pub const DST_FORMAT: wgpu::TextureFormat = wgpu::TextureFormat::Rgba8UnormSrgb;
130
131    /// Create a new **TextureCapturer**.
132    ///
133    /// Note that a **TextureCapturer** must only be used with a single texture. If you require
134    /// capturing multiple textures, you may create multiple **TextureCapturers**.
135    ///
136    /// `workers` refers to the number of worker threads used to await GPU buffers to be mapped for
137    /// reading and for running user callbacks. If `None` is specified, a threadpool will be
138    /// spawned with a number of threads equal to the number of CPUs available on the system.
139    ///
140    /// `timeout` specifies how long to block and wait for an available worker in the case that all
141    /// workers are busy at the time a `Snapshot::read` occurs. If `None` is specified, calls to
142    /// `Snapshot::read` will never time out (the default behaviour).
143    ///
144    /// Note that the specified parameters are only relevant to calls to `Snapshot::read`. In the
145    /// case that the user uses `Snapshot::read_async`, it is the responsibility of the user to
146    /// execute the future.
147    pub fn new(workers: Option<u32>, timeout: Option<Duration>) -> Self {
148        Capturer {
149            converter_data_pair: Default::default(),
150            thread_pool: Default::default(),
151            workers,
152            timeout,
153        }
154    }
155
156    /// The number of futures currently running on the inner `ThreadPool`.
157    ///
158    /// Note that futures are only run on the threadpool when the `Snapshot::read` method is used.
159    /// In the case that `Snapshot::read_async` is used it is up to the user to track their
160    /// futures.
161    ///
162    /// If the inner thread pool mutex has been poisoned, or if the thread pool has not been
163    /// created due to no calls to `read`, this will return `0`.
164    pub fn active_snapshots(&self) -> u32 {
165        if let Ok(guard) = self.thread_pool.lock() {
166            if let Some(tp) = guard.as_ref() {
167                return tp.active_futures.load(atomic::Ordering::SeqCst);
168            }
169        }
170        0
171    }
172
173    /// The number of worker threads used to await GPU buffers to be mapped for reading and for
174    /// running user callbacks.
175    pub fn workers(&self) -> u32 {
176        if let Ok(guard) = self.thread_pool.lock() {
177            if let Some(tp) = guard.as_ref() {
178                return tp.workers();
179            }
180        }
181        self.workers.unwrap_or(num_cpus::get() as u32)
182    }
183
184    /// Capture the given texture at the state of the given command encoder.
185    pub fn capture(
186        &self,
187        device: &wgpu::Device,
188        encoder: &mut wgpu::CommandEncoder,
189        src_texture: &wgpu::Texture,
190    ) -> Snapshot {
191        let buffer = if src_texture.format() != Self::DST_FORMAT {
192            let mut converter_data_pair = self
193                .converter_data_pair
194                .lock()
195                .expect("failed to lock converter");
196
197            // Create converter and target texture if they don't exist.
198            let converter_data_pair = converter_data_pair
199                .get_or_insert_with(|| create_converter_data_pair(device, src_texture));
200
201            // If the texture has changed in some way, recreate the converter.
202            if !wgpu::texture_descriptor_eq(
203                src_texture.descriptor(),
204                &converter_data_pair.src_descriptor,
205            ) {
206                *converter_data_pair = create_converter_data_pair(device, src_texture);
207            }
208
209            // Encode the texture format conversion.
210            let dst_view = converter_data_pair.dst_texture.view();
211            converter_data_pair
212                .reshaper
213                .encode_render_pass(&dst_view.build(), encoder);
214
215            converter_data_pair.dst_texture.to_buffer(device, encoder)
216        } else {
217            src_texture.to_buffer(device, encoder)
218        };
219
220        Snapshot {
221            buffer,
222            thread_pool: self.thread_pool.clone(),
223            workers: self.workers,
224            timeout: self.timeout,
225        }
226    }
227
228    /// Await for the completion of all `Snapshot::read` active futures, polling the device as
229    /// necessary until all futures have reached completion or until a timeout is reached.
230    pub fn await_active_snapshots(
231        &self,
232        device: &wgpu::Device,
233    ) -> Result<(), AwaitWorkerTimeout<()>> {
234        if let Ok(guard) = self.thread_pool.lock() {
235            if let Some(tp) = guard.as_ref() {
236                return tp.await_active_futures(device);
237            }
238        }
239        Ok(())
240    }
241}
242
243impl Snapshot {
244    /// Reads the non-linear sRGBA image from mapped memory and convert it to an owned buffer.
245    pub async fn read_async<'buffer>(
246        &'buffer self,
247    ) -> Result<Rgba8AsyncMappedImageBuffer<'buffer>, wgpu::BufferAsyncError> {
248        let mapping = self.buffer.read().await?;
249        Ok(Rgba8AsyncMappedImageBuffer(mapping))
250    }
251
252    /// The same as `read_async`, but runs the resulting future on an inner threadpool and calls
253    /// the given callback with the mapped image buffer once complete.
254    ///
255    /// Note: The given callback will not be called until the memory is mapped and the device is
256    /// polled. You should not rely on the callback being called immediately.
257    ///
258    /// Note: The given callback will be called on the inner thread pool and will not be called on
259    /// the current thread.
260    ///
261    /// Note: **This method may block** if the associated `wgpu::TextureCapturer` has an
262    /// `active_futures` count that is greater than the number of worker threads with which it was
263    /// created. This is necessary in order to avoid "out of memory" errors resulting from an
264    /// accumulating queue of pending texture buffers waiting to be mapped. To avoid blocking, you
265    /// can try using a higher thread count, capturing a smaller texture, or using `read_async`
266    /// instead and running the resulting future on a custom runtime or threadpool.
267    pub fn read<F>(self, callback: F) -> Result<(), AwaitWorkerTimeout<impl Future<Output = ()>>>
268    where
269        F: 'static + Send + FnOnce(Result<Rgba8AsyncMappedImageBuffer, wgpu::BufferAsyncError>),
270    {
271        let thread_pool = self.thread_pool();
272        let read_future = async move {
273            let res = self.read_async().await;
274            callback(res);
275        };
276        thread_pool.spawn_when_worker_available(read_future)
277    }
278
279    fn thread_pool(&self) -> Arc<ThreadPool> {
280        let mut guard = self
281            .thread_pool
282            .lock()
283            .expect("failed to acquire thread handle");
284        let thread_pool = guard.get_or_insert_with(|| {
285            let workers = self.workers.unwrap_or(num_cpus::get() as u32);
286            let thread_pool = ThreadPool {
287                active_futures: Arc::new(AtomicU32::new(0)),
288                workers,
289                timeout: self.timeout,
290            };
291            Arc::new(thread_pool)
292        });
293        thread_pool.clone()
294    }
295}
296
297impl<'b> Rgba8AsyncMappedImageBuffer<'b> {
298    pub fn as_image(&self) -> image::SubImage<wgpu::ImageHolder<image::Rgba<u8>>> {
299        // safe: we know it's Rgba<u8>
300        unsafe { self.0.as_image::<image::Rgba<u8>>() }
301    }
302    /// Convert the mapped image buffer to an owned buffer.
303    pub fn to_owned(&self) -> image::ImageBuffer<image::Rgba<u8>, Vec<u8>> {
304        let view = self.as_image();
305        let mut result = image::ImageBuffer::new(view.width(), view.height());
306        result
307            .copy_from(&view, 0, 0)
308            .expect("nannou internal error: image copy failed");
309        result
310    }
311}
312
313impl<T> std::error::Error for AwaitWorkerTimeout<T> {}
314
315impl<T> fmt::Debug for AwaitWorkerTimeout<T> {
316    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
317        f.debug_struct("AwaitWorkerTimeout").finish()
318    }
319}
320
321impl<T> fmt::Display for AwaitWorkerTimeout<T> {
322    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
323        f.debug_struct("AwaitWorkerTimeout").finish()
324    }
325}
326
327// Create the format converter and the target texture.
328fn create_converter_data_pair(
329    device: &wgpu::Device,
330    src_texture: &wgpu::Texture,
331) -> ConverterDataPair {
332    // Create the destination format texture.
333    let dst_texture = wgpu::TextureBuilder::from(src_texture.descriptor.clone())
334        .sample_count(1)
335        .format(Capturer::DST_FORMAT)
336        .usage(wgpu::TextureUsages::RENDER_ATTACHMENT | wgpu::TextureUsages::COPY_SRC)
337        .build(device);
338
339    // Create the converter.
340    let src_sample_count = src_texture.sample_count();
341    let src_sample_type = src_texture.sample_type();
342    let src_view = src_texture.create_view(&wgpu::TextureViewDescriptor::default());
343    let dst_sample_count = 1;
344    let dst_format = dst_texture.format();
345    let reshaper = wgpu::TextureReshaper::new(
346        device,
347        &src_view,
348        src_sample_count,
349        src_sample_type,
350        dst_sample_count,
351        dst_format,
352    );
353
354    // Keep track of the `src_descriptor` to check if we need to recreate the converter.
355    let src_descriptor = src_texture.descriptor.clone();
356
357    ConverterDataPair {
358        src_descriptor,
359        reshaper,
360        dst_texture,
361    }
362}