1use crate::callback_worker::IOWorkers;
4use crate::driver::aio::AioDriver;
5use crate::driver::uring::UringDriver; use crate::tasks::{IOEvent, IoCallback};
7use crossfire::BlockingRxTrait;
8use std::{
9 io,
10 sync::{
11 Arc,
12 atomic::{AtomicUsize, Ordering},
13 },
14};
15
16pub enum IoEngineType {
17 Aio,
18 Uring,
19}
20
21pub struct IoCtxShared<C: IoCallback, Q> {
22 pub depth: usize,
23 pub queue: Q,
24 pub cb_workers: IOWorkers<C>,
25 pub free_slots_count: AtomicUsize,
26}
27
28unsafe impl<C: IoCallback, Q: Send> Send for IoCtxShared<C, Q> {}
29unsafe impl<C: IoCallback, Q: Send> Sync for IoCtxShared<C, Q> {}
30
31pub struct IOContext<C: IoCallback, Q> {
32 pub(crate) inner: Arc<IoCtxShared<C, Q>>,
33}
34
35impl<C: IoCallback, Q> IOContext<C, Q>
36where
37 Q: BlockingRxTrait<Box<IOEvent<C>>> + Send + 'static,
38{
39 pub fn new(
40 depth: usize,
41 queue: Q,
42 cbs: &IOWorkers<C>,
43 driver_type: IoEngineType, ) -> Result<Arc<Self>, io::Error> {
45 let inner = Arc::new(IoCtxShared {
46 depth,
47 queue,
48 cb_workers: cbs.clone(),
49 free_slots_count: AtomicUsize::new(depth),
50 });
51
52 match driver_type {
53 IoEngineType::Aio => AioDriver::start(inner.clone())?,
54 IoEngineType::Uring => UringDriver::start(inner.clone())?,
55 }
56
57 Ok(Arc::new(Self { inner }))
58 }
59
60 #[inline]
61 pub fn get_depth(&self) -> usize {
62 self.inner.depth
63 }
64
65 pub fn running_count(&self) -> usize {
66 let inner = self.inner.as_ref();
67 let free = inner.free_slots_count.load(Ordering::SeqCst);
68 if free > inner.depth { 0 } else { inner.depth - free }
69 }
70}