1use std::cell::UnsafeCell;
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::os::fd::RawFd;
5
6use nix::errno::Errno;
7
8use embed_collections::dlist::{DLinkedList, DListItem, DListNode};
9use io_buffer::{Buffer, safe_copy};
10
11#[derive(Copy, Clone, PartialEq, Debug)]
12pub enum IOAction {
13 Read = 0,
14 Write = 1,
15}
16
17pub trait IOCallback: Sized + 'static + Send + Unpin {
19 fn call(self, _event: IOEvent<Self>);
20}
21
22pub struct ClosureCb(pub Box<dyn FnOnce(IOEvent<Self>) + Send + Sync + 'static>);
24
25impl IOCallback for ClosureCb {
26 fn call(self, event: IOEvent<Self>) {
27 (self.0)(event)
28 }
29}
30
31pub struct IOEvent<C: IOCallback>(pub Box<IOEvent_<C>>);
32
33impl<C: IOCallback> Deref for IOEvent<C> {
34 type Target = IOEvent_<C>;
35 fn deref(&self) -> &Self::Target {
36 &self.0
37 }
38}
39
40impl<C: IOCallback> DerefMut for IOEvent<C> {
41 fn deref_mut(&mut self) -> &mut Self::Target {
42 &mut self.0
43 }
44}
45
46impl<C: IOCallback> fmt::Debug for IOEvent<C> {
47 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48 self.0.fmt(f)
49 }
50}
51
52#[repr(C)]
54pub struct IOEvent_<C: IOCallback> {
55 pub(crate) node: UnsafeCell<DListNode<Self, ()>>,
58 pub buf: Option<Buffer>,
59 pub offset: i64,
60 pub action: IOAction,
61 pub fd: RawFd,
62 pub(crate) res: i32,
67 cb: Option<C>,
68 sub_tasks: DLinkedList<Box<Self>, ()>,
69}
70
71unsafe impl<C: IOCallback> DListItem<()> for IOEvent_<C> {
73 fn get_node(&self) -> &mut DListNode<Self, ()> {
74 unsafe { &mut *self.node.get() }
75 }
76}
77
78impl<C: IOCallback> fmt::Debug for IOEvent_<C> {
79 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
80 write!(f, "offset={} {:?} sub_tasks {} ", self.offset, self.action, self.sub_tasks.len())
81 }
82}
83
84impl<C: IOCallback> IOEvent<C> {
85 #[inline]
86 pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> IOEvent<C> {
87 log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
88 IOEvent(Box::new(IOEvent_ {
89 buf: Some(buf),
90 fd,
91 action,
92 offset,
93 res: i32::MIN,
94 cb: None,
95 sub_tasks: DLinkedList::new(),
96 node: UnsafeCell::new(DListNode::default()),
97 }))
98 }
99
100 #[inline(always)]
102 pub fn set_callback(&mut self, cb: C) {
103 self.cb = Some(cb);
104 }
105
106 #[inline(always)]
107 pub fn get_size(&self) -> usize {
108 self.buf.as_ref().unwrap().len()
109 }
110
111 #[inline(always)]
112 pub(crate) fn push_to_list(self, events: &mut DLinkedList<Box<IOEvent_<C>>, ()>) {
113 events.push_back(self.0);
114 }
115
116 #[inline(always)]
117 pub(crate) fn pop_from_list(events: &mut DLinkedList<Box<IOEvent_<C>>, ()>) -> Option<Self> {
118 events.pop_front().map(IOEvent)
119 }
120
121 #[inline(always)]
122 pub(crate) fn set_subtasks(&mut self, sub_tasks: DLinkedList<Box<IOEvent_<C>>, ()>) {
123 self.sub_tasks = sub_tasks;
124 }
125
126 #[inline(always)]
127 pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
128 self.buf.as_ref().unwrap().as_ref()
129 }
130
131 #[inline(always)]
132 pub fn is_done(&self) -> bool {
133 self.res != i32::MIN
134 }
135
136 #[inline(always)]
137 pub fn get_write_result(self) -> Result<(), Errno> {
138 let res = self.res;
139 if res >= 0 {
140 return Ok(());
141 } else if res == i32::MIN {
142 panic!("IOEvent get_result before it's done");
143 } else {
144 return Err(Errno::from_raw(-res));
145 }
146 }
147
148 #[inline(always)]
151 pub fn get_result(&self) -> Result<usize, Errno> {
152 let res = self.res;
153 if res >= 0 {
154 return Ok(res as usize);
155 } else if res == i32::MIN {
156 panic!("IOEvent get_result before it's done");
157 } else {
158 return Err(Errno::from_raw(-res));
159 }
160 }
161
162 #[inline(always)]
165 pub fn get_read_result(mut self) -> Result<Buffer, Errno> {
166 let res = self.res;
167 if res >= 0 {
168 let buf = self.buf.take().unwrap();
169 return Ok(buf);
171 } else if res == i32::MIN {
172 panic!("IOEvent get_result before it's done");
173 } else {
174 return Err(Errno::from_raw(-res));
175 }
176 }
177
178 #[inline(always)]
179 pub(crate) fn set_error(&mut self, mut errno: i32) {
180 if errno == 0 {
181 errno = Errno::EINVAL as i32;
184 }
185 if errno > 0 {
186 errno = -errno;
187 }
188 self.res = errno;
189 }
190
191 #[inline(always)]
192 pub(crate) fn set_copied(&mut self, len: usize) {
193 if self.res == i32::MIN {
194 self.res = len as i32;
195 } else {
196 self.res += len as i32;
197 }
198 }
199
200 #[inline(always)]
203 pub(crate) fn callback(mut self) {
204 match self.cb.take() {
205 Some(cb) => {
206 cb.call(self);
207 }
208 None => return,
209 }
210 }
211
212 #[inline(always)]
216 pub fn callback_merged(mut self) {
217 if !self.sub_tasks.is_empty() {
218 let res = self.res;
219 if res >= 0 {
220 if self.action == IOAction::Read {
221 let buffer = self.buf.take().unwrap();
222 let mut b = buffer.as_ref();
223 for event_box in self.sub_tasks.drain() {
224 let mut event = IOEvent(event_box);
225 let sub_buf = event.buf.as_mut().unwrap();
226 if b.len() == 0 {
227 event.set_copied(0);
229 } else {
230 let copied = safe_copy(sub_buf, b);
231 event.set_copied(copied);
232 b = &b[copied..];
233 }
234 event.callback();
235 }
236 } else {
237 let l = self.buf.as_ref().unwrap().len();
238 for event_box in self.sub_tasks.drain() {
239 let mut event = IOEvent(event_box);
240 let mut sub_len = event.get_size();
241 if sub_len > l {
242 sub_len = l;
244 }
245 event.set_copied(sub_len);
246 event.callback();
247 }
248 }
249 } else {
250 let errno = -res;
251 for event_box in self.sub_tasks.drain() {
252 let mut event = IOEvent(event_box);
253 event.set_error(errno);
254 event.callback();
255 }
256 }
257 } else {
258 self.callback();
259 }
260 }
261
262 pub(crate) fn new_exit_signal(fd: RawFd) -> Self {
264 Self(Box::new(IOEvent_ {
266 node: UnsafeCell::new(DListNode::default()),
267 buf: None,
268 offset: 0,
269 action: IOAction::Read, fd,
271 res: i32::MIN,
272 cb: None, sub_tasks: DLinkedList::new(),
274 }))
275 }
276}