Skip to main content

scirs2_core/distributed/
primitives.rs

1//! Distributed computing primitives for SciRS2 Core
2//!
3//! This module provides low-level, local distributed computing primitives that
4//! complement the cluster management infrastructure in the parent module.
5//!
6//! ## Overview
7//!
8//! - [`WorkQueue`] / [`WorkReceiver`]: Bounded, multi-producer channel-based task
9//!   distribution with backpressure
10//! - [`WorkerPool`]: Fixed-size thread pool for parallel task execution with result
11//!   collection
12//! - [`distributed_map`]: Parallel map over a `Vec<T>` using a worker pool
13//! - [`distributed_map_reduce`]: Parallel map followed by serial reduce
14//! - [`chunked_parallel_process`]: Slice-based chunked parallel processing
15//! - [`ResourceMonitor`]: Heuristic CPU/memory resource availability monitor
16//!
17//! All primitives use `std::sync::mpsc` channels; no async runtime is required.
18
19use crate::error::{CoreError, CoreResult, ErrorContext};
20use std::sync::mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TryRecvError, TrySendError};
21use std::sync::{Arc, Mutex};
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25// ─────────────────────────────────────────────────────────────────────────────
26// Error type
27// ─────────────────────────────────────────────────────────────────────────────
28
29/// Errors specific to distributed-primitive operations.
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub enum DistributedError {
32    /// The work queue is full (bounded queue at capacity).
33    QueueFull,
34    /// The channel has been disconnected (all senders/receivers dropped).
35    Disconnected,
36    /// A worker thread panicked.
37    WorkerPanic(String),
38    /// The operation timed out.
39    Timeout,
40    /// Invalid argument (e.g. zero workers, zero chunk size).
41    InvalidArgument(String),
42    /// Internal mutex/lock poisoned.
43    PoisonedLock,
44}
45
46impl std::fmt::Display for DistributedError {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            DistributedError::QueueFull => write!(f, "work queue is full"),
50            DistributedError::Disconnected => write!(f, "channel disconnected"),
51            DistributedError::WorkerPanic(msg) => write!(f, "worker panicked: {msg}"),
52            DistributedError::Timeout => write!(f, "operation timed out"),
53            DistributedError::InvalidArgument(msg) => write!(f, "invalid argument: {msg}"),
54            DistributedError::PoisonedLock => write!(f, "mutex lock poisoned"),
55        }
56    }
57}
58
59impl std::error::Error for DistributedError {}
60
61impl From<DistributedError> for CoreError {
62    fn from(err: DistributedError) -> Self {
63        CoreError::ComputationError(ErrorContext::new(err.to_string()))
64    }
65}
66
67// ─────────────────────────────────────────────────────────────────────────────
68// WorkQueue / WorkReceiver
69// ─────────────────────────────────────────────────────────────────────────────
70
71/// Bounded, multi-producer work queue (sending half).
72///
73/// Items sent to a `WorkQueue` may be received by exactly one [`WorkReceiver`].
74/// The queue is backed by [`std::sync::mpsc::sync_channel`], providing natural
75/// backpressure: blocking `push` waits until a slot is free; non-blocking
76/// `try_push` returns `Ok(false)` when the queue is at capacity.
77///
78/// # Clone behaviour
79///
80/// Cloning a `WorkQueue` creates an additional sender to the same channel —
81/// multiple threads may push concurrently.
82///
83/// # Example
84///
85/// ```rust
86/// use scirs2_core::distributed::primitives::WorkQueue;
87///
88/// let (queue, receiver) = WorkQueue::<i32>::new(8).expect("queue creation failed");
89/// queue.push(42).expect("push failed");
90/// let item = receiver.recv().expect("recv returned None");
91/// assert_eq!(item, 42);
92/// ```
93#[derive(Debug, Clone)]
94pub struct WorkQueue<T: Send + 'static> {
95    sender: SyncSender<T>,
96    /// Approximate live item count, maintained on every push/pop.
97    len: Arc<Mutex<usize>>,
98    capacity: usize,
99}
100
101/// The consumer half of a [`WorkQueue`].
102///
103/// There can only be one receiver per queue.  All blocking and non-blocking
104/// receive methods decrement the internal item counter on success.
105pub struct WorkReceiver<T: Send + 'static> {
106    receiver: Receiver<T>,
107    len: Arc<Mutex<usize>>,
108}
109
110impl<T: Send + 'static> WorkQueue<T> {
111    /// Create a new bounded work queue with the given `capacity`.
112    ///
113    /// Returns `(WorkQueue, WorkReceiver)` on success.
114    ///
115    /// # Errors
116    ///
117    /// [`DistributedError::InvalidArgument`] when `capacity == 0`.
118    pub fn new(capacity: usize) -> Result<(Self, WorkReceiver<T>), DistributedError> {
119        if capacity == 0 {
120            return Err(DistributedError::InvalidArgument(
121                "capacity must be > 0".to_string(),
122            ));
123        }
124        let (tx, rx) = mpsc::sync_channel::<T>(capacity);
125        let len = Arc::new(Mutex::new(0usize));
126        let queue = WorkQueue {
127            sender: tx,
128            len: Arc::clone(&len),
129            capacity,
130        };
131        let receiver = WorkReceiver { receiver: rx, len };
132        Ok((queue, receiver))
133    }
134
135    /// Blocking push.  Waits until a slot is available.
136    ///
137    /// # Errors
138    ///
139    /// [`DistributedError::Disconnected`] if the [`WorkReceiver`] has been dropped.
140    pub fn push(&self, task: T) -> Result<(), DistributedError> {
141        self.sender
142            .send(task)
143            .map_err(|_| DistributedError::Disconnected)?;
144        if let Ok(mut guard) = self.len.lock() {
145            *guard = guard.saturating_add(1);
146        }
147        Ok(())
148    }
149
150    /// Non-blocking push.
151    ///
152    /// Returns:
153    /// - `Ok(true)` — the task was enqueued successfully.
154    /// - `Ok(false)` — the queue was full; the task was **not** enqueued.
155    ///
156    /// # Errors
157    ///
158    /// [`DistributedError::Disconnected`] if the [`WorkReceiver`] has been dropped.
159    pub fn try_push(&self, task: T) -> Result<bool, DistributedError> {
160        match self.sender.try_send(task) {
161            Ok(()) => {
162                if let Ok(mut guard) = self.len.lock() {
163                    *guard = guard.saturating_add(1);
164                }
165                Ok(true)
166            }
167            Err(TrySendError::Full(_)) => Ok(false),
168            Err(TrySendError::Disconnected(_)) => Err(DistributedError::Disconnected),
169        }
170    }
171
172    /// Approximate number of items currently in the queue.
173    pub fn len(&self) -> usize {
174        self.len.lock().map(|g| *g).unwrap_or(0)
175    }
176
177    /// Returns `true` if the queue contains no items.
178    pub fn is_empty(&self) -> bool {
179        self.len() == 0
180    }
181
182    /// Maximum capacity of this queue.
183    pub fn capacity(&self) -> usize {
184        self.capacity
185    }
186}
187
188impl<T: Send + 'static> WorkReceiver<T> {
189    /// Blocking receive.  Returns `None` when all senders have been dropped.
190    pub fn recv(&self) -> Option<T> {
191        match self.receiver.recv() {
192            Ok(item) => {
193                if let Ok(mut guard) = self.len.lock() {
194                    *guard = guard.saturating_sub(1);
195                }
196                Some(item)
197            }
198            Err(_) => None,
199        }
200    }
201
202    /// Receive with a timeout.  Returns `None` on timeout or disconnection.
203    pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
204        match self.receiver.recv_timeout(timeout) {
205            Ok(item) => {
206                if let Ok(mut guard) = self.len.lock() {
207                    *guard = guard.saturating_sub(1);
208                }
209                Some(item)
210            }
211            Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => None,
212        }
213    }
214
215    /// Non-blocking receive.  Returns `None` if the queue is currently empty.
216    pub fn try_recv(&self) -> Option<T> {
217        match self.receiver.try_recv() {
218            Ok(item) => {
219                if let Ok(mut guard) = self.len.lock() {
220                    *guard = guard.saturating_sub(1);
221                }
222                Some(item)
223            }
224            Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => None,
225        }
226    }
227}
228
229// ─────────────────────────────────────────────────────────────────────────────
230// WorkerPool
231// ─────────────────────────────────────────────────────────────────────────────
232
233/// A fixed-size thread pool that processes tasks of type `T` and emits
234/// results of type `R`.
235///
236/// Tasks are distributed to worker threads via an internal `sync_channel`;
237/// results are collected through a standard `mpsc::channel`.
238///
239/// # Example
240///
241/// ```rust
242/// use scirs2_core::distributed::primitives::WorkerPool;
243/// use std::time::Duration;
244///
245/// let pool = WorkerPool::new(4, |x: i32| x * x).expect("pool creation failed");
246/// pool.submit(7).expect("submit failed");
247/// let result = pool.collect_result(Some(Duration::from_secs(5)));
248/// assert_eq!(result, Some(49));
249/// pool.shutdown();
250/// ```
251pub struct WorkerPool<T: Send + 'static, R: Send + 'static> {
252    n_workers: usize,
253    handles: Vec<JoinHandle<()>>,
254    task_sender: SyncSender<Option<T>>,
255    result_receiver: Receiver<R>,
256}
257
258impl<T: Send + 'static, R: Send + 'static> WorkerPool<T, R> {
259    /// Create a new pool with `n_workers` threads.
260    ///
261    /// Each thread calls `worker_fn(task)` and sends the result to the
262    /// pool's internal result channel.
263    ///
264    /// A `None` sentinel in the task channel signals workers to stop; the pool
265    /// sends one sentinel per worker on [`shutdown`](WorkerPool::shutdown).
266    ///
267    /// # Errors
268    ///
269    /// [`DistributedError::InvalidArgument`] if `n_workers == 0`.
270    pub fn new<F>(n_workers: usize, worker_fn: F) -> Result<Self, DistributedError>
271    where
272        F: Fn(T) -> R + Send + Clone + 'static,
273    {
274        if n_workers == 0 {
275            return Err(DistributedError::InvalidArgument(
276                "n_workers must be > 0".to_string(),
277            ));
278        }
279
280        // Bounded task channel — provides natural backpressure to submitters.
281        let buffer = n_workers.saturating_mul(4).max(4);
282        let (task_tx, task_rx) = mpsc::sync_channel::<Option<T>>(buffer);
283        let (result_tx, result_rx) = mpsc::channel::<R>();
284
285        // Share the receiver so all workers compete for tasks (work-stealing
286        // within a single process).
287        let shared_rx = Arc::new(Mutex::new(task_rx));
288
289        let mut handles = Vec::with_capacity(n_workers);
290        for _ in 0..n_workers {
291            let shared_rx = Arc::clone(&shared_rx);
292            let result_tx = result_tx.clone();
293            let fn_clone = worker_fn.clone();
294
295            let handle = thread::spawn(move || loop {
296                let task = {
297                    // Hold the lock only while dequeuing, not while processing.
298                    let guard = match shared_rx.lock() {
299                        Ok(g) => g,
300                        Err(_) => break, // poisoned lock → exit
301                    };
302                    match guard.recv() {
303                        Ok(Some(t)) => t,
304                        Ok(None) | Err(_) => break, // shutdown sentinel or disconnect
305                    }
306                };
307                let result = fn_clone(task);
308                if result_tx.send(result).is_err() {
309                    break; // result channel closed → no point continuing
310                }
311            });
312            handles.push(handle);
313        }
314
315        Ok(WorkerPool {
316            n_workers,
317            handles,
318            task_sender: task_tx,
319            result_receiver: result_rx,
320        })
321    }
322
323    /// Number of worker threads in this pool.
324    pub fn n_workers(&self) -> usize {
325        self.n_workers
326    }
327
328    /// Submit a task to the pool.
329    ///
330    /// Blocks briefly if the internal task buffer (capacity `n_workers * 4`)
331    /// is full.
332    ///
333    /// # Errors
334    ///
335    /// [`DistributedError::Disconnected`] if all workers have exited.
336    pub fn submit(&self, task: T) -> Result<(), DistributedError> {
337        self.task_sender
338            .send(Some(task))
339            .map_err(|_| DistributedError::Disconnected)
340    }
341
342    /// Collect one result.
343    ///
344    /// - `timeout = None` → block indefinitely.
345    /// - `timeout = Some(d)` → return `None` if no result arrives within `d`.
346    pub fn collect_result(&self, timeout: Option<Duration>) -> Option<R> {
347        match timeout {
348            None => self.result_receiver.recv().ok(),
349            Some(d) => match self.result_receiver.recv_timeout(d) {
350                Ok(r) => Some(r),
351                Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => None,
352            },
353        }
354    }
355
356    /// Collect up to `expected` results, waiting at most `timeout` per result.
357    ///
358    /// Returns fewer items if individual receives time out.
359    pub fn collect_all(&self, expected: usize, timeout: Duration) -> Vec<R> {
360        let mut results = Vec::with_capacity(expected);
361        for _ in 0..expected {
362            match self.collect_result(Some(timeout)) {
363                Some(r) => results.push(r),
364                None => break,
365            }
366        }
367        results
368    }
369
370    /// Signal all workers to stop and join their threads.
371    ///
372    /// In-flight tasks complete normally; tasks still in the buffer may be
373    /// discarded once a worker picks up its sentinel.
374    pub fn shutdown(self) {
375        for _ in 0..self.n_workers {
376            // Best-effort; ignore errors if workers have already exited.
377            let _ = self.task_sender.send(None);
378        }
379        for handle in self.handles {
380            let _ = handle.join();
381        }
382    }
383}
384
385// ─────────────────────────────────────────────────────────────────────────────
386// Distributed map / map-reduce / chunked processing
387// ─────────────────────────────────────────────────────────────────────────────
388
389/// Parallel map over `data`, applying `map_fn` using `n_workers` threads.
390///
391/// Results are returned in **input order** (not completion order).
392///
393/// # Fallback
394///
395/// If `n_workers == 0`, it is silently clamped to 1.
396///
397/// # Example
398///
399/// ```rust
400/// use scirs2_core::distributed::primitives::distributed_map;
401///
402/// let data: Vec<i32> = (1..=8).collect();
403/// let squares = distributed_map(data, |x| x * x, 4);
404/// assert_eq!(squares, vec![1, 4, 9, 16, 25, 36, 49, 64]);
405/// ```
406pub fn distributed_map<T, R, F>(data: Vec<T>, map_fn: F, n_workers: usize) -> Vec<R>
407where
408    T: Send + 'static,
409    R: Send + 'static,
410    F: Fn(T) -> R + Send + Clone + 'static,
411{
412    let workers = n_workers.max(1);
413    let n = data.len();
414    if n == 0 {
415        return Vec::new();
416    }
417
418    // We tag each item with its original index so we can reorder the output.
419    let pool: WorkerPool<(usize, T), (usize, R)> =
420        WorkerPool::new(workers, move |(idx, item)| (idx, map_fn(item))).unwrap_or_else(|_| {
421            // Unreachable: workers >= 1 guarantees success.
422            panic!("internal error: WorkerPool::new failed with workers >= 1")
423        });
424
425    for (idx, item) in data.into_iter().enumerate() {
426        if pool.submit((idx, item)).is_err() {
427            break; // workers exited early — stop submitting
428        }
429    }
430
431    // Collect with a generous per-result timeout; in practice results arrive
432    // quickly but we avoid hanging forever on a misbehaving function.
433    let raw = pool.collect_all(n, Duration::from_secs(120));
434    pool.shutdown();
435
436    // Reorder to match input order.
437    let mut results: Vec<Option<R>> = (0..n).map(|_| None).collect();
438    for (idx, result) in raw {
439        if idx < results.len() {
440            results[idx] = Some(result);
441        }
442    }
443
444    results.into_iter().flatten().collect()
445}
446
447/// Parallel map followed by serial reduce.
448///
449/// The map phase distributes `map_fn` across `n_workers` threads; reduction
450/// is performed sequentially on the collected (ordered) results.
451///
452/// # Example
453///
454/// ```rust
455/// use scirs2_core::distributed::primitives::distributed_map_reduce;
456///
457/// let data: Vec<i32> = (1..=100).collect();
458/// let sum = distributed_map_reduce(data, |x| x as i64, |acc, r| acc + r, 0i64, 4);
459/// assert_eq!(sum, 5050i64);
460/// ```
461pub fn distributed_map_reduce<T, R, S, F, G>(
462    data: Vec<T>,
463    map_fn: F,
464    reduce_fn: G,
465    initial: S,
466    n_workers: usize,
467) -> S
468where
469    T: Send + 'static,
470    R: Send + 'static,
471    S: Send + Clone + 'static,
472    F: Fn(T) -> R + Send + Clone + 'static,
473    G: Fn(S, R) -> S + Send + Clone + 'static,
474{
475    let mapped = distributed_map(data, map_fn, n_workers);
476    mapped.into_iter().fold(initial, reduce_fn)
477}
478
479/// Parallel processing of `data` divided into `chunk_size` slices.
480///
481/// Each chunk is processed by `process_fn` on one of `n_workers` threads.
482/// The flat results from all chunks are concatenated in input order.
483///
484/// # Type constraints
485///
486/// `T` must implement `Clone` because each chunk is cloned into an owned
487/// `Arc<Vec<T>>` before being sent to a worker thread.
488///
489/// # Argument clamping
490///
491/// - `chunk_size == 0` is clamped to 1.
492/// - `n_workers == 0` is clamped to 1.
493///
494/// # Example
495///
496/// ```rust
497/// use scirs2_core::distributed::primitives::chunked_parallel_process;
498///
499/// let data: Vec<i32> = (1..=12).collect();
500/// let doubled = chunked_parallel_process(
501///     &data,
502///     |chunk| chunk.iter().map(|&x| x * 2).collect(),
503///     4,
504///     3,
505/// );
506/// assert_eq!(doubled, vec![2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24]);
507/// ```
508pub fn chunked_parallel_process<T, R, F>(
509    data: &[T],
510    process_fn: F,
511    chunk_size: usize,
512    n_workers: usize,
513) -> Vec<R>
514where
515    T: Send + Sync + Clone + 'static,
516    R: Send + 'static,
517    F: Fn(&[T]) -> Vec<R> + Send + Clone + 'static,
518{
519    let effective_chunk = chunk_size.max(1);
520    let effective_workers = n_workers.max(1);
521
522    if data.is_empty() {
523        return Vec::new();
524    }
525
526    // Build owned Vec chunks wrapped in Arc so they can be sent to workers.
527    // We clone the slice data once per chunk; this is necessary because we need
528    // 'static ownership to move into threads.
529    let chunks: Vec<Arc<Vec<T>>> = data
530        .chunks(effective_chunk)
531        .map(|c| Arc::new(c.to_vec()))
532        .collect();
533
534    let n_chunks = chunks.len();
535
536    // Workers receive (chunk_index, Arc<Vec<T>>) and return (chunk_index, Vec<R>).
537    type TaskItem<T> = (usize, Arc<Vec<T>>);
538    type ResultItem<R> = (usize, Vec<R>);
539
540    let pool: WorkerPool<TaskItem<T>, ResultItem<R>> =
541        WorkerPool::new(effective_workers, move |task: TaskItem<T>| {
542            let (idx, chunk) = task;
543            (idx, process_fn(&chunk))
544        })
545        .unwrap_or_else(|_| panic!("internal error: WorkerPool::new failed with workers >= 1"));
546
547    for (idx, chunk) in chunks.into_iter().enumerate() {
548        if pool.submit((idx, chunk)).is_err() {
549            break;
550        }
551    }
552
553    let raw = pool.collect_all(n_chunks, Duration::from_secs(120));
554    pool.shutdown();
555
556    // Reorder to match input order.
557    let mut results: Vec<Option<Vec<R>>> = (0..n_chunks).map(|_| None).collect();
558    for (idx, chunk_result) in raw {
559        if idx < results.len() {
560            results[idx] = Some(chunk_result);
561        }
562    }
563
564    results.into_iter().flatten().flatten().collect()
565}
566
567// ─────────────────────────────────────────────────────────────────────────────
568// ResourceMonitor
569// ─────────────────────────────────────────────────────────────────────────────
570
571/// Heuristic resource availability monitor.
572///
573/// Uses [`std::thread::available_parallelism`] to detect the logical CPU count
574/// and applies simple thresholds to calculate recommended parallelism.
575///
576/// This is a **local** resource monitor; it does not query remote cluster
577/// nodes.  For cluster-level resource management see
578/// [`super::ClusterManager`].
579///
580/// # Example
581///
582/// ```rust
583/// use scirs2_core::distributed::primitives::ResourceMonitor;
584///
585/// let monitor = ResourceMonitor::new(0.8, 1_000_000_000);
586/// let workers = monitor.available_workers();
587/// assert!(workers >= 1);
588/// let chunk = monitor.recommended_chunk_size(1_000_000);
589/// assert!(chunk >= 64);
590/// assert!(monitor.can_submit());
591/// ```
592#[derive(Debug, Clone)]
593pub struct ResourceMonitor {
594    /// Fraction of logical CPUs to use (0.0–1.0).
595    cpu_threshold: f64,
596    /// Memory threshold in bytes (reserved for future real-time monitoring).
597    memory_threshold: usize,
598    /// Cached logical CPU count from `available_parallelism`.
599    logical_cpus: usize,
600}
601
602impl ResourceMonitor {
603    /// Create a new `ResourceMonitor`.
604    ///
605    /// - `cpu_threshold`: fraction of CPUs (0.0–1.0) to allocate.
606    ///   Values outside `[0, 1]` are clamped.
607    /// - `memory_threshold`: maximum estimated memory usage (bytes) before
608    ///   `can_submit` would return `false` in a future implementation.
609    pub fn new(cpu_threshold: f64, memory_threshold: usize) -> Self {
610        let logical_cpus = std::thread::available_parallelism()
611            .map(|n| n.get())
612            .unwrap_or(1);
613        ResourceMonitor {
614            cpu_threshold: cpu_threshold.clamp(0.0, 1.0),
615            memory_threshold,
616            logical_cpus,
617        }
618    }
619
620    /// Number of logical CPUs detected on this machine.
621    pub fn logical_cpu_count(&self) -> usize {
622        self.logical_cpus
623    }
624
625    /// Recommended number of worker threads.
626    ///
627    /// Returns `max(1, floor(cpu_threshold × logical_cpus))`.
628    pub fn available_workers(&self) -> usize {
629        let n = (self.cpu_threshold * self.logical_cpus as f64).floor() as usize;
630        n.max(1)
631    }
632
633    /// Recommended chunk size for `total_work` items given the current worker count.
634    ///
635    /// Targets 4 chunks per worker (good for work-stealing granularity) with a
636    /// minimum of 64 items per chunk.
637    pub fn recommended_chunk_size(&self, total_work: usize) -> usize {
638        if total_work == 0 {
639            return 64;
640        }
641        let workers = self.available_workers();
642        let target_chunks = workers.saturating_mul(4).max(1);
643        (total_work / target_chunks).max(64)
644    }
645
646    /// Returns `true` when the system appears to have capacity headroom.
647    ///
648    /// The current implementation always returns `true` because querying
649    /// OS-level CPU and memory metrics portably and without unsafe code
650    /// requires platform-specific system calls beyond scope here.  The API
651    /// is provided for future enhancement and testing.
652    pub fn can_submit(&self) -> bool {
653        true
654    }
655
656    /// CPU utilisation threshold (0.0–1.0).
657    pub fn cpu_threshold(&self) -> f64 {
658        self.cpu_threshold
659    }
660
661    /// Memory threshold in bytes.
662    pub fn memory_threshold(&self) -> usize {
663        self.memory_threshold
664    }
665}
666
667// ─────────────────────────────────────────────────────────────────────────────
668// Extension trait
669// ─────────────────────────────────────────────────────────────────────────────
670
671/// Extension trait that adds distributed-aware parallel processing to slices.
672pub trait DistributedSliceExt<T> {
673    /// Divide the slice into chunks, process each chunk in parallel, and
674    /// concatenate the results in input order.
675    fn distributed_process<R, F>(
676        &self,
677        process_fn: F,
678        chunk_size: usize,
679        n_workers: usize,
680    ) -> Vec<R>
681    where
682        T: Send + Sync + Clone + 'static,
683        R: Send + 'static,
684        F: Fn(&[T]) -> Vec<R> + Send + Clone + 'static;
685}
686
687impl<T: Send + Sync + Clone + 'static> DistributedSliceExt<T> for [T] {
688    fn distributed_process<R, F>(
689        &self,
690        process_fn: F,
691        chunk_size: usize,
692        n_workers: usize,
693    ) -> Vec<R>
694    where
695        R: Send + 'static,
696        F: Fn(&[T]) -> Vec<R> + Send + Clone + 'static,
697    {
698        chunked_parallel_process(self, process_fn, chunk_size, n_workers)
699    }
700}
701
702// ─────────────────────────────────────────────────────────────────────────────
703// CoreResult-returning wrappers
704// ─────────────────────────────────────────────────────────────────────────────
705
706/// Like [`distributed_map`] but returns [`CoreResult`]`<Vec<R>>`.
707///
708/// This wrapper always succeeds unless `n_workers == 0` (which is silently
709/// clamped to 1); it exists for consistent error-handling at call sites.
710pub fn try_distributed_map<T, R, F>(data: Vec<T>, map_fn: F, n_workers: usize) -> CoreResult<Vec<R>>
711where
712    T: Send + 'static,
713    R: Send + 'static,
714    F: Fn(T) -> R + Send + Clone + 'static,
715{
716    Ok(distributed_map(data, map_fn, n_workers))
717}
718
719/// Like [`distributed_map_reduce`] but returns [`CoreResult`]`<S>`.
720///
721/// This wrapper always succeeds; it exists for consistent error-handling.
722pub fn try_distributed_map_reduce<T, R, S, F, G>(
723    data: Vec<T>,
724    map_fn: F,
725    reduce_fn: G,
726    initial: S,
727    n_workers: usize,
728) -> CoreResult<S>
729where
730    T: Send + 'static,
731    R: Send + 'static,
732    S: Send + Clone + 'static,
733    F: Fn(T) -> R + Send + Clone + 'static,
734    G: Fn(S, R) -> S + Send + Clone + 'static,
735{
736    Ok(distributed_map_reduce(
737        data, map_fn, reduce_fn, initial, n_workers,
738    ))
739}
740
741// ─────────────────────────────────────────────────────────────────────────────
742// Tests (≥ 20 comprehensive cases)
743// ─────────────────────────────────────────────────────────────────────────────
744
745#[cfg(test)]
746mod tests {
747    use super::*;
748    use std::sync::atomic::{AtomicUsize, Ordering};
749    use std::sync::Arc;
750    use std::time::Duration;
751
752    // ── WorkQueue ─────────────────────────────────────────────────────────────
753
754    #[test]
755    fn test_work_queue_basic_push_recv() {
756        let (queue, receiver) = WorkQueue::<i32>::new(8).expect("queue creation failed");
757        queue.push(42).expect("push failed");
758        let item = receiver.recv().expect("recv returned None");
759        assert_eq!(item, 42);
760    }
761
762    #[test]
763    fn test_work_queue_zero_capacity_is_error() {
764        let result = WorkQueue::<i32>::new(0);
765        assert!(matches!(result, Err(DistributedError::InvalidArgument(_))));
766    }
767
768    #[test]
769    fn test_work_queue_try_push_full() {
770        let (queue, _receiver) = WorkQueue::<i32>::new(2).expect("queue creation failed");
771        let r1 = queue.try_push(1).expect("try_push 1 failed");
772        let r2 = queue.try_push(2).expect("try_push 2 failed");
773        let r3 = queue
774            .try_push(3)
775            .expect("try_push 3 should return false when full");
776        assert!(r1, "first slot should be accepted");
777        assert!(r2, "second slot should be accepted");
778        assert!(!r3, "queue is full — should return false");
779    }
780
781    #[test]
782    fn test_work_queue_len_and_is_empty() {
783        let (queue, receiver) = WorkQueue::<u64>::new(16).expect("queue creation failed");
784        assert!(queue.is_empty(), "newly created queue must be empty");
785        queue.push(10).expect("push 10 failed");
786        queue.push(20).expect("push 20 failed");
787        assert_eq!(queue.len(), 2, "queue len should be 2 after two pushes");
788        receiver.recv();
789        assert_eq!(queue.len(), 1, "queue len should be 1 after one recv");
790    }
791
792    #[test]
793    fn test_work_queue_capacity() {
794        let (queue, _rx) = WorkQueue::<()>::new(32).expect("queue creation failed");
795        assert_eq!(queue.capacity(), 32);
796    }
797
798    #[test]
799    fn test_work_queue_disconnected_on_receiver_drop() {
800        let (queue, receiver) = WorkQueue::<i32>::new(4).expect("queue creation failed");
801        drop(receiver);
802        let err = queue.push(1);
803        assert!(matches!(err, Err(DistributedError::Disconnected)));
804    }
805
806    #[test]
807    fn test_work_receiver_recv_timeout_returns_none() {
808        let (_queue, receiver) = WorkQueue::<i32>::new(4).expect("queue creation failed");
809        let result = receiver.recv_timeout(Duration::from_millis(20));
810        assert!(
811            result.is_none(),
812            "should time out with nothing in the queue"
813        );
814    }
815
816    #[test]
817    fn test_work_receiver_try_recv_empty() {
818        let (_queue, receiver) = WorkQueue::<i32>::new(4).expect("queue creation failed");
819        assert!(
820            receiver.try_recv().is_none(),
821            "try_recv on empty queue must return None"
822        );
823    }
824
825    #[test]
826    fn test_work_queue_multiple_producers() {
827        let (queue, receiver) = WorkQueue::<i32>::new(128).expect("queue creation failed");
828        let ranges: Vec<(i32, i32)> = vec![(0, 10), (10, 20), (20, 30)];
829        let mut handles = Vec::new();
830        for (start, end) in ranges {
831            let q = queue.clone();
832            handles.push(std::thread::spawn(move || {
833                for i in start..end {
834                    q.push(i).expect("push failed");
835                }
836            }));
837        }
838        for h in handles {
839            h.join().expect("producer thread panicked");
840        }
841        // Drain all items
842        let mut items: Vec<i32> = Vec::new();
843        while let Some(x) = receiver.try_recv() {
844            items.push(x);
845        }
846        // Flush any remaining items from the channel after try_recv emptied the counter.
847        // (try_recv may miss a few items that arrived just after the counter read)
848        // Give a short grace period.
849        while let Some(x) = receiver.recv_timeout(Duration::from_millis(10)) {
850            items.push(x);
851        }
852        assert_eq!(
853            items.len(),
854            30,
855            "expected 30 items from three producers, got {}",
856            items.len()
857        );
858        items.sort_unstable();
859        assert_eq!(items, (0..30).collect::<Vec<_>>());
860    }
861
862    // ── WorkerPool ────────────────────────────────────────────────────────────
863
864    #[test]
865    fn test_worker_pool_basic_square() {
866        let pool = WorkerPool::new(2, |x: i32| x * 2).expect("pool creation failed");
867        pool.submit(3).expect("submit failed");
868        pool.submit(7).expect("submit failed");
869        let mut results = pool.collect_all(2, Duration::from_secs(5));
870        results.sort_unstable();
871        assert_eq!(results, vec![6, 14]);
872        pool.shutdown();
873    }
874
875    #[test]
876    fn test_worker_pool_zero_workers_is_error() {
877        let result = WorkerPool::<i32, i32>::new(0, |x| x);
878        assert!(
879            matches!(result, Err(DistributedError::InvalidArgument(_))),
880            "zero workers must be rejected"
881        );
882    }
883
884    #[test]
885    fn test_worker_pool_collect_result_none_on_timeout() {
886        let pool = WorkerPool::<i32, i32>::new(1, |x| x).expect("pool creation failed");
887        let result = pool.collect_result(Some(Duration::from_millis(30)));
888        assert!(result.is_none(), "nothing submitted → should timeout");
889        pool.shutdown();
890    }
891
892    #[test]
893    fn test_worker_pool_accumulates_correct_sum() {
894        let counter = Arc::new(AtomicUsize::new(0));
895        let counter_clone = Arc::clone(&counter);
896        let pool = WorkerPool::new(4, move |x: usize| {
897            counter_clone.fetch_add(x, Ordering::Relaxed);
898            x
899        })
900        .expect("pool creation failed");
901
902        for i in 0..20 {
903            pool.submit(i).expect("submit failed");
904        }
905        let _ = pool.collect_all(20, Duration::from_secs(5));
906        pool.shutdown();
907
908        // Sum 0..20 = 190
909        assert_eq!(counter.load(Ordering::Relaxed), 190);
910    }
911
912    #[test]
913    fn test_worker_pool_n_workers() {
914        let pool = WorkerPool::new(7, |x: i32| x).expect("pool creation failed");
915        assert_eq!(pool.n_workers(), 7);
916        pool.shutdown();
917    }
918
919    // ── distributed_map ───────────────────────────────────────────────────────
920
921    #[test]
922    fn test_distributed_map_empty_input() {
923        let result = distributed_map(Vec::<i32>::new(), |x| x * x, 4);
924        assert!(result.is_empty());
925    }
926
927    #[test]
928    fn test_distributed_map_preserves_order() {
929        let data: Vec<i32> = (1..=16).collect();
930        let result = distributed_map(data, |x| x * x, 4);
931        let expected: Vec<i32> = (1..=16).map(|x| x * x).collect();
932        assert_eq!(
933            result, expected,
934            "distributed_map must preserve input order"
935        );
936    }
937
938    #[test]
939    fn test_distributed_map_single_worker() {
940        let data: Vec<String> = (0..10).map(|i| format!("item-{i}")).collect();
941        let lens = distributed_map(data.clone(), |s| s.len(), 1);
942        let expected: Vec<usize> = data.iter().map(|s| s.len()).collect();
943        assert_eq!(lens, expected);
944    }
945
946    #[test]
947    fn test_distributed_map_zero_workers_clamped_to_one() {
948        let data: Vec<i32> = (0..5).collect();
949        // n_workers=0 must not panic; it is clamped to 1 internally.
950        let result = distributed_map(data, |x| x + 1, 0);
951        assert_eq!(result, vec![1, 2, 3, 4, 5]);
952    }
953
954    // ── distributed_map_reduce ────────────────────────────────────────────────
955
956    #[test]
957    fn test_distributed_map_reduce_sum() {
958        let data: Vec<i32> = (1..=100).collect();
959        let sum = distributed_map_reduce(data, |x| x as i64, |acc, r| acc + r, 0i64, 4);
960        assert_eq!(sum, 5050, "sum 1..100 must equal 5050");
961    }
962
963    #[test]
964    fn test_distributed_map_reduce_factorial_small() {
965        let data: Vec<u64> = (1..=5).collect();
966        let product = distributed_map_reduce(data, |x| x, |acc, r| acc * r, 1u64, 2);
967        assert_eq!(product, 120, "5! = 120");
968    }
969
970    #[test]
971    fn test_distributed_map_reduce_string_concat_order() {
972        let data: Vec<i32> = (0..5).collect();
973        let result = distributed_map_reduce(
974            data,
975            |x| x.to_string(),
976            |mut acc, r| {
977                acc.push_str(&r);
978                acc
979            },
980            String::new(),
981            2,
982        );
983        // Map phase orders by index; reduce is serial over ordered results.
984        assert_eq!(result, "01234");
985    }
986
987    // ── chunked_parallel_process ──────────────────────────────────────────────
988
989    #[test]
990    fn test_chunked_parallel_process_basic() {
991        let data: Vec<i32> = (1..=12).collect();
992        let doubled =
993            chunked_parallel_process(&data, |chunk| chunk.iter().map(|&x| x * 2).collect(), 4, 3);
994        assert_eq!(doubled, vec![2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24]);
995    }
996
997    #[test]
998    fn test_chunked_parallel_process_empty_input() {
999        let data: Vec<f64> = Vec::new();
1000        let result = chunked_parallel_process(&data, |c| c.to_vec(), 4, 2);
1001        assert!(result.is_empty());
1002    }
1003
1004    #[test]
1005    fn test_chunked_parallel_process_chunk_larger_than_data() {
1006        let data: Vec<i32> = (0..5).collect();
1007        let result = chunked_parallel_process(
1008            &data,
1009            |chunk| chunk.iter().map(|&x| x + 1).collect(),
1010            100,
1011            2,
1012        );
1013        assert_eq!(result, vec![1, 2, 3, 4, 5]);
1014    }
1015
1016    #[test]
1017    fn test_chunked_parallel_process_chunk_size_one() {
1018        let data: Vec<i32> = (0..10).collect();
1019        let result = chunked_parallel_process(&data, |chunk| vec![chunk[0] * 3], 1, 4);
1020        let expected: Vec<i32> = (0..10).map(|x| x * 3).collect();
1021        assert_eq!(result, expected);
1022    }
1023
1024    // ── ResourceMonitor ───────────────────────────────────────────────────────
1025
1026    #[test]
1027    fn test_resource_monitor_available_workers_full_threshold() {
1028        let monitor = ResourceMonitor::new(1.0, usize::MAX);
1029        let workers = monitor.available_workers();
1030        assert!(workers >= 1);
1031        assert_eq!(workers, monitor.logical_cpu_count());
1032    }
1033
1034    #[test]
1035    fn test_resource_monitor_half_threshold() {
1036        let monitor = ResourceMonitor::new(0.5, usize::MAX);
1037        let cpus = monitor.logical_cpu_count();
1038        let workers = monitor.available_workers();
1039        let expected = ((0.5_f64 * cpus as f64).floor() as usize).max(1);
1040        assert_eq!(workers, expected);
1041    }
1042
1043    #[test]
1044    fn test_resource_monitor_zero_threshold_still_one_worker() {
1045        let monitor = ResourceMonitor::new(0.0, 0);
1046        assert_eq!(
1047            monitor.available_workers(),
1048            1,
1049            "must always return at least 1"
1050        );
1051    }
1052
1053    #[test]
1054    fn test_resource_monitor_recommended_chunk_size() {
1055        let monitor = ResourceMonitor::new(1.0, usize::MAX);
1056        let chunk = monitor.recommended_chunk_size(1_000_000);
1057        assert!(chunk >= 64, "chunk must be at least 64");
1058        let chunk_zero = monitor.recommended_chunk_size(0);
1059        assert_eq!(chunk_zero, 64, "zero total work → default 64");
1060    }
1061
1062    #[test]
1063    fn test_resource_monitor_can_submit() {
1064        let monitor = ResourceMonitor::new(0.8, 1_000_000_000);
1065        assert!(monitor.can_submit());
1066    }
1067
1068    #[test]
1069    fn test_resource_monitor_accessors() {
1070        let monitor = ResourceMonitor::new(0.75, 500_000);
1071        assert!((monitor.cpu_threshold() - 0.75).abs() < 1e-9);
1072        assert_eq!(monitor.memory_threshold(), 500_000);
1073    }
1074
1075    // ── DistributedSliceExt ───────────────────────────────────────────────────
1076
1077    #[test]
1078    fn test_distributed_slice_ext_double() {
1079        let data: Vec<i32> = (1..=20).collect();
1080        let result =
1081            data.distributed_process(|chunk| chunk.iter().map(|&x| x as i64 * 2).collect(), 5, 4);
1082        let expected: Vec<i64> = (1..=20).map(|x| x as i64 * 2).collect();
1083        assert_eq!(result, expected);
1084    }
1085
1086    // ── CoreResult wrappers ───────────────────────────────────────────────────
1087
1088    #[test]
1089    fn test_try_distributed_map() {
1090        let data: Vec<i32> = (1..=5).collect();
1091        let result = try_distributed_map(data, |x| x + 10, 2).expect("try_distributed_map failed");
1092        assert_eq!(result, vec![11, 12, 13, 14, 15]);
1093    }
1094
1095    #[test]
1096    fn test_try_distributed_map_reduce() {
1097        let data: Vec<i32> = (1..=10).collect();
1098        let result = try_distributed_map_reduce(data, |x| x as u32, |a, b| a + b, 0u32, 2)
1099            .expect("try_distributed_map_reduce failed");
1100        assert_eq!(result, 55, "sum 1..10 = 55");
1101    }
1102
1103    // ── DistributedError ──────────────────────────────────────────────────────
1104
1105    #[test]
1106    fn test_distributed_error_display_messages() {
1107        let cases: &[(DistributedError, &str)] = &[
1108            (DistributedError::QueueFull, "full"),
1109            (DistributedError::Disconnected, "disconnect"),
1110            (DistributedError::Timeout, "timed out"),
1111            (
1112                DistributedError::InvalidArgument("bad arg".into()),
1113                "bad arg",
1114            ),
1115            (DistributedError::WorkerPanic("boom".into()), "boom"),
1116            (DistributedError::PoisonedLock, "poison"),
1117        ];
1118        for (err, expected_fragment) in cases {
1119            let msg = err.to_string();
1120            assert!(
1121                msg.contains(expected_fragment),
1122                "error '{msg}' should contain '{expected_fragment}'"
1123            );
1124        }
1125    }
1126
1127    #[test]
1128    fn test_distributed_error_into_core_error() {
1129        let err: CoreError = DistributedError::QueueFull.into();
1130        // Verify it converts without panicking and produces a non-empty string.
1131        assert!(!err.to_string().is_empty());
1132    }
1133}