smithay 0.7.0

Smithay is a library for writing wayland compositors.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
//! Module for [dmabuf](https://docs.kernel.org/driver-api/dma-buf.html) buffers.
//!
//! `Dmabuf`s act alike to smart pointers and can be freely cloned and passed around.
//! Once the last `Dmabuf` reference is dropped, its file descriptor is closed and
//! underlying resources are freed.
//!
//! If you want to hold on to a potentially alive dmabuf without blocking the free up
//! of the underlying resources, you may `downgrade` a `Dmabuf` reference to a `WeakDmabuf`.
//!
//! This can be especially useful in resources where other parts of the stack should decide upon
//! the lifetime of the buffer. E.g. when you are only caching associated resources for a dmabuf.

use calloop::generic::Generic;
use calloop::{EventSource, Interest, Mode, PostAction};
use rustix::ioctl::Setter;

use super::{Allocator, Buffer, Format, Fourcc, Modifier};
#[cfg(feature = "backend_drm")]
use crate::backend::drm::DrmNode;
use crate::utils::{Buffer as BufferCoords, Size};
#[cfg(feature = "wayland_frontend")]
use crate::wayland::compositor::{Blocker, BlockerState};
use std::hash::{Hash, Hasher};
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "backend_drm")]
use std::sync::Mutex;
use std::sync::{Arc, Weak};
use std::{error, fmt};

/// Maximum amount of planes this implementation supports
pub const MAX_PLANES: usize = 4;

#[derive(Debug)]
pub(crate) struct DmabufInternal {
    /// The submitted planes
    pub planes: Vec<Plane>,
    /// The size of this buffer
    pub size: Size<i32, BufferCoords>,
    /// The format in use
    pub format: Fourcc,
    /// Format modifier
    pub modifier: Modifier,
    /// The flags applied to it
    ///
    /// This is a bitflag, to be compared with the `Flags` enum re-exported by this module.
    pub flags: DmabufFlags,
    /// Presumably compatible device for buffer import
    ///
    /// This is inferred from client apis, however there is no kernel api or guarantee this is correct
    #[cfg(feature = "backend_drm")]
    node: Mutex<Option<DrmNode>>,
}

#[derive(Debug)]
pub(crate) struct Plane {
    pub fd: OwnedFd,
    /// The plane index
    pub plane_idx: u32,
    /// Offset from the start of the Fd
    pub offset: u32,
    /// Stride for this plane
    pub stride: u32,
}

impl From<Plane> for OwnedFd {
    #[inline]
    fn from(plane: Plane) -> OwnedFd {
        plane.fd
    }
}

bitflags::bitflags! {
    /// Possible flags for a DMA buffer
    #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
    pub struct DmabufFlags: u32 {
        /// The buffer content is Y-inverted
        const Y_INVERT = 1;
        /// The buffer content is interlaced
        const INTERLACED = 2;
        /// The buffer content if interlaced is bottom-field first
        const BOTTOM_FIRST = 4;
    }
}

#[derive(Debug, Clone)]
/// Strong reference to a dmabuf handle
pub struct Dmabuf(pub(crate) Arc<DmabufInternal>);

#[derive(Debug, Clone)]
/// Weak reference to a dmabuf handle
pub struct WeakDmabuf(pub(crate) Weak<DmabufInternal>);

// A reference to a particular dmabuf plane fd, so it can be used as a calloop source.
#[derive(Debug)]
struct PlaneRef {
    dmabuf: Dmabuf,
    idx: usize,
}

impl AsFd for PlaneRef {
    #[inline]
    fn as_fd(&self) -> BorrowedFd<'_> {
        self.dmabuf.0.planes[self.idx].fd.as_fd()
    }
}

impl PartialEq for Dmabuf {
    #[inline]
    fn eq(&self, other: &Self) -> bool {
        Arc::ptr_eq(&self.0, &other.0)
    }
}
impl Eq for Dmabuf {}

impl PartialEq for WeakDmabuf {
    #[inline]
    fn eq(&self, other: &Self) -> bool {
        Weak::ptr_eq(&self.0, &other.0)
    }
}
impl Eq for WeakDmabuf {}

impl Hash for Dmabuf {
    #[inline]
    fn hash<H: Hasher>(&self, state: &mut H) {
        Arc::as_ptr(&self.0).hash(state)
    }
}
impl Hash for WeakDmabuf {
    #[inline]
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.0.as_ptr().hash(state)
    }
}

impl Buffer for Dmabuf {
    #[inline]
    fn size(&self) -> Size<i32, BufferCoords> {
        self.0.size
    }

