Skip to main content

polars_async/
lib.rs

1use std::sync::{Arc, LazyLock};
2
3use polars_error::polars_warn;
4use polars_utils::relaxed_cell::RelaxedCell;
5use tokio::runtime::{Builder, Runtime};
6
7use crate::executor::{THREAD_SPAWNED_BY_POLARS_EXECUTOR, is_scheduling_polars_executor_thread};
8
9pub mod executor;
10pub mod primitives;
11
12pub struct RuntimeManager {
13    rt: Runtime,
14}
15
16impl RuntimeManager {
17    fn new() -> Self {
18        let n_threads = std::env::var("POLARS_ASYNC_THREAD_COUNT")
19            .map(|x| x.parse::<usize>().expect("integer"))
20            .unwrap_or(usize::min(polars_config::config().max_threads(), 32));
21
22        let max_blocking = std::env::var("POLARS_MAX_BLOCKING_THREAD_COUNT")
23            .map(|x| x.parse::<usize>().expect("integer"))
24            .unwrap_or(512);
25
26        if polars_config::config().verbose() {
27            eprintln!("async thread count: {n_threads}");
28            eprintln!("blocking thread count: {max_blocking}");
29        }
30
31        let max_total_threads = n_threads + max_blocking;
32        let warned = RelaxedCell::new_bool(false);
33        let tokio_thread_count_start = Arc::new(RelaxedCell::new_i64(0));
34        let tokio_thread_count_stop = tokio_thread_count_start.clone();
35
36        let rt = Builder::new_multi_thread()
37            .worker_threads(n_threads)
38            .max_blocking_threads(max_blocking)
39            .on_thread_start(move || {
40                if tokio_thread_count_start.fetch_add(1) + 1 >= (max_total_threads as i64) && !warned.load() {
41                    warned.store(true);
42                    polars_warn!("POLARS_MAX_BLOCKING_THREAD_COUNT reached ({max_blocking}), this may indicate a deadlock");
43                }
44            })
45            .on_thread_stop(move || { tokio_thread_count_stop.fetch_sub(1); })
46            .enable_io()
47            .enable_time()
48            .build()
49            .unwrap();
50
51        Self { rt }
52    }
53
54    /// Runs the given function on this thread while allowing another thread to take
55    /// over this thread's task execution duties.
56    ///
57    /// Simply directly calls f() if this thread is not an async executor thread.
58    pub fn block_in_place<R, F: FnOnce() -> R>(&self, f: F) -> R {
59        if THREAD_SPAWNED_BY_POLARS_EXECUTOR.get() {
60            executor::block_in_place(f)
61        } else {
62            tokio::task::block_in_place(f)
63        }
64    }
65
66    /// Blocks this thread to evaluate the given future.
67    ///
68    /// This is more expensive than block_on when called from an async runtime
69    /// worker thread because other async tasks scheduled to run on this thread
70    /// have to be moved to a new thread.
71    ///
72    /// If more than POLARS_MAX_BLOCKING_THREAD_COUNT calls to this occur
73    /// simultaneously a deadlock may occur.
74    pub fn block_in_place_on<F>(&self, future: F) -> F::Output
75    where
76        F: Future,
77    {
78        if THREAD_SPAWNED_BY_POLARS_EXECUTOR.get() {
79            executor::block_in_place(|| self.rt.block_on(future))
80        } else {
81            tokio::task::block_in_place(|| self.rt.block_on(future))
82        }
83    }
84
85    /// Blocks this thread to evaluate the given future.
86    ///
87    /// Panics if the current thread is an async runtime worker thread.
88    pub fn block_on<F>(&self, future: F) -> F::Output
89    where
90        F: Future,
91    {
92        assert!(
93            !is_scheduling_polars_executor_thread(),
94            "block_on may not be called from within a polars async executor runtime worker thread"
95        );
96        self.rt.block_on(future)
97    }
98
99    /// Spawns a future onto the Tokio runtime (see [`tokio::runtime::Runtime::spawn`]).
100    pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
101    where
102        F: Future + Send + 'static,
103        F::Output: Send + 'static,
104    {
105        self.rt.spawn(future)
106    }
107
108    // See [`tokio::runtime::Runtime::spawn_blocking`].
109    pub fn spawn_blocking<F, R>(&self, f: F) -> tokio::task::JoinHandle<R>
110    where
111        F: FnOnce() -> R + Send + 'static,
112        R: Send + 'static,
113    {
114        self.rt.spawn_blocking(f)
115    }
116}
117
118/// The global Polars async runtime accessor.
119pub static ASYNC: LazyLock<RuntimeManager> = LazyLock::new(RuntimeManager::new);