lance_core/utils/
tokio.rs1use std::sync::atomic::Ordering;
5use std::sync::{LazyLock, atomic};
6use std::time::Duration;
7
8use futures::{Future, FutureExt};
9use tokio::runtime::{Builder, Runtime};
10use tracing::Span;
11
12static NUM_COMPUTE_INTENSIVE_CPUS: LazyLock<usize> =
17 LazyLock::new(calculate_num_compute_intensive_cpus);
18
19pub fn get_num_compute_intensive_cpus() -> usize {
20 *NUM_COMPUTE_INTENSIVE_CPUS
21}
22
23fn calculate_num_compute_intensive_cpus() -> usize {
24 if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
25 return user_specified.parse().unwrap();
26 }
27
28 let cpus = num_cpus::get();
29
30 if cpus <= *IO_CORE_RESERVATION {
31 if cpus > 2 {
34 log::warn!(
35 "Number of CPUs is less than or equal to the number of IO core reservations. \
36 This is not a supported configuration. using 1 CPU for compute intensive tasks."
37 );
38 }
39 return 1;
40 }
41
42 num_cpus::get() - *IO_CORE_RESERVATION
43}
44
45pub static IO_CORE_RESERVATION: LazyLock<usize> = LazyLock::new(|| {
46 std::env::var("LANCE_IO_CORE_RESERVATION")
47 .unwrap_or("2".to_string())
48 .parse()
49 .unwrap()
50});
51
52fn create_runtime() -> Runtime {
53 Builder::new_multi_thread()
54 .thread_name("lance-cpu")
55 .max_blocking_threads(get_num_compute_intensive_cpus())
56 .worker_threads(1)
57 .thread_keep_alive(Duration::from_secs(u64::MAX))
59 .build()
60 .unwrap()
61}
62
63static CPU_RUNTIME: atomic::AtomicPtr<Runtime> = atomic::AtomicPtr::new(std::ptr::null_mut());
64
65static RUNTIME_INSTALLED: atomic::AtomicBool = atomic::AtomicBool::new(false);
66
67static ATFORK_INSTALLED: atomic::AtomicBool = atomic::AtomicBool::new(false);
68
69fn global_cpu_runtime() -> &'static mut Runtime {
70 loop {
71 let ptr = CPU_RUNTIME.load(Ordering::SeqCst);
72 if !ptr.is_null() {
73 return unsafe { &mut *ptr };
74 }
75 if !RUNTIME_INSTALLED.fetch_or(true, Ordering::SeqCst) {
76 break;
77 }
78 std::thread::yield_now();
79 }
80 if !ATFORK_INSTALLED.fetch_or(true, Ordering::SeqCst) {
81 install_atfork();
82 }
83 let new_ptr = Box::into_raw(Box::new(create_runtime()));
84 CPU_RUNTIME.store(new_ptr, Ordering::SeqCst);
85 unsafe { &mut *new_ptr }
86}
87
88extern "C" fn atfork_tokio_child() {
91 CPU_RUNTIME.store(std::ptr::null_mut(), Ordering::SeqCst);
92 RUNTIME_INSTALLED.store(false, Ordering::SeqCst);
93}
94
95#[cfg(not(windows))]
96fn install_atfork() {
97 unsafe { libc::pthread_atfork(None, None, Some(atfork_tokio_child)) };
98}
99
100#[cfg(windows)]
101fn install_atfork() {}
102
103pub fn spawn_cpu<
112 E: std::error::Error + Send + 'static,
113 F: FnOnce() -> std::result::Result<R, E> + Send + 'static,
114 R: Send + 'static,
115>(
116 func: F,
117) -> impl Future<Output = std::result::Result<R, E>> {
118 let (send, recv) = tokio::sync::oneshot::channel();
119 let span = Span::current();
121 global_cpu_runtime().spawn_blocking(move || {
122 let _span_guard = span.enter();
123 let result = func();
124 let _ = send.send(result);
125 });
126 recv.map(|res| res.unwrap())
127}