llkv_threading/
lib.rs

1//! Helper utilities for Rayon thread-pool management shared across LLKV crates.
2//!
3//! Centralizing Rayon configuration ensures callers reuse a single bounded pool.
4//! This avoids opportunistic global overrides and keeps worker naming
5//! consistent. The helpers below expose a `with_thread_pool` entry point and a
6//! cheap accessor for the current thread count.
7
8use std::env;
9use std::sync::OnceLock;
10
11use rayon::{ThreadPool, ThreadPoolBuilder};
12
13const ENV_MAX_THREADS: &str = "LLKV_MAX_THREADS";
14
15fn detected_thread_count() -> usize {
16    match std::thread::available_parallelism() {
17        Ok(nz) => nz.get(),
18        Err(_) => 1,
19    }
20}
21
22fn configured_thread_count() -> usize {
23    let default = detected_thread_count();
24    match env::var(ENV_MAX_THREADS) {
25        Ok(raw) => match raw.trim().parse::<usize>() {
26            Ok(value) if value > 0 => value,
27            _ => default,
28        },
29        Err(_) => default,
30    }
31}
32
33fn build_pool() -> ThreadPool {
34    let limit = configured_thread_count();
35    ThreadPoolBuilder::new()
36        .num_threads(limit)
37        .thread_name(|idx| format!("llkv-worker-{idx}"))
38        .build()
39        .unwrap_or_else(|_| {
40            ThreadPoolBuilder::new()
41                .build()
42                .expect("failed to build rayon thread pool")
43        })
44}
45
46fn pool() -> &'static ThreadPool {
47    static POOL: OnceLock<ThreadPool> = OnceLock::new();
48    POOL.get_or_init(build_pool)
49}
50
51fn log_pool_size_once() {
52    static LOGGED: OnceLock<()> = OnceLock::new();
53    LOGGED.get_or_init(|| {
54        let count = pool().current_num_threads();
55        tracing::debug!(
56            "[llkv-threading] Rayon pool initialized with {count} threads (LLKV_MAX_THREADS={})",
57            env::var(ENV_MAX_THREADS).unwrap_or_else(|_| "<unset>".into())
58        );
59    });
60}
61
62/// Execute the provided closure within the shared Rayon thread pool.
63///
64/// This helper ensures all parallel work within the workspace shares the same
65/// worker configuration (thread count, naming). The pool size can be capped via
66/// the `LLKV_MAX_THREADS` environment variable; non-positive values fall back to
67/// the detected hardware parallelism.
68///
69/// # Arguments
70/// - `f`: Closure to run inside the pool.
71///
72/// # Panics
73/// Panics only if the global pool fails to initialize, which indicates a bug in
74/// the underlying Rayon builder.
75pub fn with_thread_pool<F, R>(f: F) -> R
76where
77    F: FnOnce() -> R + Send,
78    R: Send,
79{
80    log_pool_size_once();
81    pool().install(f)
82}
83
84/// Return the number of worker threads currently active in the shared pool.
85///
86/// This value reflects either hardware parallelism or the override supplied via
87/// `LLKV_MAX_THREADS`. The count may change if the environment variable differs
88/// between runs, but remains stable for the lifetime of the process.
89pub fn current_thread_count() -> usize {
90    pool().current_num_threads()
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96
97    #[test]
98    fn env_override_zero_defaults() {
99        let prev = env::var(ENV_MAX_THREADS).ok();
100        // Unsafe is fine for test-only env mutation in a single-threaded context.
101        unsafe {
102            env::set_var(ENV_MAX_THREADS, "0");
103        }
104        let count = configured_thread_count();
105        if let Some(prev) = prev {
106            unsafe {
107                env::set_var(ENV_MAX_THREADS, prev);
108            }
109        } else {
110            unsafe {
111                env::remove_var(ENV_MAX_THREADS);
112            }
113        }
114        assert!(count >= 1);
115    }
116}