buffet/
lib.rs

1use std::future::Future;
2
3mod roll;
4pub use roll::*;
5
6mod piece;
7pub use piece::*;
8
9pub mod bufpool;
10use bufpool::*;
11
12mod io;
13pub use io::*;
14
15pub mod net;
16
17#[cfg(all(target_os = "linux", feature = "uring"))]
18mod uring;
19
20#[cfg(all(target_os = "linux", feature = "uring"))]
21pub use uring::get_ring;
22
23/// Spawns a new asynchronous task, returning a [tokio::task::JoinHandle] for
24/// it.
25///
26/// Spawning a task enables the task to execute concurrently to other tasks.
27/// There is no guarantee that a spawned task will execute to completion. When a
28/// runtime is shutdown, all outstanding tasks are dropped, regardless of the
29/// lifecycle of that task.
30///
31/// This must be executed from within a runtime created by [crate::start]
32pub fn spawn<T: Future + 'static>(task: T) -> tokio::task::JoinHandle<T::Output> {
33    tokio::task::spawn_local(task)
34}
35
36/// Build a new current-thread runtime and runs the provided future on it
37#[cfg(all(target_os = "linux", feature = "uring"))]
38pub fn start<F: Future>(task: F) -> F::Output {
39    use luring::IoUringAsync;
40    use send_wrapper::SendWrapper;
41    use tokio::task::LocalSet;
42
43    let u = SendWrapper::new(uring::get_ring());
44    let rt = tokio::runtime::Builder::new_current_thread()
45        .enable_all()
46        .on_thread_park(move || {
47            u.submit().unwrap();
48        })
49        .build()
50        .unwrap();
51    let res = rt.block_on(async move {
52        crate::bufpool::initialize_allocator().unwrap();
53        let mut lset = LocalSet::new();
54        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
55        let listen_task = IoUringAsync::listen(get_ring());
56        lset.spawn_local(async move {
57            tokio::select! {
58                _ = listen_task => {
59                    tracing::trace!("IoUringAsync listen task finished");
60                },
61                _ = cancel_rx => {
62                    tracing::trace!("IoUringAsync listen task cancelled");
63                }
64            }
65        });
66
67        let res = lset.run_until(task).await;
68
69        tracing::debug!("waiting for local set (cancellations, cleanups etc.)");
70
71        // during this poll, the async cancellations get submitted
72        let cancel_submit_timeout = std::time::Duration::from_millis(0);
73        if (tokio::time::timeout(cancel_submit_timeout, &mut lset).await).is_err() {
74            drop(cancel_tx);
75
76            // during this second poll, the async cancellations hopefully finish
77            let cleanup_timeout = std::time::Duration::from_millis(500);
78            if (tokio::time::timeout(cleanup_timeout, lset).await).is_err() {
79                tracing::warn!(
80                    "🥲 timed out waiting for local set (async cancellations, cleanups etc.)"
81                );
82            }
83        }
84
85        res
86    });
87    rt.shutdown_timeout(std::time::Duration::from_millis(20));
88    res
89}
90
91/// Build a new current-thread runtime and runs the provided future on it
92#[cfg(not(all(target_os = "linux", feature = "uring")))]
93pub fn start<F: Future>(task: F) -> F::Output {
94    use tokio::task::LocalSet;
95
96    tokio::runtime::Builder::new_current_thread()
97        .enable_all()
98        .build()
99        .unwrap()
100        .block_on(async move {
101            crate::bufpool::initialize_allocator().unwrap();
102            let lset = LocalSet::new();
103            lset.run_until(task).await
104        })
105}