1use std::cell::UnsafeCell;
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::os::fd::RawFd;
5
6use nix::errno::Errno;
7
8use embed_collections::dlist::{DLinkedList, DListItem, DListNode};
9use io_buffer::{Buffer, safe_copy};
10
11#[derive(Copy, Clone, PartialEq, Debug)]
12pub enum IOAction {
13 Read = 0,
14 Write = 1,
15}
16
17pub trait IOCallback: Sized + 'static + Send + Unpin {
19 fn call(self, _event: IOEvent<Self>);
20}
21
22pub struct ClosureCb(pub Box<dyn FnOnce(IOEvent<Self>) + Send + Sync + 'static>);
24
25impl IOCallback for ClosureCb {
26 fn call(self, event: IOEvent<Self>) {
27 (self.0)(event)
28 }
29}
30
31pub struct IOEvent<C: IOCallback>(pub Box<IOEvent_<C>>);
32
33impl<C: IOCallback> Deref for IOEvent<C> {
34 type Target = IOEvent_<C>;
35 fn deref(&self) -> &Self::Target {
36 &self.0
37 }
38}
39
40impl<C: IOCallback> DerefMut for IOEvent<C> {
41 fn deref_mut(&mut self) -> &mut Self::Target {
42 &mut self.0
43 }
44}
45
46impl<C: IOCallback> fmt::Debug for IOEvent<C> {
47 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48 self.0.fmt(f)
49 }
50}
51
52#[repr(C)]
54pub struct IOEvent_<C: IOCallback> {
55 pub(crate) node: UnsafeCell<DListNode<Self, ()>>,
58 pub buf: Option<Buffer>,
59 pub offset: i64,
60 pub action: IOAction,
61 pub fd: RawFd,
62 pub(crate) res: i32,
67 cb: Option<C>,
68 sub_tasks: DLinkedList<Box<Self>, ()>,
69}
70
71unsafe impl<C: IOCallback> DListItem<()> for IOEvent_<C> {
73 fn get_node(&self) -> &mut DListNode<Self, ()> {
74 unsafe { &mut *self.node.get() }
75 }
76}
77
78impl<C: IOCallback> fmt::Debug for IOEvent_<C> {
79 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
80 write!(f, "offset={} {:?} sub_tasks {} ", self.offset, self.action, self.sub_tasks.len())
81 }
82}
83
84impl<C: IOCallback> IOEvent<C> {
85 #[inline]
86 pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> IOEvent<C> {
87 log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
88 IOEvent(Box::new(IOEvent_ {
89 buf: Some(buf),
90 fd,
91 action,
92 offset,
93 res: i32::MIN,
94 cb: None,
95 sub_tasks: DLinkedList::new(),
96 node: UnsafeCell::new(DListNode::default()),
97 }))
98 }
99
100 #[inline(always)]
102 pub fn set_callback(&mut self, cb: C) {
103 self.cb = Some(cb);
104 }
105
106 #[inline(always)]
107 pub fn get_size(&self) -> usize {
108 self.buf.as_ref().unwrap().len()
109 }
110
111 #[inline(always)]
112 pub fn push_to_list(self, events: &mut DLinkedList<Box<IOEvent_<C>>, ()>) {
113 events.push_back(self.0);
114 }
115
116 #[inline(always)]
117 pub fn pop_from_list(events: &mut DLinkedList<Box<IOEvent_<C>>, ()>) -> Option<Self> {
118 events.pop_front().map(IOEvent)
119 }
120
121 #[inline(always)]
122 pub fn set_subtasks(&mut self, sub_tasks: DLinkedList<Box<IOEvent_<C>>, ()>) {
123 self.sub_tasks = sub_tasks;
124 }
125
126 #[inline(always)]
127 pub fn get_buf_ref<'a>(&'a self) -> &'a [u8] {
128 self.buf.as_ref().unwrap().as_ref()
129 }
130
131 #[inline(always)]
132 pub fn is_done(&self) -> bool {
133 self.res != i32::MIN
134 }
135
136 #[inline(always)]
137 pub fn get_write_result(self) -> Result<(), Errno> {
138 let res = self.res;
139 if res >= 0 {
140 return Ok(());
141 } else if res == i32::MIN {
142 panic!("IOEvent get_result before it's done");
143 } else {
144 return Err(Errno::from_raw(-res));
145 }
146 }
147
148 #[inline(always)]
149 pub fn get_read_result(mut self) -> Result<Buffer, Errno> {
150 let res = self.res;
151 if res >= 0 {
152 let mut buf = self.buf.take().unwrap();
153 buf.set_len(res as usize);
154 return Ok(buf);
155 } else if res == i32::MIN {
156 panic!("IOEvent get_result before it's done");
157 } else {
158 return Err(Errno::from_raw(-res));
159 }
160 }
161
162 #[inline(always)]
163 pub(crate) fn set_error(&mut self, mut errno: i32) {
164 if errno == 0 {
165 errno = Errno::EINVAL as i32;
168 }
169 if errno > 0 {
170 errno = -errno;
171 }
172 self.res = errno;
173 }
174
175 #[inline(always)]
176 pub(crate) fn set_copied(&mut self, len: usize) {
177 if self.res == i32::MIN {
178 self.res = len as i32;
179 } else {
180 self.res += len as i32;
181 }
182 }
183
184 #[inline(always)]
185 pub(crate) fn callback(mut self) {
186 match self.cb.take() {
187 Some(cb) => {
188 cb.call(self);
189 }
190 None => return,
191 }
192 }
193
194 #[inline(always)]
195 pub(crate) fn callback_merged(mut self) {
196 let mut tasks = std::mem::replace(&mut self.sub_tasks, DLinkedList::new());
201
202 if !tasks.is_empty() {
203 let res = self.res;
204 if res >= 0 {
205 if self.action == IOAction::Read {
206 let buffer = self.buf.take().unwrap();
207 let mut b = buffer.as_ref();
208 while let Some(mut event) = Self::pop_from_list(&mut tasks) {
209 let sub_buf = event.buf.as_mut().unwrap();
210 if b.len() == 0 {
211 event.set_copied(0);
213 } else {
214 let copied = safe_copy(sub_buf, b);
215 event.set_copied(copied);
216 b = &b[copied..];
217 }
218 event.callback();
219 }
220 } else {
221 let l = self.buf.as_ref().unwrap().len();
222 while let Some(mut event) = Self::pop_from_list(&mut tasks) {
223 let mut sub_len = event.get_size();
224 if sub_len > l {
225 sub_len = l;
227 }
228 event.set_copied(sub_len);
229 event.callback();
230 }
231 }
232 } else {
233 let errno = -res;
234 while let Some(mut event) = Self::pop_from_list(&mut tasks) {
235 event.set_error(errno);
236 event.callback();
237 }
238 }
239 } else {
240 self.callback();
241 }
242 }
243
244 pub(crate) fn new_exit_signal(fd: RawFd) -> Self {
246 Self(Box::new(IOEvent_ {
248 node: UnsafeCell::new(DListNode::default()),
249 buf: None,
250 offset: 0,
251 action: IOAction::Read, fd,
253 res: i32::MIN,
254 cb: None, sub_tasks: DLinkedList::new(),
256 }))
257 }
258}