fuse_backend_rs/common/
async_runtime.rs

1// Copyright (C) 2022 Alibaba Cloud. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5//! `Runtime` to wrap over tokio current-thread `Runtime` and tokio-uring `Runtime`.
6
7use std::future::Future;
8
9use lazy_static::lazy_static;
10
11lazy_static! {
12    pub(crate) static ref RUNTIME_TYPE: RuntimeType = RuntimeType::new();
13}
14
15pub(crate) enum RuntimeType {
16    Tokio,
17    #[cfg(target_os = "linux")]
18    Uring,
19}
20
21impl RuntimeType {
22    fn new() -> Self {
23        #[cfg(target_os = "linux")]
24        {
25            if Self::probe_io_uring() {
26                return Self::Uring;
27            }
28        }
29        Self::Tokio
30    }
31
32    #[cfg(target_os = "linux")]
33    fn probe_io_uring() -> bool {
34        use io_uring::{opcode, IoUring, Probe};
35
36        let io_uring = match IoUring::new(1) {
37            Ok(io_uring) => io_uring,
38            Err(_) => {
39                return false;
40            }
41        };
42        let submitter = io_uring.submitter();
43
44        let mut probe = Probe::new();
45
46        // Check we can register a probe to validate supported operations.
47        if let Err(_) = submitter.register_probe(&mut probe) {
48            return false;
49        }
50
51        // Check IORING_OP_FSYNC is supported
52        if !probe.is_supported(opcode::Fsync::CODE) {
53            return false;
54        }
55
56        // Check IORING_OP_READ is supported
57        if !probe.is_supported(opcode::Read::CODE) {
58            return false;
59        }
60
61        // Check IORING_OP_WRITE is supported
62        if !probe.is_supported(opcode::Write::CODE) {
63            return false;
64        }
65        return true;
66    }
67}
68
69/// An adapter enum to support both tokio current-thread Runtime and tokio-uring Runtime.
70pub enum Runtime {
71    /// Tokio current thread Runtime.
72    Tokio(tokio::runtime::Runtime),
73    #[cfg(target_os = "linux")]
74    /// Tokio-uring Runtime.
75    Uring(std::sync::Mutex<tokio_uring::Runtime>),
76}
77
78impl Runtime {
79    /// Create a new instance of async Runtime.
80    ///
81    /// A `tokio-uring::Runtime` is create if io-uring is available, otherwise a tokio current
82    /// thread Runtime will be created.
83    ///
84    /// # Panic
85    /// Panic if failed to create the Runtime object.
86    pub fn new() -> Self {
87        // Check whether io-uring is available.
88        #[cfg(target_os = "linux")]
89        if matches!(*RUNTIME_TYPE, RuntimeType::Uring) {
90            if let Ok(rt) = tokio_uring::Runtime::new(&tokio_uring::builder()) {
91                return Runtime::Uring(std::sync::Mutex::new(rt));
92            }
93        }
94
95        // Create tokio runtime if io-uring is not supported.
96        let rt = tokio::runtime::Builder::new_current_thread()
97            .enable_all()
98            .build()
99            .expect("utils: failed to create tokio runtime for current thread");
100        Runtime::Tokio(rt)
101    }
102
103    /// Run a future to completion.
104    pub fn block_on<F: Future>(&self, f: F) -> F::Output {
105        match self {
106            Runtime::Tokio(rt) => rt.block_on(f),
107            #[cfg(target_os = "linux")]
108            Runtime::Uring(rt) => rt.lock().unwrap().block_on(f),
109        }
110    }
111
112    /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
113    ///
114    /// Spawning a task enables the task to execute concurrently to other tasks.
115    /// There is no guarantee that a spawned task will execute to completion. When a
116    /// runtime is shutdown, all outstanding tasks are dropped, regardless of the
117    /// lifecycle of that task.
118    ///
119    /// This function must be called from the context of a `tokio-uring` runtime.
120    ///
121    /// [`JoinHandle`]: tokio::task::JoinHandle
122    pub fn spawn<T: std::future::Future + 'static>(
123        &self,
124        task: T,
125    ) -> tokio::task::JoinHandle<T::Output> {
126        match self {
127            Runtime::Tokio(_) => tokio::task::spawn_local(task),
128            #[cfg(target_os = "linux")]
129            Runtime::Uring(_) => tokio_uring::spawn(task),
130        }
131    }
132}
133
134/// Start an async runtime.
135pub fn start<F: Future>(future: F) -> F::Output {
136    Runtime::new().block_on(future)
137}
138
139impl Default for Runtime {
140    fn default() -> Self {
141        Runtime::new()
142    }
143}
144
145/// Run a callback with the default `Runtime` object.
146pub fn with_runtime<F, R>(f: F) -> R
147where
148    F: FnOnce(&Runtime) -> R,
149{
150    let rt = Runtime::new();
151    f(&rt)
152}
153
154/// Run a future to completion with the default `Runtime` object.
155pub fn block_on<F: Future>(f: F) -> F::Output {
156    Runtime::new().block_on(f)
157}
158
159/// Spawns a new asynchronous task with the defualt `Runtime`, returning a [`JoinHandle`] for it.
160///
161/// Spawning a task enables the task to execute concurrently to other tasks.
162/// There is no guarantee that a spawned task will execute to completion. When a
163/// runtime is shutdown, all outstanding tasks are dropped, regardless of the
164/// lifecycle of that task.
165///
166/// This will create a new Runtime to run spawn.
167///
168/// [`JoinHandle`]: tokio::task::JoinHandle
169pub fn spawn<T: std::future::Future + 'static>(task: T) -> tokio::task::JoinHandle<T::Output> {
170    let rt = Runtime::new();
171    rt.spawn(task)
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    #[test]
179    fn test_with_runtime() {
180        let res = with_runtime(|rt| rt.block_on(async { 1 }));
181        assert_eq!(res, 1);
182
183        let res = with_runtime(|rt| rt.block_on(async { 3 }));
184        assert_eq!(res, 3);
185    }
186
187    #[test]
188    fn test_block_on() {
189        let res = block_on(async { 1 });
190        assert_eq!(res, 1);
191
192        let res = block_on(async { 3 });
193        assert_eq!(res, 3);
194    }
195}