io_engine/
tasks.rs

1use std::cell::UnsafeCell;
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::os::fd::RawFd;
5
6use nix::errno::Errno;
7
8use embed_collections::dlist::{DLinkedList, DListItem, DListNode};
9use io_buffer::{Buffer, safe_copy};
10
11#[derive(Copy, Clone, PartialEq, Debug)]
12pub enum IOAction {
13    Read = 0,
14    Write = 1,
15}
16
17/// Define your callback with this trait
18pub trait IOCallback: Sized + 'static + Send + Unpin {
19    fn call(self, _event: IOEvent<Self>);
20}
21
22/// Closure callback for IOEvent
23pub struct ClosureCb(pub Box<dyn FnOnce(IOEvent<Self>) + Send + Sync + 'static>);
24
25impl IOCallback for ClosureCb {
26    fn call(self, event: IOEvent<Self>) {
27        (self.0)(event)
28    }
29}
30
31pub struct IOEvent<C: IOCallback>(pub Box<IOEvent_<C>>);
32
33impl<C: IOCallback> Deref for IOEvent<C> {
34    type Target = IOEvent_<C>;
35    fn deref(&self) -> &Self::Target {
36        &self.0
37    }
38}
39
40impl<C: IOCallback> DerefMut for IOEvent<C> {
41    fn deref_mut(&mut self) -> &mut Self::Target {
42        &mut self.0
43    }
44}
45
46impl<C: IOCallback> fmt::Debug for IOEvent<C> {
47    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48        self.0.fmt(f)
49    }
50}
51
52// Carries the information of read/write event
53#[repr(C)]
54pub struct IOEvent_<C: IOCallback> {
55    /// make sure DListNode always in the front.
56    /// This is for putting sub_tasks in the link list, without additional allocation.
57    pub(crate) node: UnsafeCell<DListNode<Self, ()>>,
58    pub buf: Option<Buffer>,
59    pub offset: i64,
60    pub action: IOAction,
61    pub fd: RawFd,
62    /// Result of the IO operation.
63    /// Initialized to i32::MIN.
64    /// >= 0: Accumulated bytes transferred (used for partial IO retries).
65    /// < 0: Error code (negative errno).
66    pub(crate) res: i32,
67    cb: Option<C>,
68    sub_tasks: DLinkedList<Box<Self>, ()>,
69}
70
71// Implement DListItem for IOEvent_ to allow it to be linked
72unsafe impl<C: IOCallback> DListItem<()> for IOEvent_<C> {
73    fn get_node(&self) -> &mut DListNode<Self, ()> {
74        unsafe { &mut *self.node.get() }
75    }
76}
77
78impl<C: IOCallback> fmt::Debug for IOEvent_<C> {
79    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
80        write!(f, "offset={} {:?} sub_tasks {} ", self.offset, self.action, self.sub_tasks.len())
81    }
82}
83
84impl<C: IOCallback> IOEvent<C> {
85    #[inline]
86    pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> IOEvent<C> {
87        log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
88        IOEvent(Box::new(IOEvent_ {
89            buf: Some(buf),
90            fd,
91            action,
92            offset,
93            res: i32::MIN,
94            cb: None,
95            sub_tasks: DLinkedList::new(),
96            node: UnsafeCell::new(DListNode::default()),
97        }))
98    }
99
100    /// Set callback for IOEvent, might be closure or a custom struct
101    #[inline(always)]
102    pub fn set_callback(&mut self, cb: C) {
103        self.cb = Some(cb);
104    }
105
106    #[inline(always)]
107    pub fn get_size(&self) -> usize {
108        self.buf.as_ref().unwrap().len()
109    }
110
111    #[inline(always)]
112    pub fn push_to_list(self, events: &mut DLinkedList<Box<IOEvent_<C>>, ()>) {
113        events.push_back(self.0);
114    }
115
116    #[inline(always)]
117    pub fn pop_from_list(events: &mut DLinkedList<Box<IOEvent_<C>>, ()>) -> Option<Self> {
118        events.pop_front().map(IOEvent)
119    }
120
121    #[inline(always)]
122    pub fn set_subtasks(&mut self, sub_tasks: DLinkedList<Box<IOEvent_<C>>, ()>) {
123        self.sub_tasks = sub_tasks;
124    }
125
126    #[inline(always)]
127    pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
128        self.buf.as_ref().unwrap().as_ref()
129    }
130
131    #[inline(always)]
132    pub fn is_done(&self) -> bool {
133        self.res != i32::MIN
134    }
135
136    #[inline(always)]
137    pub fn get_write_result(self) -> Result<(), Errno> {
138        let res = self.res;
139        if res >= 0 {
140            return Ok(());
141        } else if res == i32::MIN {
142            panic!("IOEvent get_result before it's done");
143        } else {
144            return Err(Errno::from_raw(-res));
145        }
146    }
147
148    #[inline(always)]
149    pub fn get_read_result(mut self) -> Result<Buffer, Errno> {
150        let res = self.res;
151        if res >= 0 {
152            let mut buf = self.buf.take().unwrap();
153            buf.set_len(res as usize);
154            return Ok(buf);
155        } else if res == i32::MIN {
156            panic!("IOEvent get_result before it's done");
157        } else {
158            return Err(Errno::from_raw(-res));
159        }
160    }
161
162    #[inline(always)]
163    pub(crate) fn set_error(&mut self, mut errno: i32) {
164        if errno == 0 {
165            // XXX: EOF does not have code to represent,
166            // also when offset is not align to 4096, may return result 0,
167            errno = Errno::EINVAL as i32;
168        }
169        if errno > 0 {
170            errno = -errno;
171        }
172        self.res = errno;
173    }
174
175    #[inline(always)]
176    pub(crate) fn set_copied(&mut self, len: usize) {
177        if self.res == i32::MIN {
178            self.res = len as i32;
179        } else {
180            self.res += len as i32;
181        }
182    }
183
184    #[inline(always)]
185    pub(crate) fn callback(mut self) {
186        match self.cb.take() {
187            Some(cb) => {
188                cb.call(self);
189            }
190            None => return,
191        }
192    }
193
194    #[inline(always)]
195    pub(crate) fn callback_merged(mut self) {
196        // sub_tasks is not Option anymore, but DLinkedList.
197        // We can check if it's empty.
198        // Use std::mem::take if DLinkedList implements Default (it usually refers to new which is empty).
199        // Or check if it is empty. DLinkedList::new() creates empty list.
200        let mut tasks = std::mem::replace(&mut self.sub_tasks, DLinkedList::new());
201
202        if !tasks.is_empty() {
203            let res = self.res;
204            if res >= 0 {
205                if self.action == IOAction::Read {
206                    let buffer = self.buf.take().unwrap();
207                    let mut b = buffer.as_ref();
208                    while let Some(mut event) = Self::pop_from_list(&mut tasks) {
209                        let sub_buf = event.buf.as_mut().unwrap();
210                        if b.len() == 0 {
211                            // short read
212                            event.set_copied(0);
213                        } else {
214                            let copied = safe_copy(sub_buf, b);
215                            event.set_copied(copied);
216                            b = &b[copied..];
217                        }
218                        event.callback();
219                    }
220                } else {
221                    let l = self.buf.as_ref().unwrap().len();
222                    while let Some(mut event) = Self::pop_from_list(&mut tasks) {
223                        let mut sub_len = event.get_size();
224                        if sub_len > l {
225                            // short write
226                            sub_len = l;
227                        }
228                        event.set_copied(sub_len);
229                        event.callback();
230                    }
231                }
232            } else {
233                let errno = -res;
234                while let Some(mut event) = Self::pop_from_list(&mut tasks) {
235                    event.set_error(errno);
236                    event.callback();
237                }
238            }
239        } else {
240            self.callback();
241        }
242    }
243
244    // New constructor for exit signal events
245    pub(crate) fn new_exit_signal(fd: RawFd) -> Self {
246        // Exit signal wraps a IOEvent
247        Self(Box::new(IOEvent_ {
248            node: UnsafeCell::new(DListNode::default()),
249            buf: None,
250            offset: 0,
251            action: IOAction::Read, // Exit signal is a read
252            fd,
253            res: i32::MIN,
254            cb: None, // No callback for exit signal
255            sub_tasks: DLinkedList::new(),
256        }))
257    }
258}