linux_io_uring/
cqueue.rs

1//! Completion Queue
2
3use std::sync::atomic;
4use linux_io_uring_sys as sys;
5use crate::util::{ Mmap, unsync_load };
6use crate::mmap_offset;
7
8
9pub struct CompletionQueue {
10    pub(crate) head: *const atomic::AtomicU32,
11    pub(crate) tail: *const atomic::AtomicU32,
12    pub(crate) ring_mask: *const u32,
13    pub(crate) ring_entries: *const u32,
14
15    overflow: *const atomic::AtomicU32,
16
17    pub(crate) cqes: *const sys::io_uring_cqe
18}
19
20/// Completion Entry
21#[repr(transparent)]
22#[derive(Clone)]
23pub struct Entry(pub(crate) sys::io_uring_cqe);
24
25pub struct AvailableQueue<'a> {
26    head: u32,
27    tail: u32,
28    ring_mask: u32,
29    ring_entries: u32,
30
31    queue: &'a mut CompletionQueue
32}
33
34impl CompletionQueue {
35    pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> CompletionQueue {
36        mmap_offset!{
37            let head            = cq_mmap + p.cq_off.head           => *const atomic::AtomicU32;
38            let tail            = cq_mmap + p.cq_off.tail           => *const atomic::AtomicU32;
39            let ring_mask       = cq_mmap + p.cq_off.ring_mask      => *const u32;
40            let ring_entries    = cq_mmap + p.cq_off.ring_entries   => *const u32;
41            let overflow        = cq_mmap + p.cq_off.overflow       => *const atomic::AtomicU32;
42            let cqes            = cq_mmap + p.cq_off.cqes           => *const sys::io_uring_cqe;
43        }
44
45        CompletionQueue {
46            head, tail,
47            ring_mask, ring_entries,
48            overflow,
49            cqes
50        }
51    }
52
53    /// If queue is full, the new event maybe dropped.
54    /// This value records number of dropped events.
55    pub fn overflow(&self) -> u32 {
56        unsafe {
57            (*self.overflow).load(atomic::Ordering::Acquire)
58        }
59    }
60
61    pub fn capacity(&self) -> usize {
62        unsafe {
63            self.ring_entries.read_volatile() as usize
64        }
65    }
66
67    pub fn len(&self) -> usize {
68        let head = unsafe { unsync_load(self.head) };
69        let tail = unsafe { (*self.tail).load(atomic::Ordering::Acquire) };
70
71        tail.wrapping_sub(head) as usize
72    }
73
74    pub fn is_empty(&self) -> bool {
75        let head = unsafe { unsync_load(self.head) };
76        let tail = unsafe { (*self.tail).load(atomic::Ordering::Acquire) };
77
78        head == tail
79    }
80
81    pub fn is_full(&self) -> bool {
82        self.len() == self.capacity()
83    }
84
85    /// Get currently available completion queue
86    pub fn available(&mut self) -> AvailableQueue<'_> {
87        unsafe {
88            AvailableQueue {
89                head: unsync_load(self.head),
90                tail: (*self.tail).load(atomic::Ordering::Acquire),
91                ring_mask: self.ring_mask.read_volatile(),
92                ring_entries: self.ring_entries.read_volatile(),
93                queue: self
94            }
95        }
96    }
97}
98
99impl AvailableQueue<'_> {
100    /// Sync queue
101    pub fn sync(&mut self) {
102        unsafe {
103            (*self.queue.head).store(self.head, atomic::Ordering::Release);
104            self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
105        }
106    }
107
108    pub fn capacity(&self) -> usize {
109        self.ring_entries as usize
110    }
111
112    pub fn is_full(&self) -> bool {
113        self.len() == self.capacity()
114    }
115}
116
117impl ExactSizeIterator for AvailableQueue<'_> {
118    fn len(&self) -> usize {
119        self.tail.wrapping_sub(self.head) as usize
120    }
121}
122
123impl<'a> Iterator for AvailableQueue<'a> {
124    type Item = Entry;
125
126    fn next(&mut self) -> Option<Self::Item> {
127        if self.head != self.tail {
128            unsafe {
129                let entry = self.queue.cqes.add((self.head & self.ring_mask) as usize);
130                self.head = self.head.wrapping_add(1);
131                Some(Entry(*entry))
132            }
133        } else {
134            None
135        }
136    }
137}
138
139impl Drop for AvailableQueue<'_> {
140    fn drop(&mut self) {
141        unsafe {
142            (*self.queue.head).store(self.head, atomic::Ordering::Release);
143        }
144    }
145}
146
147impl Entry {
148    /// Result value
149    pub fn result(&self) -> i32 {
150        self.0.res
151    }
152
153    /// User Data
154    ///
155    /// See [Entry::user_data](crate::squeue::Entry::user_data).
156    pub fn user_data(&self) -> u64 {
157        self.0.user_data
158    }
159}