    #[inline]
    fn format(&self) -> Format {
        Format {
            code: self.0.format,
            modifier: self.0.modifier,
        }
    }
}

/// Builder for Dmabufs
#[derive(Debug)]
pub struct DmabufBuilder {
    internal: DmabufInternal,
}

impl DmabufBuilder {
    /// Add a plane to the constructed Dmabuf
    ///
    /// *Note*: Each Dmabuf needs at least one plane.
    /// MAX_PLANES notes the maximum amount of planes any format may use with this implementation.
    pub fn add_plane(&mut self, fd: OwnedFd, idx: u32, offset: u32, stride: u32) -> bool {
        if self.internal.planes.len() == MAX_PLANES {
            return false;
        }
        self.internal.planes.push(Plane {
            fd,
            plane_idx: idx,
            offset,
            stride,
        });

        true
    }

    /// Sets a known good node to import the dmabuf with.
    ///
    /// While this is only a strong hint and no guarantee, implementations
    /// should avoid setting this at all, if they can't be reasonably certain.
    #[cfg(feature = "backend_drm")]
    pub fn set_node(&mut self, node: DrmNode) {
        self.internal.node = Mutex::new(Some(node));
    }

    /// Build a `Dmabuf` out of the provided parameters and planes
    ///
    /// Returns `None` if the builder has no planes attached.
    pub fn build(mut self) -> Option<Dmabuf> {
        if self.internal.planes.is_empty() {
            return None;
        }

        self.internal.planes.sort_by_key(|plane| plane.plane_idx);
        Some(Dmabuf(Arc::new(self.internal)))
    }
}

impl Dmabuf {
    /// Create a new Dmabuf by initializing with values from an existing buffer
    ///
    // Note: the `src` Buffer is only used a reference for size and format.
    // The contents are determined by the provided file descriptors, which
    // do not need to refer to the same buffer `src` does.
    pub fn builder_from_buffer(src: &impl Buffer, flags: DmabufFlags) -> DmabufBuilder {
        Self::builder(src.size(), src.format().code, src.format().modifier, flags)
    }

    /// Create a new Dmabuf builder
    pub fn builder(
        size: impl Into<Size<i32, BufferCoords>>,
        format: Fourcc,
        modifier: Modifier,
        flags: DmabufFlags,
    ) -> DmabufBuilder {
        DmabufBuilder {
            internal: DmabufInternal {
                planes: Vec::with_capacity(MAX_PLANES),
                size: size.into(),
                format,
                modifier,
                flags,
                #[cfg(feature = "backend_drm")]
                node: Mutex::new(None),
            },
        }
    }

    /// The amount of planes this Dmabuf has
    pub fn num_planes(&self) -> usize {
        self.0.planes.len()
    }

    /// Returns raw handles of the planes of this buffer
    pub fn handles(&self) -> impl Iterator<Item = BorrowedFd<'_>> + '_ {
        self.0.planes.iter().map(|p| p.fd.as_fd())
    }

