1use crate::error::{CoreError, CoreResult, ErrorContext};
20use std::sync::mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TryRecvError, TrySendError};
21use std::sync::{Arc, Mutex};
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25#[derive(Debug, Clone, PartialEq, Eq)]
31pub enum DistributedError {
32 QueueFull,
34 Disconnected,
36 WorkerPanic(String),
38 Timeout,
40 InvalidArgument(String),
42 PoisonedLock,
44}
45
46impl std::fmt::Display for DistributedError {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 match self {
49 DistributedError::QueueFull => write!(f, "work queue is full"),
50 DistributedError::Disconnected => write!(f, "channel disconnected"),
51 DistributedError::WorkerPanic(msg) => write!(f, "worker panicked: {msg}"),
52 DistributedError::Timeout => write!(f, "operation timed out"),
53 DistributedError::InvalidArgument(msg) => write!(f, "invalid argument: {msg}"),
54 DistributedError::PoisonedLock => write!(f, "mutex lock poisoned"),
55 }
56 }
57}
58
59impl std::error::Error for DistributedError {}
60
61impl From<DistributedError> for CoreError {
62 fn from(err: DistributedError) -> Self {
63 CoreError::ComputationError(ErrorContext::new(err.to_string()))
64 }
65}
66
67#[derive(Debug, Clone)]
94pub struct WorkQueue<T: Send + 'static> {
95 sender: SyncSender<T>,
96 len: Arc<Mutex<usize>>,
98 capacity: usize,
99}
100
101pub struct WorkReceiver<T: Send + 'static> {
106 receiver: Receiver<T>,
107 len: Arc<Mutex<usize>>,
108}
109
110impl<T: Send + 'static> WorkQueue<T> {
111 pub fn new(capacity: usize) -> Result<(Self, WorkReceiver<T>), DistributedError> {
119 if capacity == 0 {
120 return Err(DistributedError::InvalidArgument(
121 "capacity must be > 0".to_string(),
122 ));
123 }
124 let (tx, rx) = mpsc::sync_channel::<T>(capacity);
125 let len = Arc::new(Mutex::new(0usize));
126 let queue = WorkQueue {
127 sender: tx,
128 len: Arc::clone(&len),
129 capacity,
130 };
131 let receiver = WorkReceiver { receiver: rx, len };
132 Ok((queue, receiver))
133 }
134
135 pub fn push(&self, task: T) -> Result<(), DistributedError> {
141 self.sender
142 .send(task)
143 .map_err(|_| DistributedError::Disconnected)?;
144 if let Ok(mut guard) = self.len.lock() {
145 *guard = guard.saturating_add(1);
146 }
147 Ok(())
148 }
149
150 pub fn try_push(&self, task: T) -> Result<bool, DistributedError> {
160 match self.sender.try_send(task) {
161 Ok(()) => {
162 if let Ok(mut guard) = self.len.lock() {
163 *guard = guard.saturating_add(1);
164 }
165 Ok(true)
166 }
167 Err(TrySendError::Full(_)) => Ok(false),
168 Err(TrySendError::Disconnected(_)) => Err(DistributedError::Disconnected),
169 }
170 }
171
172 pub fn len(&self) -> usize {
174 self.len.lock().map(|g| *g).unwrap_or(0)
175 }
176
177 pub fn is_empty(&self) -> bool {
179 self.len() == 0
180 }
181
182 pub fn capacity(&self) -> usize {
184 self.capacity
185 }
186}
187
188impl<T: Send + 'static> WorkReceiver<T> {
189 pub fn recv(&self) -> Option<T> {
191 match self.receiver.recv() {
192 Ok(item) => {
193 if let Ok(mut guard) = self.len.lock() {
194 *guard = guard.saturating_sub(1);
195 }
196 Some(item)
197 }
198 Err(_) => None,
199 }
200 }
201
202 pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
204 match self.receiver.recv_timeout(timeout) {
205 Ok(item) => {
206 if let Ok(mut guard) = self.len.lock() {
207 *guard = guard.saturating_sub(1);
208 }
209 Some(item)
210 }
211 Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => None,
212 }
213 }
214
215 pub fn try_recv(&self) -> Option<T> {
217 match self.receiver.try_recv() {
218 Ok(item) => {
219 if let Ok(mut guard) = self.len.lock() {
220 *guard = guard.saturating_sub(1);
221 }
222 Some(item)
223 }
224 Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => None,
225 }
226 }
227}
228
229pub struct WorkerPool<T: Send + 'static, R: Send + 'static> {
252 n_workers: usize,
253 handles: Vec<JoinHandle<()>>,
254 task_sender: SyncSender<Option<T>>,
255 result_receiver: Receiver<R>,
256}
257
258impl<T: Send + 'static, R: Send + 'static> WorkerPool<T, R> {
259 pub fn new<F>(n_workers: usize, worker_fn: F) -> Result<Self, DistributedError>
271 where
272 F: Fn(T) -> R + Send + Clone + 'static,
273 {
274 if n_workers == 0 {
275 return Err(DistributedError::InvalidArgument(
276 "n_workers must be > 0".to_string(),
277 ));
278 }
279
280 let buffer = n_workers.saturating_mul(4).max(4);
282 let (task_tx, task_rx) = mpsc::sync_channel::<Option<T>>(buffer);
283 let (result_tx, result_rx) = mpsc::channel::<R>();
284
285 let shared_rx = Arc::new(Mutex::new(task_rx));
288
289 let mut handles = Vec::with_capacity(n_workers);
290 for _ in 0..n_workers {
291 let shared_rx = Arc::clone(&shared_rx);
292 let result_tx = result_tx.clone();
293 let fn_clone = worker_fn.clone();
294
295 let handle = thread::spawn(move || loop {
296 let task = {
297 let guard = match shared_rx.lock() {
299 Ok(g) => g,
300 Err(_) => break, };
302 match guard.recv() {
303 Ok(Some(t)) => t,
304 Ok(None) | Err(_) => break, }
306 };
307 let result = fn_clone(task);
308 if result_tx.send(result).is_err() {
309 break; }
311 });
312 handles.push(handle);
313 }
314
315 Ok(WorkerPool {
316 n_workers,
317 handles,
318 task_sender: task_tx,
319 result_receiver: result_rx,
320 })
321 }
322
323 pub fn n_workers(&self) -> usize {
325 self.n_workers
326 }
327
328 pub fn submit(&self, task: T) -> Result<(), DistributedError> {
337 self.task_sender
338 .send(Some(task))
339 .map_err(|_| DistributedError::Disconnected)
340 }
341
342 pub fn collect_result(&self, timeout: Option<Duration>) -> Option<R> {
347 match timeout {
348 None => self.result_receiver.recv().ok(),
349 Some(d) => match self.result_receiver.recv_timeout(d) {
350 Ok(r) => Some(r),
351 Err(RecvTimeoutError::Timeout) | Err(RecvTimeoutError::Disconnected) => None,
352 },
353 }
354 }
355
356 pub fn collect_all(&self, expected: usize, timeout: Duration) -> Vec<R> {
360 let mut results = Vec::with_capacity(expected);
361 for _ in 0..expected {
362 match self.collect_result(Some(timeout)) {
363 Some(r) => results.push(r),
364 None => break,
365 }
366 }
367 results
368 }
369
370 pub fn shutdown(self) {
375 for _ in 0..self.n_workers {
376 let _ = self.task_sender.send(None);
378 }
379 for handle in self.handles {
380 let _ = handle.join();
381 }
382 }
383}
384
385pub fn distributed_map<T, R, F>(data: Vec<T>, map_fn: F, n_workers: usize) -> Vec<R>
407where
408 T: Send + 'static,
409 R: Send + 'static,
410 F: Fn(T) -> R + Send + Clone + 'static,
411{
412 let workers = n_workers.max(1);
413 let n = data.len();
414 if n == 0 {
415 return Vec::new();
416 }
417
418 let pool: WorkerPool<(usize, T), (usize, R)> =
420 WorkerPool::new(workers, move |(idx, item)| (idx, map_fn(item))).unwrap_or_else(|_| {
421 panic!("internal error: WorkerPool::new failed with workers >= 1")
423 });
424
425 for (idx, item) in data.into_iter().enumerate() {
426 if pool.submit((idx, item)).is_err() {
427 break; }
429 }
430
431 let raw = pool.collect_all(n, Duration::from_secs(120));
434 pool.shutdown();
435
436 let mut results: Vec<Option<R>> = (0..n).map(|_| None).collect();
438 for (idx, result) in raw {
439 if idx < results.len() {
440 results[idx] = Some(result);
441 }
442 }
443
444 results.into_iter().flatten().collect()
445}
446
447pub fn distributed_map_reduce<T, R, S, F, G>(
462 data: Vec<T>,
463 map_fn: F,
464 reduce_fn: G,
465 initial: S,
466 n_workers: usize,
467) -> S
468where
469 T: Send + 'static,
470 R: Send + 'static,
471 S: Send + Clone + 'static,
472 F: Fn(T) -> R + Send + Clone + 'static,
473 G: Fn(S, R) -> S + Send + Clone + 'static,
474{
475 let mapped = distributed_map(data, map_fn, n_workers);
476 mapped.into_iter().fold(initial, reduce_fn)
477}
478
479pub fn chunked_parallel_process<T, R, F>(
509 data: &[T],
510 process_fn: F,
511 chunk_size: usize,
512 n_workers: usize,
513) -> Vec<R>
514where
515 T: Send + Sync + Clone + 'static,
516 R: Send + 'static,
517 F: Fn(&[T]) -> Vec<R> + Send + Clone + 'static,
518{
519 let effective_chunk = chunk_size.max(1);
520 let effective_workers = n_workers.max(1);
521
522 if data.is_empty() {
523 return Vec::new();
524 }
525
526 let chunks: Vec<Arc<Vec<T>>> = data
530 .chunks(effective_chunk)
531 .map(|c| Arc::new(c.to_vec()))
532 .collect();
533
534 let n_chunks = chunks.len();
535
536 type TaskItem<T> = (usize, Arc<Vec<T>>);
538 type ResultItem<R> = (usize, Vec<R>);
539
540 let pool: WorkerPool<TaskItem<T>, ResultItem<R>> =
541 WorkerPool::new(effective_workers, move |task: TaskItem<T>| {
542 let (idx, chunk) = task;
543 (idx, process_fn(&chunk))
544 })
545 .unwrap_or_else(|_| panic!("internal error: WorkerPool::new failed with workers >= 1"));
546
547 for (idx, chunk) in chunks.into_iter().enumerate() {
548 if pool.submit((idx, chunk)).is_err() {
549 break;
550 }
551 }
552
553 let raw = pool.collect_all(n_chunks, Duration::from_secs(120));
554 pool.shutdown();
555
556 let mut results: Vec<Option<Vec<R>>> = (0..n_chunks).map(|_| None).collect();
558 for (idx, chunk_result) in raw {
559 if idx < results.len() {
560 results[idx] = Some(chunk_result);
561 }
562 }
563
564 results.into_iter().flatten().flatten().collect()
565}
566
567#[derive(Debug, Clone)]
593pub struct ResourceMonitor {
594 cpu_threshold: f64,
596 memory_threshold: usize,
598 logical_cpus: usize,
600}
601
602impl ResourceMonitor {
603 pub fn new(cpu_threshold: f64, memory_threshold: usize) -> Self {
610 let logical_cpus = std::thread::available_parallelism()
611 .map(|n| n.get())
612 .unwrap_or(1);
613 ResourceMonitor {
614 cpu_threshold: cpu_threshold.clamp(0.0, 1.0),
615 memory_threshold,
616 logical_cpus,
617 }
618 }
619
620 pub fn logical_cpu_count(&self) -> usize {
622 self.logical_cpus
623 }
624
625 pub fn available_workers(&self) -> usize {
629 let n = (self.cpu_threshold * self.logical_cpus as f64).floor() as usize;
630 n.max(1)
631 }
632
633 pub fn recommended_chunk_size(&self, total_work: usize) -> usize {
638 if total_work == 0 {
639 return 64;
640 }
641 let workers = self.available_workers();
642 let target_chunks = workers.saturating_mul(4).max(1);
643 (total_work / target_chunks).max(64)
644 }
645
646 pub fn can_submit(&self) -> bool {
653 true
654 }
655
656 pub fn cpu_threshold(&self) -> f64 {
658 self.cpu_threshold
659 }
660
661 pub fn memory_threshold(&self) -> usize {
663 self.memory_threshold
664 }
665}
666
667pub trait DistributedSliceExt<T> {
673 fn distributed_process<R, F>(
676 &self,
677 process_fn: F,
678 chunk_size: usize,
679 n_workers: usize,
680 ) -> Vec<R>
681 where
682 T: Send + Sync + Clone + 'static,
683 R: Send + 'static,
684 F: Fn(&[T]) -> Vec<R> + Send + Clone + 'static;
685}
686
687impl<T: Send + Sync + Clone + 'static> DistributedSliceExt<T> for [T] {
688 fn distributed_process<R, F>(
689 &self,
690 process_fn: F,
691 chunk_size: usize,
692 n_workers: usize,
693 ) -> Vec<R>
694 where
695 R: Send + 'static,
696 F: Fn(&[T]) -> Vec<R> + Send + Clone + 'static,
697 {
698 chunked_parallel_process(self, process_fn, chunk_size, n_workers)
699 }
700}
701
702pub fn try_distributed_map<T, R, F>(data: Vec<T>, map_fn: F, n_workers: usize) -> CoreResult<Vec<R>>
711where
712 T: Send + 'static,
713 R: Send + 'static,
714 F: Fn(T) -> R + Send + Clone + 'static,
715{
716 Ok(distributed_map(data, map_fn, n_workers))
717}
718
719pub fn try_distributed_map_reduce<T, R, S, F, G>(
723 data: Vec<T>,
724 map_fn: F,
725 reduce_fn: G,
726 initial: S,
727 n_workers: usize,
728) -> CoreResult<S>
729where
730 T: Send + 'static,
731 R: Send + 'static,
732 S: Send + Clone + 'static,
733 F: Fn(T) -> R + Send + Clone + 'static,
734 G: Fn(S, R) -> S + Send + Clone + 'static,
735{
736 Ok(distributed_map_reduce(
737 data, map_fn, reduce_fn, initial, n_workers,
738 ))
739}
740
741#[cfg(test)]
746mod tests {
747 use super::*;
748 use std::sync::atomic::{AtomicUsize, Ordering};
749 use std::sync::Arc;
750 use std::time::Duration;
751
752 #[test]
755 fn test_work_queue_basic_push_recv() {
756 let (queue, receiver) = WorkQueue::<i32>::new(8).expect("queue creation failed");
757 queue.push(42).expect("push failed");
758 let item = receiver.recv().expect("recv returned None");
759 assert_eq!(item, 42);
760 }
761
762 #[test]
763 fn test_work_queue_zero_capacity_is_error() {
764 let result = WorkQueue::<i32>::new(0);
765 assert!(matches!(result, Err(DistributedError::InvalidArgument(_))));
766 }
767
768 #[test]
769 fn test_work_queue_try_push_full() {
770 let (queue, _receiver) = WorkQueue::<i32>::new(2).expect("queue creation failed");
771 let r1 = queue.try_push(1).expect("try_push 1 failed");
772 let r2 = queue.try_push(2).expect("try_push 2 failed");
773 let r3 = queue
774 .try_push(3)
775 .expect("try_push 3 should return false when full");
776 assert!(r1, "first slot should be accepted");
777 assert!(r2, "second slot should be accepted");
778 assert!(!r3, "queue is full — should return false");
779 }
780
781 #[test]
782 fn test_work_queue_len_and_is_empty() {
783 let (queue, receiver) = WorkQueue::<u64>::new(16).expect("queue creation failed");
784 assert!(queue.is_empty(), "newly created queue must be empty");
785 queue.push(10).expect("push 10 failed");
786 queue.push(20).expect("push 20 failed");
787 assert_eq!(queue.len(), 2, "queue len should be 2 after two pushes");
788 receiver.recv();
789 assert_eq!(queue.len(), 1, "queue len should be 1 after one recv");
790 }
791
792 #[test]
793 fn test_work_queue_capacity() {
794 let (queue, _rx) = WorkQueue::<()>::new(32).expect("queue creation failed");
795 assert_eq!(queue.capacity(), 32);
796 }
797
798 #[test]
799 fn test_work_queue_disconnected_on_receiver_drop() {
800 let (queue, receiver) = WorkQueue::<i32>::new(4).expect("queue creation failed");
801 drop(receiver);
802 let err = queue.push(1);
803 assert!(matches!(err, Err(DistributedError::Disconnected)));
804 }
805
806 #[test]
807 fn test_work_receiver_recv_timeout_returns_none() {
808 let (_queue, receiver) = WorkQueue::<i32>::new(4).expect("queue creation failed");
809 let result = receiver.recv_timeout(Duration::from_millis(20));
810 assert!(
811 result.is_none(),
812 "should time out with nothing in the queue"
813 );
814 }
815
816 #[test]
817 fn test_work_receiver_try_recv_empty() {
818 let (_queue, receiver) = WorkQueue::<i32>::new(4).expect("queue creation failed");
819 assert!(
820 receiver.try_recv().is_none(),
821 "try_recv on empty queue must return None"
822 );
823 }
824
825 #[test]
826 fn test_work_queue_multiple_producers() {
827 let (queue, receiver) = WorkQueue::<i32>::new(128).expect("queue creation failed");
828 let ranges: Vec<(i32, i32)> = vec![(0, 10), (10, 20), (20, 30)];
829 let mut handles = Vec::new();
830 for (start, end) in ranges {
831 let q = queue.clone();
832 handles.push(std::thread::spawn(move || {
833 for i in start..end {
834 q.push(i).expect("push failed");
835 }
836 }));
837 }
838 for h in handles {
839 h.join().expect("producer thread panicked");
840 }
841 let mut items: Vec<i32> = Vec::new();
843 while let Some(x) = receiver.try_recv() {
844 items.push(x);
845 }
846 while let Some(x) = receiver.recv_timeout(Duration::from_millis(10)) {
850 items.push(x);
851 }
852 assert_eq!(
853 items.len(),
854 30,
855 "expected 30 items from three producers, got {}",
856 items.len()
857 );
858 items.sort_unstable();
859 assert_eq!(items, (0..30).collect::<Vec<_>>());
860 }
861
862 #[test]
865 fn test_worker_pool_basic_square() {
866 let pool = WorkerPool::new(2, |x: i32| x * 2).expect("pool creation failed");
867 pool.submit(3).expect("submit failed");
868 pool.submit(7).expect("submit failed");
869 let mut results = pool.collect_all(2, Duration::from_secs(5));
870 results.sort_unstable();
871 assert_eq!(results, vec![6, 14]);
872 pool.shutdown();
873 }
874
875 #[test]
876 fn test_worker_pool_zero_workers_is_error() {
877 let result = WorkerPool::<i32, i32>::new(0, |x| x);
878 assert!(
879 matches!(result, Err(DistributedError::InvalidArgument(_))),
880 "zero workers must be rejected"
881 );
882 }
883
884 #[test]
885 fn test_worker_pool_collect_result_none_on_timeout() {
886 let pool = WorkerPool::<i32, i32>::new(1, |x| x).expect("pool creation failed");
887 let result = pool.collect_result(Some(Duration::from_millis(30)));
888 assert!(result.is_none(), "nothing submitted → should timeout");
889 pool.shutdown();
890 }
891
892 #[test]
893 fn test_worker_pool_accumulates_correct_sum() {
894 let counter = Arc::new(AtomicUsize::new(0));
895 let counter_clone = Arc::clone(&counter);
896 let pool = WorkerPool::new(4, move |x: usize| {
897 counter_clone.fetch_add(x, Ordering::Relaxed);
898 x
899 })
900 .expect("pool creation failed");
901
902 for i in 0..20 {
903 pool.submit(i).expect("submit failed");
904 }
905 let _ = pool.collect_all(20, Duration::from_secs(5));
906 pool.shutdown();
907
908 assert_eq!(counter.load(Ordering::Relaxed), 190);
910 }
911
912 #[test]
913 fn test_worker_pool_n_workers() {
914 let pool = WorkerPool::new(7, |x: i32| x).expect("pool creation failed");
915 assert_eq!(pool.n_workers(), 7);
916 pool.shutdown();
917 }
918
919 #[test]
922 fn test_distributed_map_empty_input() {
923 let result = distributed_map(Vec::<i32>::new(), |x| x * x, 4);
924 assert!(result.is_empty());
925 }
926
927 #[test]
928 fn test_distributed_map_preserves_order() {
929 let data: Vec<i32> = (1..=16).collect();
930 let result = distributed_map(data, |x| x * x, 4);
931 let expected: Vec<i32> = (1..=16).map(|x| x * x).collect();
932 assert_eq!(
933 result, expected,
934 "distributed_map must preserve input order"
935 );
936 }
937
938 #[test]
939 fn test_distributed_map_single_worker() {
940 let data: Vec<String> = (0..10).map(|i| format!("item-{i}")).collect();
941 let lens = distributed_map(data.clone(), |s| s.len(), 1);
942 let expected: Vec<usize> = data.iter().map(|s| s.len()).collect();
943 assert_eq!(lens, expected);
944 }
945
946 #[test]
947 fn test_distributed_map_zero_workers_clamped_to_one() {
948 let data: Vec<i32> = (0..5).collect();
949 let result = distributed_map(data, |x| x + 1, 0);
951 assert_eq!(result, vec![1, 2, 3, 4, 5]);
952 }
953
954 #[test]
957 fn test_distributed_map_reduce_sum() {
958 let data: Vec<i32> = (1..=100).collect();
959 let sum = distributed_map_reduce(data, |x| x as i64, |acc, r| acc + r, 0i64, 4);
960 assert_eq!(sum, 5050, "sum 1..100 must equal 5050");
961 }
962
963 #[test]
964 fn test_distributed_map_reduce_factorial_small() {
965 let data: Vec<u64> = (1..=5).collect();
966 let product = distributed_map_reduce(data, |x| x, |acc, r| acc * r, 1u64, 2);
967 assert_eq!(product, 120, "5! = 120");
968 }
969
970 #[test]
971 fn test_distributed_map_reduce_string_concat_order() {
972 let data: Vec<i32> = (0..5).collect();
973 let result = distributed_map_reduce(
974 data,
975 |x| x.to_string(),
976 |mut acc, r| {
977 acc.push_str(&r);
978 acc
979 },
980 String::new(),
981 2,
982 );
983 assert_eq!(result, "01234");
985 }
986
987 #[test]
990 fn test_chunked_parallel_process_basic() {
991 let data: Vec<i32> = (1..=12).collect();
992 let doubled =
993 chunked_parallel_process(&data, |chunk| chunk.iter().map(|&x| x * 2).collect(), 4, 3);
994 assert_eq!(doubled, vec![2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24]);
995 }
996
997 #[test]
998 fn test_chunked_parallel_process_empty_input() {
999 let data: Vec<f64> = Vec::new();
1000 let result = chunked_parallel_process(&data, |c| c.to_vec(), 4, 2);
1001 assert!(result.is_empty());
1002 }
1003
1004 #[test]
1005 fn test_chunked_parallel_process_chunk_larger_than_data() {
1006 let data: Vec<i32> = (0..5).collect();
1007 let result = chunked_parallel_process(
1008 &data,
1009 |chunk| chunk.iter().map(|&x| x + 1).collect(),
1010 100,
1011 2,
1012 );
1013 assert_eq!(result, vec![1, 2, 3, 4, 5]);
1014 }
1015
1016 #[test]
1017 fn test_chunked_parallel_process_chunk_size_one() {
1018 let data: Vec<i32> = (0..10).collect();
1019 let result = chunked_parallel_process(&data, |chunk| vec![chunk[0] * 3], 1, 4);
1020 let expected: Vec<i32> = (0..10).map(|x| x * 3).collect();
1021 assert_eq!(result, expected);
1022 }
1023
1024 #[test]
1027 fn test_resource_monitor_available_workers_full_threshold() {
1028 let monitor = ResourceMonitor::new(1.0, usize::MAX);
1029 let workers = monitor.available_workers();
1030 assert!(workers >= 1);
1031 assert_eq!(workers, monitor.logical_cpu_count());
1032 }
1033
1034 #[test]
1035 fn test_resource_monitor_half_threshold() {
1036 let monitor = ResourceMonitor::new(0.5, usize::MAX);
1037 let cpus = monitor.logical_cpu_count();
1038 let workers = monitor.available_workers();
1039 let expected = ((0.5_f64 * cpus as f64).floor() as usize).max(1);
1040 assert_eq!(workers, expected);
1041 }
1042
1043 #[test]
1044 fn test_resource_monitor_zero_threshold_still_one_worker() {
1045 let monitor = ResourceMonitor::new(0.0, 0);
1046 assert_eq!(
1047 monitor.available_workers(),
1048 1,
1049 "must always return at least 1"
1050 );
1051 }
1052
1053 #[test]
1054 fn test_resource_monitor_recommended_chunk_size() {
1055 let monitor = ResourceMonitor::new(1.0, usize::MAX);
1056 let chunk = monitor.recommended_chunk_size(1_000_000);
1057 assert!(chunk >= 64, "chunk must be at least 64");
1058 let chunk_zero = monitor.recommended_chunk_size(0);
1059 assert_eq!(chunk_zero, 64, "zero total work → default 64");
1060 }
1061
1062 #[test]
1063 fn test_resource_monitor_can_submit() {
1064 let monitor = ResourceMonitor::new(0.8, 1_000_000_000);
1065 assert!(monitor.can_submit());
1066 }
1067
1068 #[test]
1069 fn test_resource_monitor_accessors() {
1070 let monitor = ResourceMonitor::new(0.75, 500_000);
1071 assert!((monitor.cpu_threshold() - 0.75).abs() < 1e-9);
1072 assert_eq!(monitor.memory_threshold(), 500_000);
1073 }
1074
1075 #[test]
1078 fn test_distributed_slice_ext_double() {
1079 let data: Vec<i32> = (1..=20).collect();
1080 let result =
1081 data.distributed_process(|chunk| chunk.iter().map(|&x| x as i64 * 2).collect(), 5, 4);
1082 let expected: Vec<i64> = (1..=20).map(|x| x as i64 * 2).collect();
1083 assert_eq!(result, expected);
1084 }
1085
1086 #[test]
1089 fn test_try_distributed_map() {
1090 let data: Vec<i32> = (1..=5).collect();
1091 let result = try_distributed_map(data, |x| x + 10, 2).expect("try_distributed_map failed");
1092 assert_eq!(result, vec![11, 12, 13, 14, 15]);
1093 }
1094
1095 #[test]
1096 fn test_try_distributed_map_reduce() {
1097 let data: Vec<i32> = (1..=10).collect();
1098 let result = try_distributed_map_reduce(data, |x| x as u32, |a, b| a + b, 0u32, 2)
1099 .expect("try_distributed_map_reduce failed");
1100 assert_eq!(result, 55, "sum 1..10 = 55");
1101 }
1102
1103 #[test]
1106 fn test_distributed_error_display_messages() {
1107 let cases: &[(DistributedError, &str)] = &[
1108 (DistributedError::QueueFull, "full"),
1109 (DistributedError::Disconnected, "disconnect"),
1110 (DistributedError::Timeout, "timed out"),
1111 (
1112 DistributedError::InvalidArgument("bad arg".into()),
1113 "bad arg",
1114 ),
1115 (DistributedError::WorkerPanic("boom".into()), "boom"),
1116 (DistributedError::PoisonedLock, "poison"),
1117 ];
1118 for (err, expected_fragment) in cases {
1119 let msg = err.to_string();
1120 assert!(
1121 msg.contains(expected_fragment),
1122 "error '{msg}' should contain '{expected_fragment}'"
1123 );
1124 }
1125 }
1126
1127 #[test]
1128 fn test_distributed_error_into_core_error() {
1129 let err: CoreError = DistributedError::QueueFull.into();
1130 assert!(!err.to_string().is_empty());
1132 }
1133}