1use std::sync::{Arc, LazyLock};
2
3use polars_error::polars_warn;
4use polars_utils::relaxed_cell::RelaxedCell;
5use tokio::runtime::{Builder, Runtime};
6
7use crate::executor::{THREAD_SPAWNED_BY_POLARS_EXECUTOR, is_scheduling_polars_executor_thread};
8
9pub mod executor;
10pub mod primitives;
11
12pub struct RuntimeManager {
13 rt: Runtime,
14}
15
16impl RuntimeManager {
17 fn new() -> Self {
18 let n_threads = std::env::var("POLARS_ASYNC_THREAD_COUNT")
19 .map(|x| x.parse::<usize>().expect("integer"))
20 .unwrap_or(usize::min(polars_config::config().max_threads(), 32));
21
22 let max_blocking = std::env::var("POLARS_MAX_BLOCKING_THREAD_COUNT")
23 .map(|x| x.parse::<usize>().expect("integer"))
24 .unwrap_or(512);
25
26 if polars_config::config().verbose() {
27 eprintln!("async thread count: {n_threads}");
28 eprintln!("blocking thread count: {max_blocking}");
29 }
30
31 let max_total_threads = n_threads + max_blocking;
32 let warned = RelaxedCell::new_bool(false);
33 let tokio_thread_count_start = Arc::new(RelaxedCell::new_i64(0));
34 let tokio_thread_count_stop = tokio_thread_count_start.clone();
35
36 let rt = Builder::new_multi_thread()
37 .worker_threads(n_threads)
38 .max_blocking_threads(max_blocking)
39 .on_thread_start(move || {
40 if tokio_thread_count_start.fetch_add(1) + 1 >= (max_total_threads as i64) && !warned.load() {
41 warned.store(true);
42 polars_warn!("POLARS_MAX_BLOCKING_THREAD_COUNT reached ({max_blocking}), this may indicate a deadlock");
43 }
44 })
45 .on_thread_stop(move || { tokio_thread_count_stop.fetch_sub(1); })
46 .enable_io()
47 .enable_time()
48 .build()
49 .unwrap();
50
51 Self { rt }
52 }
53
54 pub fn block_in_place<R, F: FnOnce() -> R>(&self, f: F) -> R {
59 if THREAD_SPAWNED_BY_POLARS_EXECUTOR.get() {
60 executor::block_in_place(f)
61 } else {
62 tokio::task::block_in_place(f)
63 }
64 }
65
66 pub fn block_in_place_on<F>(&self, future: F) -> F::Output
75 where
76 F: Future,
77 {
78 if THREAD_SPAWNED_BY_POLARS_EXECUTOR.get() {
79 executor::block_in_place(|| self.rt.block_on(future))
80 } else {
81 tokio::task::block_in_place(|| self.rt.block_on(future))
82 }
83 }
84
85 pub fn block_on<F>(&self, future: F) -> F::Output
89 where
90 F: Future,
91 {
92 assert!(
93 !is_scheduling_polars_executor_thread(),
94 "block_on may not be called from within a polars async executor runtime worker thread"
95 );
96 self.rt.block_on(future)
97 }
98
99 pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
101 where
102 F: Future + Send + 'static,
103 F::Output: Send + 'static,
104 {
105 self.rt.spawn(future)
106 }
107
108 pub fn spawn_blocking<F, R>(&self, f: F) -> tokio::task::JoinHandle<R>
110 where
111 F: FnOnce() -> R + Send + 'static,
112 R: Send + 'static,
113 {
114 self.rt.spawn_blocking(f)
115 }
116}
117
118pub static ASYNC: LazyLock<RuntimeManager> = LazyLock::new(RuntimeManager::new);