    /// Returns offsets for the planes of this buffer
    pub fn offsets(&self) -> impl Iterator<Item = u32> + '_ {
        self.0.planes.iter().map(|p| p.offset)
    }

    /// Returns strides for the planes of this buffer
    pub fn strides(&self) -> impl Iterator<Item = u32> + '_ {
        self.0.planes.iter().map(|p| p.stride)
    }

    /// Returns if this buffer format has any vendor-specific modifiers set or is implicit/linear
    pub fn has_modifier(&self) -> bool {
        self.0.modifier != Modifier::Invalid && self.0.modifier != Modifier::Linear
    }

    /// Returns if the buffer is stored inverted on the y-axis
    pub fn y_inverted(&self) -> bool {
        self.0.flags.contains(DmabufFlags::Y_INVERT)
    }

    /// Create a weak reference to this dmabuf
    pub fn weak(&self) -> WeakDmabuf {
        WeakDmabuf(Arc::downgrade(&self.0))
    }

    /// Presumably compatible device for buffer import
    ///
    /// This is inferred from client apis, as there is no kernel api or other guarantee this is correct,
    /// so it should only be treated as a hint.
    #[cfg(feature = "backend_drm")]
    pub fn node(&self) -> Option<DrmNode> {
        *self.0.node.lock().unwrap()
    }

    /// Sets or unsets any node set for this dmabuf (see [`Dmabuf::node`]).
    ///
    /// May alter behavior of other parts of smithay using this as a hint.
    #[cfg(feature = "backend_drm")]
    pub fn set_node(&self, node: impl Into<Option<DrmNode>>) {
        *self.0.node.lock().unwrap() = node.into();
    }

    /// Create an [`calloop::EventSource`] and [`Blocker`] for this [`Dmabuf`].
    ///
    /// Usually used to block applying surface state on the readiness of an attached dmabuf.
    #[cfg(feature = "wayland_frontend")]
    #[profiling::function]
    pub fn generate_blocker(
        &self,
        interest: Interest,
    ) -> Result<(DmabufBlocker, DmabufSource), AlreadyReady> {
        let source = DmabufSource::new(self.clone(), interest)?;
        let blocker = DmabufBlocker(source.signal.clone());
        Ok((blocker, source))
    }

    /// Map the plane at specified index with the specified mode
    ///
    /// Returns `Err` if the plane with the specified index does not exist or
    /// mmap failed
    pub fn map_plane(
        &self,
        idx: usize,
        mode: DmabufMappingMode,
    ) -> Result<DmabufMapping, DmabufMappingFailed> {
        let plane = self
            .0
            .planes
            .get(idx)
            .ok_or(DmabufMappingFailed::PlaneIndexOutOfBound)?;

        let size = rustix::fs::seek(&plane.fd, rustix::fs::SeekFrom::End(0)).map_err(std::io::Error::from)?;
        rustix::fs::seek(&plane.fd, rustix::fs::SeekFrom::Start(0)).map_err(std::io::Error::from)?;

        let len = (size - plane.offset as u64) as usize;
        let ptr = unsafe {
            rustix::mm::mmap(
                std::ptr::null_mut(),
                len,
                mode.into(),
                rustix::mm::MapFlags::SHARED,
                &plane.fd,
                plane.offset as u64,
            )
        }
        .map_err(std::io::Error::from)?;
        Ok(DmabufMapping { len, ptr })
    }

    /// Synchronize access for the plane at the specified index
    ///
    /// Returns `Err` if the plane with the specified index does not exist or
    /// the dmabuf_sync ioctl failed
    pub fn sync_plane(&self, idx: usize, flags: DmabufSyncFlags) -> Result<(), DmabufSyncFailed> {
        let plane = self
            .0
            .planes
            .get(idx)
            .ok_or(DmabufSyncFailed::PlaneIndexOutOfBound)?;
        unsafe { rustix::ioctl::ioctl(&plane.fd, Setter::<DMA_BUF_SYNC, _>::new(dma_buf_sync { flags })) }
            .map_err(std::io::Error::from)?;
        Ok(())
    }
}

bitflags::bitflags! {
    /// Modes of mapping a dmabuf plane
    #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
    pub struct DmabufMappingMode: u32 {
        /// Map the dmabuf readable
        const READ = 0b00000001;
        /// Map the dmabuf writable
        const WRITE = 0b00000010;
    }
}

impl From<DmabufMappingMode> for rustix::mm::ProtFlags {
    #[inline]
    fn from(mode: DmabufMappingMode) -> Self {
        let mut flags = rustix::mm::ProtFlags::empty();

        if mode.contains(DmabufMappingMode::READ) {
            flags |= rustix::mm::ProtFlags::READ;
        }

        if mode.contains(DmabufMappingMode::WRITE) {
            flags |= rustix::mm::ProtFlags::WRITE;
        }

        flags
    }
}

/// Dmabuf mapping errors
#[derive(Debug, thiserror::Error)]
#[error("Mapping the dmabuf failed")]
pub enum DmabufMappingFailed {
    /// The supplied index for the plane is out of bounds
    #[error("The supplied index for the plane is out of bounds")]
    PlaneIndexOutOfBound,
    /// Io error during map operation
    Io(#[from] std::io::Error),
}

/// Dmabuf sync errors
#[derive(Debug, thiserror::Error)]
#[error("Sync of the dmabuf failed")]
pub enum DmabufSyncFailed {
    /// The supplied index for the plane is out of bounds
    #[error("The supplied index for the plane is out of bounds")]
    PlaneIndexOutOfBound,
    /// Io error during sync operation
    Io(#[from] std::io::Error),
}

bitflags::bitflags! {
    /// Flags for the [`Dmabuf::sync_plane`](Dmabuf::sync_plane) operation
    #[derive(Copy, Clone)]
    pub struct DmabufSyncFlags: std::ffi::c_ulonglong {
        /// Read from the dmabuf
        const READ = 1 << 0;
        /// Write to the dmabuf
        #[allow(clippy::identity_op)]
        const WRITE = 2 << 0;
        /// Start of read/write
        const START = 0 << 2;
        /// End of read/write
        const END = 1 << 2;
    }
}

#[repr(C)]
#[allow(non_camel_case_types)]
struct dma_buf_sync {
    flags: DmabufSyncFlags,
}

const DMA_BUF_SYNC: rustix::ioctl::Opcode = rustix::ioctl::opcode::write::<dma_buf_sync>(b'b', 0);

/// A mapping into a [`Dmabuf`]
#[derive(Debug)]
pub struct DmabufMapping {
    ptr: *mut std::ffi::c_void,
    len: usize,
}

impl DmabufMapping {
    /// Access the raw pointer of the mapping
    pub fn ptr(&self) -> *mut std::ffi::c_void {
        self.ptr
    }

