lance_core/utils/
tokio.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::LazyLock;
5use std::time::Duration;
6
7use crate::Result;
8
9use futures::{Future, FutureExt};
10use tokio::runtime::{Builder, Runtime};
11use tracing::Span;
12
13/// We cache the call to num_cpus::get() because:
14///
15/// 1. It shouldn't change during the lifetime of the program
16/// 2. It's a relatively expensive call (requires opening several files and examining them)
17static NUM_COMPUTE_INTENSIVE_CPUS: LazyLock<usize> =
18    LazyLock::new(calculate_num_compute_intensive_cpus);
19
20pub fn get_num_compute_intensive_cpus() -> usize {
21    *NUM_COMPUTE_INTENSIVE_CPUS
22}
23
24fn calculate_num_compute_intensive_cpus() -> usize {
25    if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
26        return user_specified.parse().unwrap();
27    }
28
29    let cpus = num_cpus::get();
30
31    if cpus <= *IO_CORE_RESERVATION {
32        // If the user is not setting a custom value for LANCE_IO_CORE_RESERVATION then we don't emit
33        // a warning because they're just on a small machine and there isn't much they can do about it.
34        if cpus > 2 {
35            log::warn!(
36                "Number of CPUs is less than or equal to the number of IO core reservations. \
37                This is not a supported configuration. using 1 CPU for compute intensive tasks."
38            );
39        }
40        return 1;
41    }
42
43    num_cpus::get() - *IO_CORE_RESERVATION
44}
45
46pub static IO_CORE_RESERVATION: LazyLock<usize> = LazyLock::new(|| {
47    std::env::var("LANCE_IO_CORE_RESERVATION")
48        .unwrap_or("2".to_string())
49        .parse()
50        .unwrap()
51});
52
53pub static CPU_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
54    Builder::new_multi_thread()
55        .thread_name("lance-cpu")
56        .max_blocking_threads(get_num_compute_intensive_cpus())
57        .worker_threads(1)
58        // keep the thread alive "forever"
59        .thread_keep_alive(Duration::from_secs(u64::MAX))
60        .build()
61        .unwrap()
62});
63
64/// Spawn a CPU intensive task
65///
66/// This task will be put onto a thread pool dedicated for CPU-intensive work
67/// This keeps the tokio thread pool free so that we can always be ready to service
68/// cheap I/O & control requests.
69///
70/// This can also be used to convert a big chunk of synchronous work into a future
71/// so that it can be run in parallel with something like StreamExt::buffered()
72pub fn spawn_cpu<F: FnOnce() -> Result<R> + Send + 'static, R: Send + 'static>(
73    func: F,
74) -> impl Future<Output = Result<R>> {
75    let (send, recv) = tokio::sync::oneshot::channel();
76    // Propagate the current span into the task
77    let span = Span::current();
78    CPU_RUNTIME.spawn_blocking(move || {
79        let _span_guard = span.enter();
80        let result = func();
81        let _ = send.send(result);
82    });
83    recv.map(|res| res.unwrap())
84}