1use crate::error::{DistributedError, Result};
7use crate::task::PartitionId;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
13pub struct SpatialExtent {
14 pub min_x: f64,
16 pub min_y: f64,
18 pub max_x: f64,
20 pub max_y: f64,
22}
23
24impl SpatialExtent {
25 pub fn new(min_x: f64, min_y: f64, max_x: f64, max_y: f64) -> Result<Self> {
27 if min_x >= max_x || min_y >= max_y {
28 return Err(DistributedError::partitioning(
29 "Invalid extent: min must be less than max",
30 ));
31 }
32 Ok(Self {
33 min_x,
34 min_y,
35 max_x,
36 max_y,
37 })
38 }
39
40 pub fn width(&self) -> f64 {
42 self.max_x - self.min_x
43 }
44
45 pub fn height(&self) -> f64 {
47 self.max_y - self.min_y
48 }
49
50 pub fn area(&self) -> f64 {
52 self.width() * self.height()
53 }
54
55 pub fn contains(&self, x: f64, y: f64) -> bool {
57 x >= self.min_x && x <= self.max_x && y >= self.min_y && y <= self.max_y
58 }
59
60 pub fn intersects(&self, other: &Self) -> bool {
62 self.min_x <= other.max_x
63 && self.max_x >= other.min_x
64 && self.min_y <= other.max_y
65 && self.max_y >= other.min_y
66 }
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct Partition {
72 pub id: PartitionId,
74 pub extent: SpatialExtent,
76 pub estimated_size: u64,
78 pub feature_count: Option<u64>,
80 pub metadata: HashMap<String, String>,
82}
83
84impl Partition {
85 pub fn new(id: PartitionId, extent: SpatialExtent) -> Self {
87 Self {
88 id,
89 extent,
90 estimated_size: 0,
91 feature_count: None,
92 metadata: HashMap::new(),
93 }
94 }
95
96 pub fn with_estimated_size(mut self, size: u64) -> Self {
98 self.estimated_size = size;
99 self
100 }
101
102 pub fn with_feature_count(mut self, count: u64) -> Self {
104 self.feature_count = Some(count);
105 self
106 }
107
108 pub fn with_metadata(mut self, key: String, value: String) -> Self {
110 self.metadata.insert(key, value);
111 self
112 }
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
117pub enum PartitionStrategy {
118 Tile,
120 Strip,
122 Hash,
124 Range,
126 LoadBalanced,
128}
129
130pub struct TilePartitioner {
132 extent: SpatialExtent,
134 tiles_x: usize,
136 tiles_y: usize,
138}
139
140impl TilePartitioner {
141 pub fn new(extent: SpatialExtent, tiles_x: usize, tiles_y: usize) -> Result<Self> {
143 if tiles_x == 0 || tiles_y == 0 {
144 return Err(DistributedError::partitioning(
145 "Number of tiles must be greater than zero",
146 ));
147 }
148 Ok(Self {
149 extent,
150 tiles_x,
151 tiles_y,
152 })
153 }
154
155 pub fn partition(&self) -> Vec<Partition> {
157 let tile_width = self.extent.width() / self.tiles_x as f64;
158 let tile_height = self.extent.height() / self.tiles_y as f64;
159
160 let mut partitions = Vec::with_capacity(self.tiles_x * self.tiles_y);
161 let mut partition_id = 0;
162
163 for y in 0..self.tiles_y {
164 for x in 0..self.tiles_x {
165 let min_x = self.extent.min_x + (x as f64 * tile_width);
166 let min_y = self.extent.min_y + (y as f64 * tile_height);
167 let max_x = min_x + tile_width;
168 let max_y = min_y + tile_height;
169
170 if let Ok(tile_extent) = SpatialExtent::new(min_x, min_y, max_x, max_y) {
171 let partition = Partition::new(PartitionId(partition_id), tile_extent)
172 .with_metadata("tile_x".to_string(), x.to_string())
173 .with_metadata("tile_y".to_string(), y.to_string());
174 partitions.push(partition);
175 partition_id += 1;
176 }
177 }
178 }
179
180 partitions
181 }
182
183 pub fn num_partitions(&self) -> usize {
185 self.tiles_x * self.tiles_y
186 }
187}
188
189pub struct StripPartitioner {
191 extent: SpatialExtent,
193 num_strips: usize,
195}
196
197impl StripPartitioner {
198 pub fn new(extent: SpatialExtent, num_strips: usize) -> Result<Self> {
200 if num_strips == 0 {
201 return Err(DistributedError::partitioning(
202 "Number of strips must be greater than zero",
203 ));
204 }
205 Ok(Self { extent, num_strips })
206 }
207
208 pub fn partition(&self) -> Vec<Partition> {
210 let strip_height = self.extent.height() / self.num_strips as f64;
211
212 let mut partitions = Vec::with_capacity(self.num_strips);
213
214 for i in 0..self.num_strips {
215 let min_y = self.extent.min_y + (i as f64 * strip_height);
216 let max_y = min_y + strip_height;
217
218 if let Ok(strip_extent) =
219 SpatialExtent::new(self.extent.min_x, min_y, self.extent.max_x, max_y)
220 {
221 let partition = Partition::new(PartitionId(i as u64), strip_extent)
222 .with_metadata("strip_index".to_string(), i.to_string());
223 partitions.push(partition);
224 }
225 }
226
227 partitions
228 }
229
230 pub fn num_partitions(&self) -> usize {
232 self.num_strips
233 }
234}
235
236pub struct HashPartitioner {
238 num_partitions: usize,
240}
241
242impl HashPartitioner {
243 pub fn new(num_partitions: usize) -> Result<Self> {
245 if num_partitions == 0 {
246 return Err(DistributedError::partitioning(
247 "Number of partitions must be greater than zero",
248 ));
249 }
250 Ok(Self { num_partitions })
251 }
252
253 pub fn partition_for_key(&self, key: &[u8]) -> PartitionId {
255 let hash = self.hash_key(key);
256 PartitionId(hash % self.num_partitions as u64)
257 }
258
259 fn hash_key(&self, key: &[u8]) -> u64 {
261 const FNV_OFFSET: u64 = 14695981039346656037;
262 const FNV_PRIME: u64 = 1099511628211;
263
264 let mut hash = FNV_OFFSET;
265 for &byte in key {
266 hash ^= u64::from(byte);
267 hash = hash.wrapping_mul(FNV_PRIME);
268 }
269 hash
270 }
271
272 pub fn num_partitions(&self) -> usize {
274 self.num_partitions
275 }
276}
277
278pub struct RangePartitioner {
280 boundaries: Vec<f64>,
282}
283
284impl RangePartitioner {
285 pub fn new(boundaries: Vec<f64>) -> Result<Self> {
287 if boundaries.is_empty() {
288 return Err(DistributedError::partitioning("Boundaries cannot be empty"));
289 }
290
291 for i in 1..boundaries.len() {
293 if boundaries[i] <= boundaries[i - 1] {
294 return Err(DistributedError::partitioning(
295 "Boundaries must be sorted in ascending order",
296 ));
297 }
298 }
299
300 Ok(Self { boundaries })
301 }
302
303 pub fn partition_for_value(&self, value: f64) -> PartitionId {
305 let mut low = 0;
307 let mut high = self.boundaries.len();
308
309 while low < high {
310 let mid = (low + high) / 2;
311 if value < self.boundaries[mid] {
312 high = mid;
313 } else {
314 low = mid + 1;
315 }
316 }
317
318 PartitionId(low as u64)
319 }
320
321 pub fn num_partitions(&self) -> usize {
323 self.boundaries.len() + 1
324 }
325}
326
327pub struct LoadBalancedPartitioner {
329 target_size: u64,
331 total_size: u64,
333}
334
335impl LoadBalancedPartitioner {
336 pub fn new(total_size: u64, num_workers: usize) -> Result<Self> {
338 if num_workers == 0 {
339 return Err(DistributedError::partitioning(
340 "Number of workers must be greater than zero",
341 ));
342 }
343
344 let target_size = total_size.div_ceil(num_workers as u64);
345
346 Ok(Self {
347 target_size,
348 total_size,
349 })
350 }
351
352 pub fn target_size(&self) -> u64 {
354 self.target_size
355 }
356
357 pub fn estimated_partitions(&self) -> usize {
359 self.total_size.div_ceil(self.target_size) as usize
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366
367 #[test]
368 fn test_spatial_extent() -> std::result::Result<(), Box<dyn std::error::Error>> {
369 let extent = SpatialExtent::new(0.0, 0.0, 100.0, 100.0)?;
370 assert_eq!(extent.width(), 100.0);
371 assert_eq!(extent.height(), 100.0);
372 assert_eq!(extent.area(), 10000.0);
373
374 assert!(extent.contains(50.0, 50.0));
375 assert!(!extent.contains(150.0, 50.0));
376
377 let other = SpatialExtent::new(50.0, 50.0, 150.0, 150.0)?;
378 assert!(extent.intersects(&other));
379 Ok(())
380 }
381
382 #[test]
383 fn test_tile_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
384 let extent = SpatialExtent::new(0.0, 0.0, 100.0, 100.0)?;
385 let partitioner = TilePartitioner::new(extent, 2, 2)?;
386
387 let partitions = partitioner.partition();
388 assert_eq!(partitions.len(), 4);
389 assert_eq!(partitioner.num_partitions(), 4);
390
391 let first = &partitions[0];
393 assert_eq!(first.extent.min_x, 0.0);
394 assert_eq!(first.extent.min_y, 0.0);
395 assert_eq!(first.extent.max_x, 50.0);
396 assert_eq!(first.extent.max_y, 50.0);
397 Ok(())
398 }
399
400 #[test]
401 fn test_strip_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
402 let extent = SpatialExtent::new(0.0, 0.0, 100.0, 100.0)?;
403 let partitioner = StripPartitioner::new(extent, 4)?;
404
405 let partitions = partitioner.partition();
406 assert_eq!(partitions.len(), 4);
407 assert_eq!(partitioner.num_partitions(), 4);
408
409 let first = &partitions[0];
411 assert_eq!(first.extent.min_x, 0.0);
412 assert_eq!(first.extent.max_x, 100.0);
413 assert_eq!(first.extent.height(), 25.0);
414 Ok(())
415 }
416
417 #[test]
418 fn test_hash_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
419 let partitioner = HashPartitioner::new(4)?;
420 assert_eq!(partitioner.num_partitions(), 4);
421
422 let key1 = b"test_key_1";
423 let key2 = b"test_key_2";
424
425 let partition1 = partitioner.partition_for_key(key1);
426 let partition2 = partitioner.partition_for_key(key2);
427
428 assert_eq!(partition1, partitioner.partition_for_key(key1));
430 assert_eq!(partition2, partitioner.partition_for_key(key2));
431
432 assert!(partition1.0 < 4);
434 assert!(partition2.0 < 4);
435 Ok(())
436 }
437
438 #[test]
439 fn test_range_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
440 let boundaries = vec![10.0, 20.0, 30.0];
441 let partitioner = RangePartitioner::new(boundaries)?;
442
443 assert_eq!(partitioner.num_partitions(), 4);
444
445 assert_eq!(partitioner.partition_for_value(5.0), PartitionId(0));
446 assert_eq!(partitioner.partition_for_value(15.0), PartitionId(1));
447 assert_eq!(partitioner.partition_for_value(25.0), PartitionId(2));
448 assert_eq!(partitioner.partition_for_value(35.0), PartitionId(3));
449 Ok(())
450 }
451
452 #[test]
453 fn test_load_balanced_partitioner() -> std::result::Result<(), Box<dyn std::error::Error>> {
454 let total_size = 1000 * 1024 * 1024; let num_workers = 4;
456 let partitioner = LoadBalancedPartitioner::new(total_size, num_workers)?;
457
458 assert_eq!(partitioner.target_size(), 250 * 1024 * 1024);
459 assert_eq!(partitioner.estimated_partitions(), 4);
460 Ok(())
461 }
462}