io_engine/scheduler/
tasks.rs

1/*
2Copyright (c) NaturalIO Contributors
3
4Permission is hereby granted, free of charge, to any person obtaining a copy
5of this software and associated documentation files (the "Software"), to deal
6in the Software without restriction, including without limitation the rights
7to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8copies of the Software, and to permit persons to whom the Software is
9furnished to do so, subject to the following conditions:
10
11The above copyright notice and this permission notice shall be included in all
12copies or substantial portions of the Software.
13
14THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20SOFTWARE.
21*/
22
23use std::os::fd::RawFd;
24use std::{
25    fmt,
26    sync::atomic::{AtomicI32, Ordering},
27};
28
29use nix::errno::Errno;
30
31use super::embedded_list::*;
32use super::{aio, callback_worker::*};
33use crate::buffer::Buffer;
34
35#[derive(Copy, Clone, PartialEq, Debug)]
36pub enum IOAction {
37    Read = 0,
38    Write = 1,
39}
40
41/// Define your callback with this trait
42pub trait IOCallbackCustom: Sized + 'static + Send + Unpin {
43    fn call(self, _event: Box<IOEvent<Self>>);
44}
45
46/// Closure callback for IOEvent
47pub struct ClosureCb(pub Box<dyn FnOnce(Box<IOEvent<Self>>) + Send + Sync + 'static>);
48
49impl IOCallbackCustom for ClosureCb {
50    fn call(self, event: Box<IOEvent<Self>>) {
51        (self.0)(event)
52    }
53}
54
55// Carries the infomation of read/write event
56#[repr(C)]
57pub struct IOEvent<C: IOCallbackCustom> {
58    /// make sure EmbeddedListNode always in the front.
59    /// This is for putting sub_tasks in the link list, without additional allocation.
60    pub(crate) node: EmbeddedListNode,
61    pub buf: Option<Buffer>,
62    pub offset: i64,
63    pub action: IOAction,
64    pub fd: RawFd,
65    res: AtomicI32,
66    cb: Option<C>,
67    sub_tasks: Option<EmbeddedList>,
68}
69
70impl<C: IOCallbackCustom> fmt::Debug for IOEvent<C> {
71    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72        if let Some(sub_tasks) = self.sub_tasks.as_ref() {
73            write!(
74                f,
75                "offset={} {:?} sub_tasks {} ",
76                self.offset,
77                self.action,
78                sub_tasks.get_length()
79            )
80        } else {
81            write!(f, "offset={} {:?}", self.offset, self.action)
82        }
83    }
84}
85
86impl<C: IOCallbackCustom> IOEvent<C> {
87    #[inline]
88    pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> Box<Self> {
89        log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
90        Box::new(Self {
91            buf: Some(buf),
92            fd,
93            action,
94            offset,
95            res: AtomicI32::new(0),
96            cb: None,
97            sub_tasks: None,
98            node: Default::default(),
99        })
100    }
101
102    /// Set callback for IOEvent, might be closure or a custom struct
103    #[inline(always)]
104    pub fn set_callback(&mut self, cb: C) {
105        self.cb = Some(cb);
106    }
107
108    #[inline(always)]
109    pub fn get_size(&self) -> usize {
110        self.buf.as_ref().unwrap().len()
111    }
112
113    #[inline(always)]
114    pub fn push_to_list(mut self: Box<Self>, events: &mut EmbeddedList) {
115        events.push_back(&mut self.node);
116        let _ = Box::leak(self);
117    }
118
119    #[inline(always)]
120    pub fn pop_from_list(events: &mut EmbeddedList) -> Option<Box<Self>> {
121        if let Some(event) = events.pop_front::<Self>() {
122            Some(unsafe { Box::from_raw(event) })
123        } else {
124            None
125        }
126    }
127
128    #[inline(always)]
129    pub(crate) fn set_subtasks(&mut self, sub_tasks: EmbeddedList) {
130        self.sub_tasks = Some(sub_tasks)
131    }
132
133    #[inline(always)]
134    pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
135        self.buf.as_ref().unwrap().as_ref()
136    }
137
138    #[inline(always)]
139    pub fn is_done(&self) -> bool {
140        self.res.load(Ordering::Acquire) != 0
141    }
142
143    #[inline]
144    pub fn get_result(&mut self) -> Result<Buffer, Errno> {
145        let res = self.res.load(Ordering::Acquire);
146        if res > 0 {
147            return Ok(self.buf.take().unwrap());
148        } else if res == 0 {
149            panic!("IOEvent get_result before it's done");
150        } else {
151            return Err(Errno::from_raw(-res));
152        }
153    }
154
155    #[inline(always)]
156    pub fn _get_result(&mut self) -> Result<Buffer, i32> {
157        let res = self.res.load(Ordering::Acquire);
158        if res > 0 {
159            return Ok(self.buf.take().unwrap());
160        } else if res == 0 {
161            panic!("IOEvent get_result before it's done");
162        } else {
163            return Err(res);
164        }
165    }
166
167    #[inline(always)]
168    pub(crate) fn set_error(&self, mut errno: i32) {
169        if errno == 0 {
170            // XXX: EOF does not have code to represent,
171            // also when offset is not align to 4096, may return result 0,
172            errno = Errno::EINVAL as i32;
173        }
174        if errno > 0 {
175            errno = -errno;
176        }
177        self.res.store(errno, Ordering::Release);
178    }
179
180    #[inline(always)]
181    pub(crate) fn set_ok(&self) {
182        self.res.store(1, Ordering::Release);
183    }
184
185    #[inline(always)]
186    pub(crate) fn callback(mut self: Box<Self>) {
187        match self.cb.take() {
188            Some(cb) => {
189                cb.call(self);
190            }
191            None => return,
192        }
193    }
194
195    #[inline(always)]
196    pub(crate) fn callback_merged(mut self: Box<Self>) {
197        if let Some(mut tasks) = self.sub_tasks.take() {
198            match self._get_result() {
199                Ok(buffer) => {
200                    if self.action == IOAction::Read {
201                        let mut offset: usize = 0;
202                        let b = buffer.as_ref();
203                        while let Some(mut event) = Self::pop_from_list(&mut tasks) {
204                            let sub_buf = event.buf.as_mut().unwrap();
205                            let sub_size = sub_buf.len();
206                            sub_buf.copy_from(0, &b[offset..offset + sub_size]);
207                            offset += sub_size;
208                            event.set_ok();
209                            event.callback();
210                        }
211                    } else {
212                        while let Some(event) = Self::pop_from_list(&mut tasks) {
213                            event.set_ok();
214                            event.callback();
215                        }
216                    }
217                }
218                Err(errno) => {
219                    while let Some(event) = Self::pop_from_list(&mut tasks) {
220                        event.set_error(errno);
221                        event.callback();
222                    }
223                }
224            }
225        } else {
226            self.callback();
227        }
228    }
229}
230
231pub(crate) struct IOEventTaskSlot<C: IOCallbackCustom> {
232    pub(crate) iocb: aio::iocb,
233    pub(crate) event: Option<Box<IOEvent<C>>>,
234}
235
236impl<C: IOCallbackCustom> IOEventTaskSlot<C> {
237    pub(crate) fn new(slot_id: u64) -> Self {
238        Self {
239            iocb: aio::iocb { aio_data: slot_id, aio_reqprio: 1, ..Default::default() },
240            event: None,
241        }
242    }
243
244    #[inline(always)]
245    pub(crate) fn fill_slot(&mut self, event: Box<IOEvent<C>>, slot_id: u16) {
246        let iocb = &mut self.iocb;
247        iocb.aio_data = slot_id as libc::__u64;
248        iocb.aio_fildes = event.fd as libc::__u32;
249        let buf = event.buf.as_ref().unwrap();
250        iocb.aio_lio_opcode = event.action as u16;
251        iocb.aio_buf = buf.get_raw() as u64;
252        iocb.aio_nbytes = buf.len() as u64;
253        iocb.aio_offset = event.offset;
254        self.event.replace(event);
255    }
256
257    #[inline(always)]
258    pub(crate) fn set_written(&mut self, written: usize, cb: &IOWorkers<C>) -> bool {
259        if self.iocb.aio_nbytes <= written as u64 {
260            if let Some(event) = self.event.take() {
261                event.set_ok();
262                cb.send(event);
263            }
264            return true;
265        }
266        self.iocb.aio_nbytes -= written as u64;
267        self.iocb.aio_buf += written as u64;
268        return false;
269    }
270
271    #[inline(always)]
272    pub(crate) fn set_error(&mut self, errno: i32, cb: &IOWorkers<C>) {
273        if let Some(event) = self.event.take() {
274            event.set_error(errno);
275            cb.send(event);
276        }
277    }
278}