lance_core/utils/
tokio.rs1use std::sync::LazyLock;
5use std::time::Duration;
6
7use crate::Result;
8
9use futures::{Future, FutureExt};
10use tokio::runtime::{Builder, Runtime};
11use tracing::Span;
12
13static NUM_COMPUTE_INTENSIVE_CPUS: LazyLock<usize> =
18 LazyLock::new(calculate_num_compute_intensive_cpus);
19
20pub fn get_num_compute_intensive_cpus() -> usize {
21 *NUM_COMPUTE_INTENSIVE_CPUS
22}
23
24fn calculate_num_compute_intensive_cpus() -> usize {
25 if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
26 return user_specified.parse().unwrap();
27 }
28
29 let cpus = num_cpus::get();
30
31 if cpus <= *IO_CORE_RESERVATION {
32 if cpus > 2 {
35 log::warn!(
36 "Number of CPUs is less than or equal to the number of IO core reservations. \
37 This is not a supported configuration. using 1 CPU for compute intensive tasks."
38 );
39 }
40 return 1;
41 }
42
43 num_cpus::get() - *IO_CORE_RESERVATION
44}
45
46pub static IO_CORE_RESERVATION: LazyLock<usize> = LazyLock::new(|| {
47 std::env::var("LANCE_IO_CORE_RESERVATION")
48 .unwrap_or("2".to_string())
49 .parse()
50 .unwrap()
51});
52
53pub static CPU_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
54 Builder::new_multi_thread()
55 .thread_name("lance-cpu")
56 .max_blocking_threads(get_num_compute_intensive_cpus())
57 .worker_threads(1)
58 .thread_keep_alive(Duration::from_secs(u64::MAX))
60 .build()
61 .unwrap()
62});
63
64pub fn spawn_cpu<F: FnOnce() -> Result<R> + Send + 'static, R: Send + 'static>(
73 func: F,
74) -> impl Future<Output = Result<R>> {
75 let (send, recv) = tokio::sync::oneshot::channel();
76 let span = Span::current();
78 CPU_RUNTIME.spawn_blocking(move || {
79 let _span_guard = span.enter();
80 let result = func();
81 let _ = send.send(result);
82 });
83 recv.map(|res| res.unwrap())
84}