scirs2_core/parallel/
partitioning.rs

1//! Custom partitioning strategies for different data distributions
2//!
3//! This module provides advanced partitioning strategies that adapt to
4//! various data distributions for optimal load balancing in parallel processing.
5//! It includes support for uniform, skewed, Gaussian, and custom distributions.
6
7use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
8use crate::parallel_ops::*;
9use std::cmp::Ordering;
10use std::marker::PhantomData;
11use std::time::Duration;
12
13/// Data distribution types that affect partitioning strategy
14#[derive(Debug, Clone, PartialEq)]
15pub enum DataDistribution {
16    /// Uniform distribution - data is evenly distributed
17    Uniform,
18    /// Skewed distribution - data is concentrated in certain regions
19    Skewed {
20        /// Skewness factor (0.0 = no skew, positive = right skew, negative = left skew)
21        skewness: f64,
22    },
23    /// Gaussian/Normal distribution
24    Gaussian {
25        /// Mean of the distribution
26        mean: f64,
27        /// Standard deviation
28        std_dev: f64,
29    },
30    /// Power law distribution (e.g., Zipf distribution)
31    PowerLaw {
32        /// Exponent parameter
33        alpha: f64,
34    },
35    /// Bimodal distribution - two peaks
36    Bimodal {
37        /// First peak mean
38        mean1: f64,
39        /// Second peak mean
40        mean2: f64,
41        /// Mixing ratio (0.0 to 1.0)
42        mix_ratio: f64,
43    },
44    /// Custom distribution defined by density function
45    Custom {
46        /// Name or description of the distribution
47        name: String,
48    },
49}
50
51/// Partitioning strategy for dividing work among threads
52#[derive(Debug, Clone)]
53pub enum PartitionStrategy {
54    /// Equal-sized partitions (traditional approach)
55    EqualSize,
56    /// Weighted partitions based on data distribution
57    Weighted {
58        /// Weights for each partition
59        weights: Vec<f64>,
60    },
61    /// Dynamic partitioning that adjusts at runtime
62    Dynamic {
63        /// Initial partition sizes
64        initial_sizes: Vec<usize>,
65        /// Whether to allow stealing between partitions
66        allow_stealing: bool,
67    },
68    /// Hierarchical partitioning for nested parallelism
69    Hierarchical {
70        /// Number of levels in the hierarchy
71        levels: usize,
72        /// Branching factor at each level
73        branching_factor: usize,
74    },
75    /// Range-based partitioning for sorted data
76    RangeBased {
77        /// Boundary values for each partition
78        boundaries: Vec<f64>,
79    },
80    /// Hash-based partitioning for key-value data
81    HashBased {
82        /// Number of hash buckets
83        num_buckets: usize,
84    },
85}
86
87/// Configuration for the partitioner
88#[derive(Debug, Clone)]
89pub struct PartitionerConfig {
90    /// Number of partitions (usually number of threads)
91    pub num_partitions: usize,
92    /// Minimum partition size
93    pub min_partition_size: usize,
94    /// Maximum partition size (0 for unlimited)
95    pub max_partition_size: usize,
96    /// Whether to enable load balancing
97    pub enable_load_balancing: bool,
98    /// Target load imbalance factor (1.0 = perfect balance)
99    pub target_imbalance_factor: f64,
100    /// Whether to consider NUMA topology
101    pub numa_aware: bool,
102    /// Whether to enable work stealing
103    pub enable_work_stealing: bool,
104}
105
106impl Default for PartitionerConfig {
107    fn default() -> Self {
108        Self {
109            num_partitions: num_threads(),
110            min_partition_size: 1000,
111            max_partition_size: 0,
112            enable_load_balancing: true,
113            target_imbalance_factor: 1.1,
114            numa_aware: false,
115            enable_work_stealing: true,
116        }
117    }
118}
119
120/// Partitioner for dividing data based on distribution characteristics
121pub struct DataPartitioner<T> {
122    config: PartitionerConfig,
123    phantom: PhantomData<T>,
124}
125
126impl<T> DataPartitioner<T>
127where
128    T: Send + Sync + Clone,
129{
130    /// Create a new partitioner with the given configuration
131    pub fn new(config: PartitionerConfig) -> Self {
132        Self {
133            config,
134            phantom: PhantomData,
135        }
136    }
137
138    /// Create a partitioner with default configuration
139    pub fn with_defaultconfig() -> Self {
140        Self::new(PartitionerConfig::default())
141    }
142
143    /// Analyze data to determine its distribution
144    pub fn analyze_distribution(&self, data: &[T]) -> DataDistribution
145    where
146        T: Into<f64> + Copy,
147    {
148        if data.is_empty() {
149            return DataDistribution::Uniform;
150        }
151
152        // Convert data to float values for analysis
153        let values: Vec<f64> = data.iter().map(|&x| x.into()).collect();
154
155        // Calculate basic statistics
156        let n = values.len() as f64;
157        let mean = values.iter().copied().sum::<f64>() / n;
158
159        // Calculate variance and standard deviation
160        let variance = values
161            .iter()
162            .map(|&x| {
163                let diff = x - mean;
164                diff * diff
165            })
166            .sum::<f64>()
167            / n;
168        let std_dev = variance.sqrt();
169
170        // Calculate skewness
171        let skewness = if std_dev > 0.0 {
172            let sum_cubed = values
173                .iter()
174                .map(|&x| {
175                    let z = (x - mean) / std_dev;
176                    z * z * z
177                })
178                .sum::<f64>();
179            sum_cubed / n
180        } else {
181            0.0
182        };
183
184        // Calculate kurtosis to detect bimodality
185        let kurtosis = if std_dev > 0.0 {
186            let sum_fourth = values
187                .iter()
188                .map(|&x| {
189                    let z = (x - mean) / std_dev;
190                    z * z * z * z
191                })
192                .sum::<f64>();
193            sum_fourth / n - 3.0
194        } else {
195            0.0
196        };
197
198        // Determine distribution type based on statistics
199        if skewness.abs() < 0.5 && kurtosis > -1.5 && kurtosis < -0.8 {
200            // Uniform distribution has kurtosis around -1.2
201            DataDistribution::Uniform
202        } else if skewness.abs() < 0.5 && kurtosis.abs() < 1.0 {
203            // Approximately normal
204            DataDistribution::Gaussian { mean, std_dev }
205        } else if skewness.abs() > 2.0 {
206            // Heavily skewed
207            DataDistribution::Skewed { skewness }
208        } else if kurtosis < -1.5 {
209            // Very negative kurtosis may indicate bimodality
210            // Simple bimodal detection - in practice would use more sophisticated methods
211            DataDistribution::Bimodal {
212                mean1: mean - std_dev,
213                mean2: mean + std_dev,
214                mix_ratio: 0.5,
215            }
216        } else {
217            // Default to uniform if no clear pattern
218            DataDistribution::Uniform
219        }
220    }
221
222    /// Create a partitioning strategy based on data distribution
223    pub fn create_strategy(
224        &self,
225        distribution: &DataDistribution,
226        data_size: usize,
227    ) -> CoreResult<PartitionStrategy> {
228        let num_partitions = self.config.num_partitions;
229
230        match distribution {
231            DataDistribution::Uniform => {
232                // Equal-sized partitions work well for uniform data
233                Ok(PartitionStrategy::EqualSize)
234            }
235
236            DataDistribution::Skewed { skewness } => {
237                // Create weighted partitions based on skewness
238                let weights = self.calculate_skewed_weights(*skewness, num_partitions)?;
239                Ok(PartitionStrategy::Weighted { weights })
240            }
241
242            DataDistribution::Gaussian { mean, std_dev } => {
243                // Create range-based partitions using quantiles
244                let boundaries =
245                    self.calculate_gaussian_boundaries(*mean, *std_dev, num_partitions)?;
246                Ok(PartitionStrategy::RangeBased { boundaries })
247            }
248
249            DataDistribution::PowerLaw { alpha } => {
250                // Use logarithmic partitioning for power law
251                let weights = self.calculate_power_law_weights(*alpha, num_partitions)?;
252                Ok(PartitionStrategy::Weighted { weights })
253            }
254
255            DataDistribution::Bimodal {
256                mean1,
257                mean2,
258                mix_ratio,
259            } => {
260                // Create partitions around the two modes
261                let boundaries =
262                    self.calculate_bimodal_boundaries(*mean1, *mean2, *mix_ratio, num_partitions)?;
263                Ok(PartitionStrategy::RangeBased { boundaries })
264            }
265
266            DataDistribution::Custom { .. } => {
267                // For custom distributions, use dynamic partitioning
268                let initial_sizes = vec![data_size / num_partitions; num_partitions];
269                Ok(PartitionStrategy::Dynamic {
270                    initial_sizes,
271                    allow_stealing: self.config.enable_work_stealing,
272                })
273            }
274        }
275    }
276
277    /// Partition data according to the given strategy
278    pub fn partition(&self, data: &[T], strategy: &PartitionStrategy) -> CoreResult<Vec<Vec<T>>> {
279        let data_size = data.len();
280        let num_partitions = self.config.num_partitions;
281
282        if data_size < num_partitions * self.config.min_partition_size {
283            // Not enough data to partition effectively
284            return Ok(vec![data.to_vec()]);
285        }
286
287        match strategy {
288            PartitionStrategy::EqualSize => self.partition_equal_size(data),
289
290            PartitionStrategy::Weighted { weights } => self.partition_weighted(data, weights),
291
292            PartitionStrategy::Dynamic { initial_sizes, .. } => {
293                self.partition_dynamic(data, initial_sizes)
294            }
295
296            PartitionStrategy::Hierarchical {
297                levels,
298                branching_factor,
299            } => self.partition_hierarchical(data, *levels, *branching_factor),
300
301            PartitionStrategy::RangeBased { boundaries: _ } => {
302                // Range-based partitioning requires specific trait bounds
303                // For now, return an error if T doesn't meet requirements
304                Err(CoreError::InvalidArgument(
305                    ErrorContext::new(
306                        "Range-based partitioning requires PartialOrd + Into<f64> + Copy traits"
307                            .to_string(),
308                    )
309                    .with_location(ErrorLocation::new(file!(), line!())),
310                ))
311            }
312
313            PartitionStrategy::HashBased { num_buckets: _ } => {
314                // Hash-based partitioning requires Hash trait
315                // For now, return an error if T doesn't implement Hash
316                Err(CoreError::InvalidArgument(
317                    ErrorContext::new("Hash-based partitioning requires Hash trait".to_string())
318                        .with_location(ErrorLocation::new(file!(), line!())),
319                ))
320            }
321        }
322    }
323
324    /// Partition data into equal-sized chunks
325    fn partition_equal_size(&self, data: &[T]) -> CoreResult<Vec<Vec<T>>> {
326        let chunk_size = data.len().div_ceil(self.config.num_partitions);
327        let mut partitions = Vec::with_capacity(self.config.num_partitions);
328
329        for chunk in data.chunks(chunk_size) {
330            partitions.push(chunk.to_vec());
331        }
332
333        Ok(partitions)
334    }
335
336    /// Partition data according to weights
337    fn partition_weighted(&self, data: &[T], weights: &[f64]) -> CoreResult<Vec<Vec<T>>> {
338        if weights.len() != self.config.num_partitions {
339            return Err(CoreError::InvalidArgument(
340                ErrorContext::new("Weight count does not match partition count".to_string())
341                    .with_location(ErrorLocation::new(file!(), line!())),
342            ));
343        }
344
345        let total_weight: f64 = weights.iter().sum();
346        if total_weight <= 0.0 {
347            return Err(CoreError::InvalidArgument(
348                ErrorContext::new("Total weight must be positive".to_string())
349                    .with_location(ErrorLocation::new(file!(), line!())),
350            ));
351        }
352
353        let mut partitions = Vec::with_capacity(self.config.num_partitions);
354        let mut start = 0;
355
356        for weight in weights {
357            let size = ((weight / total_weight) * data.len() as f64) as usize;
358            let end = (start + size).min(data.len());
359
360            if start < data.len() {
361                partitions.push(data[start..end].to_vec());
362            }
363
364            start = end;
365        }
366
367        // Add any remaining elements to the last partition
368        if start < data.len() && !partitions.is_empty() {
369            if let Some(last) = partitions.last_mut() {
370                last.extend_from_slice(&data[start..]);
371            }
372        }
373
374        Ok(partitions)
375    }
376
377    /// Dynamic partitioning with runtime adjustment
378    fn partition_dynamic(&self, data: &[T], initialsizes: &[usize]) -> CoreResult<Vec<Vec<T>>> {
379        // For now, use initial sizes as-is
380        // In a full implementation, this would monitor progress and adjust
381        let mut partitions = Vec::with_capacity(initialsizes.len());
382        let mut start = 0;
383
384        for &size in initialsizes {
385            let end = (start + size).min(data.len());
386            if start < data.len() {
387                partitions.push(data[start..end].to_vec());
388            }
389            start = end;
390        }
391
392        Ok(partitions)
393    }
394
395    /// Hierarchical partitioning for nested parallelism
396    fn partition_hierarchical(
397        &self,
398        data: &[T],
399        levels: usize,
400        branching_factor: usize,
401    ) -> CoreResult<Vec<Vec<T>>> {
402        if levels == 0 || branching_factor == 0 {
403            return Err(CoreError::InvalidArgument(
404                ErrorContext::new("Invalid hierarchical parameters".to_string())
405                    .with_location(ErrorLocation::new(file!(), line!())),
406            ));
407        }
408
409        // Calculate total number of leaf partitions
410        let num_leaves = branching_factor.pow(levels as u32);
411        let chunk_size = data.len().div_ceil(num_leaves);
412
413        let mut partitions = Vec::with_capacity(num_leaves);
414        for chunk in data.chunks(chunk_size) {
415            partitions.push(chunk.to_vec());
416        }
417
418        Ok(partitions)
419    }
420
421    /// Range-based partitioning for sorted data
422    #[allow(dead_code)]
423    fn partition_rangebased(&self, data: &[T], boundaries: &[f64]) -> CoreResult<Vec<Vec<T>>>
424    where
425        T: PartialOrd + Into<f64> + Copy,
426    {
427        let mut partitions = vec![Vec::new(); boundaries.len() + 1];
428
429        for &item in data {
430            let value: f64 = item.into();
431            let mut partition_idx = boundaries.len();
432
433            for (i, &boundary) in boundaries.iter().enumerate() {
434                if value <= boundary {
435                    partition_idx = i;
436                    break;
437                }
438            }
439
440            partitions[partition_idx].push(item);
441        }
442
443        Ok(partitions)
444    }
445
446    /// Hash-based partitioning
447    #[allow(dead_code)]
448    fn partition_hashbased(&self, data: &[T], numbuckets: usize) -> CoreResult<Vec<Vec<T>>>
449    where
450        T: std::hash::Hash,
451    {
452        use std::collections::hash_map::DefaultHasher;
453        use std::hash::Hasher;
454
455        let mut partitions = vec![Vec::new(); numbuckets];
456
457        for item in data {
458            let mut hasher = DefaultHasher::new();
459            item.hash(&mut hasher);
460            let hash = hasher.finish();
461            let bucket = (hash % numbuckets as u64) as usize;
462            partitions[bucket].push(item.clone());
463        }
464
465        Ok(partitions)
466    }
467
468    /// Calculate weights for skewed distribution
469    fn calculate_skewed_weights(
470        &self,
471        skewness: f64,
472        num_partitions: usize,
473    ) -> CoreResult<Vec<f64>> {
474        let mut weights = Vec::with_capacity(num_partitions);
475
476        // Use exponential weights for skewed distributions
477        let base = 1.0 + skewness.abs() / 10.0;
478
479        for i in 0..num_partitions {
480            let weight = if skewness > 0.0 {
481                // Right skew - more weight on early partitions
482                base.powf((num_partitions - i.saturating_sub(1)) as f64)
483            } else {
484                // Left skew - more weight on later partitions
485                base.powf(i as f64)
486            };
487            weights.push(weight);
488        }
489
490        Ok(weights)
491    }
492
493    /// Calculate boundaries for Gaussian distribution
494    fn calculate_gaussian_boundaries(
495        &self,
496        mean: f64,
497        std_dev: f64,
498        num_partitions: usize,
499    ) -> CoreResult<Vec<f64>> {
500        let mut boundaries = Vec::with_capacity(num_partitions - 1);
501
502        // Use simplified quantile approximation for normal distribution
503        // For a standard normal distribution, we can use the inverse error function
504        for i in 1..num_partitions {
505            let quantile = i as f64 / num_partitions as f64;
506            // Simple approximation of the inverse normal CDF
507            // For more accuracy, would use a proper inverse normal CDF implementation
508            let z_score = if quantile == 0.5 {
509                0.0
510            } else if quantile < 0.5 {
511                // Approximation for left tail
512                -((1.0 - 2.0 * quantile).ln() * 2.0).sqrt()
513            } else {
514                // Approximation for right tail
515                ((2.0 * quantile - 1.0).ln() * 2.0).sqrt()
516            };
517            let boundary = mean + z_score * std_dev;
518            boundaries.push(boundary);
519        }
520
521        Ok(boundaries)
522    }
523
524    /// Calculate weights for power law distribution
525    fn calculate_power_law_weights(
526        &self,
527        alpha: f64,
528        num_partitions: usize,
529    ) -> CoreResult<Vec<f64>> {
530        let mut weights = Vec::with_capacity(num_partitions);
531
532        for i in 0..num_partitions {
533            // Power law: weight ∝ (i+1)^(-alpha)
534            let weight = ((i + 1) as f64).powf(-alpha);
535            weights.push(weight);
536        }
537
538        Ok(weights)
539    }
540
541    /// Calculate boundaries for bimodal distribution
542    fn calculate_bimodal_boundaries(
543        &self,
544        mean1: f64,
545        mean2: f64,
546        mix_ratio: f64,
547        num_partitions: usize,
548    ) -> CoreResult<Vec<f64>> {
549        let mut boundaries = Vec::with_capacity(num_partitions - 1);
550
551        // Split partitions between the two modes
552        let partitions_mode1 = ((num_partitions as f64) * mix_ratio) as usize;
553        let partitions_mode2 = num_partitions - partitions_mode1;
554
555        // Create boundaries around first mode
556        let range1 = (mean2 - mean1).abs() * 0.5;
557        for i in 1..partitions_mode1 {
558            let boundary = mean1 - range1 * 0.5 + (range1 / partitions_mode1 as f64) * i as f64;
559            boundaries.push(boundary);
560        }
561
562        // Boundary between modes
563        boundaries.push((mean1 + mean2) / 2.0);
564
565        // Create boundaries around second mode
566        for i in 1..partitions_mode2 {
567            let boundary = mean2 - range1 * 0.5 + (range1 / partitions_mode2 as f64) * i as f64;
568            boundaries.push(boundary);
569        }
570
571        boundaries.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
572        Ok(boundaries)
573    }
574}
575
576/// Load balancer for runtime adjustment of partitions
577pub struct LoadBalancer {
578    /// Target imbalance factor
579    target_imbalance: f64,
580    /// History of partition execution times
581    execution_times: Vec<Vec<Duration>>,
582    /// Current partition weights
583    weights: Vec<f64>,
584}
585
586impl LoadBalancer {
587    /// Create a new load balancer
588    pub fn new(num_partitions: usize, targetimbalance: f64) -> Self {
589        Self {
590            target_imbalance: targetimbalance,
591            execution_times: vec![Vec::new(); num_partitions],
592            weights: vec![1.0; num_partitions],
593        }
594    }
595
596    /// Record execution time for a partition
597    pub fn recordexecution_time(&mut self, partitionid: usize, duration: Duration) {
598        if partitionid < self.execution_times.len() {
599            self.execution_times[partitionid].push(duration);
600
601            // Keep only recent history (last 10 measurements)
602            if self.execution_times[partitionid].len() > 10 {
603                self.execution_times[partitionid].remove(0);
604            }
605        }
606    }
607
608    /// Calculate new weights based on execution history
609    pub fn rebalance(&mut self) -> Vec<f64> {
610        let mut avg_times = Vec::with_capacity(self.weights.len());
611
612        // Calculate average execution time for each partition
613        for times in &self.execution_times {
614            if times.is_empty() {
615                avg_times.push(1.0);
616            } else {
617                let sum: Duration = times.iter().sum();
618                let avg = sum.as_secs_f64() / times.len() as f64;
619                avg_times.push(avg);
620            }
621        }
622
623        // Calculate total average time
624        let total_avg: f64 = avg_times.iter().sum();
625        let mean_time = total_avg / avg_times.len() as f64;
626
627        // Adjust weights inversely proportional to execution time
628        for (i, &avg_time) in avg_times.iter().enumerate() {
629            if avg_time > mean_time * self.target_imbalance {
630                // This partition is too slow, reduce its weight
631                self.weights[i] *= 0.9;
632            } else if avg_time < mean_time / self.target_imbalance {
633                // This partition is too fast, increase its weight
634                self.weights[i] *= 1.1;
635            }
636
637            // Keep weights within reasonable bounds
638            self.weights[i] = self.weights[i].clamp(0.1, 10.0);
639        }
640
641        self.weights.clone()
642    }
643
644    /// Get current load imbalance factor
645    pub fn get_imbalance_factor(&self) -> f64 {
646        let mut min_time = f64::MAX;
647        let mut max_time = 0.0f64;
648
649        for times in &self.execution_times {
650            if !times.is_empty() {
651                let avg: f64 =
652                    times.iter().map(|d| d.as_secs_f64()).sum::<f64>() / times.len() as f64;
653                min_time = min_time.min(avg);
654                max_time = max_time.max(avg);
655            }
656        }
657
658        if min_time > 0.0 {
659            max_time / min_time
660        } else {
661            1.0
662        }
663    }
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669
670    #[test]
671    fn test_uniform_distribution_detection() {
672        let partitioner = DataPartitioner::<f64>::with_defaultconfig();
673        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
674
675        let distribution = partitioner.analyze_distribution(&data);
676        match distribution {
677            DataDistribution::Uniform | DataDistribution::Gaussian { .. } => {
678                // Uniform sequence might be detected as either
679            }
680            _ => panic!("Expected uniform or gaussian distribution"),
681        }
682    }
683
684    #[test]
685    fn test_skewed_distribution_detection() {
686        let partitioner = DataPartitioner::<f64>::with_defaultconfig();
687        // Create heavily skewed data
688        let mut data = vec![1.0; 900];
689        data.extend(vec![100.0; 100]);
690
691        let distribution = partitioner.analyze_distribution(&data);
692        match distribution {
693            DataDistribution::Skewed { skewness } => {
694                assert!(skewness > 2.0, "Expected high positive skewness");
695            }
696            _ => panic!("Expected skewed distribution"),
697        }
698    }
699
700    #[test]
701    fn test_equal_size_partitioning() {
702        let config = PartitionerConfig {
703            num_partitions: 4,
704            ..Default::default()
705        };
706        let partitioner = DataPartitioner::<i32>::new(config);
707        let data: Vec<i32> = (0..100).collect();
708
709        let partitions = partitioner
710            .partition_equal_size(&data)
711            .expect("Operation failed");
712        assert_eq!(partitions.len(), 4);
713        assert_eq!(partitions[0].len(), 25);
714        assert_eq!(partitions[3].len(), 25);
715    }
716
717    #[test]
718    fn test_weighted_partitioning() {
719        let config = PartitionerConfig {
720            num_partitions: 3,
721            ..Default::default()
722        };
723        let partitioner = DataPartitioner::<i32>::new(config);
724        let data: Vec<i32> = (0..90).collect();
725        let weights = vec![1.0, 2.0, 3.0];
726
727        let partitions = partitioner
728            .partition_weighted(&data, &weights)
729            .expect("Operation failed");
730        assert_eq!(partitions.len(), 3);
731        assert_eq!(partitions[0].len(), 15); // 1/6 of 90
732        assert_eq!(partitions[1].len(), 30); // 2/6 of 90
733        assert_eq!(partitions[2].len(), 45); // 3/6 of 90
734    }
735
736    #[test]
737    fn test_load_balancer() {
738        let mut balancer = LoadBalancer::new(3, 1.2);
739
740        // Record some execution times
741        use std::time::Duration;
742        balancer.recordexecution_time(0, Duration::from_millis(100));
743        balancer.recordexecution_time(1, Duration::from_millis(200));
744        balancer.recordexecution_time(2, Duration::from_millis(150));
745
746        let new_weights = balancer.rebalance();
747        assert_eq!(new_weights.len(), 3);
748
749        // Partition 1 was slowest, should have reduced weight
750        assert!(new_weights[1] < new_weights[0]);
751    }
752}