io_engine/
tasks.rs

1use std::fmt;
2use std::os::fd::RawFd;
3
4use nix::errno::Errno;
5
6use crate::embedded_list::*;
7use io_buffer::{Buffer, safe_copy};
8
9#[derive(Copy, Clone, PartialEq, Debug)]
10pub enum IOAction {
11    Read = 0,
12    Write = 1,
13}
14
15/// Define your callback with this trait
16pub trait IoCallback: Sized + 'static + Send + Unpin {
17    fn call(self, _event: Box<IOEvent<Self>>);
18}
19
20/// Closure callback for IOEvent
21pub struct ClosureCb(pub Box<dyn FnOnce(Box<IOEvent<Self>>) + Send + Sync + 'static>);
22
23impl IoCallback for ClosureCb {
24    fn call(self, event: Box<IOEvent<Self>>) {
25        (self.0)(event)
26    }
27}
28
29// Carries the information of read/write event
30#[repr(C)]
31pub struct IOEvent<C: IoCallback> {
32    /// make sure EmbeddedListNode always in the front.
33    /// This is for putting sub_tasks in the link list, without additional allocation.
34    pub(crate) node: EmbeddedListNode,
35    pub buf: Option<Buffer>,
36    pub offset: i64,
37    pub action: IOAction,
38    pub fd: RawFd,
39    res: i32,
40    cb: Option<C>,
41    sub_tasks: Option<EmbeddedList>,
42}
43
44impl<C: IoCallback> fmt::Debug for IOEvent<C> {
45    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
46        if let Some(sub_tasks) = self.sub_tasks.as_ref() {
47            write!(
48                f,
49                "offset={} {:?} sub_tasks {} ",
50                self.offset,
51                self.action,
52                sub_tasks.get_length()
53            )
54        } else {
55            write!(f, "offset={} {:?}", self.offset, self.action)
56        }
57    }
58}
59
60impl<C: IoCallback> IOEvent<C> {
61    #[inline]
62    pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> Box<Self> {
63        log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
64        Box::new(Self {
65            buf: Some(buf),
66            fd,
67            action,
68            offset,
69            res: i32::MIN,
70            cb: None,
71            sub_tasks: None,
72            node: Default::default(),
73        })
74    }
75
76    /// Set callback for IOEvent, might be closure or a custom struct
77    #[inline(always)]
78    pub fn set_callback(&mut self, cb: C) {
79        self.cb = Some(cb);
80    }
81
82    #[inline(always)]
83    pub fn get_size(&self) -> usize {
84        self.buf.as_ref().unwrap().len()
85    }
86
87    #[inline(always)]
88    pub fn push_to_list(mut self: Box<Self>, events: &mut EmbeddedList) {
89        events.push_back(&mut self.node);
90        let _ = Box::leak(self);
91    }
92
93    #[inline(always)]
94    pub fn pop_from_list(events: &mut EmbeddedList) -> Option<Box<Self>> {
95        if let Some(event) = events.pop_front::<Self>() {
96            Some(unsafe { Box::from_raw(event) })
97        } else {
98            None
99        }
100    }
101
102    #[inline(always)]
103    pub fn set_subtasks(&mut self, sub_tasks: EmbeddedList) {
104        self.sub_tasks = Some(sub_tasks)
105    }
106
107    #[inline(always)]
108    pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
109        self.buf.as_ref().unwrap().as_ref()
110    }
111
112    #[inline(always)]
113    pub fn is_done(&self) -> bool {
114        self.res != i32::MIN
115    }
116
117    #[inline(always)]
118    pub fn get_write_result(self) -> Result<(), Errno> {
119        let res = self.res;
120        if res >= 0 {
121            return Ok(());
122        } else if res == i32::MIN {
123            panic!("IOEvent get_result before it's done");
124        } else {
125            return Err(Errno::from_raw(-res));
126        }
127    }
128
129    #[inline(always)]
130    pub fn get_read_result(mut self) -> Result<Buffer, Errno> {
131        let res = self.res;
132        if res >= 0 {
133            let mut buf = self.buf.take().unwrap();
134            buf.set_len(res as usize);
135            return Ok(buf);
136        } else if res == i32::MIN {
137            panic!("IOEvent get_result before it's done");
138        } else {
139            return Err(Errno::from_raw(-res));
140        }
141    }
142
143    #[inline(always)]
144    pub(crate) fn set_error(&mut self, mut errno: i32) {
145        if errno == 0 {
146            // XXX: EOF does not have code to represent,
147            // also when offset is not align to 4096, may return result 0,
148            errno = Errno::EINVAL as i32;
149        }
150        if errno > 0 {
151            errno = -errno;
152        }
153        self.res = errno;
154    }
155
156    #[inline(always)]
157    pub(crate) fn set_copied(&mut self, len: usize) {
158        if self.res == i32::MIN {
159            self.res = len as i32;
160        } else {
161            self.res += len as i32;
162        }
163    }
164
165    #[inline(always)]
166    pub(crate) fn callback(mut self: Box<Self>) {
167        match self.cb.take() {
168            Some(cb) => {
169                cb.call(self);
170            }
171            None => return,
172        }
173    }
174
175    #[inline(always)]
176    pub(crate) fn callback_merged(mut self: Box<Self>) {
177        if let Some(mut tasks) = self.sub_tasks.take() {
178            let res = self.res;
179            if res >= 0 {
180                if self.action == IOAction::Read {
181                    let buffer = self.buf.take().unwrap();
182                    let mut b = buffer.as_ref();
183                    while let Some(mut event) = Self::pop_from_list(&mut tasks) {
184                        let sub_buf = event.buf.as_mut().unwrap();
185                        if b.len() == 0 {
186                            // short read
187                            event.set_copied(0);
188                        } else {
189                            let copied = safe_copy(sub_buf, b);
190                            event.set_copied(copied);
191                            b = &b[copied..];
192                        }
193                        event.callback();
194                    }
195                } else {
196                    let l = self.buf.as_ref().unwrap().len();
197                    while let Some(mut event) = Self::pop_from_list(&mut tasks) {
198                        let mut sub_len = event.get_size();
199                        if sub_len > l {
200                            // short write
201                            sub_len = l;
202                        }
203                        event.set_copied(sub_len);
204                        event.callback();
205                    }
206                }
207            } else {
208                let errno = -res;
209                while let Some(mut event) = Self::pop_from_list(&mut tasks) {
210                    event.set_error(errno);
211                    event.callback();
212                }
213            }
214        } else {
215            self.callback();
216        }
217    }
218
219    // New constructor for exit signal events
220    pub(crate) fn new_exit_signal(fd: RawFd) -> Box<Self> {
221        Box::new(Self {
222            node: Default::default(),
223            buf: None,
224            offset: 0,
225            action: IOAction::Read, // Exit signal is a read
226            fd,
227            res: i32::MIN,
228            cb: None, // No callback for exit signal
229            sub_tasks: None,
230        })
231    }
232}