io_engine/
context.rs

1// Copyright (c) 2025 NaturalIO
2
3use crate::callback_worker::IOWorkers;
4use crate::driver::aio::AioDriver;
5use crate::driver::uring::UringDriver; // Import UringDriver
6use crate::tasks::{IOEvent, IoCallback};
7use crossfire::BlockingRxTrait;
8use std::{
9    io,
10    sync::{
11        Arc,
12        atomic::{AtomicUsize, Ordering},
13    },
14};
15
16pub enum IoEngineType {
17    Aio,
18    Uring,
19}
20
21pub struct IoCtxShared<C: IoCallback, Q> {
22    pub depth: usize,
23    pub queue: Q,
24    pub cb_workers: IOWorkers<C>,
25    pub free_slots_count: AtomicUsize,
26}
27
28unsafe impl<C: IoCallback, Q: Send> Send for IoCtxShared<C, Q> {}
29unsafe impl<C: IoCallback, Q: Send> Sync for IoCtxShared<C, Q> {}
30
31pub struct IOContext<C: IoCallback, Q> {
32    pub(crate) inner: Arc<IoCtxShared<C, Q>>,
33}
34
35impl<C: IoCallback, Q> IOContext<C, Q>
36where
37    Q: BlockingRxTrait<Box<IOEvent<C>>> + Send + 'static,
38{
39    pub fn new(
40        depth: usize,
41        queue: Q,
42        cbs: &IOWorkers<C>,
43        driver_type: IoEngineType, // New parameter
44    ) -> Result<Arc<Self>, io::Error> {
45        let inner = Arc::new(IoCtxShared {
46            depth,
47            queue,
48            cb_workers: cbs.clone(),
49            free_slots_count: AtomicUsize::new(depth),
50        });
51
52        match driver_type {
53            IoEngineType::Aio => AioDriver::start(inner.clone())?,
54            IoEngineType::Uring => UringDriver::start(inner.clone())?,
55        }
56
57        Ok(Arc::new(Self { inner }))
58    }
59
60    #[inline]
61    pub fn get_depth(&self) -> usize {
62        self.inner.depth
63    }
64
65    pub fn running_count(&self) -> usize {
66        let inner = self.inner.as_ref();
67        let free = inner.free_slots_count.load(Ordering::SeqCst);
68        if free > inner.depth { 0 } else { inner.depth - free }
69    }
70}