1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors
use crate::Result;
use futures::{Future, FutureExt};
use tokio::runtime::{Builder, Runtime};
use tracing::Span;
lazy_static::lazy_static! {
pub static ref CPU_RUNTIME: Runtime = Builder::new_multi_thread()
.thread_name("lance-cpu")
.max_blocking_threads(num_cpus::get())
.build()
.unwrap();
}
/// Spawn a CPU intensive task
///
/// This task will be put onto a thread pool dedicated for CPU-intensive work
/// This keeps the tokio thread pool free so that we can always be ready to service
/// cheap I/O & control requests.
///
/// This can also be used to convert a big chunk of synchronous work into a future
/// so that it can be run in parallel with something like StreamExt::buffered()
pub fn spawn_cpu<F: FnOnce() -> Result<R> + Send + 'static, R: Send + 'static>(
func: F,
) -> impl Future<Output = Result<R>> {
let (send, recv) = tokio::sync::oneshot::channel();
// Propagate the current span into the task
let span = Span::current();
CPU_RUNTIME.spawn_blocking(move || {
let _span_guard = span.enter();
let result = func();
let _ = send.send(result);
});
recv.map(|res| res.unwrap())
}