io_engine/scheduler/
tasks.rs1use std::os::fd::RawFd;
24use std::{
25 fmt,
26 sync::atomic::{AtomicI32, Ordering},
27};
28
29use nix::errno::Errno;
30
31use super::embedded_list::*;
32use super::{aio, callback_worker::*};
33use crate::buffer::Buffer;
34
35#[derive(Copy, Clone, PartialEq, Debug)]
36pub enum IOAction {
37 Read = 0,
38 Write = 1,
39}
40
41pub trait IOCallbackCustom: Sized + 'static + Send + Unpin {
43 fn call(self, _event: Box<IOEvent<Self>>);
44}
45
46pub struct ClosureCb(pub Box<dyn FnOnce(Box<IOEvent<Self>>) + Send + Sync + 'static>);
48
49impl IOCallbackCustom for ClosureCb {
50 fn call(self, event: Box<IOEvent<Self>>) {
51 (self.0)(event)
52 }
53}
54
55#[repr(C)]
57pub struct IOEvent<C: IOCallbackCustom> {
58 pub(crate) node: EmbeddedListNode,
61 pub buf: Option<Buffer>,
62 pub offset: i64,
63 pub action: IOAction,
64 pub fd: RawFd,
65 res: AtomicI32,
66 cb: Option<C>,
67 sub_tasks: Option<EmbeddedList>,
68}
69
70impl<C: IOCallbackCustom> fmt::Debug for IOEvent<C> {
71 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72 if let Some(sub_tasks) = self.sub_tasks.as_ref() {
73 write!(
74 f,
75 "offset={} {:?} sub_tasks {} ",
76 self.offset,
77 self.action,
78 sub_tasks.get_length()
79 )
80 } else {
81 write!(f, "offset={} {:?}", self.offset, self.action)
82 }
83 }
84}
85
86impl<C: IOCallbackCustom> IOEvent<C> {
87 #[inline]
88 pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> Box<Self> {
89 log_assert!(
90 buf.len() > 0,
91 "{:?} offset={}, buffer size == 0",
92 action,
93 offset
94 );
95 Box::new(Self {
96 buf: Some(buf),
97 fd,
98 action,
99 offset,
100 res: AtomicI32::new(0),
101 cb: None,
102 sub_tasks: None,
103 node: Default::default(),
104 })
105 }
106
107 #[inline(always)]
109 pub fn set_callback(&mut self, cb: C) {
110 self.cb = Some(cb);
111 }
112
113 #[inline(always)]
114 pub fn get_size(&self) -> usize {
115 self.buf.as_ref().unwrap().len()
116 }
117
118 #[inline(always)]
119 pub fn push_to_list(mut self: Box<Self>, events: &mut EmbeddedList) {
120 events.push_back(&mut self.node);
121 let _ = Box::leak(self);
122 }
123
124 #[inline(always)]
125 pub fn pop_from_list(events: &mut EmbeddedList) -> Option<Box<Self>> {
126 if let Some(event) = events.pop_front::<Self>() {
127 Some(unsafe { Box::from_raw(event) })
128 } else {
129 None
130 }
131 }
132
133 #[inline(always)]
134 pub(crate) fn set_subtasks(&mut self, sub_tasks: EmbeddedList) {
135 self.sub_tasks = Some(sub_tasks)
136 }
137
138 #[inline(always)]
139 pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
140 self.buf.as_ref().unwrap().as_ref()
141 }
142
143 #[inline(always)]
144 pub fn is_done(&self) -> bool {
145 self.res.load(Ordering::Acquire) != 0
146 }
147
148 #[inline]
149 pub fn get_result(&mut self) -> Result<Buffer, Errno> {
150 let res = self.res.load(Ordering::Acquire);
151 if res > 0 {
152 return Ok(self.buf.take().unwrap());
153 } else if res == 0 {
154 panic!("IOEvent get_result before it's done");
155 } else {
156 return Err(Errno::from_raw(-res));
157 }
158 }
159
160 #[inline(always)]
161 pub fn _get_result(&mut self) -> Result<Buffer, i32> {
162 let res = self.res.load(Ordering::Acquire);
163 if res > 0 {
164 return Ok(self.buf.take().unwrap());
165 } else if res == 0 {
166 panic!("IOEvent get_result before it's done");
167 } else {
168 return Err(res);
169 }
170 }
171
172 #[inline(always)]
173 pub(crate) fn set_error(&self, mut errno: i32) {
174 if errno == 0 {
175 errno = Errno::EINVAL as i32;
178 }
179 if errno > 0 {
180 errno = -errno;
181 }
182 self.res.store(errno, Ordering::Release);
183 }
184
185 #[inline(always)]
186 pub(crate) fn set_ok(&self) {
187 self.res.store(1, Ordering::Release);
188 }
189
190 #[inline(always)]
191 pub(crate) fn callback(mut self: Box<Self>) {
192 match self.cb.take() {
193 Some(cb) => {
194 cb.call(self);
195 }
196 None => return,
197 }
198 }
199
200 #[inline(always)]
201 pub(crate) fn callback_merged(mut self: Box<Self>) {
202 if let Some(mut tasks) = self.sub_tasks.take() {
203 match self._get_result() {
204 Ok(buffer) => {
205 if self.action == IOAction::Read {
206 let mut offset: usize = 0;
207 let b = buffer.as_ref();
208 while let Some(mut event) = Self::pop_from_list(&mut tasks) {
209 let sub_buf = event.buf.as_mut().unwrap();
210 let sub_size = sub_buf.len();
211 sub_buf.copy_from(0, &b[offset..offset + sub_size]);
212 offset += sub_size;
213 event.set_ok();
214 event.callback();
215 }
216 } else {
217 while let Some(event) = Self::pop_from_list(&mut tasks) {
218 event.set_ok();
219 event.callback();
220 }
221 }
222 }
223 Err(errno) => {
224 while let Some(event) = Self::pop_from_list(&mut tasks) {
225 event.set_error(errno);
226 event.callback();
227 }
228 }
229 }
230 } else {
231 self.callback();
232 }
233 }
234}
235
236pub(crate) struct IOEventTaskSlot<C: IOCallbackCustom> {
237 pub(crate) iocb: aio::iocb,
238 pub(crate) event: Option<Box<IOEvent<C>>>,
239}
240
241impl<C: IOCallbackCustom> IOEventTaskSlot<C> {
242 pub(crate) fn new(slot_id: u64) -> Self {
243 Self {
244 iocb: aio::iocb {
245 aio_data: slot_id,
246 aio_reqprio: 1,
247 ..Default::default()
248 },
249 event: None,
250 }
251 }
252
253 #[inline(always)]
254 pub(crate) fn fill_slot(&mut self, event: Box<IOEvent<C>>, slot_id: u16) {
255 let iocb = &mut self.iocb;
256 iocb.aio_data = slot_id as libc::__u64;
257 iocb.aio_fildes = event.fd as libc::__u32;
258 let buf = event.buf.as_ref().unwrap();
259 iocb.aio_lio_opcode = event.action as u16;
260 iocb.aio_buf = buf.get_raw() as u64;
261 iocb.aio_nbytes = buf.len() as u64;
262 iocb.aio_offset = event.offset;
263 self.event.replace(event);
264 }
265
266 #[inline(always)]
267 pub(crate) fn set_written(&mut self, written: usize, cb: &IOWorkers<C>) -> bool {
268 if self.iocb.aio_nbytes <= written as u64 {
269 if let Some(event) = self.event.take() {
270 event.set_ok();
271 cb.send(event);
272 }
273 return true;
274 }
275 self.iocb.aio_nbytes -= written as u64;
276 self.iocb.aio_buf += written as u64;
277 return false;
278 }
279
280 #[inline(always)]
281 pub(crate) fn set_error(&mut self, errno: i32, cb: &IOWorkers<C>) {
282 if let Some(event) = self.event.take() {
283 event.set_error(errno);
284 cb.send(event);
285 }
286 }
287}