io_engine/
tasks.rs

1use std::cell::UnsafeCell;
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::os::fd::RawFd;
5
6use nix::errno::Errno;
7
8use embed_collections::dlist::{DLinkedList, DListItem, DListNode};
9use io_buffer::{Buffer, safe_copy};
10
11#[derive(Copy, Clone, PartialEq, Debug)]
12pub enum IOAction {
13    Read = 0,
14    Write = 1,
15}
16
17/// Define your callback with this trait
18pub trait IOCallback: Sized + 'static + Send + Unpin {
19    fn call(self, _event: IOEvent<Self>);
20}
21
22/// Closure callback for IOEvent
23pub struct ClosureCb(pub Box<dyn FnOnce(IOEvent<Self>) + Send + Sync + 'static>);
24
25impl IOCallback for ClosureCb {
26    fn call(self, event: IOEvent<Self>) {
27        (self.0)(event)
28    }
29}
30
31pub struct IOEvent<C: IOCallback>(pub Box<IOEvent_<C>>);
32
33impl<C: IOCallback> Deref for IOEvent<C> {
34    type Target = IOEvent_<C>;
35    fn deref(&self) -> &Self::Target {
36        &self.0
37    }
38}
39
40impl<C: IOCallback> DerefMut for IOEvent<C> {
41    fn deref_mut(&mut self) -> &mut Self::Target {
42        &mut self.0
43    }
44}
45
46impl<C: IOCallback> fmt::Debug for IOEvent<C> {
47    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48        self.0.fmt(f)
49    }
50}
51
52// Carries the information of read/write event
53#[repr(C)]
54pub struct IOEvent_<C: IOCallback> {
55    /// make sure DListNode always in the front.
56    /// This is for putting sub_tasks in the link list, without additional allocation.
57    pub(crate) node: UnsafeCell<DListNode<Self, ()>>,
58    pub buf: Option<Buffer>,
59    pub offset: i64,
60    pub action: IOAction,
61    pub fd: RawFd,
62    /// Result of the IO operation.
63    /// Initialized to i32::MIN.
64    /// >= 0: Accumulated bytes transferred (used for partial IO retries).
65    /// < 0: Error code (negative errno).
66    pub(crate) res: i32,
67    cb: Option<C>,
68    sub_tasks: DLinkedList<Box<Self>, ()>,
69}
70
71// Implement DListItem for IOEvent_ to allow it to be linked
72unsafe impl<C: IOCallback> DListItem<()> for IOEvent_<C> {
73    fn get_node(&self) -> &mut DListNode<Self, ()> {
74        unsafe { &mut *self.node.get() }
75    }
76}
77
78impl<C: IOCallback> fmt::Debug for IOEvent_<C> {
79    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
80        write!(f, "offset={} {:?} sub_tasks {} ", self.offset, self.action, self.sub_tasks.len())
81    }
82}
83
84impl<C: IOCallback> IOEvent<C> {
85    #[inline]
86    pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> IOEvent<C> {
87        log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
88        IOEvent(Box::new(IOEvent_ {
89            buf: Some(buf),
90            fd,
91            action,
92            offset,
93            res: i32::MIN,
94            cb: None,
95            sub_tasks: DLinkedList::new(),
96            node: UnsafeCell::new(DListNode::default()),
97        }))
98    }
99
100    /// Set callback for IOEvent, might be closure or a custom struct
101    #[inline(always)]
102    pub fn set_callback(&mut self, cb: C) {
103        self.cb = Some(cb);
104    }
105
106    #[inline(always)]
107    pub fn get_size(&self) -> usize {
108        self.buf.as_ref().unwrap().len()
109    }
110
111    #[inline(always)]
112    pub(crate) fn push_to_list(self, events: &mut DLinkedList<Box<IOEvent_<C>>, ()>) {
113        events.push_back(self.0);
114    }
115
116    #[inline(always)]
117    pub(crate) fn pop_from_list(events: &mut DLinkedList<Box<IOEvent_<C>>, ()>) -> Option<Self> {
118        events.pop_front().map(IOEvent)
119    }
120
121    #[inline(always)]
122    pub(crate) fn set_subtasks(&mut self, sub_tasks: DLinkedList<Box<IOEvent_<C>>, ()>) {
123        self.sub_tasks = sub_tasks;
124    }
125
126    #[inline(always)]
127    pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
128        self.buf.as_ref().unwrap().as_ref()
129    }
130
131    #[inline(always)]
132    pub fn is_done(&self) -> bool {
133        self.res != i32::MIN
134    }
135
136    #[inline(always)]
137    pub fn get_write_result(self) -> Result<(), Errno> {
138        let res = self.res;
139        if res >= 0 {
140            return Ok(());
141        } else if res == i32::MIN {
142            panic!("IOEvent get_result before it's done");
143        } else {
144            return Err(Errno::from_raw(-res));
145        }
146    }
147
148    /// Get the result of the IO operation (bytes read/written or error).
149    /// Returns the number of bytes successfully transferred.
150    #[inline(always)]
151    pub fn get_result(&self) -> Result<usize, Errno> {
152        let res = self.res;
153        if res >= 0 {
154            return Ok(res as usize);
155        } else if res == i32::MIN {
156            panic!("IOEvent get_result before it's done");
157        } else {
158            return Err(Errno::from_raw(-res));
159        }
160    }
161
162    /// Get the buffer from a read operation.
163    /// Note: The buffer length is NOT modified. Use `get_result()` to get actual bytes read.
164    #[inline(always)]
165    pub fn get_read_result(mut self) -> Result<Buffer, Errno> {
166        let res = self.res;
167        if res >= 0 {
168            let buf = self.buf.take().unwrap();
169            // Do NOT modify buffer length - caller should use get_result() to know actual bytes read
170            return Ok(buf);
171        } else if res == i32::MIN {
172            panic!("IOEvent get_result before it's done");
173        } else {
174            return Err(Errno::from_raw(-res));
175        }
176    }
177
178    #[inline(always)]
179    pub(crate) fn set_error(&mut self, mut errno: i32) {
180        if errno == 0 {
181            // XXX: EOF does not have code to represent,
182            // also when offset is not align to 4096, may return result 0,
183            errno = Errno::EINVAL as i32;
184        }
185        if errno > 0 {
186            errno = -errno;
187        }
188        self.res = errno;
189    }
190
191    #[inline(always)]
192    pub(crate) fn set_copied(&mut self, len: usize) {
193        if self.res == i32::MIN {
194            self.res = len as i32;
195        } else {
196            self.res += len as i32;
197        }
198    }
199
200    /// Trigger the callback for this IOEvent.
201    /// This consumes the event and calls the associated callback.
202    #[inline(always)]
203    pub(crate) fn callback(mut self) {
204        match self.cb.take() {
205            Some(cb) => {
206                cb.call(self);
207            }
208            None => return,
209        }
210    }
211
212    /// For writing custom callback workers
213    ///
214    /// Callback worker should always call this function on receiving IOEvent from Driver
215    #[inline(always)]
216    pub fn callback_merged(mut self) {
217        if !self.sub_tasks.is_empty() {
218            let res = self.res;
219            if res >= 0 {
220                if self.action == IOAction::Read {
221                    let buffer = self.buf.take().unwrap();
222                    let mut b = buffer.as_ref();
223                    for event_box in self.sub_tasks.drain() {
224                        let mut event = IOEvent(event_box);
225                        let sub_buf = event.buf.as_mut().unwrap();
226                        if b.len() == 0 {
227                            // short read
228                            event.set_copied(0);
229                        } else {
230                            let copied = safe_copy(sub_buf, b);
231                            event.set_copied(copied);
232                            b = &b[copied..];
233                        }
234                        event.callback();
235                    }
236                } else {
237                    let l = self.buf.as_ref().unwrap().len();
238                    for event_box in self.sub_tasks.drain() {
239                        let mut event = IOEvent(event_box);
240                        let mut sub_len = event.get_size();
241                        if sub_len > l {
242                            // short write
243                            sub_len = l;
244                        }
245                        event.set_copied(sub_len);
246                        event.callback();
247                    }
248                }
249            } else {
250                let errno = -res;
251                for event_box in self.sub_tasks.drain() {
252                    let mut event = IOEvent(event_box);
253                    event.set_error(errno);
254                    event.callback();
255                }
256            }
257        } else {
258            self.callback();
259        }
260    }
261
262    // New constructor for exit signal events
263    pub(crate) fn new_exit_signal(fd: RawFd) -> Self {
264        // Exit signal wraps a IOEvent
265        Self(Box::new(IOEvent_ {
266            node: UnsafeCell::new(DListNode::default()),
267            buf: None,
268            offset: 0,
269            action: IOAction::Read, // Exit signal is a read
270            fd,
271            res: i32::MIN,
272            cb: None, // No callback for exit signal
273            sub_tasks: DLinkedList::new(),
274        }))
275    }
276}