1use std::fmt;
2use std::os::fd::RawFd;
3
4use nix::errno::Errno;
5
6use crate::embedded_list::*;
7use io_buffer::{Buffer, safe_copy};
8
9#[derive(Copy, Clone, PartialEq, Debug)]
10pub enum IOAction {
11 Read = 0,
12 Write = 1,
13}
14
15pub trait IoCallback: Sized + 'static + Send + Unpin {
17 fn call(self, _event: Box<IOEvent<Self>>);
18}
19
20pub struct ClosureCb(pub Box<dyn FnOnce(Box<IOEvent<Self>>) + Send + Sync + 'static>);
22
23impl IoCallback for ClosureCb {
24 fn call(self, event: Box<IOEvent<Self>>) {
25 (self.0)(event)
26 }
27}
28
29#[repr(C)]
31pub struct IOEvent<C: IoCallback> {
32 pub(crate) node: EmbeddedListNode,
35 pub buf: Option<Buffer>,
36 pub offset: i64,
37 pub action: IOAction,
38 pub fd: RawFd,
39 res: i32,
40 cb: Option<C>,
41 sub_tasks: Option<EmbeddedList>,
42}
43
44impl<C: IoCallback> fmt::Debug for IOEvent<C> {
45 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
46 if let Some(sub_tasks) = self.sub_tasks.as_ref() {
47 write!(
48 f,
49 "offset={} {:?} sub_tasks {} ",
50 self.offset,
51 self.action,
52 sub_tasks.get_length()
53 )
54 } else {
55 write!(f, "offset={} {:?}", self.offset, self.action)
56 }
57 }
58}
59
60impl<C: IoCallback> IOEvent<C> {
61 #[inline]
62 pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> Box<Self> {
63 log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
64 Box::new(Self {
65 buf: Some(buf),
66 fd,
67 action,
68 offset,
69 res: i32::MIN,
70 cb: None,
71 sub_tasks: None,
72 node: Default::default(),
73 })
74 }
75
76 #[inline(always)]
78 pub fn set_callback(&mut self, cb: C) {
79 self.cb = Some(cb);
80 }
81
82 #[inline(always)]
83 pub fn get_size(&self) -> usize {
84 self.buf.as_ref().unwrap().len()
85 }
86
87 #[inline(always)]
88 pub fn push_to_list(mut self: Box<Self>, events: &mut EmbeddedList) {
89 events.push_back(&mut self.node);
90 let _ = Box::leak(self);
91 }
92
93 #[inline(always)]
94 pub fn pop_from_list(events: &mut EmbeddedList) -> Option<Box<Self>> {
95 if let Some(event) = events.pop_front::<Self>() {
96 Some(unsafe { Box::from_raw(event) })
97 } else {
98 None
99 }
100 }
101
102 #[inline(always)]
103 pub fn set_subtasks(&mut self, sub_tasks: EmbeddedList) {
104 self.sub_tasks = Some(sub_tasks)
105 }
106
107 #[inline(always)]
108 pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
109 self.buf.as_ref().unwrap().as_ref()
110 }
111
112 #[inline(always)]
113 pub fn is_done(&self) -> bool {
114 self.res != i32::MIN
115 }
116
117 #[inline(always)]
118 pub fn get_write_result(self) -> Result<(), Errno> {
119 let res = self.res;
120 if res >= 0 {
121 return Ok(());
122 } else if res == i32::MIN {
123 panic!("IOEvent get_result before it's done");
124 } else {
125 return Err(Errno::from_raw(-res));
126 }
127 }
128
129 #[inline(always)]
130 pub fn get_read_result(mut self) -> Result<Buffer, Errno> {
131 let res = self.res;
132 if res >= 0 {
133 let mut buf = self.buf.take().unwrap();
134 buf.set_len(res as usize);
135 return Ok(buf);
136 } else if res == i32::MIN {
137 panic!("IOEvent get_result before it's done");
138 } else {
139 return Err(Errno::from_raw(-res));
140 }
141 }
142
143 #[inline(always)]
144 pub(crate) fn set_error(&mut self, mut errno: i32) {
145 if errno == 0 {
146 errno = Errno::EINVAL as i32;
149 }
150 if errno > 0 {
151 errno = -errno;
152 }
153 self.res = errno;
154 }
155
156 #[inline(always)]
157 pub(crate) fn set_copied(&mut self, len: usize) {
158 if self.res == i32::MIN {
159 self.res = len as i32;
160 } else {
161 self.res += len as i32;
162 }
163 }
164
165 #[inline(always)]
166 pub(crate) fn callback(mut self: Box<Self>) {
167 match self.cb.take() {
168 Some(cb) => {
169 cb.call(self);
170 }
171 None => return,
172 }
173 }
174
175 #[inline(always)]
176 pub(crate) fn callback_merged(mut self: Box<Self>) {
177 if let Some(mut tasks) = self.sub_tasks.take() {
178 let res = self.res;
179 if res >= 0 {
180 if self.action == IOAction::Read {
181 let buffer = self.buf.take().unwrap();
182 let mut b = buffer.as_ref();
183 while let Some(mut event) = Self::pop_from_list(&mut tasks) {
184 let sub_buf = event.buf.as_mut().unwrap();
185 if b.len() == 0 {
186 event.set_copied(0);
188 } else {
189 let copied = safe_copy(sub_buf, b);
190 event.set_copied(copied);
191 b = &b[copied..];
192 }
193 event.callback();
194 }
195 } else {
196 let l = self.buf.as_ref().unwrap().len();
197 while let Some(mut event) = Self::pop_from_list(&mut tasks) {
198 let mut sub_len = event.get_size();
199 if sub_len > l {
200 sub_len = l;
202 }
203 event.set_copied(sub_len);
204 event.callback();
205 }
206 }
207 } else {
208 let errno = -res;
209 while let Some(mut event) = Self::pop_from_list(&mut tasks) {
210 event.set_error(errno);
211 event.callback();
212 }
213 }
214 } else {
215 self.callback();
216 }
217 }
218
219 pub(crate) fn new_exit_signal(fd: RawFd) -> Box<Self> {
221 Box::new(Self {
222 node: Default::default(),
223 buf: None,
224 offset: 0,
225 action: IOAction::Read, fd,
227 res: i32::MIN,
228 cb: None, sub_tasks: None,
230 })
231 }
232}