polars_core/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![cfg_attr(feature = "simd", feature(portable_simd))]
3#![allow(ambiguous_glob_reexports)]
4#![cfg_attr(
5    feature = "allow_unused",
6    allow(unused, dead_code, irrefutable_let_patterns)
7)] // Maybe be caused by some feature
8// combinations
9#![cfg_attr(feature = "nightly", allow(clippy::non_canonical_partial_ord_impl))] // remove once stable
10extern crate core;
11
12#[macro_use]
13pub mod utils;
14pub mod chunked_array;
15pub mod config;
16pub mod datatypes;
17pub mod error;
18pub mod fmt;
19pub mod frame;
20pub mod functions;
21pub mod hashing;
22mod named_from;
23pub mod prelude;
24#[cfg(feature = "random")]
25pub mod random;
26pub mod scalar;
27pub mod schema;
28#[cfg(feature = "serde")]
29pub mod serde;
30pub mod series;
31pub mod testing;
32#[cfg(test)]
33mod tests;
34
35use std::cell::{Cell, RefCell};
36use std::panic::AssertUnwindSafe;
37use std::sync::{LazyLock, Mutex};
38use std::time::{SystemTime, UNIX_EPOCH};
39
40pub use datatypes::SchemaExtPl;
41pub use hashing::IdBuildHasher;
42use rayon::{ThreadPool, ThreadPoolBuilder};
43
44pub static PROCESS_ID: LazyLock<u128> = LazyLock::new(|| {
45    SystemTime::now()
46        .duration_since(UNIX_EPOCH)
47        .unwrap()
48        .as_nanos()
49});
50
51pub struct POOL;
52
53// Thread locals to allow disabling threading for specific threads.
54#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
55thread_local! {
56    static ALLOW_THREADS: Cell<bool> = const { Cell::new(true) };
57    static NOOP_POOL: RefCell<ThreadPool> = RefCell::new(
58        ThreadPoolBuilder::new()
59            .use_current_thread()
60            .num_threads(1)
61            .build()
62            .expect("could not create no-op thread pool")
63    );
64}
65
66impl POOL {
67    pub fn install<OP, R>(&self, op: OP) -> R
68    where
69        OP: FnOnce() -> R + Send,
70        R: Send,
71    {
72        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
73        {
74            op()
75        }
76
77        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
78        {
79            self.with(|p| p.install(op))
80        }
81    }
82
83    pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
84    where
85        A: FnOnce() -> RA + Send,
86        B: FnOnce() -> RB + Send,
87        RA: Send,
88        RB: Send,
89    {
90        self.install(|| rayon::join(oper_a, oper_b))
91    }
92
93    pub fn scope<'scope, OP, R>(&self, op: OP) -> R
94    where
95        OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
96        R: Send,
97    {
98        self.install(|| rayon::scope(op))
99    }
100
101    pub fn spawn<OP>(&self, op: OP)
102    where
103        OP: FnOnce() + Send + 'static,
104    {
105        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
106        {
107            rayon::spawn(op)
108        }
109
110        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
111        {
112            self.with(|p| {
113                p.spawn(op);
114                if p.current_num_threads() == 1 {
115                    p.yield_now();
116                }
117            })
118        }
119    }
120
121    pub fn spawn_fifo<OP>(&self, op: OP)
122    where
123        OP: FnOnce() + Send + 'static,
124    {
125        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
126        {
127            rayon::spawn_fifo(op)
128        }
129
130        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
131        {
132            self.with(|p| {
133                p.spawn_fifo(op);
134                if p.current_num_threads() == 1 {
135                    p.yield_now();
136                }
137            })
138        }
139    }
140
141    pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
142        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
143        {
144            None
145        }
146
147        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
148        {
149            self.with(|p| p.current_thread_has_pending_tasks())
150        }
151    }
152
153    pub fn current_thread_index(&self) -> Option<usize> {
154        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
155        {
156            rayon::current_thread_index()
157        }
158
159        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
160        {
161            self.with(|p| p.current_thread_index())
162        }
163    }
164
165    pub fn current_num_threads(&self) -> usize {
166        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
167        {
168            rayon::current_num_threads()
169        }
170
171        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
172        {
173            self.with(|p| p.current_num_threads())
174        }
175    }
176
177    #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
178    pub fn with<OP, R>(&self, op: OP) -> R
179    where
180        OP: FnOnce(&ThreadPool) -> R + Send,
181        R: Send,
182    {
183        if ALLOW_THREADS.get() || THREAD_POOL.current_thread_index().is_some() {
184            op(&THREAD_POOL)
185        } else {
186            NOOP_POOL.with(|v| op(&v.borrow()))
187        }
188    }
189
190    pub fn without_threading<R>(&self, op: impl FnOnce() -> R) -> R {
191        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
192        {
193            op()
194        }
195
196        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
197        {
198            // This can only be done from threads that are not in the main threadpool.
199            if THREAD_POOL.current_thread_index().is_some() {
200                op()
201            } else {
202                let prev = ALLOW_THREADS.replace(false);
203                // @Q? Should this catch_unwind?
204                let result = std::panic::catch_unwind(AssertUnwindSafe(op));
205                ALLOW_THREADS.set(prev);
206                match result {
207                    Ok(v) => v,
208                    Err(p) => std::panic::resume_unwind(p),
209                }
210            }
211        }
212    }
213}
214
215// this is re-exported in utils for polars child crates
216#[cfg(not(target_family = "wasm"))] // only use this on non wasm targets
217pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
218    let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
219    ThreadPoolBuilder::new()
220        .num_threads(
221            std::env::var("POLARS_MAX_THREADS")
222                .map(|s| s.parse::<usize>().expect("integer"))
223                .unwrap_or_else(|_| {
224                    std::thread::available_parallelism()
225                        .unwrap_or(std::num::NonZeroUsize::new(1).unwrap())
226                        .get()
227                }),
228        )
229        .thread_name(move |i| format!("{thread_name}-{i}"))
230        .build()
231        .expect("could not spawn threads")
232});
233
234#[cfg(all(target_os = "emscripten", target_family = "wasm"))] // Use 1 rayon thread on emscripten
235pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
236    ThreadPoolBuilder::new()
237        .num_threads(1)
238        .use_current_thread()
239        .build()
240        .expect("could not create pool")
241});
242
243// utility for the tests to ensure a single thread can execute
244pub static SINGLE_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
245
246/// Default length for a `.head()` call
247pub(crate) const HEAD_DEFAULT_LENGTH: usize = 10;
248/// Default length for a `.tail()` call
249pub(crate) const TAIL_DEFAULT_LENGTH: usize = 10;
250pub const CHEAP_SERIES_HASH_LIMIT: usize = 1000;