Skip to main content

lance_core/utils/
tokio.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::atomic::Ordering;
5use std::sync::{LazyLock, atomic};
6use std::time::Duration;
7
8use futures::{Future, FutureExt};
9use tokio::runtime::{Builder, Runtime};
10use tracing::Span;
11
12/// We cache the call to num_cpus::get() because:
13///
14/// 1. It shouldn't change during the lifetime of the program
15/// 2. It's a relatively expensive call (requires opening several files and examining them)
16static NUM_COMPUTE_INTENSIVE_CPUS: LazyLock<usize> =
17    LazyLock::new(calculate_num_compute_intensive_cpus);
18
19pub fn get_num_compute_intensive_cpus() -> usize {
20    *NUM_COMPUTE_INTENSIVE_CPUS
21}
22
23fn calculate_num_compute_intensive_cpus() -> usize {
24    if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
25        return user_specified.parse().unwrap();
26    }
27
28    let cpus = num_cpus::get();
29
30    if cpus <= *IO_CORE_RESERVATION {
31        // If the user is not setting a custom value for LANCE_IO_CORE_RESERVATION then we don't emit
32        // a warning because they're just on a small machine and there isn't much they can do about it.
33        if cpus > 2 {
34            log::warn!(
35                "Number of CPUs is less than or equal to the number of IO core reservations. \
36                This is not a supported configuration. using 1 CPU for compute intensive tasks."
37            );
38        }
39        return 1;
40    }
41
42    num_cpus::get() - *IO_CORE_RESERVATION
43}
44
45pub static IO_CORE_RESERVATION: LazyLock<usize> = LazyLock::new(|| {
46    std::env::var("LANCE_IO_CORE_RESERVATION")
47        .unwrap_or("2".to_string())
48        .parse()
49        .unwrap()
50});
51
52fn create_runtime() -> Runtime {
53    Builder::new_multi_thread()
54        .thread_name("lance-cpu")
55        .max_blocking_threads(get_num_compute_intensive_cpus())
56        .worker_threads(1)
57        // keep the thread alive "forever"
58        .thread_keep_alive(Duration::from_secs(u64::MAX))
59        .build()
60        .unwrap()
61}
62
63static CPU_RUNTIME: atomic::AtomicPtr<Runtime> = atomic::AtomicPtr::new(std::ptr::null_mut());
64
65static RUNTIME_INSTALLED: atomic::AtomicBool = atomic::AtomicBool::new(false);
66
67static ATFORK_INSTALLED: atomic::AtomicBool = atomic::AtomicBool::new(false);
68
69fn global_cpu_runtime() -> &'static mut Runtime {
70    loop {
71        let ptr = CPU_RUNTIME.load(Ordering::SeqCst);
72        if !ptr.is_null() {
73            return unsafe { &mut *ptr };
74        }
75        if !RUNTIME_INSTALLED.fetch_or(true, Ordering::SeqCst) {
76            break;
77        }
78        std::thread::yield_now();
79    }
80    if !ATFORK_INSTALLED.fetch_or(true, Ordering::SeqCst) {
81        install_atfork();
82    }
83    let new_ptr = Box::into_raw(Box::new(create_runtime()));
84    CPU_RUNTIME.store(new_ptr, Ordering::SeqCst);
85    unsafe { &mut *new_ptr }
86}
87
88/// After a fork() operation, force re-creation of the BackgroundExecutor. Note: this function
89/// runs in "async-signal context" which means that we can't (safely) do much here.
90extern "C" fn atfork_tokio_child() {
91    CPU_RUNTIME.store(std::ptr::null_mut(), Ordering::SeqCst);
92    RUNTIME_INSTALLED.store(false, Ordering::SeqCst);
93}
94
95#[cfg(not(windows))]
96fn install_atfork() {
97    unsafe { libc::pthread_atfork(None, None, Some(atfork_tokio_child)) };
98}
99
100#[cfg(windows)]
101fn install_atfork() {}
102
103/// Spawn a CPU intensive task
104///
105/// This task will be put onto a thread pool dedicated for CPU-intensive work
106/// This keeps the tokio thread pool free so that we can always be ready to service
107/// cheap I/O & control requests.
108///
109/// This can also be used to convert a big chunk of synchronous work into a future
110/// so that it can be run in parallel with something like StreamExt::buffered()
111pub fn spawn_cpu<
112    E: std::error::Error + Send + 'static,
113    F: FnOnce() -> std::result::Result<R, E> + Send + 'static,
114    R: Send + 'static,
115>(
116    func: F,
117) -> impl Future<Output = std::result::Result<R, E>> {
118    let (send, recv) = tokio::sync::oneshot::channel();
119    // Propagate the current span into the task
120    let span = Span::current();
121    global_cpu_runtime().spawn_blocking(move || {
122        let _span_guard = span.enter();
123        let result = func();
124        let _ = send.send(result);
125    });
126    recv.map(|res| res.unwrap())
127}