io_engine/scheduler/
tasks.rs

1/*
2Copyright (c) NaturalIO Contributors
3
4Permission is hereby granted, free of charge, to any person obtaining a copy
5of this software and associated documentation files (the "Software"), to deal
6in the Software without restriction, including without limitation the rights
7to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8copies of the Software, and to permit persons to whom the Software is
9furnished to do so, subject to the following conditions:
10
11The above copyright notice and this permission notice shall be included in all
12copies or substantial portions of the Software.
13
14THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20SOFTWARE.
21*/
22
23use std::os::fd::RawFd;
24use std::{
25    fmt,
26    sync::atomic::{AtomicI32, Ordering},
27};
28
29use nix::errno::Errno;
30
31use super::embedded_list::*;
32use super::{aio, callback_worker::*};
33use crate::buffer::Buffer;
34
35#[derive(Copy, Clone, PartialEq, Debug)]
36pub enum IOAction {
37    Read = 0,
38    Write = 1,
39}
40
41/// Define your callback with this trait
42pub trait IOCallbackCustom: Sized + 'static + Send + Unpin {
43    fn call(self, _event: Box<IOEvent<Self>>);
44}
45
46/// Closure callback for IOEvent
47pub struct ClosureCb(pub Box<dyn FnOnce(Box<IOEvent<Self>>) + Send + Sync + 'static>);
48
49impl IOCallbackCustom for ClosureCb {
50    fn call(self, event: Box<IOEvent<Self>>) {
51        (self.0)(event)
52    }
53}
54
55// Carries the infomation of read/write event
56#[repr(C)]
57pub struct IOEvent<C: IOCallbackCustom> {
58    /// make sure EmbeddedListNode always in the front.
59    /// This is for putting sub_tasks in the link list, without additional allocation.
60    pub(crate) node: EmbeddedListNode,
61    pub buf: Option<Buffer>,
62    pub offset: i64,
63    pub action: IOAction,
64    pub fd: RawFd,
65    res: AtomicI32,
66    cb: Option<C>,
67    sub_tasks: Option<EmbeddedList>,
68}
69
70impl<C: IOCallbackCustom> fmt::Debug for IOEvent<C> {
71    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72        if let Some(sub_tasks) = self.sub_tasks.as_ref() {
73            write!(
74                f,
75                "offset={} {:?} sub_tasks {} ",
76                self.offset,
77                self.action,
78                sub_tasks.get_length()
79            )
80        } else {
81            write!(f, "offset={} {:?}", self.offset, self.action)
82        }
83    }
84}
85
86impl<C: IOCallbackCustom> IOEvent<C> {
87    #[inline]
88    pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> Box<Self> {
89        log_assert!(
90            buf.len() > 0,
91            "{:?} offset={}, buffer size == 0",
92            action,
93            offset
94        );
95        Box::new(Self {
96            buf: Some(buf),
97            fd,
98            action,
99            offset,
100            res: AtomicI32::new(0),
101            cb: None,
102            sub_tasks: None,
103            node: Default::default(),
104        })
105    }
106
107    /// Set callback for IOEvent, might be closure or a custom struct
108    #[inline(always)]
109    pub fn set_callback(&mut self, cb: C) {
110        self.cb = Some(cb);
111    }
112
113    #[inline(always)]
114    pub fn get_size(&self) -> usize {
115        self.buf.as_ref().unwrap().len()
116    }
117
118    #[inline(always)]
119    pub fn push_to_list(mut self: Box<Self>, events: &mut EmbeddedList) {
120        events.push_back(&mut self.node);
121        let _ = Box::leak(self);
122    }
123
124    #[inline(always)]
125    pub fn pop_from_list(events: &mut EmbeddedList) -> Option<Box<Self>> {
126        if let Some(event) = events.pop_front::<Self>() {
127            Some(unsafe { Box::from_raw(event) })
128        } else {
129            None
130        }
131    }
132
133    #[inline(always)]
134    pub(crate) fn set_subtasks(&mut self, sub_tasks: EmbeddedList) {
135        self.sub_tasks = Some(sub_tasks)
136    }
137
138    #[inline(always)]
139    pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
140        self.buf.as_ref().unwrap().as_ref()
141    }
142
143    #[inline(always)]
144    pub fn is_done(&self) -> bool {
145        self.res.load(Ordering::Acquire) != 0
146    }
147
148    #[inline]
149    pub fn get_result(&mut self) -> Result<Buffer, Errno> {
150        let res = self.res.load(Ordering::Acquire);
151        if res > 0 {
152            return Ok(self.buf.take().unwrap());
153        } else if res == 0 {
154            panic!("IOEvent get_result before it's done");
155        } else {
156            return Err(Errno::from_raw(-res));
157        }
158    }
159
160    #[inline(always)]
161    pub fn _get_result(&mut self) -> Result<Buffer, i32> {
162        let res = self.res.load(Ordering::Acquire);
163        if res > 0 {
164            return Ok(self.buf.take().unwrap());
165        } else if res == 0 {
166            panic!("IOEvent get_result before it's done");
167        } else {
168            return Err(res);
169        }
170    }
171
172    #[inline(always)]
173    pub(crate) fn set_error(&self, mut errno: i32) {
174        if errno == 0 {
175            // XXX: EOF does not have code to represent,
176            // also when offset is not align to 4096, may return result 0,
177            errno = Errno::EINVAL as i32;
178        }
179        if errno > 0 {
180            errno = -errno;
181        }
182        self.res.store(errno, Ordering::Release);
183    }
184
185    #[inline(always)]
186    pub(crate) fn set_ok(&self) {
187        self.res.store(1, Ordering::Release);
188    }
189
190    #[inline(always)]
191    pub(crate) fn callback(mut self: Box<Self>) {
192        match self.cb.take() {
193            Some(cb) => {
194                cb.call(self);
195            }
196            None => return,
197        }
198    }
199
200    #[inline(always)]
201    pub(crate) fn callback_merged(mut self: Box<Self>) {
202        if let Some(mut tasks) = self.sub_tasks.take() {
203            match self._get_result() {
204                Ok(buffer) => {
205                    if self.action == IOAction::Read {
206                        let mut offset: usize = 0;
207                        let b = buffer.as_ref();
208                        while let Some(mut event) = Self::pop_from_list(&mut tasks) {
209                            let sub_buf = event.buf.as_mut().unwrap();
210                            let sub_size = sub_buf.len();
211                            sub_buf.copy_from(0, &b[offset..offset + sub_size]);
212                            offset += sub_size;
213                            event.set_ok();
214                            event.callback();
215                        }
216                    } else {
217                        while let Some(event) = Self::pop_from_list(&mut tasks) {
218                            event.set_ok();
219                            event.callback();
220                        }
221                    }
222                }
223                Err(errno) => {
224                    while let Some(event) = Self::pop_from_list(&mut tasks) {
225                        event.set_error(errno);
226                        event.callback();
227                    }
228                }
229            }
230        } else {
231            self.callback();
232        }
233    }
234}
235
236pub(crate) struct IOEventTaskSlot<C: IOCallbackCustom> {
237    pub(crate) iocb: aio::iocb,
238    pub(crate) event: Option<Box<IOEvent<C>>>,
239}
240
241impl<C: IOCallbackCustom> IOEventTaskSlot<C> {
242    pub(crate) fn new(slot_id: u64) -> Self {
243        Self {
244            iocb: aio::iocb {
245                aio_data: slot_id,
246                aio_reqprio: 1,
247                ..Default::default()
248            },
249            event: None,
250        }
251    }
252
253    #[inline(always)]
254    pub(crate) fn fill_slot(&mut self, event: Box<IOEvent<C>>, slot_id: u16) {
255        let iocb = &mut self.iocb;
256        iocb.aio_data = slot_id as libc::__u64;
257        iocb.aio_fildes = event.fd as libc::__u32;
258        let buf = event.buf.as_ref().unwrap();
259        iocb.aio_lio_opcode = event.action as u16;
260        iocb.aio_buf = buf.get_raw() as u64;
261        iocb.aio_nbytes = buf.len() as u64;
262        iocb.aio_offset = event.offset;
263        self.event.replace(event);
264    }
265
266    #[inline(always)]
267    pub(crate) fn set_written(&mut self, written: usize, cb: &IOWorkers<C>) -> bool {
268        if self.iocb.aio_nbytes <= written as u64 {
269            if let Some(event) = self.event.take() {
270                event.set_ok();
271                cb.send(event);
272            }
273            return true;
274        }
275        self.iocb.aio_nbytes -= written as u64;
276        self.iocb.aio_buf += written as u64;
277        return false;
278    }
279
280    #[inline(always)]
281    pub(crate) fn set_error(&mut self, errno: i32, cb: &IOWorkers<C>) {
282        if let Some(event) = self.event.take() {
283            event.set_error(errno);
284            cb.send(event);
285        }
286    }
287}