1use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
8use crate::parallel_ops::*;
9use std::cmp::Ordering;
10use std::marker::PhantomData;
11use std::time::Duration;
12
13#[derive(Debug, Clone, PartialEq)]
15pub enum DataDistribution {
16 Uniform,
18 Skewed {
20 skewness: f64,
22 },
23 Gaussian {
25 mean: f64,
27 std_dev: f64,
29 },
30 PowerLaw {
32 alpha: f64,
34 },
35 Bimodal {
37 mean1: f64,
39 mean2: f64,
41 mix_ratio: f64,
43 },
44 Custom {
46 name: String,
48 },
49}
50
51#[derive(Debug, Clone)]
53pub enum PartitionStrategy {
54 EqualSize,
56 Weighted {
58 weights: Vec<f64>,
60 },
61 Dynamic {
63 initial_sizes: Vec<usize>,
65 allow_stealing: bool,
67 },
68 Hierarchical {
70 levels: usize,
72 branching_factor: usize,
74 },
75 RangeBased {
77 boundaries: Vec<f64>,
79 },
80 HashBased {
82 num_buckets: usize,
84 },
85}
86
87#[derive(Debug, Clone)]
89pub struct PartitionerConfig {
90 pub num_partitions: usize,
92 pub min_partition_size: usize,
94 pub max_partition_size: usize,
96 pub enable_load_balancing: bool,
98 pub target_imbalance_factor: f64,
100 pub numa_aware: bool,
102 pub enable_work_stealing: bool,
104}
105
106impl Default for PartitionerConfig {
107 fn default() -> Self {
108 Self {
109 num_partitions: num_threads(),
110 min_partition_size: 1000,
111 max_partition_size: 0,
112 enable_load_balancing: true,
113 target_imbalance_factor: 1.1,
114 numa_aware: false,
115 enable_work_stealing: true,
116 }
117 }
118}
119
120pub struct DataPartitioner<T> {
122 config: PartitionerConfig,
123 phantom: PhantomData<T>,
124}
125
126impl<T> DataPartitioner<T>
127where
128 T: Send + Sync + Clone,
129{
130 pub fn new(config: PartitionerConfig) -> Self {
132 Self {
133 config,
134 phantom: PhantomData,
135 }
136 }
137
138 pub fn with_defaultconfig() -> Self {
140 Self::new(PartitionerConfig::default())
141 }
142
143 pub fn analyze_distribution(&self, data: &[T]) -> DataDistribution
145 where
146 T: Into<f64> + Copy,
147 {
148 if data.is_empty() {
149 return DataDistribution::Uniform;
150 }
151
152 let values: Vec<f64> = data.iter().map(|&x| x.into()).collect();
154
155 let n = values.len() as f64;
157 let mean = values.iter().copied().sum::<f64>() / n;
158
159 let variance = values
161 .iter()
162 .map(|&x| {
163 let diff = x - mean;
164 diff * diff
165 })
166 .sum::<f64>()
167 / n;
168 let std_dev = variance.sqrt();
169
170 let skewness = if std_dev > 0.0 {
172 let sum_cubed = values
173 .iter()
174 .map(|&x| {
175 let z = (x - mean) / std_dev;
176 z * z * z
177 })
178 .sum::<f64>();
179 sum_cubed / n
180 } else {
181 0.0
182 };
183
184 let kurtosis = if std_dev > 0.0 {
186 let sum_fourth = values
187 .iter()
188 .map(|&x| {
189 let z = (x - mean) / std_dev;
190 z * z * z * z
191 })
192 .sum::<f64>();
193 sum_fourth / n - 3.0
194 } else {
195 0.0
196 };
197
198 if skewness.abs() < 0.5 && kurtosis > -1.5 && kurtosis < -0.8 {
200 DataDistribution::Uniform
202 } else if skewness.abs() < 0.5 && kurtosis.abs() < 1.0 {
203 DataDistribution::Gaussian { mean, std_dev }
205 } else if skewness.abs() > 2.0 {
206 DataDistribution::Skewed { skewness }
208 } else if kurtosis < -1.5 {
209 DataDistribution::Bimodal {
212 mean1: mean - std_dev,
213 mean2: mean + std_dev,
214 mix_ratio: 0.5,
215 }
216 } else {
217 DataDistribution::Uniform
219 }
220 }
221
222 pub fn create_strategy(
224 &self,
225 distribution: &DataDistribution,
226 data_size: usize,
227 ) -> CoreResult<PartitionStrategy> {
228 let num_partitions = self.config.num_partitions;
229
230 match distribution {
231 DataDistribution::Uniform => {
232 Ok(PartitionStrategy::EqualSize)
234 }
235
236 DataDistribution::Skewed { skewness } => {
237 let weights = self.calculate_skewed_weights(*skewness, num_partitions)?;
239 Ok(PartitionStrategy::Weighted { weights })
240 }
241
242 DataDistribution::Gaussian { mean, std_dev } => {
243 let boundaries =
245 self.calculate_gaussian_boundaries(*mean, *std_dev, num_partitions)?;
246 Ok(PartitionStrategy::RangeBased { boundaries })
247 }
248
249 DataDistribution::PowerLaw { alpha } => {
250 let weights = self.calculate_power_law_weights(*alpha, num_partitions)?;
252 Ok(PartitionStrategy::Weighted { weights })
253 }
254
255 DataDistribution::Bimodal {
256 mean1,
257 mean2,
258 mix_ratio,
259 } => {
260 let boundaries =
262 self.calculate_bimodal_boundaries(*mean1, *mean2, *mix_ratio, num_partitions)?;
263 Ok(PartitionStrategy::RangeBased { boundaries })
264 }
265
266 DataDistribution::Custom { .. } => {
267 let initial_sizes = vec![data_size / num_partitions; num_partitions];
269 Ok(PartitionStrategy::Dynamic {
270 initial_sizes,
271 allow_stealing: self.config.enable_work_stealing,
272 })
273 }
274 }
275 }
276
277 pub fn partition(&self, data: &[T], strategy: &PartitionStrategy) -> CoreResult<Vec<Vec<T>>> {
279 let data_size = data.len();
280 let num_partitions = self.config.num_partitions;
281
282 if data_size < num_partitions * self.config.min_partition_size {
283 return Ok(vec![data.to_vec()]);
285 }
286
287 match strategy {
288 PartitionStrategy::EqualSize => self.partition_equal_size(data),
289
290 PartitionStrategy::Weighted { weights } => self.partition_weighted(data, weights),
291
292 PartitionStrategy::Dynamic { initial_sizes, .. } => {
293 self.partition_dynamic(data, initial_sizes)
294 }
295
296 PartitionStrategy::Hierarchical {
297 levels,
298 branching_factor,
299 } => self.partition_hierarchical(data, *levels, *branching_factor),
300
301 PartitionStrategy::RangeBased { boundaries: _ } => {
302 Err(CoreError::InvalidArgument(
305 ErrorContext::new(
306 "Range-based partitioning requires PartialOrd + Into<f64> + Copy traits"
307 .to_string(),
308 )
309 .with_location(ErrorLocation::new(file!(), line!())),
310 ))
311 }
312
313 PartitionStrategy::HashBased { num_buckets: _ } => {
314 Err(CoreError::InvalidArgument(
317 ErrorContext::new("Hash-based partitioning requires Hash trait".to_string())
318 .with_location(ErrorLocation::new(file!(), line!())),
319 ))
320 }
321 }
322 }
323
324 fn partition_equal_size(&self, data: &[T]) -> CoreResult<Vec<Vec<T>>> {
326 let chunk_size = data.len().div_ceil(self.config.num_partitions);
327 let mut partitions = Vec::with_capacity(self.config.num_partitions);
328
329 for chunk in data.chunks(chunk_size) {
330 partitions.push(chunk.to_vec());
331 }
332
333 Ok(partitions)
334 }
335
336 fn partition_weighted(&self, data: &[T], weights: &[f64]) -> CoreResult<Vec<Vec<T>>> {
338 if weights.len() != self.config.num_partitions {
339 return Err(CoreError::InvalidArgument(
340 ErrorContext::new("Weight count does not match partition count".to_string())
341 .with_location(ErrorLocation::new(file!(), line!())),
342 ));
343 }
344
345 let total_weight: f64 = weights.iter().sum();
346 if total_weight <= 0.0 {
347 return Err(CoreError::InvalidArgument(
348 ErrorContext::new("Total weight must be positive".to_string())
349 .with_location(ErrorLocation::new(file!(), line!())),
350 ));
351 }
352
353 let mut partitions = Vec::with_capacity(self.config.num_partitions);
354 let mut start = 0;
355
356 for weight in weights {
357 let size = ((weight / total_weight) * data.len() as f64) as usize;
358 let end = (start + size).min(data.len());
359
360 if start < data.len() {
361 partitions.push(data[start..end].to_vec());
362 }
363
364 start = end;
365 }
366
367 if start < data.len() && !partitions.is_empty() {
369 if let Some(last) = partitions.last_mut() {
370 last.extend_from_slice(&data[start..]);
371 }
372 }
373
374 Ok(partitions)
375 }
376
377 fn partition_dynamic(&self, data: &[T], initialsizes: &[usize]) -> CoreResult<Vec<Vec<T>>> {
379 let mut partitions = Vec::with_capacity(initialsizes.len());
382 let mut start = 0;
383
384 for &size in initialsizes {
385 let end = (start + size).min(data.len());
386 if start < data.len() {
387 partitions.push(data[start..end].to_vec());
388 }
389 start = end;
390 }
391
392 Ok(partitions)
393 }
394
395 fn partition_hierarchical(
397 &self,
398 data: &[T],
399 levels: usize,
400 branching_factor: usize,
401 ) -> CoreResult<Vec<Vec<T>>> {
402 if levels == 0 || branching_factor == 0 {
403 return Err(CoreError::InvalidArgument(
404 ErrorContext::new("Invalid hierarchical parameters".to_string())
405 .with_location(ErrorLocation::new(file!(), line!())),
406 ));
407 }
408
409 let num_leaves = branching_factor.pow(levels as u32);
411 let chunk_size = data.len().div_ceil(num_leaves);
412
413 let mut partitions = Vec::with_capacity(num_leaves);
414 for chunk in data.chunks(chunk_size) {
415 partitions.push(chunk.to_vec());
416 }
417
418 Ok(partitions)
419 }
420
421 #[allow(dead_code)]
423 fn partition_rangebased(&self, data: &[T], boundaries: &[f64]) -> CoreResult<Vec<Vec<T>>>
424 where
425 T: PartialOrd + Into<f64> + Copy,
426 {
427 let mut partitions = vec![Vec::new(); boundaries.len() + 1];
428
429 for &item in data {
430 let value: f64 = item.into();
431 let mut partition_idx = boundaries.len();
432
433 for (i, &boundary) in boundaries.iter().enumerate() {
434 if value <= boundary {
435 partition_idx = i;
436 break;
437 }
438 }
439
440 partitions[partition_idx].push(item);
441 }
442
443 Ok(partitions)
444 }
445
446 #[allow(dead_code)]
448 fn partition_hashbased(&self, data: &[T], numbuckets: usize) -> CoreResult<Vec<Vec<T>>>
449 where
450 T: std::hash::Hash,
451 {
452 use std::collections::hash_map::DefaultHasher;
453 use std::hash::Hasher;
454
455 let mut partitions = vec![Vec::new(); numbuckets];
456
457 for item in data {
458 let mut hasher = DefaultHasher::new();
459 item.hash(&mut hasher);
460 let hash = hasher.finish();
461 let bucket = (hash % numbuckets as u64) as usize;
462 partitions[bucket].push(item.clone());
463 }
464
465 Ok(partitions)
466 }
467
468 fn calculate_skewed_weights(
470 &self,
471 skewness: f64,
472 num_partitions: usize,
473 ) -> CoreResult<Vec<f64>> {
474 let mut weights = Vec::with_capacity(num_partitions);
475
476 let base = 1.0 + skewness.abs() / 10.0;
478
479 for i in 0..num_partitions {
480 let weight = if skewness > 0.0 {
481 base.powf((num_partitions - i.saturating_sub(1)) as f64)
483 } else {
484 base.powf(i as f64)
486 };
487 weights.push(weight);
488 }
489
490 Ok(weights)
491 }
492
493 fn calculate_gaussian_boundaries(
495 &self,
496 mean: f64,
497 std_dev: f64,
498 num_partitions: usize,
499 ) -> CoreResult<Vec<f64>> {
500 let mut boundaries = Vec::with_capacity(num_partitions - 1);
501
502 for i in 1..num_partitions {
505 let quantile = i as f64 / num_partitions as f64;
506 let z_score = if quantile == 0.5 {
509 0.0
510 } else if quantile < 0.5 {
511 -((1.0 - 2.0 * quantile).ln() * 2.0).sqrt()
513 } else {
514 ((2.0 * quantile - 1.0).ln() * 2.0).sqrt()
516 };
517 let boundary = mean + z_score * std_dev;
518 boundaries.push(boundary);
519 }
520
521 Ok(boundaries)
522 }
523
524 fn calculate_power_law_weights(
526 &self,
527 alpha: f64,
528 num_partitions: usize,
529 ) -> CoreResult<Vec<f64>> {
530 let mut weights = Vec::with_capacity(num_partitions);
531
532 for i in 0..num_partitions {
533 let weight = ((i + 1) as f64).powf(-alpha);
535 weights.push(weight);
536 }
537
538 Ok(weights)
539 }
540
541 fn calculate_bimodal_boundaries(
543 &self,
544 mean1: f64,
545 mean2: f64,
546 mix_ratio: f64,
547 num_partitions: usize,
548 ) -> CoreResult<Vec<f64>> {
549 let mut boundaries = Vec::with_capacity(num_partitions - 1);
550
551 let partitions_mode1 = ((num_partitions as f64) * mix_ratio) as usize;
553 let partitions_mode2 = num_partitions - partitions_mode1;
554
555 let range1 = (mean2 - mean1).abs() * 0.5;
557 for i in 1..partitions_mode1 {
558 let boundary = mean1 - range1 * 0.5 + (range1 / partitions_mode1 as f64) * i as f64;
559 boundaries.push(boundary);
560 }
561
562 boundaries.push((mean1 + mean2) / 2.0);
564
565 for i in 1..partitions_mode2 {
567 let boundary = mean2 - range1 * 0.5 + (range1 / partitions_mode2 as f64) * i as f64;
568 boundaries.push(boundary);
569 }
570
571 boundaries.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
572 Ok(boundaries)
573 }
574}
575
576pub struct LoadBalancer {
578 target_imbalance: f64,
580 execution_times: Vec<Vec<Duration>>,
582 weights: Vec<f64>,
584}
585
586impl LoadBalancer {
587 pub fn new(num_partitions: usize, targetimbalance: f64) -> Self {
589 Self {
590 target_imbalance: targetimbalance,
591 execution_times: vec![Vec::new(); num_partitions],
592 weights: vec![1.0; num_partitions],
593 }
594 }
595
596 pub fn recordexecution_time(&mut self, partitionid: usize, duration: Duration) {
598 if partitionid < self.execution_times.len() {
599 self.execution_times[partitionid].push(duration);
600
601 if self.execution_times[partitionid].len() > 10 {
603 self.execution_times[partitionid].remove(0);
604 }
605 }
606 }
607
608 pub fn rebalance(&mut self) -> Vec<f64> {
610 let mut avg_times = Vec::with_capacity(self.weights.len());
611
612 for times in &self.execution_times {
614 if times.is_empty() {
615 avg_times.push(1.0);
616 } else {
617 let sum: Duration = times.iter().sum();
618 let avg = sum.as_secs_f64() / times.len() as f64;
619 avg_times.push(avg);
620 }
621 }
622
623 let total_avg: f64 = avg_times.iter().sum();
625 let mean_time = total_avg / avg_times.len() as f64;
626
627 for (i, &avg_time) in avg_times.iter().enumerate() {
629 if avg_time > mean_time * self.target_imbalance {
630 self.weights[i] *= 0.9;
632 } else if avg_time < mean_time / self.target_imbalance {
633 self.weights[i] *= 1.1;
635 }
636
637 self.weights[i] = self.weights[i].clamp(0.1, 10.0);
639 }
640
641 self.weights.clone()
642 }
643
644 pub fn get_imbalance_factor(&self) -> f64 {
646 let mut min_time = f64::MAX;
647 let mut max_time = 0.0f64;
648
649 for times in &self.execution_times {
650 if !times.is_empty() {
651 let avg: f64 =
652 times.iter().map(|d| d.as_secs_f64()).sum::<f64>() / times.len() as f64;
653 min_time = min_time.min(avg);
654 max_time = max_time.max(avg);
655 }
656 }
657
658 if min_time > 0.0 {
659 max_time / min_time
660 } else {
661 1.0
662 }
663 }
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669
670 #[test]
671 fn test_uniform_distribution_detection() {
672 let partitioner = DataPartitioner::<f64>::with_defaultconfig();
673 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
674
675 let distribution = partitioner.analyze_distribution(&data);
676 match distribution {
677 DataDistribution::Uniform | DataDistribution::Gaussian { .. } => {
678 }
680 _ => panic!("Expected uniform or gaussian distribution"),
681 }
682 }
683
684 #[test]
685 fn test_skewed_distribution_detection() {
686 let partitioner = DataPartitioner::<f64>::with_defaultconfig();
687 let mut data = vec![1.0; 900];
689 data.extend(vec![100.0; 100]);
690
691 let distribution = partitioner.analyze_distribution(&data);
692 match distribution {
693 DataDistribution::Skewed { skewness } => {
694 assert!(skewness > 2.0, "Expected high positive skewness");
695 }
696 _ => panic!("Expected skewed distribution"),
697 }
698 }
699
700 #[test]
701 fn test_equal_size_partitioning() {
702 let config = PartitionerConfig {
703 num_partitions: 4,
704 ..Default::default()
705 };
706 let partitioner = DataPartitioner::<i32>::new(config);
707 let data: Vec<i32> = (0..100).collect();
708
709 let partitions = partitioner
710 .partition_equal_size(&data)
711 .expect("Operation failed");
712 assert_eq!(partitions.len(), 4);
713 assert_eq!(partitions[0].len(), 25);
714 assert_eq!(partitions[3].len(), 25);
715 }
716
717 #[test]
718 fn test_weighted_partitioning() {
719 let config = PartitionerConfig {
720 num_partitions: 3,
721 ..Default::default()
722 };
723 let partitioner = DataPartitioner::<i32>::new(config);
724 let data: Vec<i32> = (0..90).collect();
725 let weights = vec![1.0, 2.0, 3.0];
726
727 let partitions = partitioner
728 .partition_weighted(&data, &weights)
729 .expect("Operation failed");
730 assert_eq!(partitions.len(), 3);
731 assert_eq!(partitions[0].len(), 15); assert_eq!(partitions[1].len(), 30); assert_eq!(partitions[2].len(), 45); }
735
736 #[test]
737 fn test_load_balancer() {
738 let mut balancer = LoadBalancer::new(3, 1.2);
739
740 use std::time::Duration;
742 balancer.recordexecution_time(0, Duration::from_millis(100));
743 balancer.recordexecution_time(1, Duration::from_millis(200));
744 balancer.recordexecution_time(2, Duration::from_millis(150));
745
746 let new_weights = balancer.rebalance();
747 assert_eq!(new_weights.len(), 3);
748
749 assert!(new_weights[1] < new_weights[0]);
751 }
752}