1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![cfg_attr(feature = "simd", feature(portable_simd))]
3#![allow(ambiguous_glob_reexports)]
4#![cfg_attr(
5 feature = "allow_unused",
6 allow(unused, dead_code, irrefutable_let_patterns)
7)] #![cfg_attr(feature = "nightly", allow(clippy::non_canonical_partial_ord_impl))] extern crate core;
11
12#[macro_use]
13pub mod utils;
14pub mod chunked_array;
15pub mod config;
16pub mod datatypes;
17pub mod error;
18pub mod fmt;
19pub mod frame;
20pub mod functions;
21pub mod hashing;
22mod named_from;
23pub mod prelude;
24#[cfg(feature = "random")]
25pub mod random;
26pub mod scalar;
27pub mod schema;
28#[cfg(feature = "serde")]
29pub mod serde;
30pub mod series;
31pub mod testing;
32#[cfg(test)]
33mod tests;
34
35use std::cell::{Cell, RefCell};
36use std::panic::AssertUnwindSafe;
37use std::sync::{LazyLock, Mutex};
38use std::time::{SystemTime, UNIX_EPOCH};
39
40pub use datatypes::SchemaExtPl;
41pub use hashing::IdBuildHasher;
42use rayon::{ThreadPool, ThreadPoolBuilder};
43
44pub static PROCESS_ID: LazyLock<u128> = LazyLock::new(|| {
45 SystemTime::now()
46 .duration_since(UNIX_EPOCH)
47 .unwrap()
48 .as_nanos()
49});
50
51pub struct POOL;
52
53#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
55thread_local! {
56 static ALLOW_THREADS: Cell<bool> = const { Cell::new(true) };
57 static NOOP_POOL: RefCell<ThreadPool> = RefCell::new(
58 ThreadPoolBuilder::new()
59 .use_current_thread()
60 .num_threads(1)
61 .build()
62 .expect("could not create no-op thread pool")
63 );
64}
65
66impl POOL {
67 pub fn install<OP, R>(&self, op: OP) -> R
68 where
69 OP: FnOnce() -> R + Send,
70 R: Send,
71 {
72 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
73 {
74 op()
75 }
76
77 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
78 {
79 self.with(|p| p.install(op))
80 }
81 }
82
83 pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
84 where
85 A: FnOnce() -> RA + Send,
86 B: FnOnce() -> RB + Send,
87 RA: Send,
88 RB: Send,
89 {
90 self.install(|| rayon::join(oper_a, oper_b))
91 }
92
93 pub fn scope<'scope, OP, R>(&self, op: OP) -> R
94 where
95 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
96 R: Send,
97 {
98 self.install(|| rayon::scope(op))
99 }
100
101 pub fn spawn<OP>(&self, op: OP)
102 where
103 OP: FnOnce() + Send + 'static,
104 {
105 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
106 {
107 rayon::spawn(op)
108 }
109
110 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
111 {
112 self.with(|p| {
113 p.spawn(op);
114 if p.current_num_threads() == 1 {
115 p.yield_now();
116 }
117 })
118 }
119 }
120
121 pub fn spawn_fifo<OP>(&self, op: OP)
122 where
123 OP: FnOnce() + Send + 'static,
124 {
125 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
126 {
127 rayon::spawn_fifo(op)
128 }
129
130 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
131 {
132 self.with(|p| {
133 p.spawn_fifo(op);
134 if p.current_num_threads() == 1 {
135 p.yield_now();
136 }
137 })
138 }
139 }
140
141 pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
142 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
143 {
144 None
145 }
146
147 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
148 {
149 self.with(|p| p.current_thread_has_pending_tasks())
150 }
151 }
152
153 pub fn current_thread_index(&self) -> Option<usize> {
154 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
155 {
156 rayon::current_thread_index()
157 }
158
159 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
160 {
161 self.with(|p| p.current_thread_index())
162 }
163 }
164
165 pub fn current_num_threads(&self) -> usize {
166 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
167 {
168 rayon::current_num_threads()
169 }
170
171 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
172 {
173 self.with(|p| p.current_num_threads())
174 }
175 }
176
177 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
178 pub fn with<OP, R>(&self, op: OP) -> R
179 where
180 OP: FnOnce(&ThreadPool) -> R + Send,
181 R: Send,
182 {
183 if ALLOW_THREADS.get() || THREAD_POOL.current_thread_index().is_some() {
184 op(&THREAD_POOL)
185 } else {
186 NOOP_POOL.with(|v| op(&v.borrow()))
187 }
188 }
189
190 pub fn without_threading<R>(&self, op: impl FnOnce() -> R) -> R {
191 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
192 {
193 op()
194 }
195
196 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
197 {
198 if THREAD_POOL.current_thread_index().is_some() {
200 op()
201 } else {
202 let prev = ALLOW_THREADS.replace(false);
203 let result = std::panic::catch_unwind(AssertUnwindSafe(op));
205 ALLOW_THREADS.set(prev);
206 match result {
207 Ok(v) => v,
208 Err(p) => std::panic::resume_unwind(p),
209 }
210 }
211 }
212 }
213}
214
215#[cfg(not(target_family = "wasm"))] pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
218 let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
219 ThreadPoolBuilder::new()
220 .num_threads(
221 std::env::var("POLARS_MAX_THREADS")
222 .map(|s| s.parse::<usize>().expect("integer"))
223 .unwrap_or_else(|_| {
224 std::thread::available_parallelism()
225 .unwrap_or(std::num::NonZeroUsize::new(1).unwrap())
226 .get()
227 }),
228 )
229 .thread_name(move |i| format!("{thread_name}-{i}"))
230 .build()
231 .expect("could not spawn threads")
232});
233
234#[cfg(all(target_os = "emscripten", target_family = "wasm"))] pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
236 ThreadPoolBuilder::new()
237 .num_threads(1)
238 .use_current_thread()
239 .build()
240 .expect("could not create pool")
241});
242
243pub static SINGLE_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
245
246pub(crate) const HEAD_DEFAULT_LENGTH: usize = 10;
248pub(crate) const TAIL_DEFAULT_LENGTH: usize = 10;
250pub const CHEAP_SERIES_HASH_LIMIT: usize = 1000;