io_engine/
context.rs

1use crate::callback_worker::Worker;
2use crate::driver::aio::AioDriver;
3use crate::driver::uring::UringDriver; // Import UringDriver
4use crate::tasks::{IOCallback, IOEvent};
5use crossfire::BlockingRxTrait;
6use std::{
7    io,
8    sync::{
9        Arc,
10        atomic::{AtomicUsize, Ordering},
11    },
12};
13
14pub enum Driver {
15    Aio,
16    Uring,
17}
18
19pub(crate) struct CtxShared<C: IOCallback, Q, W> {
20    pub depth: usize,
21    pub queue: Q,
22    pub cb_workers: W,
23    pub free_slots_count: AtomicUsize,
24    pub _marker: std::marker::PhantomData<C>,
25}
26
27unsafe impl<C: IOCallback, Q: Send, W: Send> Send for CtxShared<C, Q, W> {}
28unsafe impl<C: IOCallback, Q: Send, W: Send> Sync for CtxShared<C, Q, W> {}
29
30/// IOContext manages the submission of IO tasks to the underlying driver.
31///
32/// It is generic over the callback type `C`, the submission queue `Q`, and the worker type `W`.
33///
34/// # Channel Selection for `W` (Worker)
35///
36/// When configuring the `IOContext` with a worker `W` (usually a channel sender `cb_workers`),
37/// you should choose the `crossfire` channel type based on your sharing model:
38///
39/// * **Shared Worker (Multiple Contexts):** If you have multiple `IOContext` instances sharing the same
40///   callback worker, use the [IOWorkers](crate::callback_worker::IOWorkers) struct,
41///   or pass `crossfire::MTx` (from `mpsc` or `mpmc` channels) with your custom worker implementation.
42///   This allows multiple producers (contexts) to send completion events to a single consumer (worker).
43///
44/// * **Single Instance (Dedicated Worker):** If you have a single `IOContext` with its own dedicated
45///   callback worker, use `crossfire::Tx` (from `spsc` channels). This is more efficient for single-producer
46///   scenarios.
47///
48/// * **inline callback:** If you have a very light callback logic, you can use [Inline](crate::callback_worker::Inline)
49pub struct IOContext<C: IOCallback, Q, W> {
50    pub(crate) inner: Arc<CtxShared<C, Q, W>>,
51}
52
53impl<C: IOCallback, Q, W> IOContext<C, Q, W>
54where
55    Q: BlockingRxTrait<IOEvent<C>> + Send + 'static,
56    W: Worker<C> + Send + 'static,
57{
58    pub fn new(
59        depth: usize,
60        queue: Q,
61        cbs: W,
62        driver_type: Driver, // New parameter
63    ) -> Result<Arc<Self>, io::Error> {
64        let inner = Arc::new(CtxShared {
65            depth,
66            queue,
67            cb_workers: cbs,
68            free_slots_count: AtomicUsize::new(depth),
69            _marker: std::marker::PhantomData,
70        });
71
72        match driver_type {
73            Driver::Aio => AioDriver::start(inner.clone())?,
74            Driver::Uring => UringDriver::start(inner.clone())?,
75        }
76
77        Ok(Arc::new(Self { inner }))
78    }
79
80    #[inline]
81    pub fn get_depth(&self) -> usize {
82        self.inner.depth
83    }
84
85    pub fn running_count(&self) -> usize {
86        let inner = self.inner.as_ref();
87        let free = inner.free_slots_count.load(Ordering::SeqCst);
88        if free > inner.depth { 0 } else { inner.depth - free }
89    }
90}