fuse_backend_rs/common/
async_runtime.rs1use std::future::Future;
8
9use lazy_static::lazy_static;
10
11lazy_static! {
12 pub(crate) static ref RUNTIME_TYPE: RuntimeType = RuntimeType::new();
13}
14
15pub(crate) enum RuntimeType {
16 Tokio,
17 #[cfg(target_os = "linux")]
18 Uring,
19}
20
21impl RuntimeType {
22 fn new() -> Self {
23 #[cfg(target_os = "linux")]
24 {
25 if Self::probe_io_uring() {
26 return Self::Uring;
27 }
28 }
29 Self::Tokio
30 }
31
32 #[cfg(target_os = "linux")]
33 fn probe_io_uring() -> bool {
34 use io_uring::{opcode, IoUring, Probe};
35
36 let io_uring = match IoUring::new(1) {
37 Ok(io_uring) => io_uring,
38 Err(_) => {
39 return false;
40 }
41 };
42 let submitter = io_uring.submitter();
43
44 let mut probe = Probe::new();
45
46 if let Err(_) = submitter.register_probe(&mut probe) {
48 return false;
49 }
50
51 if !probe.is_supported(opcode::Fsync::CODE) {
53 return false;
54 }
55
56 if !probe.is_supported(opcode::Read::CODE) {
58 return false;
59 }
60
61 if !probe.is_supported(opcode::Write::CODE) {
63 return false;
64 }
65 return true;
66 }
67}
68
69pub enum Runtime {
71 Tokio(tokio::runtime::Runtime),
73 #[cfg(target_os = "linux")]
74 Uring(std::sync::Mutex<tokio_uring::Runtime>),
76}
77
78impl Runtime {
79 pub fn new() -> Self {
87 #[cfg(target_os = "linux")]
89 if matches!(*RUNTIME_TYPE, RuntimeType::Uring) {
90 if let Ok(rt) = tokio_uring::Runtime::new(&tokio_uring::builder()) {
91 return Runtime::Uring(std::sync::Mutex::new(rt));
92 }
93 }
94
95 let rt = tokio::runtime::Builder::new_current_thread()
97 .enable_all()
98 .build()
99 .expect("utils: failed to create tokio runtime for current thread");
100 Runtime::Tokio(rt)
101 }
102
103 pub fn block_on<F: Future>(&self, f: F) -> F::Output {
105 match self {
106 Runtime::Tokio(rt) => rt.block_on(f),
107 #[cfg(target_os = "linux")]
108 Runtime::Uring(rt) => rt.lock().unwrap().block_on(f),
109 }
110 }
111
112 pub fn spawn<T: std::future::Future + 'static>(
123 &self,
124 task: T,
125 ) -> tokio::task::JoinHandle<T::Output> {
126 match self {
127 Runtime::Tokio(_) => tokio::task::spawn_local(task),
128 #[cfg(target_os = "linux")]
129 Runtime::Uring(_) => tokio_uring::spawn(task),
130 }
131 }
132}
133
134pub fn start<F: Future>(future: F) -> F::Output {
136 Runtime::new().block_on(future)
137}
138
139impl Default for Runtime {
140 fn default() -> Self {
141 Runtime::new()
142 }
143}
144
145pub fn with_runtime<F, R>(f: F) -> R
147where
148 F: FnOnce(&Runtime) -> R,
149{
150 let rt = Runtime::new();
151 f(&rt)
152}
153
154pub fn block_on<F: Future>(f: F) -> F::Output {
156 Runtime::new().block_on(f)
157}
158
159pub fn spawn<T: std::future::Future + 'static>(task: T) -> tokio::task::JoinHandle<T::Output> {
170 let rt = Runtime::new();
171 rt.spawn(task)
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[test]
179 fn test_with_runtime() {
180 let res = with_runtime(|rt| rt.block_on(async { 1 }));
181 assert_eq!(res, 1);
182
183 let res = with_runtime(|rt| rt.block_on(async { 3 }));
184 assert_eq!(res, 3);
185 }
186
187 #[test]
188 fn test_block_on() {
189 let res = block_on(async { 1 });
190 assert_eq!(res, 1);
191
192 let res = block_on(async { 3 });
193 assert_eq!(res, 3);
194 }
195}