lance_core/utils/
tokio.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::time::Duration;
5
6use crate::Result;
7
8use futures::{Future, FutureExt};
9use tokio::runtime::{Builder, Runtime};
10use tracing::Span;
11
12pub fn get_num_compute_intensive_cpus() -> usize {
13    if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
14        return user_specified.parse().unwrap();
15    }
16
17    let cpus = num_cpus::get();
18
19    if cpus <= *IO_CORE_RESERVATION {
20        // If the user is not setting a custom value for LANCE_IO_CORE_RESERVATION then we don't emit
21        // a warning because they're just on a small machine and there isn't much they can do about it.
22        if cpus > 2 {
23            log::warn!(
24                "Number of CPUs is less than or equal to the number of IO core reservations. \
25                This is not a supported configuration. using 1 CPU for compute intensive tasks."
26            );
27        }
28        return 1;
29    }
30
31    num_cpus::get() - *IO_CORE_RESERVATION
32}
33
34lazy_static::lazy_static! {
35    pub static ref IO_CORE_RESERVATION: usize = std::env::var("LANCE_IO_CORE_RESERVATION").unwrap_or("2".to_string()).parse().unwrap();
36
37    pub static ref CPU_RUNTIME: Runtime = Builder::new_multi_thread()
38        .thread_name("lance-cpu")
39        .max_blocking_threads(get_num_compute_intensive_cpus())
40        .worker_threads(1)
41        // keep the thread alive "forever"
42        .thread_keep_alive(Duration::from_secs(u64::MAX))
43        .build()
44        .unwrap();
45}
46
47/// Spawn a CPU intensive task
48///
49/// This task will be put onto a thread pool dedicated for CPU-intensive work
50/// This keeps the tokio thread pool free so that we can always be ready to service
51/// cheap I/O & control requests.
52///
53/// This can also be used to convert a big chunk of synchronous work into a future
54/// so that it can be run in parallel with something like StreamExt::buffered()
55pub fn spawn_cpu<F: FnOnce() -> Result<R> + Send + 'static, R: Send + 'static>(
56    func: F,
57) -> impl Future<Output = Result<R>> {
58    let (send, recv) = tokio::sync::oneshot::channel();
59    // Propagate the current span into the task
60    let span = Span::current();
61    CPU_RUNTIME.spawn_blocking(move || {
62        let _span_guard = span.enter();
63        let result = func();
64        let _ = send.send(result);
65    });
66    recv.map(|res| res.unwrap())
67}