Skip to main content

adze_concurrency_plan_core/
lib.rs

1//! Pure policy helpers for bounded parallel partition planning.
2
3#![forbid(unsafe_op_in_unsafe_fn)]
4#![deny(missing_docs)]
5#![cfg_attr(feature = "strict_api", deny(unreachable_pub))]
6#![cfg_attr(not(feature = "strict_api"), warn(unreachable_pub))]
7#![cfg_attr(feature = "strict_docs", deny(missing_docs))]
8#![cfg_attr(not(feature = "strict_docs"), allow(missing_docs))]
9
10pub use adze_concurrency_normalize_core::{MIN_CONCURRENCY, normalized_concurrency};
11
12/// Workloads at or below `concurrency * DIRECT_PARALLEL_THRESHOLD_MULTIPLIER`
13/// prefer direct parallel iteration over chunk partitioning.
14pub const DIRECT_PARALLEL_THRESHOLD_MULTIPLIER: usize = 2;
15
16/// Planning metadata for bounded parallel partitioning.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct ParallelPartitionPlan {
19    /// Effective non-zero concurrency used by the plan.
20    pub concurrency: usize,
21    /// Chunk size to use for partitioned processing. Guaranteed to be at least `1`.
22    pub chunk_size: usize,
23    /// Whether direct parallel iteration is preferred over chunk partitioning.
24    pub use_direct_parallel_iter: bool,
25}
26
27impl ParallelPartitionPlan {
28    /// Build a partition plan for `item_count` items and requested concurrency.
29    #[must_use]
30    pub fn for_item_count(item_count: usize, requested_concurrency: usize) -> Self {
31        let concurrency = normalized_concurrency(requested_concurrency);
32        let use_direct_parallel_iter =
33            item_count <= concurrency.saturating_mul(DIRECT_PARALLEL_THRESHOLD_MULTIPLIER);
34        let chunk_size = if item_count == 0 {
35            1
36        } else {
37            item_count.div_ceil(concurrency)
38        };
39
40        Self {
41            concurrency,
42            chunk_size,
43            use_direct_parallel_iter,
44        }
45    }
46}
47
48#[cfg(test)]
49mod tests {
50    use super::*;
51
52    #[test]
53    fn normalized_concurrency_is_never_zero() {
54        assert_eq!(normalized_concurrency(0), 1);
55        assert_eq!(normalized_concurrency(1), 1);
56        assert_eq!(normalized_concurrency(8), 8);
57    }
58
59    #[test]
60    fn plan_for_empty_work_is_safe() {
61        let plan = ParallelPartitionPlan::for_item_count(0, 0);
62        assert_eq!(plan.concurrency, 1);
63        assert_eq!(plan.chunk_size, 1);
64        assert!(plan.use_direct_parallel_iter);
65    }
66
67    #[test]
68    fn plan_for_large_work_uses_chunking() {
69        let plan = ParallelPartitionPlan::for_item_count(257, 4);
70        assert_eq!(plan.concurrency, 4);
71        assert_eq!(plan.chunk_size, 65);
72        assert!(!plan.use_direct_parallel_iter);
73    }
74}