io_engine/scheduler/
tasks.rs1use std::os::fd::RawFd;
24use std::{
25 fmt,
26 sync::atomic::{AtomicI32, Ordering},
27};
28
29use nix::errno::Errno;
30
31use super::embedded_list::*;
32use super::{aio, callback_worker::*};
33use crate::buffer::Buffer;
34
35#[derive(Copy, Clone, PartialEq, Debug)]
36pub enum IOAction {
37 Read = 0,
38 Write = 1,
39}
40
41pub trait IOCallbackCustom: Sized + 'static + Send + Unpin {
43 fn call(self, _event: Box<IOEvent<Self>>);
44}
45
46pub struct ClosureCb(pub Box<dyn FnOnce(Box<IOEvent<Self>>) + Send + Sync + 'static>);
48
49impl IOCallbackCustom for ClosureCb {
50 fn call(self, event: Box<IOEvent<Self>>) {
51 (self.0)(event)
52 }
53}
54
55#[repr(C)]
57pub struct IOEvent<C: IOCallbackCustom> {
58 pub(crate) node: EmbeddedListNode,
61 pub buf: Option<Buffer>,
62 pub offset: i64,
63 pub action: IOAction,
64 pub fd: RawFd,
65 res: AtomicI32,
66 cb: Option<C>,
67 sub_tasks: Option<EmbeddedList>,
68}
69
70impl<C: IOCallbackCustom> fmt::Debug for IOEvent<C> {
71 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72 if let Some(sub_tasks) = self.sub_tasks.as_ref() {
73 write!(
74 f,
75 "offset={} {:?} sub_tasks {} ",
76 self.offset,
77 self.action,
78 sub_tasks.get_length()
79 )
80 } else {
81 write!(f, "offset={} {:?}", self.offset, self.action)
82 }
83 }
84}
85
86impl<C: IOCallbackCustom> IOEvent<C> {
87 #[inline]
88 pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> Box<Self> {
89 log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
90 Box::new(Self {
91 buf: Some(buf),
92 fd,
93 action,
94 offset,
95 res: AtomicI32::new(0),
96 cb: None,
97 sub_tasks: None,
98 node: Default::default(),
99 })
100 }
101
102 #[inline(always)]
104 pub fn set_callback(&mut self, cb: C) {
105 self.cb = Some(cb);
106 }
107
108 #[inline(always)]
109 pub fn get_size(&self) -> usize {
110 self.buf.as_ref().unwrap().len()
111 }
112
113 #[inline(always)]
114 pub fn push_to_list(mut self: Box<Self>, events: &mut EmbeddedList) {
115 events.push_back(&mut self.node);
116 let _ = Box::leak(self);
117 }
118
119 #[inline(always)]
120 pub fn pop_from_list(events: &mut EmbeddedList) -> Option<Box<Self>> {
121 if let Some(event) = events.pop_front::<Self>() {
122 Some(unsafe { Box::from_raw(event) })
123 } else {
124 None
125 }
126 }
127
128 #[inline(always)]
129 pub(crate) fn set_subtasks(&mut self, sub_tasks: EmbeddedList) {
130 self.sub_tasks = Some(sub_tasks)
131 }
132
133 #[inline(always)]
134 pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
135 self.buf.as_ref().unwrap().as_ref()
136 }
137
138 #[inline(always)]
139 pub fn is_done(&self) -> bool {
140 self.res.load(Ordering::Acquire) != 0
141 }
142
143 #[inline]
144 pub fn get_result(&mut self) -> Result<Buffer, Errno> {
145 let res = self.res.load(Ordering::Acquire);
146 if res > 0 {
147 return Ok(self.buf.take().unwrap());
148 } else if res == 0 {
149 panic!("IOEvent get_result before it's done");
150 } else {
151 return Err(Errno::from_raw(-res));
152 }
153 }
154
155 #[inline(always)]
156 pub fn _get_result(&mut self) -> Result<Buffer, i32> {
157 let res = self.res.load(Ordering::Acquire);
158 if res > 0 {
159 return Ok(self.buf.take().unwrap());
160 } else if res == 0 {
161 panic!("IOEvent get_result before it's done");
162 } else {
163 return Err(res);
164 }
165 }
166
167 #[inline(always)]
168 pub(crate) fn set_error(&self, mut errno: i32) {
169 if errno == 0 {
170 errno = Errno::EINVAL as i32;
173 }
174 if errno > 0 {
175 errno = -errno;
176 }
177 self.res.store(errno, Ordering::Release);
178 }
179
180 #[inline(always)]
181 pub(crate) fn set_ok(&self) {
182 self.res.store(1, Ordering::Release);
183 }
184
185 #[inline(always)]
186 pub(crate) fn callback(mut self: Box<Self>) {
187 match self.cb.take() {
188 Some(cb) => {
189 cb.call(self);
190 }
191 None => return,
192 }
193 }
194
195 #[inline(always)]
196 pub(crate) fn callback_merged(mut self: Box<Self>) {
197 if let Some(mut tasks) = self.sub_tasks.take() {
198 match self._get_result() {
199 Ok(buffer) => {
200 if self.action == IOAction::Read {
201 let mut offset: usize = 0;
202 let b = buffer.as_ref();
203 while let Some(mut event) = Self::pop_from_list(&mut tasks) {
204 let sub_buf = event.buf.as_mut().unwrap();
205 let sub_size = sub_buf.len();
206 sub_buf.copy_from(0, &b[offset..offset + sub_size]);
207 offset += sub_size;
208 event.set_ok();
209 event.callback();
210 }
211 } else {
212 while let Some(event) = Self::pop_from_list(&mut tasks) {
213 event.set_ok();
214 event.callback();
215 }
216 }
217 }
218 Err(errno) => {
219 while let Some(event) = Self::pop_from_list(&mut tasks) {
220 event.set_error(errno);
221 event.callback();
222 }
223 }
224 }
225 } else {
226 self.callback();
227 }
228 }
229}
230
231pub(crate) struct IOEventTaskSlot<C: IOCallbackCustom> {
232 pub(crate) iocb: aio::iocb,
233 pub(crate) event: Option<Box<IOEvent<C>>>,
234}
235
236impl<C: IOCallbackCustom> IOEventTaskSlot<C> {
237 pub(crate) fn new(slot_id: u64) -> Self {
238 Self {
239 iocb: aio::iocb { aio_data: slot_id, aio_reqprio: 1, ..Default::default() },
240 event: None,
241 }
242 }
243
244 #[inline(always)]
245 pub(crate) fn fill_slot(&mut self, event: Box<IOEvent<C>>, slot_id: u16) {
246 let iocb = &mut self.iocb;
247 iocb.aio_data = slot_id as libc::__u64;
248 iocb.aio_fildes = event.fd as libc::__u32;
249 let buf = event.buf.as_ref().unwrap();
250 iocb.aio_lio_opcode = event.action as u16;
251 iocb.aio_buf = buf.get_raw() as u64;
252 iocb.aio_nbytes = buf.len() as u64;
253 iocb.aio_offset = event.offset;
254 self.event.replace(event);
255 }
256
257 #[inline(always)]
258 pub(crate) fn set_written(&mut self, written: usize, cb: &IOWorkers<C>) -> bool {
259 if self.iocb.aio_nbytes <= written as u64 {
260 if let Some(event) = self.event.take() {
261 event.set_ok();
262 cb.send(event);
263 }
264 return true;
265 }
266 self.iocb.aio_nbytes -= written as u64;
267 self.iocb.aio_buf += written as u64;
268 return false;
269 }
270
271 #[inline(always)]
272 pub(crate) fn set_error(&mut self, errno: i32, cb: &IOWorkers<C>) {
273 if let Some(event) = self.event.take() {
274 event.set_error(errno);
275 cb.send(event);
276 }
277 }
278}