lance_core/utils/
tokio.rs1use std::sync::atomic::Ordering;
5use std::sync::{atomic, LazyLock};
6use std::time::Duration;
7
8use crate::Result;
9
10use futures::{Future, FutureExt};
11use tokio::runtime::{Builder, Runtime};
12use tracing::Span;
13
14static NUM_COMPUTE_INTENSIVE_CPUS: LazyLock<usize> =
19 LazyLock::new(calculate_num_compute_intensive_cpus);
20
21pub fn get_num_compute_intensive_cpus() -> usize {
22 *NUM_COMPUTE_INTENSIVE_CPUS
23}
24
25fn calculate_num_compute_intensive_cpus() -> usize {
26 if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
27 return user_specified.parse().unwrap();
28 }
29
30 let cpus = num_cpus::get();
31
32 if cpus <= *IO_CORE_RESERVATION {
33 if cpus > 2 {
36 log::warn!(
37 "Number of CPUs is less than or equal to the number of IO core reservations. \
38 This is not a supported configuration. using 1 CPU for compute intensive tasks."
39 );
40 }
41 return 1;
42 }
43
44 num_cpus::get() - *IO_CORE_RESERVATION
45}
46
47pub static IO_CORE_RESERVATION: LazyLock<usize> = LazyLock::new(|| {
48 std::env::var("LANCE_IO_CORE_RESERVATION")
49 .unwrap_or("2".to_string())
50 .parse()
51 .unwrap()
52});
53
54fn create_runtime() -> Runtime {
55 Builder::new_multi_thread()
56 .thread_name("lance-cpu")
57 .max_blocking_threads(get_num_compute_intensive_cpus())
58 .worker_threads(1)
59 .thread_keep_alive(Duration::from_secs(u64::MAX))
61 .build()
62 .unwrap()
63}
64
65static CPU_RUNTIME: atomic::AtomicPtr<Runtime> = atomic::AtomicPtr::new(std::ptr::null_mut());
66
67static RUNTIME_INSTALLED: atomic::AtomicBool = atomic::AtomicBool::new(false);
68
69static ATFORK_INSTALLED: atomic::AtomicBool = atomic::AtomicBool::new(false);
70
71fn global_cpu_runtime() -> &'static mut Runtime {
72 loop {
73 let ptr = CPU_RUNTIME.load(Ordering::SeqCst);
74 if !ptr.is_null() {
75 return unsafe { &mut *ptr };
76 }
77 if !RUNTIME_INSTALLED.fetch_or(true, Ordering::SeqCst) {
78 break;
79 }
80 std::thread::yield_now();
81 }
82 if !ATFORK_INSTALLED.fetch_or(true, Ordering::SeqCst) {
83 install_atfork();
84 }
85 let new_ptr = Box::into_raw(Box::new(create_runtime()));
86 CPU_RUNTIME.store(new_ptr, Ordering::SeqCst);
87 unsafe { &mut *new_ptr }
88}
89
90extern "C" fn atfork_tokio_child() {
93 CPU_RUNTIME.store(std::ptr::null_mut(), Ordering::SeqCst);
94 RUNTIME_INSTALLED.store(false, Ordering::SeqCst);
95}
96
97#[cfg(not(windows))]
98fn install_atfork() {
99 unsafe { libc::pthread_atfork(None, None, Some(atfork_tokio_child)) };
100}
101
102#[cfg(windows)]
103fn install_atfork() {}
104
105pub fn spawn_cpu<F: FnOnce() -> Result<R> + Send + 'static, R: Send + 'static>(
114 func: F,
115) -> impl Future<Output = Result<R>> {
116 let (send, recv) = tokio::sync::oneshot::channel();
117 let span = Span::current();
119 global_cpu_runtime().spawn_blocking(move || {
120 let _span_guard = span.enter();
121 let result = func();
122 let _ = send.send(result);
123 });
124 recv.map(|res| res.unwrap())
125}