1use std::future::Future;
2
3mod roll;
4pub use roll::*;
5
6mod piece;
7pub use piece::*;
8
9pub mod bufpool;
10use bufpool::*;
11
12mod io;
13pub use io::*;
14
15pub mod net;
16
17#[cfg(all(target_os = "linux", feature = "uring"))]
18mod uring;
19
20#[cfg(all(target_os = "linux", feature = "uring"))]
21pub use uring::get_ring;
22
23pub fn spawn<T: Future + 'static>(task: T) -> tokio::task::JoinHandle<T::Output> {
33 tokio::task::spawn_local(task)
34}
35
36#[cfg(all(target_os = "linux", feature = "uring"))]
38pub fn start<F: Future>(task: F) -> F::Output {
39 use luring::IoUringAsync;
40 use send_wrapper::SendWrapper;
41 use tokio::task::LocalSet;
42
43 let u = SendWrapper::new(uring::get_ring());
44 let rt = tokio::runtime::Builder::new_current_thread()
45 .enable_all()
46 .on_thread_park(move || {
47 u.submit().unwrap();
48 })
49 .build()
50 .unwrap();
51 let res = rt.block_on(async move {
52 crate::bufpool::initialize_allocator().unwrap();
53 let mut lset = LocalSet::new();
54 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
55 let listen_task = IoUringAsync::listen(get_ring());
56 lset.spawn_local(async move {
57 tokio::select! {
58 _ = listen_task => {
59 tracing::trace!("IoUringAsync listen task finished");
60 },
61 _ = cancel_rx => {
62 tracing::trace!("IoUringAsync listen task cancelled");
63 }
64 }
65 });
66
67 let res = lset.run_until(task).await;
68
69 tracing::debug!("waiting for local set (cancellations, cleanups etc.)");
70
71 let cancel_submit_timeout = std::time::Duration::from_millis(0);
73 if (tokio::time::timeout(cancel_submit_timeout, &mut lset).await).is_err() {
74 drop(cancel_tx);
75
76 let cleanup_timeout = std::time::Duration::from_millis(500);
78 if (tokio::time::timeout(cleanup_timeout, lset).await).is_err() {
79 tracing::warn!(
80 "🥲 timed out waiting for local set (async cancellations, cleanups etc.)"
81 );
82 }
83 }
84
85 res
86 });
87 rt.shutdown_timeout(std::time::Duration::from_millis(20));
88 res
89}
90
91#[cfg(not(all(target_os = "linux", feature = "uring")))]
93pub fn start<F: Future>(task: F) -> F::Output {
94 use tokio::task::LocalSet;
95
96 tokio::runtime::Builder::new_current_thread()
97 .enable_all()
98 .build()
99 .unwrap()
100 .block_on(async move {
101 crate::bufpool::initialize_allocator().unwrap();
102 let lset = LocalSet::new();
103 lset.run_until(task).await
104 })
105}