    /// Access the length of the mapping
    pub fn length(&self) -> usize {
        self.len
    }
}

// SAFETY: The caller is responsible for accessing the data without assuming
// another process isn't mutating it, regardless of how many threads this is
// referenced in.
unsafe impl Send for DmabufMapping {}
unsafe impl Sync for DmabufMapping {}

impl Drop for DmabufMapping {
    fn drop(&mut self) {
        let _ = unsafe { rustix::mm::munmap(self.ptr, self.len) };
    }
}

impl WeakDmabuf {
    /// Try to upgrade to a strong reference of this buffer.
    ///
    /// Fails if no strong references exist anymore and the handle was already closed.
    #[inline]
    pub fn upgrade(&self) -> Option<Dmabuf> {
        self.0.upgrade().map(Dmabuf)
    }

    /// Returns true if there are not any strong references remaining
    #[inline]
    pub fn is_gone(&self) -> bool {
        self.0.strong_count() == 0
    }
}

/// Buffer that can be exported as Dmabufs
pub trait AsDmabuf {
    /// Error type returned, if exporting fails
    type Error: std::error::Error;

    /// Export this buffer as a new Dmabuf
    fn export(&self) -> Result<Dmabuf, Self::Error>;
}

impl AsDmabuf for Dmabuf {
    type Error = std::convert::Infallible;

    #[inline]
    fn export(&self) -> Result<Dmabuf, std::convert::Infallible> {
        Ok(self.clone())
    }
}

/// Type erased error
#[derive(Debug)]
pub struct AnyError(Box<dyn error::Error + Send + Sync>);

impl fmt::Display for AnyError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        self.0.fmt(f)
    }
}

impl error::Error for AnyError {
    #[inline]
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
        Some(&*self.0)
    }
}

/// Wrapper for Allocators, whos buffer types implement [`AsDmabuf`].
///
/// Implements `Allocator<Buffer=Dmabuf, Error=AnyError>`
#[derive(Debug)]
pub struct DmabufAllocator<A>(pub A)
where
    A: Allocator,
    <A as Allocator>::Buffer: AsDmabuf + 'static,
    <A as Allocator>::Error: 'static;

impl<A> Allocator for DmabufAllocator<A>
where
    A: Allocator,
    <A as Allocator>::Buffer: AsDmabuf + 'static,
    <A as Allocator>::Error: Send + Sync + 'static,
    <<A as Allocator>::Buffer as AsDmabuf>::Error: Send + Sync + 'static,
{
    type Buffer = Dmabuf;
    type Error = AnyError;

    #[profiling::function]
    fn create_buffer(
        &mut self,
        width: u32,
        height: u32,
        fourcc: Fourcc,
        modifiers: &[Modifier],
    ) -> Result<Self::Buffer, Self::Error> {
        self.0
            .create_buffer(width, height, fourcc, modifiers)
            .map_err(|err| AnyError(err.into()))
            .and_then(|b| AsDmabuf::export(&b).map_err(|err| AnyError(err.into())))
    }
}

/// [`Blocker`] implementation for an accompaning [`DmabufSource`]
#[cfg(feature = "wayland_frontend")]
#[derive(Debug)]
pub struct DmabufBlocker(Arc<AtomicBool>);

#[cfg(feature = "wayland_frontend")]
impl Blocker for DmabufBlocker {
    fn state(&self) -> BlockerState {
        if self.0.load(Ordering::SeqCst) {
            BlockerState::Released
        } else {
            BlockerState::Pending
        }
    }
}

#[derive(Debug)]
enum Subsource {
    Active(Generic<PlaneRef, std::io::Error>),
    Done(Generic<PlaneRef, std::io::Error>),
    Empty,
}

