1use crate::callback_worker::Worker;
2use crate::driver::aio::AioDriver;
3use crate::driver::uring::UringDriver; use crate::tasks::{IOCallback, IOEvent};
5use crossfire::BlockingRxTrait;
6use std::{
7 io,
8 sync::{
9 Arc,
10 atomic::{AtomicUsize, Ordering},
11 },
12};
13
14pub enum Driver {
15 Aio,
16 Uring,
17}
18
19pub(crate) struct CtxShared<C: IOCallback, Q, W> {
20 pub depth: usize,
21 pub queue: Q,
22 pub cb_workers: W,
23 pub free_slots_count: AtomicUsize,
24 pub _marker: std::marker::PhantomData<C>,
25}
26
27unsafe impl<C: IOCallback, Q: Send, W: Send> Send for CtxShared<C, Q, W> {}
28unsafe impl<C: IOCallback, Q: Send, W: Send> Sync for CtxShared<C, Q, W> {}
29
30pub struct IOContext<C: IOCallback, Q, W> {
50 pub(crate) inner: Arc<CtxShared<C, Q, W>>,
51}
52
53impl<C: IOCallback, Q, W> IOContext<C, Q, W>
54where
55 Q: BlockingRxTrait<IOEvent<C>> + Send + 'static,
56 W: Worker<C> + Send + 'static,
57{
58 pub fn new(
59 depth: usize,
60 queue: Q,
61 cbs: W,
62 driver_type: Driver, ) -> Result<Arc<Self>, io::Error> {
64 let inner = Arc::new(CtxShared {
65 depth,
66 queue,
67 cb_workers: cbs,
68 free_slots_count: AtomicUsize::new(depth),
69 _marker: std::marker::PhantomData,
70 });
71
72 match driver_type {
73 Driver::Aio => AioDriver::start(inner.clone())?,
74 Driver::Uring => UringDriver::start(inner.clone())?,
75 }
76
77 Ok(Arc::new(Self { inner }))
78 }
79
80 #[inline]
81 pub fn get_depth(&self) -> usize {
82 self.inner.depth
83 }
84
85 pub fn running_count(&self) -> usize {
86 let inner = self.inner.as_ref();
87 let free = inner.free_slots_count.load(Ordering::SeqCst);
88 if free > inner.depth { 0 } else { inner.depth - free }
89 }
90}