Skip to main content

oxigdal_distributed/
partition.rs

1//! Data partitioning strategies for distributed processing.
2//!
3//! This module provides various partitioning strategies for dividing geospatial
4//! data across multiple worker nodes for parallel processing.
5
6use crate::error::{DistributedError, Result};
7use crate::task::PartitionId;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11/// Spatial extent for a partition.
12#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
13pub struct SpatialExtent {
14    /// Minimum X coordinate.
15    pub min_x: f64,
16    /// Minimum Y coordinate.
17    pub min_y: f64,
18    /// Maximum X coordinate.
19    pub max_x: f64,
20    /// Maximum Y coordinate.
21    pub max_y: f64,
22}
23
24impl SpatialExtent {
25    /// Create a new spatial extent.
26    pub fn new(min_x: f64, min_y: f64, max_x: f64, max_y: f64) -> Result<Self> {
27        if min_x >= max_x || min_y >= max_y {
28            return Err(DistributedError::partitioning(
29                "Invalid extent: min must be less than max",
30            ));
31        }
32        Ok(Self {
33            min_x,
34            min_y,
35            max_x,
36            max_y,
37        })
38    }
39
40    /// Get the width of the extent.
41    pub fn width(&self) -> f64 {
42        self.max_x - self.min_x
43    }
44
45    /// Get the height of the extent.
46    pub fn height(&self) -> f64 {
47        self.max_y - self.min_y
48    }
49
50    /// Get the area of the extent.
51    pub fn area(&self) -> f64 {
52        self.width() * self.height()
53    }
54
55    /// Check if this extent contains a point.
56    pub fn contains(&self, x: f64, y: f64) -> bool {
57        x >= self.min_x && x <= self.max_x && y >= self.min_y && y <= self.max_y
58    }
59
60    /// Check if this extent intersects another extent.
61    pub fn intersects(&self, other: &Self) -> bool {
62        self.min_x <= other.max_x
63            && self.max_x >= other.min_x
64            && self.min_y <= other.max_y
65            && self.max_y >= other.min_y
66    }
67}
68
69/// A partition of data with associated metadata.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct Partition {
72    /// Unique partition identifier.
73    pub id: PartitionId,
74    /// Spatial extent of this partition.
75    pub extent: SpatialExtent,
76    /// Estimated size in bytes.
77    pub estimated_size: u64,
78    /// Number of features/pixels in this partition.
79    pub feature_count: Option<u64>,
80    /// Additional metadata.
81    pub metadata: HashMap<String, String>,
82}
83
84impl Partition {
85    /// Create a new partition.
86    pub fn new(id: PartitionId, extent: SpatialExtent) -> Self {
87        Self {
88            id,
89            extent,
90            estimated_size: 0,
91            feature_count: None,
92            metadata: HashMap::new(),
93        }
94    }
95
96    /// Set the estimated size.
97    pub fn with_estimated_size(mut self, size: u64) -> Self {
98        self.estimated_size = size;
99        self
100    }
101
102    /// Set the feature count.
103    pub fn with_feature_count(mut self, count: u64) -> Self {
104        self.feature_count = Some(count);
105        self
106    }
107
108    /// Add metadata.
109    pub fn with_metadata(mut self, key: String, value: String) -> Self {
110        self.metadata.insert(key, value);
111        self
112    }
113}
114
115/// Strategy for partitioning data.
116#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
117pub enum PartitionStrategy {
118    /// Partition data into regular tiles.
119    Tile,
120    /// Partition data into horizontal strips.
121    Strip,
122    /// Partition data based on hash of a key column.
123    Hash,
124    /// Partition data based on value ranges.
125    Range,
126    /// Partition data to balance load across workers.
127    LoadBalanced,
128}
129
130/// Tile-based partitioner for regular spatial grids.
131pub struct TilePartitioner {
132    /// Total spatial extent.
133    extent: SpatialExtent,
134    /// Number of tiles in X direction.
135    tiles_x: usize,
136    /// Number of tiles in Y direction.
137    tiles_y: usize,
138}
139
140impl TilePartitioner {
141    /// Create a new tile partitioner.
142    pub fn new(extent: SpatialExtent, tiles_x: usize, tiles_y: usize) -> Result<Self> {
143        if tiles_x == 0 || tiles_y == 0 {
144            return Err(DistributedError::partitioning(
145                "Number of tiles must be greater than zero",
146            ));
147        }
148        Ok(Self {
149            extent,
150            tiles_x,
151            tiles_y,
152        })
153    }
154
155    /// Generate partitions.
156    pub fn partition(&self) -> Vec<Partition> {
157        let tile_width = self.extent.width() / self.tiles_x as f64;
158        let tile_height = self.extent.height() / self.tiles_y as f64;
159
160        let mut partitions = Vec::with_capacity(self.tiles_x * self.tiles_y);
161        let mut partition_id = 0;
162
163        for y in 0..self.tiles_y {
164            for x in 0..self.tiles_x {
165                let min_x = self.extent.min_x + (x as f64 * tile_width);
166                let min_y = self.extent.min_y + (y as f64 * tile_height);
167                let max_x = min_x + tile_width;
168                let max_y = min_y + tile_height;
169
170                if let Ok(tile_extent) = SpatialExtent::new(min_x, min_y, max_x, max_y) {
171                    let partition = Partition::new(PartitionId(partition_id), tile_extent)
172                        .with_metadata("tile_x".to_string(), x.to_string())
173                        .with_metadata("tile_y".to_string(), y.to_string());
174                    partitions.push(partition);
175                    partition_id += 1;
176                }
177            }
178        }
179
180        partitions
181    }
182
183    /// Get the total number of partitions.
184    pub fn num_partitions(&self) -> usize {
185        self.tiles_x * self.tiles_y
186    }
187}
188
189/// Strip-based partitioner for horizontal bands.
190pub struct StripPartitioner {
191    /// Total spatial extent.
192    extent: SpatialExtent,
193    /// Number of strips.
194    num_strips: usize,
195}
196
197impl StripPartitioner {
198    /// Create a new strip partitioner.
199    pub fn new(extent: SpatialExtent, num_strips: usize) -> Result<Self> {
200        if num_strips == 0 {
201            return Err(DistributedError::partitioning(
202                "Number of strips must be greater than zero",
203            ));
204        }
205        Ok(Self { extent, num_strips })
206    }
207
208    /// Generate partitions.
209    pub fn partition(&self) -> Vec<Partition> {
210        let strip_height = self.extent.height() / self.num_strips as f64;
211
212        let mut partitions = Vec::with_capacity(self.num_strips);
213
214        for i in 0..self.num_strips {
215            let min_y = self.extent.min_y + (i as f64 * strip_height);
216            let max_y = min_y + strip_height;
217
218            if let Ok(strip_extent) =
219                SpatialExtent::new(self.extent.min_x, min_y, self.extent.max_x, max_y)
220            {
221                let partition = Partition::new(PartitionId(i as u64), strip_extent)
222                    .with_metadata("strip_index".to_string(), i.to_string());
223                partitions.push(partition);
224            }
225        }
226
227        partitions
228    }
229
230    /// Get the total number of partitions.
231    pub fn num_partitions(&self) -> usize {
232        self.num_strips
233    }
234}
235
236/// Hash-based partitioner for key-based distribution.
237pub struct HashPartitioner {
238    /// Number of partitions.
239    num_partitions: usize,
240}
241
242impl HashPartitioner {
243    /// Create a new hash partitioner.
244    pub fn new(num_partitions: usize) -> Result<Self> {
245        if num_partitions == 0 {
246            return Err(DistributedError::partitioning(
247                "Number of partitions must be greater than zero",
248            ));
249        }
250        Ok(Self { num_partitions })
251    }
252
253    /// Compute partition ID for a key.
254    pub fn partition_for_key(&self, key: &[u8]) -> PartitionId {
255        let hash = self.hash_key(key);
256        PartitionId(hash % self.num_partitions as u64)
257    }
258
259    /// Hash a key using a simple FNV-1a hash.
260    fn hash_key(&self, key: &[u8]) -> u64 {
261        const FNV_OFFSET: u64 = 14695981039346656037;
262        const FNV_PRIME: u64 = 1099511628211;
263
264        let mut hash = FNV_OFFSET;
265        for &byte in key {
266            hash ^= u64::from(byte);
267            hash = hash.wrapping_mul(FNV_PRIME);
268        }
269        hash
270    }
271
272    /// Get the total number of partitions.
273    pub fn num_partitions(&self) -> usize {
274        self.num_partitions
275    }
276}
277
278/// Range-based partitioner for value-based distribution.
279pub struct RangePartitioner {
280    /// Partition boundaries.
281    boundaries: Vec<f64>,
282}
283
284impl RangePartitioner {
285    /// Create a new range partitioner with specified boundaries.
286    pub fn new(boundaries: Vec<f64>) -> Result<Self> {
287        if boundaries.is_empty() {
288            return Err(DistributedError::partitioning("Boundaries cannot be empty"));
289        }
290
291        // Verify boundaries are sorted
292        for i in 1..boundaries.len() {
293            if boundaries[i] <= boundaries[i - 1] {
294                return Err(DistributedError::partitioning(
295                    "Boundaries must be sorted in ascending order",
296                ));
297            }
298        }
299
300        Ok(Self { boundaries })
301    }
302
303    /// Compute partition ID for a value.
304    pub fn partition_for_value(&self, value: f64) -> PartitionId {
305        // Binary search to find the partition
306        let mut low = 0;
307        let mut high = self.boundaries.len();
308
309        while low < high {
310            let mid = (low + high) / 2;
311            if value < self.boundaries[mid] {
312                high = mid;
313            } else {
314                low = mid + 1;
315            }
316        }
317
318        PartitionId(low as u64)
319    }
320
321    /// Get the total number of partitions.
322    pub fn num_partitions(&self) -> usize {
323        self.boundaries.len() + 1
324    }
325}
326
327/// Load-balanced partitioner that considers data size.
328pub struct LoadBalancedPartitioner {
329    /// Target size per partition in bytes.
330    target_size: u64,
331    /// Total data size.
332    total_size: u64,
333}
334
335impl LoadBalancedPartitioner {
336    /// Create a new load-balanced partitioner.
337    pub fn new(total_size: u64, num_workers: usize) -> Result<Self> {
338        if num_workers == 0 {
339            return Err(DistributedError::partitioning(
340                "Number of workers must be greater than zero",
341            ));
342        }
343
344        let target_size = total_size.div_ceil(num_workers as u64);
345
346        Ok(Self {
347            target_size,
348            total_size,
349        })
350    }
351
352    /// Get the target size per partition.
353    pub fn target_size(&self) -> u64 {
354        self.target_size
355    }
356
357    /// Estimate the number of partitions needed.
358    pub fn estimated_partitions(&self) -> usize {
359        self.total_size.div_ceil(self.target_size) as usize
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    #[test]
368    fn test_spatial_extent() -> std::result::Result<(), Box<dyn std::error::Error>> {
369        let extent = SpatialExtent::new(0.0, 0.0, 100.0, 100.0)?;
370        assert_eq!(extent.width(), 100.0);
371        assert_eq!(extent.height(), 100.0);
372        assert_eq!(extent.area(), 10000.0);
373
374        assert!(extent.contains(50.0, 50.0));
375        assert!(!extent.contains(150.0, 50.0));
376
377        let other = SpatialExtent::new(50.0, 50.0, 150.0, 150.0)?;
378        assert!(extent.intersects(&other));
379        Ok(())
380    }
381
382    #[test]
383    fn test_tile_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
384        let extent = SpatialExtent::new(0.0, 0.0, 100.0, 100.0)?;
385        let partitioner = TilePartitioner::new(extent, 2, 2)?;
386
387        let partitions = partitioner.partition();
388        assert_eq!(partitions.len(), 4);
389        assert_eq!(partitioner.num_partitions(), 4);
390
391        // Check first tile
392        let first = &partitions[0];
393        assert_eq!(first.extent.min_x, 0.0);
394        assert_eq!(first.extent.min_y, 0.0);
395        assert_eq!(first.extent.max_x, 50.0);
396        assert_eq!(first.extent.max_y, 50.0);
397        Ok(())
398    }
399
400    #[test]
401    fn test_strip_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
402        let extent = SpatialExtent::new(0.0, 0.0, 100.0, 100.0)?;
403        let partitioner = StripPartitioner::new(extent, 4)?;
404
405        let partitions = partitioner.partition();
406        assert_eq!(partitions.len(), 4);
407        assert_eq!(partitioner.num_partitions(), 4);
408
409        // Check first strip
410        let first = &partitions[0];
411        assert_eq!(first.extent.min_x, 0.0);
412        assert_eq!(first.extent.max_x, 100.0);
413        assert_eq!(first.extent.height(), 25.0);
414        Ok(())
415    }
416
417    #[test]
418    fn test_hash_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
419        let partitioner = HashPartitioner::new(4)?;
420        assert_eq!(partitioner.num_partitions(), 4);
421
422        let key1 = b"test_key_1";
423        let key2 = b"test_key_2";
424
425        let partition1 = partitioner.partition_for_key(key1);
426        let partition2 = partitioner.partition_for_key(key2);
427
428        // Same key should always go to same partition
429        assert_eq!(partition1, partitioner.partition_for_key(key1));
430        assert_eq!(partition2, partitioner.partition_for_key(key2));
431
432        // Partitions should be in valid range
433        assert!(partition1.0 < 4);
434        assert!(partition2.0 < 4);
435        Ok(())
436    }
437
438    #[test]
439    fn test_range_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
440        let boundaries = vec![10.0, 20.0, 30.0];
441        let partitioner = RangePartitioner::new(boundaries)?;
442
443        assert_eq!(partitioner.num_partitions(), 4);
444
445        assert_eq!(partitioner.partition_for_value(5.0), PartitionId(0));
446        assert_eq!(partitioner.partition_for_value(15.0), PartitionId(1));
447        assert_eq!(partitioner.partition_for_value(25.0), PartitionId(2));
448        assert_eq!(partitioner.partition_for_value(35.0), PartitionId(3));
449        Ok(())
450    }
451
452    #[test]
453    fn test_load_balanced_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
454        let total_size = 1000 * 1024 * 1024; // 1000 MB
455        let num_workers = 4;
456        let partitioner = LoadBalancedPartitioner::new(total_size, num_workers)?;
457
458        assert_eq!(partitioner.target_size(), 250 * 1024 * 1024);
459        assert_eq!(partitioner.estimated_partitions(), 4);
460        Ok(())
461    }
462}