Skip to main content

io_engine/
callback_worker.rs

1use crate::tasks::{IOCallback, IOEvent};
2use crossfire::{MTx, Tx, flavor::Flavor, mpmc};
3
4/// A trait for workers that accept IO events.
5///
6/// This allows using either `IOWorkers` (wrappers around channels) or direct channels
7/// (`MTx`, `Tx`) or any other sink, or even process inline.
8pub trait Worker<C: IOCallback>: Send + 'static {
9    fn done(&self, event: Box<IOEvent<C>>);
10}
11
12/// Example callback worker implement with crossfire::mpmc, can be shared among multiple driver instances.
13///
14/// # Safety
15///
16/// It does not check and resubmit short I/O
17pub struct IOWorkers<C: IOCallback>(pub(crate) MTx<mpmc::Array<Box<IOEvent<C>>>>);
18
19impl<C: IOCallback> IOWorkers<C> {
20    pub fn new(workers: usize) -> Self {
21        let (tx, rx) = mpmc::bounded_blocking::<Box<IOEvent<C>>>(100000);
22        for _i in 0..workers {
23            let _rx = rx.clone();
24            std::thread::spawn(move || {
25                loop {
26                    match _rx.recv() {
27                        Ok(event) => event.callback_unchecked(false),
28                        Err(_) => {
29                            debug!("IOWorkers exit");
30                            return;
31                        }
32                    }
33                }
34            });
35        }
36        Self(tx)
37    }
38}
39
40impl<C: IOCallback> Clone for IOWorkers<C> {
41    fn clone(&self) -> Self {
42        Self(self.0.clone())
43    }
44}
45
46impl<C: IOCallback> Worker<C> for IOWorkers<C> {
47    fn done(&self, item: Box<IOEvent<C>>) {
48        let _ = self.0.send(item);
49    }
50}
51
52// Implement Worker for Crossfire MTx (Multi-Producer)
53impl<C, F> Worker<C> for MTx<F>
54where
55    F: Flavor<Item = Box<IOEvent<C>>>,
56    C: IOCallback,
57{
58    fn done(&self, item: Box<IOEvent<C>>) {
59        let _ = self.send(item);
60    }
61}
62
63// Implement Worker for Crossfire Tx (Single-Producer)
64impl<C, F> Worker<C> for Tx<F>
65where
66    F: Flavor<Item = Box<IOEvent<C>>>,
67    C: IOCallback,
68{
69    fn done(&self, item: Box<IOEvent<C>>) {
70        let _ = self.send(item);
71    }
72}
73
74/// Example Inline worker that executes callbacks directly without spawning threads.
75/// Use this for very lightweight callback logic to avoid thread context switching overhead.
76///
77/// # Safety
78///
79/// It does not check and resubmit short I/O
80pub struct Inline;
81
82impl<C: IOCallback> Worker<C> for Inline {
83    fn done(&self, event: Box<IOEvent<C>>) {
84        event.callback_unchecked(false);
85    }
86}