impl Subsource {
    fn done(&mut self) {
        let mut this = Subsource::Empty;
        std::mem::swap(self, &mut this);
        match this {
            Subsource::Done(source) | Subsource::Active(source) => {
                *self = Subsource::Done(source);
            }
            _ => {}
        }
    }
}

/// [`Dmabuf`]-based event source. Can be used to monitor implicit fences of a dmabuf.
#[derive(Debug)]
pub struct DmabufSource {
    dmabuf: Dmabuf,
    signal: Arc<AtomicBool>,
    sources: [Subsource; 4],
}

#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[error("Dmabuf is already ready for the given interest")]
/// Dmabuf is already ready for the given interest
pub struct AlreadyReady;

impl DmabufSource {
    /// Creates a new [`DmabufSource`] from a [`Dmabuf`] and interest.
    ///
    /// This event source will monitor the implicit fences of the given dmabuf.
    /// Monitoring for READ-access will monitor the state of the most recent write or exclusive fence.
    /// Monitoring for WRITE-access, will monitor state of all attached fences, shared and exclusive ones.
    ///
    /// The event source is a one shot event source and will remove itself from the event loop after being triggered once.
    /// To monitor for new fences added at a later time a new DmabufSource needs to be created.
    ///
    /// Returns `AlreadyReady` if all corresponding fences are already signalled or if `interest` is empty.
    #[profiling::function]
    pub fn new(dmabuf: Dmabuf, interest: Interest) -> Result<Self, AlreadyReady> {
        if !interest.readable && !interest.writable {
            return Err(AlreadyReady);
        }

        let mut sources = [
            Subsource::Empty,
            Subsource::Empty,
            Subsource::Empty,
            Subsource::Empty,
        ];
        for (idx, handle) in dmabuf.handles().enumerate() {
            if matches!(
                rustix::event::poll(
                    &mut [rustix::event::PollFd::new(
                        &handle,
                        if interest.writable {
                            rustix::event::PollFlags::OUT
                        } else {
                            rustix::event::PollFlags::IN
                        },
                    )],
                    Some(&rustix::time::Timespec {
                        tv_sec: 0,
                        tv_nsec: 0
                    })
                ),
                Ok(1)
            ) {
                continue;
            }
            let fd = PlaneRef {
                dmabuf: dmabuf.clone(),
                idx,
            };
            sources[idx] = Subsource::Active(Generic::new(fd, interest, Mode::OneShot));
        }
        if sources
            .iter()
            .all(|x| matches!(x, Subsource::Done(_) | Subsource::Empty))
        {
            Err(AlreadyReady)
        } else {
            Ok(DmabufSource {
                dmabuf,
                sources,
                signal: Arc::new(AtomicBool::new(false)),
            })
        }
    }
}

impl EventSource for DmabufSource {
    type Event = ();
    type Metadata = Dmabuf;
    type Ret = Result<(), std::io::Error>;

    type Error = std::io::Error;

    #[profiling::function]
    fn process_events<F>(
        &mut self,
        readiness: calloop::Readiness,
        token: calloop::Token,
        mut callback: F,
    ) -> Result<PostAction, Self::Error>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        for i in 0..4 {
            if let Ok(PostAction::Remove) = if let Subsource::Active(ref mut source) = &mut self.sources[i] {
                // luckily Generic skips events for other tokens
                source.process_events(readiness, token, |_, _| Ok(PostAction::Remove))
            } else {
                Ok(PostAction::Continue)
            } {
                self.sources[i].done();
            }
        }

        if self
            .sources
            .iter()
            .all(|x| matches!(x, Subsource::Done(_) | Subsource::Empty))
        {
            self.signal.store(true, Ordering::SeqCst);
            callback((), &mut self.dmabuf)?;
            Ok(PostAction::Remove)
        } else {
            Ok(PostAction::Reregister)
        }
    }

    fn register(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        for source in self.sources.iter_mut().filter_map(|source| match source {
            Subsource::Active(ref mut source) => Some(source),
            _ => None,
        }) {
            source.register(poll, token_factory)?;
        }
        Ok(())
    }

    fn reregister(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        for source in self.sources.iter_mut() {
            match source {
                Subsource::Active(ref mut source) => source.reregister(poll, token_factory)?,
                Subsource::Done(ref mut source) => {
                    let _ = source.unregister(poll);
                }
                _ => {}
            }
        }
        Ok(())
    }

    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
        for source in self.sources.iter_mut() {
            match source {
                Subsource::Active(ref mut source) => source.unregister(poll)?,
                Subsource::Done(ref mut source) => {
                    let _ = source.unregister(poll);
                }
                _ => {}
            }
        }
        Ok(())
    }
}