1use std::os::fd::RawFd;
2use std::{fmt, u64};
3
4use embed_collections::SegList;
5use io_buffer::{Buffer, safe_copy};
6use nix::errno::Errno;
7
8#[derive(Copy, Clone, PartialEq, Debug)]
9#[repr(u8)]
10pub enum IOAction {
11 Read = 0,
12 Write = 1,
13 Alloc = 2,
14 Fsync = 3,
15}
16
17pub trait IOCallback: Sized + 'static + Send + Unpin {
19 fn call(self, offset: i64, res: Result<Option<Buffer>, Errno>);
20}
21
22pub struct ClosureCb(pub Box<dyn FnOnce(i64, Result<Option<Buffer>, Errno>) + Send>);
24
25impl IOCallback for ClosureCb {
26 fn call(self, offset: i64, res: Result<Option<Buffer>, Errno>) {
27 (self.0)(offset, res)
28 }
29}
30
31pub struct IOEvent<C: IOCallback> {
33 pub action: IOAction,
34 pub(crate) res: i32,
39 buf_or_len: BufOrLen,
42 pub offset: i64,
43 pub fd: RawFd,
44 cb: TaskCallback<C>,
45}
46
47enum TaskCallback<C: IOCallback> {
48 None,
49 Callback(C),
50 Merged(SegList<IOEventMerged<C>>),
51}
52
53enum BufOrLen {
54 Buffer(Buffer),
55 Len(u64),
57}
58
59pub(crate) struct IOEventMerged<C: IOCallback> {
60 pub buf: Buffer,
61 pub cb: Option<C>,
62}
63
64impl<C: IOCallback> fmt::Debug for IOEvent<C> {
65 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66 if let TaskCallback::Merged(sub_tasks) = &self.cb {
67 write!(f, "offset={} {:?} merged {}", self.offset, self.action, sub_tasks.len())
68 } else {
69 write!(f, "offset={} {:?}", self.offset, self.action)
70 }
71 }
72}
73
74impl<C: IOCallback> IOEvent<C> {
75 #[inline]
76 pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> Self {
77 log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
78 Self {
79 buf_or_len: BufOrLen::Buffer(buf),
80 fd,
81 action,
82 offset,
83 res: i32::MIN,
84 cb: TaskCallback::None,
85 }
86 }
87
88 #[inline]
89 pub fn new_no_buf(fd: RawFd, action: IOAction, offset: i64, len: u64) -> Self {
90 Self {
91 buf_or_len: BufOrLen::Len(len), fd,
93 action,
94 offset,
95 res: i32::MIN,
96 cb: TaskCallback::None,
97 }
98 }
99
100 #[inline(always)]
101 pub fn set_fd(&mut self, fd: RawFd) {
102 self.fd = fd;
103 }
104
105 #[inline(always)]
107 pub fn set_callback(&mut self, cb: C) {
108 self.cb = TaskCallback::Callback(cb);
109 }
110
111 #[inline(always)]
112 pub fn get_size(&self) -> u64 {
113 match &self.buf_or_len {
114 BufOrLen::Buffer(buf) => buf.len() as u64,
115 BufOrLen::Len(l) => *l,
116 }
117 }
118
119 #[inline(always)]
121 pub(crate) fn set_merged_tasks(
122 &mut self, merged_buf: Buffer, sub_tasks: SegList<IOEventMerged<C>>,
123 ) {
124 self.buf_or_len = BufOrLen::Buffer(merged_buf);
125 self.cb = TaskCallback::Merged(sub_tasks);
126 }
127
128 #[inline(always)]
131 pub(crate) fn into_merged(mut self) -> IOEventMerged<C> {
132 let buf = match std::mem::replace(&mut self.buf_or_len, BufOrLen::Len(0)) {
133 BufOrLen::Buffer(buf) => buf,
134 BufOrLen::Len(_) => panic!("into_merged called on IOEvent with no buffer"),
135 };
136 let cb = match std::mem::replace(&mut self.cb, TaskCallback::None) {
137 TaskCallback::Callback(cb) => Some(cb),
138 _ => None,
139 };
140 IOEventMerged { buf, cb }
141 }
142
143 #[inline(always)]
146 pub(crate) fn extract_merged(&mut self) -> IOEventMerged<C> {
147 let buf = match std::mem::replace(&mut self.buf_or_len, BufOrLen::Len(0)) {
148 BufOrLen::Buffer(buf) => buf,
149 BufOrLen::Len(_) => panic!("extract_merged called on IOEvent with no buffer"),
150 };
151 let cb = match std::mem::replace(&mut self.cb, TaskCallback::None) {
152 TaskCallback::Callback(cb) => Some(cb),
153 _ => None,
154 };
155 IOEventMerged { buf, cb }
156 }
157
158 #[inline(always)]
160 pub(crate) fn get_param_for_io(&mut self) -> (u64, *mut u8, u32) {
161 if let BufOrLen::Buffer(buf) = &mut self.buf_or_len {
162 let mut offset = self.offset as u64;
163 let mut p = buf.get_raw_mut();
164 let mut l = buf.len() as u32;
165 if self.res > 0 {
166 offset += self.res as u64;
168 p = unsafe { p.add(self.res as usize) };
169 l += self.res as u32;
170 }
171 (offset, p, l)
172 } else {
173 panic!("get_buf_raw called on IOEvent with no buffer");
174 }
175 }
176
177 #[inline(always)]
178 pub fn get_write_result(self) -> Result<(), Errno> {
179 let res = self.res;
180 if res >= 0 {
181 return Ok(());
182 } else if res == i32::MIN {
183 panic!("IOEvent get_result before it's done");
184 } else {
185 return Err(Errno::from_raw(-res));
186 }
187 }
188
189 #[inline(always)]
192 pub fn get_result(&self) -> Result<usize, Errno> {
193 let res = self.res;
194 if res >= 0 {
195 return Ok(res as usize);
196 } else if res == i32::MIN {
197 panic!("IOEvent get_result before it's done");
198 } else {
199 return Err(Errno::from_raw(-res));
200 }
201 }
202
203 #[inline(always)]
206 pub fn get_read_result(mut self) -> Result<Buffer, Errno> {
207 let res = self.res;
208 if res >= 0 {
209 let buf_or_len = std::mem::replace(&mut self.buf_or_len, BufOrLen::Len(0));
211 if let BufOrLen::Buffer(buf) = buf_or_len {
212 return Ok(buf);
214 } else {
215 panic!("get_read_result called on IOEvent with no buffer");
216 }
217 } else if res == i32::MIN {
218 panic!("IOEvent get_result before it's done");
219 } else {
220 return Err(Errno::from_raw(-res));
221 }
222 }
223
224 #[inline(always)]
225 pub(crate) fn set_error(&mut self, mut errno: i32) {
226 if errno == 0 {
227 errno = Errno::EINVAL as i32;
230 }
231 if errno > 0 {
232 errno = -errno;
233 }
234 self.res = errno;
235 }
236
237 #[inline(always)]
238 pub(crate) fn set_copied(&mut self, len: usize) {
239 if self.res == i32::MIN {
240 self.res = len as i32;
242 } else {
243 self.res += len as i32;
245 }
246 }
247
248 #[inline(always)]
258 pub fn callback<F>(mut self: Box<Self>, check_short_read: F) -> Result<(), Box<Self>>
259 where
260 F: FnOnce(u64) -> bool,
261 {
262 if self.res >= 0 {
263 if let BufOrLen::Buffer(buf) = &mut self.buf_or_len {
264 if buf.len() > self.res as usize {
265 if self.action == IOAction::Read {
266 if check_short_read(self.offset as u64 + self.res as u64) {
267 return Err(self);
268 } else {
269 buf.set_len(self.res as usize);
271 }
272 } else {
273 return Err(self);
275 }
276 }
277 }
278 }
279 self.callback_unchecked(false);
280 Ok(())
281 }
282
283 #[inline(always)]
294 pub fn callback_unchecked(mut self, to_fix_short_io: bool) {
295 match std::mem::replace(&mut self.cb, TaskCallback::None) {
296 TaskCallback::None => {}
297 TaskCallback::Callback(cb) => {
298 let res: Result<Option<Buffer>, Errno> = if self.res >= 0 {
299 match self.buf_or_len {
300 BufOrLen::Buffer(mut buf) => {
301 if to_fix_short_io && buf.len() > self.res as usize {
302 buf.set_len(self.res as usize);
303 }
304 Ok(Some(buf))
305 }
306 BufOrLen::Len(_) => Ok(None),
307 }
308 } else {
309 Err(Errno::from_raw(-self.res))
310 };
311 cb.call(self.offset, res);
312 }
313 TaskCallback::Merged(sub_tasks) => {
314 if self.res >= 0 {
315 let mut offset = self.offset;
316 if self.action == IOAction::Read {
317 if let BufOrLen::Buffer(parent_buf) = &self.buf_or_len {
318 let mut b: &[u8] = &parent_buf[0..self.res as usize];
319 for IOEventMerged { mut buf, cb } in sub_tasks {
320 if let Some(_cb) = cb {
321 let copied = safe_copy(&mut buf, b);
322 if copied < buf.len() {
323 buf.set_len(copied); }
325 _cb.call(offset, Ok(Some(buf)));
326 b = &b[copied..];
327 offset += copied as i64
328 }
329 }
330 }
331 } else if self.action == IOAction::Write {
332 let mut l = self.res as usize;
333 for IOEventMerged { mut buf, cb } in sub_tasks {
334 let mut copied = buf.len();
335 if copied > l {
336 copied = l;
338 buf.set_len(l);
339 }
340 if let Some(_cb) = cb {
341 _cb.call(offset, Ok(Some(buf)));
342 }
343 l -= copied;
344 offset += copied as i64;
345 }
346 }
347 } else {
348 let mut offset = self.offset;
349 for IOEventMerged { buf, cb } in sub_tasks {
350 let _l = buf.len() as i64;
351 if let Some(_cb) = cb {
352 _cb.call(offset, Err(Errno::from_raw(-self.res)));
353 }
354 offset += _l;
355 }
356 }
357 }
358 }
359 }
360}
361
362#[cfg(test)]
363mod tests {
364
365 use super::*;
366 use io_buffer::Buffer;
367 use nix::errno::Errno;
368 use std::mem::size_of;
369 use std::sync::Arc;
370
371 #[test]
372 fn test_ioevent_size() {
373 println!("IOEvent size {}", size_of::<IOEvent<ClosureCb>>());
374 println!("BufOrLen size {}", size_of::<crate::tasks::BufOrLen>());
375 println!("IOEventMerged size {}", size_of::<IOEventMerged<ClosureCb>>());
376 }
377
378 #[test]
380 fn test_callback_normal() {
381 let buffer = Buffer::alloc(4096).unwrap();
382 let mut event = IOEvent::<ClosureCb>::new(0, buffer, IOAction::Write, 1024);
383
384 let result = Arc::new(std::sync::Mutex::new(None));
385 let result_clone = result.clone();
386
387 event.set_callback(ClosureCb(Box::new(move |offset, res| {
388 *result_clone.lock().unwrap() = Some((offset, res));
389 })));
390
391 event.set_copied(4096);
392 event.callback_unchecked(true);
393
394 let (offset, res) = result.lock().unwrap().take().unwrap();
395 assert_eq!(offset, 1024);
396 assert!(res.is_ok());
397 assert!(res.unwrap().is_some());
398 }
399
400 #[test]
402 fn test_callback_merged_read() {
403 use std::sync::atomic::{AtomicI64, Ordering};
404
405 let offsets = Arc::new([AtomicI64::new(0), AtomicI64::new(0), AtomicI64::new(0)]);
406 let offsets_clone = offsets.clone();
407
408 let mut sub_tasks = SegList::new();
410
411 let buf1 = Buffer::alloc(16).unwrap();
413 sub_tasks.push(IOEventMerged {
414 buf: buf1,
415 cb: Some(ClosureCb(Box::new(move |offset, res| {
416 offsets_clone[0].store(offset, Ordering::SeqCst);
417 assert!(res.is_ok());
418 assert!(res.unwrap().is_some());
419 }))),
420 });
421
422 let buf2 = Buffer::alloc(16).unwrap();
424 let offsets_clone2 = offsets.clone();
425 sub_tasks.push(IOEventMerged {
426 buf: buf2,
427 cb: Some(ClosureCb(Box::new(move |offset, res| {
428 offsets_clone2[1].store(offset, Ordering::SeqCst);
429 assert!(res.is_ok());
430 assert!(res.unwrap().is_some());
431 }))),
432 });
433
434 let buf3 = Buffer::alloc(16).unwrap();
436 let offsets_clone3 = offsets.clone();
437 sub_tasks.push(IOEventMerged {
438 buf: buf3,
439 cb: Some(ClosureCb(Box::new(move |offset, res| {
440 offsets_clone3[2].store(offset, Ordering::SeqCst);
441 assert!(res.is_ok());
442 assert!(res.unwrap().is_some());
443 }))),
444 });
445
446 let parent_buf = Buffer::alloc(48).unwrap();
448 let mut event = IOEvent::<ClosureCb>::new(0, parent_buf, IOAction::Read, 1000);
449 event.set_copied(48); let parent_buf = match std::mem::replace(&mut event.buf_or_len, BufOrLen::Len(0)) {
453 BufOrLen::Buffer(buf) => buf,
454 BufOrLen::Len(_) => panic!("expected buffer"),
455 };
456 let mut parent_buf = parent_buf;
457 parent_buf.copy_from(0, b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()");
458
459 event.set_merged_tasks(parent_buf, sub_tasks);
460 event.callback_unchecked(true); assert_eq!(offsets[0].load(Ordering::SeqCst), 1000);
464 assert_eq!(offsets[1].load(Ordering::SeqCst), 1016);
465 assert_eq!(offsets[2].load(Ordering::SeqCst), 1032);
466 }
467
468 #[test]
470 fn test_callback_merged_write() {
471 let parent_buf = Buffer::alloc(4096).unwrap();
473
474 let mut event = IOEvent::<ClosureCb>::new(0, parent_buf, IOAction::Write, 2000);
475 event.set_copied(48); let mut sub_tasks = SegList::new();
478
479 sub_tasks.push(IOEventMerged {
481 buf: Buffer::alloc(16).unwrap(),
482 cb: Some(ClosureCb(Box::new(move |offset, res| {
483 assert_eq!(offset, 2000, "first write callback offset");
484 assert!(res.is_ok());
485 assert!(res.unwrap().is_some());
486 }))),
487 });
488
489 sub_tasks.push(IOEventMerged {
491 buf: Buffer::alloc(16).unwrap(),
492 cb: Some(ClosureCb(Box::new(move |offset, res| {
493 assert_eq!(offset, 2016, "second write callback offset");
494 assert!(res.is_ok());
495 assert!(res.unwrap().is_some());
496 }))),
497 });
498
499 sub_tasks.push(IOEventMerged {
501 buf: Buffer::alloc(16).unwrap(),
502 cb: Some(ClosureCb(Box::new(move |offset, res| {
503 assert_eq!(offset, 2032, "third write callback offset");
504 assert!(res.is_ok());
505 assert!(res.unwrap().is_some());
506 }))),
507 });
508
509 event.set_merged_tasks(Buffer::alloc(4096).unwrap(), sub_tasks);
510 event.callback_unchecked(true); }
512
513 #[test]
515 fn test_callback_merged_error() {
516 let parent_buf = Buffer::alloc(4096).unwrap();
517 let mut event = IOEvent::<ClosureCb>::new(0, parent_buf, IOAction::Read, 3000);
518 event.set_error(Errno::EIO as i32); let mut sub_tasks = SegList::new();
521
522 sub_tasks.push(IOEventMerged {
524 buf: Buffer::alloc(16).unwrap(),
525 cb: Some(ClosureCb(Box::new(move |offset, res| {
526 assert_eq!(offset, 3000, "error callback offset");
527 assert!(res.is_err());
528 assert_eq!(res.err().unwrap(), Errno::EIO);
529 }))),
530 });
531
532 sub_tasks.push(IOEventMerged {
534 buf: Buffer::alloc(16).unwrap(),
535 cb: Some(ClosureCb(Box::new(move |offset, res| {
536 assert_eq!(offset, 3016, "error callback offset 2");
537 assert!(res.is_err());
538 }))),
539 });
540
541 event.set_merged_tasks(Buffer::alloc(48).unwrap(), sub_tasks);
542 event.callback_unchecked(true);
543 }
544
545 #[test]
547 fn test_callback_merged_short_read() {
548 use std::sync::atomic::{AtomicI64, Ordering};
549
550 let offsets = Arc::new([AtomicI64::new(0), AtomicI64::new(0)]);
551 let offsets_clone = offsets.clone();
552
553 let mut sub_tasks = SegList::new();
554
555 sub_tasks.push(IOEventMerged {
557 buf: Buffer::alloc(16).unwrap(),
558 cb: Some(ClosureCb(Box::new(move |offset, res| {
559 offsets_clone[0].store(offset, Ordering::SeqCst);
560 assert!(res.is_ok());
561 assert!(res.unwrap().is_some());
562 }))),
563 });
564
565 let offsets_clone2 = offsets.clone();
567 sub_tasks.push(IOEventMerged {
568 buf: Buffer::alloc(16).unwrap(),
569 cb: Some(ClosureCb(Box::new(move |offset, res| {
570 offsets_clone2[1].store(offset, Ordering::SeqCst);
571 assert!(res.is_ok());
572 assert!(res.unwrap().is_some());
573 }))),
574 });
575
576 let parent_buf = Buffer::alloc(32).unwrap();
578 let mut event = IOEvent::<ClosureCb>::new(0, parent_buf, IOAction::Read, 4000);
579 event.set_copied(24); let parent_buf = match std::mem::replace(&mut event.buf_or_len, BufOrLen::Len(0)) {
583 BufOrLen::Buffer(buf) => buf,
584 BufOrLen::Len(_) => panic!("expected buffer"),
585 };
586
587 event.set_merged_tasks(parent_buf, sub_tasks);
588 event.callback_unchecked(true);
589
590 assert_eq!(offsets[0].load(Ordering::SeqCst), 4000);
592 assert_eq!(offsets[1].load(Ordering::SeqCst), 4016);
593 }
594}