1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::{ io, mem };
use std::sync::atomic;
use linux_io_uring_sys as sys;
use crate::util::{ Mmap, Fd, AtomicU32Ref };
use crate::mmap_offset;


pub struct CompletionQueue {
    _cq_mmap: Mmap,

    head: AtomicU32Ref,
    tail: *const atomic::AtomicU32,
    ring_mask: *const u32,

    #[allow(dead_code)]
    ring_entries: *const u32,

    overflow: *const atomic::AtomicU32,

    cqes: *const sys::io_uring_cqe
}

#[derive(Clone)]
pub struct Entry(sys::io_uring_cqe);

pub struct AvailableQueue<'a> {
    head: u32,
    tail: u32,
    ring_mask: u32,

    queue: &'a mut CompletionQueue
}

impl CompletionQueue {
    pub(crate) fn new(fd: &Fd, p: &sys::io_uring_params) -> io::Result<CompletionQueue> {
        let cq_mmap = Mmap::new(
            &fd,
            sys::IORING_OFF_CQ_RING as _,
            p.cq_off.cqes as usize + p.cq_entries as usize * mem::size_of::<sys::io_uring_cqe>()
        )?;

        mmap_offset!{ unsafe
            let head            = cq_mmap + p.cq_off.head           => *const u32;
            let tail            = cq_mmap + p.cq_off.tail           => *const atomic::AtomicU32;
            let ring_mask       = cq_mmap + p.cq_off.ring_mask      => *const u32;
            let ring_entries    = cq_mmap + p.cq_off.ring_entries   => *const u32;
            let overflow        = cq_mmap + p.cq_off.overflow       => *const atomic::AtomicU32;
            let cqes            = cq_mmap + p.cq_off.cqes           => *const sys::io_uring_cqe;
        }

        Ok(CompletionQueue {
            _cq_mmap: cq_mmap,
            head: unsafe { AtomicU32Ref::new(head) },
            tail,
            ring_mask, ring_entries,
            overflow,
            cqes
        })
    }

    pub fn overflow(&self) -> u32 {
        unsafe {
            (*self.overflow).load(atomic::Ordering::Acquire)
        }
    }

    pub fn available(&mut self) -> AvailableQueue<'_> {
        unsafe {
            AvailableQueue {
                head: self.head.unsync_load(),
                tail: (*self.tail).load(atomic::Ordering::Acquire),
                ring_mask: *self.ring_mask,
                queue: self
            }
        }
    }
}

impl ExactSizeIterator for AvailableQueue<'_> {
    fn len(&self) -> usize {
        self.tail.wrapping_sub(self.head) as usize
    }
}

impl Iterator for AvailableQueue<'_> {
    type Item = Entry;

    fn next(&mut self) -> Option<Self::Item> {
        if self.head != self.tail {
            unsafe {
                let entry = self.queue.cqes.add((self.head & self.ring_mask) as usize);
                self.head = self.head.wrapping_add(1);
                Some(Entry(*entry))
            }
        } else {
            None
        }
    }
}

impl Drop for AvailableQueue<'_> {
    fn drop(&mut self) {
        self.queue.head.store(self.head, atomic::Ordering::Release);
    }
}

impl Entry {
    pub fn result(&self) -> i32 {
        self.0.res
    }

    pub fn user_data(&self) -> u64 {
        self.0.user_data
    }
}