Skip to main content

io_engine/
tasks.rs

1use std::os::fd::RawFd;
2use std::{fmt, u64};
3
4use embed_collections::SegList;
5use io_buffer::{Buffer, safe_copy};
6use nix::errno::Errno;
7
8#[derive(Copy, Clone, PartialEq, Debug)]
9#[repr(u8)]
10pub enum IOAction {
11    Read = 0,
12    Write = 1,
13    Alloc = 2,
14    Fsync = 3,
15}
16
17/// Define your callback with this trait
18pub trait IOCallback: Sized + 'static + Send + Unpin {
19    fn call(self, offset: i64, res: Result<Option<Buffer>, Errno>);
20}
21
22/// Closure callback for IOEvent
23pub struct ClosureCb(pub Box<dyn FnOnce(i64, Result<Option<Buffer>, Errno>) + Send>);
24
25impl IOCallback for ClosureCb {
26    fn call(self, offset: i64, res: Result<Option<Buffer>, Errno>) {
27        (self.0)(offset, res)
28    }
29}
30
31// Carries the information of read/write event
32pub struct IOEvent<C: IOCallback> {
33    pub action: IOAction,
34    /// Result of the IO operation.
35    /// Initialized to i32::MIN.
36    /// >= 0: Accumulated bytes transferred (used for partial IO retries).
37    /// < 0: Error code (negative errno).
38    pub(crate) res: i32,
39    /// make sure SListNode always in the front.
40    /// This is for putting sub_tasks in the link list, without additional allocation.
41    buf_or_len: BufOrLen,
42    pub offset: i64,
43    pub fd: RawFd,
44    cb: TaskCallback<C>,
45}
46
47enum TaskCallback<C: IOCallback> {
48    None,
49    Callback(C),
50    Merged(SegList<IOEventMerged<C>>),
51}
52
53enum BufOrLen {
54    Buffer(Buffer),
55    /// for fallocate
56    Len(u64),
57}
58
59pub(crate) struct IOEventMerged<C: IOCallback> {
60    pub buf: Buffer,
61    pub cb: Option<C>,
62}
63
64impl<C: IOCallback> fmt::Debug for IOEvent<C> {
65    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66        if let TaskCallback::Merged(sub_tasks) = &self.cb {
67            write!(f, "offset={} {:?} merged {}", self.offset, self.action, sub_tasks.len())
68        } else {
69            write!(f, "offset={} {:?}", self.offset, self.action)
70        }
71    }
72}
73
74impl<C: IOCallback> IOEvent<C> {
75    #[inline]
76    pub fn new(fd: RawFd, buf: Buffer, action: IOAction, offset: i64) -> Self {
77        log_assert!(buf.len() > 0, "{:?} offset={}, buffer size == 0", action, offset);
78        Self {
79            buf_or_len: BufOrLen::Buffer(buf),
80            fd,
81            action,
82            offset,
83            res: i32::MIN,
84            cb: TaskCallback::None,
85        }
86    }
87
88    #[inline]
89    pub fn new_no_buf(fd: RawFd, action: IOAction, offset: i64, len: u64) -> Self {
90        Self {
91            buf_or_len: BufOrLen::Len(len), // No buffer for this action
92            fd,
93            action,
94            offset,
95            res: i32::MIN,
96            cb: TaskCallback::None,
97        }
98    }
99
100    #[inline(always)]
101    pub fn set_fd(&mut self, fd: RawFd) {
102        self.fd = fd;
103    }
104
105    /// Set callback for IOEvent, might be closure or a custom struct
106    #[inline(always)]
107    pub fn set_callback(&mut self, cb: C) {
108        self.cb = TaskCallback::Callback(cb);
109    }
110
111    #[inline(always)]
112    pub fn get_size(&self) -> u64 {
113        match &self.buf_or_len {
114            BufOrLen::Buffer(buf) => buf.len() as u64,
115            BufOrLen::Len(l) => *l,
116        }
117    }
118
119    /// Set merged buffer and subtasks for the master event after merging.
120    #[inline(always)]
121    pub(crate) fn set_merged_tasks(
122        &mut self, merged_buf: Buffer, sub_tasks: SegList<IOEventMerged<C>>,
123    ) {
124        self.buf_or_len = BufOrLen::Buffer(merged_buf);
125        self.cb = TaskCallback::Merged(sub_tasks);
126    }
127
128    /// Convert this IOEvent into an IOEventMerged for storing in merge buffer.
129    /// Extracts the buffer and callback from the event.
130    #[inline(always)]
131    pub(crate) fn into_merged(mut self) -> IOEventMerged<C> {
132        let buf = match std::mem::replace(&mut self.buf_or_len, BufOrLen::Len(0)) {
133            BufOrLen::Buffer(buf) => buf,
134            BufOrLen::Len(_) => panic!("into_merged called on IOEvent with no buffer"),
135        };
136        let cb = match std::mem::replace(&mut self.cb, TaskCallback::None) {
137            TaskCallback::Callback(cb) => Some(cb),
138            _ => None,
139        };
140        IOEventMerged { buf, cb }
141    }
142
143    /// Extract buffer and callback to create IOEventMerged, leaving this event with empty buffer.
144    /// Used when moving first event to merged_events list.
145    #[inline(always)]
146    pub(crate) fn extract_merged(&mut self) -> IOEventMerged<C> {
147        let buf = match std::mem::replace(&mut self.buf_or_len, BufOrLen::Len(0)) {
148            BufOrLen::Buffer(buf) => buf,
149            BufOrLen::Len(_) => panic!("extract_merged called on IOEvent with no buffer"),
150        };
151        let cb = match std::mem::replace(&mut self.cb, TaskCallback::None) {
152            TaskCallback::Callback(cb) => Some(cb),
153            _ => None,
154        };
155        IOEventMerged { buf, cb }
156    }
157
158    /// return (offset, ptr, len)
159    #[inline(always)]
160    pub(crate) fn get_param_for_io(&mut self) -> (u64, *mut u8, u32) {
161        if let BufOrLen::Buffer(buf) = &mut self.buf_or_len {
162            let mut offset = self.offset as u64;
163            let mut p = buf.get_raw_mut();
164            let mut l = buf.len() as u32;
165            if self.res > 0 {
166                // resubmited I/O
167                offset += self.res as u64;
168                p = unsafe { p.add(self.res as usize) };
169                l += self.res as u32;
170            }
171            (offset, p, l)
172        } else {
173            panic!("get_buf_raw called on IOEvent with no buffer");
174        }
175    }
176
177    #[inline(always)]
178    pub fn get_write_result(self) -> Result<(), Errno> {
179        let res = self.res;
180        if res >= 0 {
181            return Ok(());
182        } else if res == i32::MIN {
183            panic!("IOEvent get_result before it's done");
184        } else {
185            return Err(Errno::from_raw(-res));
186        }
187    }
188
189    /// Get the result of the IO operation (bytes read/written or error).
190    /// Returns the number of bytes successfully transferred.
191    #[inline(always)]
192    pub fn get_result(&self) -> Result<usize, Errno> {
193        let res = self.res;
194        if res >= 0 {
195            return Ok(res as usize);
196        } else if res == i32::MIN {
197            panic!("IOEvent get_result before it's done");
198        } else {
199            return Err(Errno::from_raw(-res));
200        }
201    }
202
203    /// Get the buffer from a read operation.
204    /// Note: The buffer length is NOT modified. Use `get_result()` to get actual bytes read.
205    #[inline(always)]
206    pub fn get_read_result(mut self) -> Result<Buffer, Errno> {
207        let res = self.res;
208        if res >= 0 {
209            // XXX?
210            let buf_or_len = std::mem::replace(&mut self.buf_or_len, BufOrLen::Len(0));
211            if let BufOrLen::Buffer(buf) = buf_or_len {
212                // Do NOT modify buffer length - caller should use get_result() to know actual bytes read
213                return Ok(buf);
214            } else {
215                panic!("get_read_result called on IOEvent with no buffer");
216            }
217        } else if res == i32::MIN {
218            panic!("IOEvent get_result before it's done");
219        } else {
220            return Err(Errno::from_raw(-res));
221        }
222    }
223
224    #[inline(always)]
225    pub(crate) fn set_error(&mut self, mut errno: i32) {
226        if errno == 0 {
227            // XXX: EOF does not have code to represent,
228            // also when offset is not align to 4096, may return result 0,
229            errno = Errno::EINVAL as i32;
230        }
231        if errno > 0 {
232            errno = -errno;
233        }
234        self.res = errno;
235    }
236
237    #[inline(always)]
238    pub(crate) fn set_copied(&mut self, len: usize) {
239        if self.res == i32::MIN {
240            // the initial state
241            self.res = len as i32;
242        } else {
243            // resubmit for short I/O
244            self.res += len as i32;
245        }
246    }
247
248    /// For writing custom callback workers
249    ///
250    /// Callback worker should always call this function on receiving IOEvent from Driver
251    ///
252    /// parameter: `check_short_read(offset: u64)` should be checking the offset exceed file end.
253    /// If `check_short_read()` return true, the callback function will return Err(IOEvent) for I/O resubmit.
254    ///
255    /// NOTE: you should always use a weak reference in `check_short_read` closure and
256    /// re-submission.
257    #[inline(always)]
258    pub fn callback<F>(mut self: Box<Self>, check_short_read: F) -> Result<(), Box<Self>>
259    where
260        F: FnOnce(u64) -> bool,
261    {
262        if self.res >= 0 {
263            if let BufOrLen::Buffer(buf) = &mut self.buf_or_len {
264                if buf.len() > self.res as usize {
265                    if self.action == IOAction::Read {
266                        if check_short_read(self.offset as u64 + self.res as u64) {
267                            return Err(self);
268                        } else {
269                            // reach file ending
270                            buf.set_len(self.res as usize);
271                        }
272                    } else {
273                        // short write always need to resubmit
274                        return Err(self);
275                    }
276                }
277            }
278        }
279        self.callback_unchecked(false);
280        Ok(())
281    }
282
283    /// Perform callback on the IOEvent when cannot re-submit for short i/o
284    ///
285    /// # Arguments
286    ///
287    /// - to_fix_short_io: should always be true, fix the buffer len of short I/O
288    ///
289    /// # Safety
290    ///
291    /// Only for callback worker does not re-submit when short I/O.
292    /// Buffer::len() will changed to actual I/O copied size during callback.
293    #[inline(always)]
294    pub fn callback_unchecked(mut self, to_fix_short_io: bool) {
295        match std::mem::replace(&mut self.cb, TaskCallback::None) {
296            TaskCallback::None => {}
297            TaskCallback::Callback(cb) => {
298                let res: Result<Option<Buffer>, Errno> = if self.res >= 0 {
299                    match self.buf_or_len {
300                        BufOrLen::Buffer(mut buf) => {
301                            if to_fix_short_io && buf.len() > self.res as usize {
302                                buf.set_len(self.res as usize);
303                            }
304                            Ok(Some(buf))
305                        }
306                        BufOrLen::Len(_) => Ok(None),
307                    }
308                } else {
309                    Err(Errno::from_raw(-self.res))
310                };
311                cb.call(self.offset, res);
312            }
313            TaskCallback::Merged(sub_tasks) => {
314                if self.res >= 0 {
315                    let mut offset = self.offset;
316                    if self.action == IOAction::Read {
317                        if let BufOrLen::Buffer(parent_buf) = &self.buf_or_len {
318                            let mut b: &[u8] = &parent_buf[0..self.res as usize];
319                            for IOEventMerged { mut buf, cb } in sub_tasks {
320                                if let Some(_cb) = cb {
321                                    let copied = safe_copy(&mut buf, b);
322                                    if copied < buf.len() {
323                                        buf.set_len(copied); // short I/O
324                                    }
325                                    _cb.call(offset, Ok(Some(buf)));
326                                    b = &b[copied..];
327                                    offset += copied as i64
328                                }
329                            }
330                        }
331                    } else if self.action == IOAction::Write {
332                        let mut l = self.res as usize;
333                        for IOEventMerged { mut buf, cb } in sub_tasks {
334                            let mut copied = buf.len();
335                            if copied > l {
336                                // short write
337                                copied = l;
338                                buf.set_len(l);
339                            }
340                            if let Some(_cb) = cb {
341                                _cb.call(offset, Ok(Some(buf)));
342                            }
343                            l -= copied;
344                            offset += copied as i64;
345                        }
346                    }
347                } else {
348                    let mut offset = self.offset;
349                    for IOEventMerged { buf, cb } in sub_tasks {
350                        let _l = buf.len() as i64;
351                        if let Some(_cb) = cb {
352                            _cb.call(offset, Err(Errno::from_raw(-self.res)));
353                        }
354                        offset += _l;
355                    }
356                }
357            }
358        }
359    }
360}
361
362#[cfg(test)]
363mod tests {
364
365    use super::*;
366    use io_buffer::Buffer;
367    use nix::errno::Errno;
368    use std::mem::size_of;
369    use std::sync::Arc;
370
371    #[test]
372    fn test_ioevent_size() {
373        println!("IOEvent size {}", size_of::<IOEvent<ClosureCb>>());
374        println!("BufOrLen size {}", size_of::<crate::tasks::BufOrLen>());
375        println!("IOEventMerged size {}", size_of::<IOEventMerged<ClosureCb>>());
376    }
377
378    /// Test normal callback (non-merged case)
379    #[test]
380    fn test_callback_normal() {
381        let buffer = Buffer::alloc(4096).unwrap();
382        let mut event = IOEvent::<ClosureCb>::new(0, buffer, IOAction::Write, 1024);
383
384        let result = Arc::new(std::sync::Mutex::new(None));
385        let result_clone = result.clone();
386
387        event.set_callback(ClosureCb(Box::new(move |offset, res| {
388            *result_clone.lock().unwrap() = Some((offset, res));
389        })));
390
391        event.set_copied(4096);
392        event.callback_unchecked(true);
393
394        let (offset, res) = result.lock().unwrap().take().unwrap();
395        assert_eq!(offset, 1024);
396        assert!(res.is_ok());
397        assert!(res.unwrap().is_some());
398    }
399
400    /// Test merged read callback - verifies offset correctness
401    #[test]
402    fn test_callback_merged_read() {
403        use std::sync::atomic::{AtomicI64, Ordering};
404
405        let offsets = Arc::new([AtomicI64::new(0), AtomicI64::new(0), AtomicI64::new(0)]);
406        let offsets_clone = offsets.clone();
407
408        // Create sub-tasks with their own buffers first
409        let mut sub_tasks = SegList::new();
410
411        // First sub-task: 16 bytes at offset 1000
412        let buf1 = Buffer::alloc(16).unwrap();
413        sub_tasks.push(IOEventMerged {
414            buf: buf1,
415            cb: Some(ClosureCb(Box::new(move |offset, res| {
416                offsets_clone[0].store(offset, Ordering::SeqCst);
417                assert!(res.is_ok());
418                assert!(res.unwrap().is_some());
419            }))),
420        });
421
422        // Second sub-task: 16 bytes at offset 1016
423        let buf2 = Buffer::alloc(16).unwrap();
424        let offsets_clone2 = offsets.clone();
425        sub_tasks.push(IOEventMerged {
426            buf: buf2,
427            cb: Some(ClosureCb(Box::new(move |offset, res| {
428                offsets_clone2[1].store(offset, Ordering::SeqCst);
429                assert!(res.is_ok());
430                assert!(res.unwrap().is_some());
431            }))),
432        });
433
434        // Third sub-task: 16 bytes at offset 1032
435        let buf3 = Buffer::alloc(16).unwrap();
436        let offsets_clone3 = offsets.clone();
437        sub_tasks.push(IOEventMerged {
438            buf: buf3,
439            cb: Some(ClosureCb(Box::new(move |offset, res| {
440                offsets_clone3[2].store(offset, Ordering::SeqCst);
441                assert!(res.is_ok());
442                assert!(res.unwrap().is_some());
443            }))),
444        });
445
446        // Create parent buffer and event
447        let parent_buf = Buffer::alloc(48).unwrap();
448        let mut event = IOEvent::<ClosureCb>::new(0, parent_buf, IOAction::Read, 1000);
449        event.set_copied(48); // 48 bytes read
450
451        // Get the parent buffer back and fill with data
452        let parent_buf = match std::mem::replace(&mut event.buf_or_len, BufOrLen::Len(0)) {
453            BufOrLen::Buffer(buf) => buf,
454            BufOrLen::Len(_) => panic!("expected buffer"),
455        };
456        let mut parent_buf = parent_buf;
457        parent_buf.copy_from(0, b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()");
458
459        event.set_merged_tasks(parent_buf, sub_tasks);
460        event.callback_unchecked(true); // Should invoke all callbacks
461
462        // Verify offsets
463        assert_eq!(offsets[0].load(Ordering::SeqCst), 1000);
464        assert_eq!(offsets[1].load(Ordering::SeqCst), 1016);
465        assert_eq!(offsets[2].load(Ordering::SeqCst), 1032);
466    }
467
468    /// Test merged write callback - verifies offset correctness
469    #[test]
470    fn test_callback_merged_write() {
471        // For write, parent buffer contains data that was written
472        let parent_buf = Buffer::alloc(4096).unwrap();
473
474        let mut event = IOEvent::<ClosureCb>::new(0, parent_buf, IOAction::Write, 2000);
475        event.set_copied(48); // All 48 bytes written
476
477        let mut sub_tasks = SegList::new();
478
479        // First sub-task: 16 bytes at offset 2000
480        sub_tasks.push(IOEventMerged {
481            buf: Buffer::alloc(16).unwrap(),
482            cb: Some(ClosureCb(Box::new(move |offset, res| {
483                assert_eq!(offset, 2000, "first write callback offset");
484                assert!(res.is_ok());
485                assert!(res.unwrap().is_some());
486            }))),
487        });
488
489        // Second sub-task: 16 bytes at offset 2016
490        sub_tasks.push(IOEventMerged {
491            buf: Buffer::alloc(16).unwrap(),
492            cb: Some(ClosureCb(Box::new(move |offset, res| {
493                assert_eq!(offset, 2016, "second write callback offset");
494                assert!(res.is_ok());
495                assert!(res.unwrap().is_some());
496            }))),
497        });
498
499        // Third sub-task: 16 bytes at offset 2032
500        sub_tasks.push(IOEventMerged {
501            buf: Buffer::alloc(16).unwrap(),
502            cb: Some(ClosureCb(Box::new(move |offset, res| {
503                assert_eq!(offset, 2032, "third write callback offset");
504                assert!(res.is_ok());
505                assert!(res.unwrap().is_some());
506            }))),
507        });
508
509        event.set_merged_tasks(Buffer::alloc(4096).unwrap(), sub_tasks);
510        event.callback_unchecked(true); // Should invoke all callbacks with correct offsets
511    }
512
513    /// Test merged callback with error result
514    #[test]
515    fn test_callback_merged_error() {
516        let parent_buf = Buffer::alloc(4096).unwrap();
517        let mut event = IOEvent::<ClosureCb>::new(0, parent_buf, IOAction::Read, 3000);
518        event.set_error(Errno::EIO as i32); // IO error
519
520        let mut sub_tasks = SegList::new();
521
522        // First sub-task
523        sub_tasks.push(IOEventMerged {
524            buf: Buffer::alloc(16).unwrap(),
525            cb: Some(ClosureCb(Box::new(move |offset, res| {
526                assert_eq!(offset, 3000, "error callback offset");
527                assert!(res.is_err());
528                assert_eq!(res.err().unwrap(), Errno::EIO);
529            }))),
530        });
531
532        // Second sub-task
533        sub_tasks.push(IOEventMerged {
534            buf: Buffer::alloc(16).unwrap(),
535            cb: Some(ClosureCb(Box::new(move |offset, res| {
536                assert_eq!(offset, 3016, "error callback offset 2");
537                assert!(res.is_err());
538            }))),
539        });
540
541        event.set_merged_tasks(Buffer::alloc(48).unwrap(), sub_tasks);
542        event.callback_unchecked(true);
543    }
544
545    /// Test short read in merged callback
546    #[test]
547    fn test_callback_merged_short_read() {
548        use std::sync::atomic::{AtomicI64, Ordering};
549
550        let offsets = Arc::new([AtomicI64::new(0), AtomicI64::new(0)]);
551        let offsets_clone = offsets.clone();
552
553        let mut sub_tasks = SegList::new();
554
555        // First sub-task: 16 bytes (fully read)
556        sub_tasks.push(IOEventMerged {
557            buf: Buffer::alloc(16).unwrap(),
558            cb: Some(ClosureCb(Box::new(move |offset, res| {
559                offsets_clone[0].store(offset, Ordering::SeqCst);
560                assert!(res.is_ok());
561                assert!(res.unwrap().is_some());
562            }))),
563        });
564
565        // Second sub-task: 16 bytes but only 8 were read (short read)
566        let offsets_clone2 = offsets.clone();
567        sub_tasks.push(IOEventMerged {
568            buf: Buffer::alloc(16).unwrap(),
569            cb: Some(ClosureCb(Box::new(move |offset, res| {
570                offsets_clone2[1].store(offset, Ordering::SeqCst);
571                assert!(res.is_ok());
572                assert!(res.unwrap().is_some());
573            }))),
574        });
575
576        // Parent buffer with 32 bytes
577        let parent_buf = Buffer::alloc(32).unwrap();
578        let mut event = IOEvent::<ClosureCb>::new(0, parent_buf, IOAction::Read, 4000);
579        event.set_copied(24); // Short read: only 24 bytes (16 + 8)
580
581        // Get parent buffer back
582        let parent_buf = match std::mem::replace(&mut event.buf_or_len, BufOrLen::Len(0)) {
583            BufOrLen::Buffer(buf) => buf,
584            BufOrLen::Len(_) => panic!("expected buffer"),
585        };
586
587        event.set_merged_tasks(parent_buf, sub_tasks);
588        event.callback_unchecked(true);
589
590        // Verify
591        assert_eq!(offsets[0].load(Ordering::SeqCst), 4000);
592        assert_eq!(offsets[1].load(Ordering::SeqCst), 4016);
593    }
594}