1use std::env;
9use std::sync::OnceLock;
10
11use rayon::{ThreadPool, ThreadPoolBuilder};
12
13const ENV_MAX_THREADS: &str = "LLKV_MAX_THREADS";
14
15fn detected_thread_count() -> usize {
16 match std::thread::available_parallelism() {
17 Ok(nz) => nz.get(),
18 Err(_) => 1,
19 }
20}
21
22fn configured_thread_count() -> usize {
23 let default = detected_thread_count();
24 match env::var(ENV_MAX_THREADS) {
25 Ok(raw) => match raw.trim().parse::<usize>() {
26 Ok(value) if value > 0 => value,
27 _ => default,
28 },
29 Err(_) => default,
30 }
31}
32
33fn build_pool() -> ThreadPool {
34 let limit = configured_thread_count();
35 ThreadPoolBuilder::new()
36 .num_threads(limit)
37 .thread_name(|idx| format!("llkv-worker-{idx}"))
38 .build()
39 .unwrap_or_else(|_| {
40 ThreadPoolBuilder::new()
41 .build()
42 .expect("failed to build rayon thread pool")
43 })
44}
45
46fn pool() -> &'static ThreadPool {
47 static POOL: OnceLock<ThreadPool> = OnceLock::new();
48 POOL.get_or_init(build_pool)
49}
50
51fn log_pool_size_once() {
52 static LOGGED: OnceLock<()> = OnceLock::new();
53 LOGGED.get_or_init(|| {
54 let count = pool().current_num_threads();
55 tracing::debug!(
56 "[llkv-threading] Rayon pool initialized with {count} threads (LLKV_MAX_THREADS={})",
57 env::var(ENV_MAX_THREADS).unwrap_or_else(|_| "<unset>".into())
58 );
59 });
60}
61
62pub fn with_thread_pool<F, R>(f: F) -> R
76where
77 F: FnOnce() -> R + Send,
78 R: Send,
79{
80 log_pool_size_once();
81 pool().install(f)
82}
83
84pub fn current_thread_count() -> usize {
90 pool().current_num_threads()
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96
97 #[test]
98 fn env_override_zero_defaults() {
99 let prev = env::var(ENV_MAX_THREADS).ok();
100 unsafe {
102 env::set_var(ENV_MAX_THREADS, "0");
103 }
104 let count = configured_thread_count();
105 if let Some(prev) = prev {
106 unsafe {
107 env::set_var(ENV_MAX_THREADS, prev);
108 }
109 } else {
110 unsafe {
111 env::remove_var(ENV_MAX_THREADS);
112 }
113 }
114 assert!(count >= 1);
115 }
116}