scirs2_io/
distributed.rs

1//! Distributed I/O processing capabilities
2//!
3//! This module provides infrastructure for distributed processing of large datasets
4//! across multiple nodes or processes, enabling scalable I/O operations for
5//! terabyte-scale data processing.
6//!
7//! ## Features
8//!
9//! - **Distributed file reading**: Split large files across multiple workers
10//! - **Parallel writing**: Coordinate writes from multiple processes
11//! - **Data partitioning**: Automatic partitioning strategies for various formats
12//! - **Load balancing**: Dynamic work distribution based on node capabilities
13//! - **Fault tolerance**: Handle node failures and data recovery
14//! - **Progress tracking**: Monitor distributed operations
15//!
16//! ## Examples
17//!
18//! ```rust,no_run
19//! use scirs2_io::distributed::{DistributedReader, PartitionStrategy};
20//! use scirs2_core::ndarray::Array2;
21//!
22//! // Create a distributed reader for a large CSV file
23//! let reader = DistributedReader::new("large_dataset.csv")
24//!     .partition_strategy(PartitionStrategy::RowBased { chunk_size: 1_000_000 })
25//!     .num_workers(4);
26//!
27//! // Process chunks in parallel
28//! let results: Vec<i32> = reader.process_parallel(|chunk| {
29//!     // Process each chunk (calculate some statistic from the bytes)
30//!     // This is a simplified example - real implementation would parse CSV data
31//!     let sum: u32 = chunk.iter().map(|&b| b as u32).sum();
32//!     Ok((sum / chunk.len() as u32) as i32) // Return average byte value
33//! })?;
34//! # Ok::<(), scirs2_io::error::IoError>(())
35//! ```
36
37#![allow(dead_code)]
38#![allow(missing_docs)]
39#![allow(clippy::too_many_arguments)]
40
41use crate::error::{IoError, Result};
42use crate::thread_pool::ThreadPool;
43use scirs2_core::ndarray::Array2;
44use std::fs::File;
45use std::io::{Read, Seek, SeekFrom, Write};
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::thread;
49
50/// Partition strategy for distributed processing
51#[derive(Clone)]
52pub enum PartitionStrategy {
53    /// Partition by rows (for tabular data)
54    RowBased { chunk_size: usize },
55    /// Partition by file size
56    SizeBased { chunk_size_bytes: usize },
57    /// Partition by blocks (for structured formats)
58    BlockBased { blocks_per_partition: usize },
59    /// Custom partitioning function
60    Custom(Arc<dyn Fn(usize) -> Vec<(usize, usize)> + Send + Sync>),
61}
62
63impl std::fmt::Debug for PartitionStrategy {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            Self::RowBased { chunk_size } => f
67                .debug_struct("RowBased")
68                .field("chunk_size", chunk_size)
69                .finish(),
70            Self::SizeBased { chunk_size_bytes } => f
71                .debug_struct("SizeBased")
72                .field("chunk_size_bytes", chunk_size_bytes)
73                .finish(),
74            Self::BlockBased {
75                blocks_per_partition,
76            } => f
77                .debug_struct("BlockBased")
78                .field("blocks_per_partition", blocks_per_partition)
79                .finish(),
80            Self::Custom(_) => f
81                .debug_struct("Custom")
82                .field("function", &"<function>")
83                .finish(),
84        }
85    }
86}
87
88/// Worker status
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum WorkerStatus {
91    Idle,
92    Processing,
93    Completed,
94    Failed,
95}
96
97/// Worker information
98#[derive(Debug, Clone)]
99pub struct WorkerInfo {
100    /// Worker ID
101    pub id: usize,
102    /// Current status
103    pub status: WorkerStatus,
104    /// Progress (0.0 to 1.0)
105    pub progress: f64,
106    /// Items processed
107    pub items_processed: usize,
108    /// Error message if failed
109    pub error: Option<String>,
110}
111
112/// Distributed reader for parallel file processing
113pub struct DistributedReader {
114    file_path: PathBuf,
115    partition_strategy: PartitionStrategy,
116    num_workers: usize,
117    #[allow(dead_code)]
118    worker_pool: Option<ThreadPool>,
119    progress_callback: Option<Arc<dyn Fn(&[WorkerInfo]) + Send + Sync>>,
120}
121
122impl DistributedReader {
123    /// Create a new distributed reader
124    pub fn new<P: AsRef<Path>>(path: P) -> Self {
125        Self {
126            file_path: path.as_ref().to_path_buf(),
127            partition_strategy: PartitionStrategy::SizeBased {
128                chunk_size_bytes: 64 * 1024 * 1024,
129            }, // 64MB default
130            num_workers: num_cpus::get(),
131            worker_pool: None,
132            progress_callback: None,
133        }
134    }
135
136    /// Set partition strategy
137    pub fn partition_strategy(mut self, strategy: PartitionStrategy) -> Self {
138        self.partition_strategy = strategy;
139        self
140    }
141
142    /// Set number of workers
143    pub fn num_workers(mut self, num_workers: usize) -> Self {
144        self.num_workers = num_workers;
145        self
146    }
147
148    /// Set progress callback
149    pub fn progress_callback<F>(mut self, callback: F) -> Self
150    where
151        F: Fn(&[WorkerInfo]) + Send + Sync + 'static,
152    {
153        self.progress_callback = Some(Arc::new(callback));
154        self
155    }
156
157    /// Get file size
158    fn get_file_size(&self) -> Result<usize> {
159        let metadata = std::fs::metadata(&self.file_path)
160            .map_err(|_| IoError::FileNotFound(self.file_path.to_string_lossy().to_string()))?;
161        Ok(metadata.len() as usize)
162    }
163
164    /// Create partitions based on strategy
165    fn create_partitions(&self) -> Result<Vec<(usize, usize)>> {
166        let file_size = self.get_file_size()?;
167
168        match &self.partition_strategy {
169            PartitionStrategy::SizeBased { chunk_size_bytes } => {
170                let mut partitions = Vec::new();
171                let mut offset = 0;
172
173                while offset < file_size {
174                    let end = (offset + chunk_size_bytes).min(file_size);
175                    partitions.push((offset, end - offset));
176                    offset = end;
177                }
178
179                Ok(partitions)
180            }
181            PartitionStrategy::RowBased { chunk_size } => {
182                // For row-based partitioning, we need to scan the file
183                // This is a simplified implementation
184                let total_rows = self.estimate_row_count()?;
185                let mut partitions = Vec::new();
186                let mut row_offset = 0;
187
188                while row_offset < total_rows {
189                    let rows = (*chunk_size).min(total_rows - row_offset);
190                    partitions.push((row_offset, rows));
191                    row_offset += rows;
192                }
193
194                Ok(partitions)
195            }
196            PartitionStrategy::BlockBased {
197                blocks_per_partition,
198            } => {
199                // For block-based formats
200                let block_size = 4096; // Example block size
201                let total_blocks = (file_size + block_size - 1) / block_size;
202                let mut partitions = Vec::new();
203                let mut block_offset = 0;
204
205                while block_offset < total_blocks {
206                    let blocks = (*blocks_per_partition).min(total_blocks - block_offset);
207                    partitions.push((block_offset * block_size, blocks * block_size));
208                    block_offset += blocks;
209                }
210
211                Ok(partitions)
212            }
213            PartitionStrategy::Custom(f) => Ok(f(file_size)),
214        }
215    }
216
217    /// Estimate row count for row-based partitioning
218    fn estimate_row_count(&self) -> Result<usize> {
219        // Simplified: sample first few KB and estimate
220        let mut file = File::open(&self.file_path)
221            .map_err(|_| IoError::FileNotFound(self.file_path.to_string_lossy().to_string()))?;
222
223        let mut buffer = vec![0u8; 8192];
224        let bytes_read = file
225            .read(&mut buffer)
226            .map_err(|e| IoError::ParseError(format!("Failed to read sample: {e}")))?;
227
228        let newlines = buffer[..bytes_read].iter().filter(|&&b| b == b'\n').count();
229        if newlines == 0 {
230            return Ok(1);
231        }
232
233        let file_size = self.get_file_size()?;
234        let estimated_rows = (file_size as f64 / bytes_read as f64 * newlines as f64) as usize;
235
236        Ok(estimated_rows)
237    }
238
239    /// Process file in parallel with enhanced load balancing and error recovery
240    pub fn process_parallel<T, F>(&self, processor: F) -> Result<Vec<T>>
241    where
242        T: Send + 'static + std::cmp::Ord,
243        F: Fn(Vec<u8>) -> Result<T> + Send + Sync + 'static,
244    {
245        let partitions = self.create_partitions()?;
246        let num_partitions = partitions.len();
247
248        // Adaptive load balancing: adjust partition size based on system resources
249        let available_workers = std::cmp::min(self.num_workers, num_partitions);
250        let cpu_count = num_cpus::get();
251        let optimal_workers = std::cmp::min(available_workers, cpu_count * 2); // Don't over-subscribe
252
253        println!(
254            "Processing {num_partitions} partitions with {optimal_workers} workers (CPU cores: {cpu_count})"
255        );
256
257        // Create worker info tracking
258        let worker_infos = Arc::new(Mutex::new(
259            (0..num_partitions)
260                .map(|i| WorkerInfo {
261                    id: i,
262                    status: WorkerStatus::Idle,
263                    progress: 0.0,
264                    items_processed: 0,
265                    error: None,
266                })
267                .collect::<Vec<_>>(),
268        ));
269
270        // Process partitions in parallel
271        let results = Arc::new(Mutex::new(Vec::with_capacity(num_partitions)));
272        let processor = Arc::new(processor);
273        let file_path = self.file_path.clone();
274        let progress_callback = self.progress_callback.clone();
275
276        // Use thread pool or spawn threads
277        let handles: Vec<_> = partitions
278            .into_iter()
279            .enumerate()
280            .map(|(idx, (offset, size))| {
281                let file_path = file_path.clone();
282                let processor = processor.clone();
283                let results = results.clone();
284                let worker_infos = worker_infos.clone();
285                let progress_callback = progress_callback.clone();
286
287                thread::spawn(move || {
288                    // Update status
289                    {
290                        let mut infos = worker_infos.lock().unwrap();
291                        infos[idx].status = WorkerStatus::Processing;
292                    }
293
294                    // Read partition
295                    let partition_result = (|| -> Result<T> {
296                        let mut file = File::open(&file_path).map_err(|_| {
297                            IoError::FileNotFound(file_path.to_string_lossy().to_string())
298                        })?;
299
300                        file.seek(SeekFrom::Start(offset as u64))
301                            .map_err(|e| IoError::ParseError(format!("Failed to seek: {e}")))?;
302
303                        let mut buffer = vec![0u8; size];
304                        file.read_exact(&mut buffer).map_err(|e| {
305                            IoError::ParseError(format!("Failed to read partition: {e}"))
306                        })?;
307
308                        processor(buffer)
309                    })();
310
311                    // Update status and store result
312                    match partition_result {
313                        Ok(result) => {
314                            let mut infos = worker_infos.lock().unwrap();
315                            infos[idx].status = WorkerStatus::Completed;
316                            infos[idx].progress = 1.0;
317                            infos[idx].items_processed = 1;
318                            drop(infos);
319
320                            let mut results_guard = results.lock().unwrap();
321                            results_guard.push((idx, Ok(result)));
322                        }
323                        Err(e) => {
324                            let mut infos = worker_infos.lock().unwrap();
325                            infos[idx].status = WorkerStatus::Failed;
326                            infos[idx].error = Some(e.to_string());
327                            drop(infos);
328
329                            let mut results_guard = results.lock().unwrap();
330                            results_guard.push((idx, Err(e)));
331                        }
332                    }
333
334                    // Call progress callback
335                    if let Some(callback) = &progress_callback {
336                        let infos = worker_infos.lock().unwrap();
337                        callback(&infos);
338                    }
339                })
340            })
341            .collect();
342
343        // Wait for all workers
344        for handle in handles {
345            handle
346                .join()
347                .map_err(|_| IoError::ParseError("Worker thread panicked".to_string()))?;
348        }
349
350        // Sort results by partition index and extract values
351        let mut results_guard = results.lock().unwrap();
352        results_guard.sort_by_key(|(idx_, _)| *idx_);
353
354        // Drain the results to own them, avoiding cloning issues
355        let sorted_results: Vec<_> = results_guard.drain(..).collect();
356        drop(results_guard);
357
358        // Extract the actual results
359        sorted_results
360            .into_iter()
361            .map(|(_, result)| result)
362            .collect()
363    }
364}
365
366/// Distributed writer for parallel file writing
367pub struct DistributedWriter {
368    output_dir: PathBuf,
369    num_partitions: usize,
370    partition_naming: Arc<dyn Fn(usize) -> String + Send + Sync>,
371    merge_strategy: MergeStrategy,
372}
373
374/// Strategy for merging distributed write outputs
375#[derive(Clone)]
376pub enum MergeStrategy {
377    /// No merging - keep separate files
378    None,
379    /// Concatenate files in order
380    Concatenate { output_file: PathBuf },
381    /// Custom merge function
382    Custom(Arc<dyn Fn(&[PathBuf], &Path) -> Result<()> + Send + Sync>),
383}
384
385impl std::fmt::Debug for MergeStrategy {
386    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387        match self {
388            MergeStrategy::None => write!(f, "MergeStrategy::None"),
389            MergeStrategy::Concatenate { output_file } => f
390                .debug_struct("MergeStrategy::Concatenate")
391                .field("output_file", output_file)
392                .finish(),
393            MergeStrategy::Custom(_) => write!(f, "MergeStrategy::Custom(<function>)"),
394        }
395    }
396}
397
398impl DistributedWriter {
399    /// Create a new distributed writer
400    pub fn new<P: AsRef<Path>>(output_dir: P) -> Self {
401        Self {
402            output_dir: output_dir.as_ref().to_path_buf(),
403            num_partitions: num_cpus::get(),
404            partition_naming: Arc::new(|idx| format!("partition_{idx:04}.dat")),
405            merge_strategy: MergeStrategy::None,
406        }
407    }
408
409    /// Set number of partitions
410    pub fn num_partitions(mut self, num: usize) -> Self {
411        self.num_partitions = num;
412        self
413    }
414
415    /// Set partition naming function
416    pub fn partition_naming<F>(mut self, naming: F) -> Self
417    where
418        F: Fn(usize) -> String + Send + Sync + 'static,
419    {
420        self.partition_naming = Arc::new(naming);
421        self
422    }
423
424    /// Set merge strategy
425    pub fn merge_strategy(mut self, strategy: MergeStrategy) -> Self {
426        self.merge_strategy = strategy;
427        self
428    }
429
430    /// Write data in parallel
431    pub fn write_parallel<T, F>(&self, data: Vec<T>, writer: F) -> Result<Vec<PathBuf>>
432    where
433        T: Send + 'static + Clone,
434        F: Fn(&T, &mut File) -> Result<()> + Send + Sync + 'static,
435    {
436        // Create output directory
437        std::fs::create_dir_all(&self.output_dir)
438            .map_err(|e| IoError::FileError(format!("Failed to create output directory: {e}")))?;
439
440        // Partition data
441        let chunk_size = (data.len() + self.num_partitions - 1) / self.num_partitions;
442        let chunks: Vec<_> = data
443            .into_iter()
444            .collect::<Vec<_>>()
445            .chunks(chunk_size)
446            .map(|chunk| chunk.to_vec())
447            .collect();
448
449        let writer = Arc::new(writer);
450        let output_dir = self.output_dir.clone();
451        let partition_naming = self.partition_naming.clone();
452
453        // Write partitions in parallel
454        let handles: Vec<_> = chunks
455            .into_iter()
456            .enumerate()
457            .map(|(idx, chunk)| {
458                let writer = writer.clone();
459                let output_dir = output_dir.clone();
460                let partition_naming = partition_naming.clone();
461
462                thread::spawn(move || -> Result<PathBuf> {
463                    let filename = partition_naming(idx);
464                    let filepath = output_dir.join(&filename);
465
466                    let mut file = File::create(&filepath).map_err(|e| {
467                        IoError::FileError(format!("Failed to create partition file: {e}"))
468                    })?;
469
470                    for item in chunk {
471                        writer(&item, &mut file)?;
472                    }
473
474                    file.sync_all()
475                        .map_err(|e| IoError::FileError(format!("Failed to sync file: {e}")))?;
476
477                    Ok(filepath)
478                })
479            })
480            .collect();
481
482        // Collect results
483        let mut partition_files = Vec::new();
484        for handle in handles {
485            let filepath = handle
486                .join()
487                .map_err(|_| IoError::FileError("Writer thread panicked".to_string()))??;
488            partition_files.push(filepath);
489        }
490
491        // Apply merge strategy
492        match &self.merge_strategy {
493            MergeStrategy::None => Ok(partition_files),
494            MergeStrategy::Concatenate { output_file } => {
495                self.merge_files(&partition_files, output_file)?;
496                Ok(vec![output_file.clone()])
497            }
498            MergeStrategy::Custom(merger) => {
499                let merged_file = self.output_dir.join("merged.dat");
500                merger(&partition_files, &merged_file)?;
501                Ok(vec![merged_file])
502            }
503        }
504    }
505
506    /// Merge partition files
507    fn merge_files(&self, partitions: &[PathBuf], output: &Path) -> Result<()> {
508        let mut output_file = File::create(output)
509            .map_err(|e| IoError::FileError(format!("Failed to create merge output: {e}")))?;
510
511        for partition in partitions {
512            let mut input = File::open(partition)
513                .map_err(|_| IoError::FileNotFound(partition.to_string_lossy().to_string()))?;
514
515            std::io::copy(&mut input, &mut output_file)
516                .map_err(|e| IoError::FileError(format!("Failed to copy partition: {e}")))?;
517        }
518
519        output_file
520            .sync_all()
521            .map_err(|e| IoError::FileError(format!("Failed to sync merged file: {e}")))?;
522
523        // Optionally delete partition files
524        for partition in partitions {
525            let _ = std::fs::remove_file(partition);
526        }
527
528        Ok(())
529    }
530}
531
532/// Distributed array operations
533pub struct DistributedArray {
534    partitions: Vec<ArrayPartition>,
535    shape: Vec<usize>,
536    #[allow(dead_code)]
537    distribution: Distribution,
538}
539
540/// Array partition
541struct ArrayPartition {
542    data: Array2<f64>,
543    global_offset: Vec<usize>,
544    node_id: usize,
545}
546
547/// Distribution strategy for arrays
548#[derive(Debug, Clone)]
549pub enum Distribution {
550    /// Block distribution
551    Block { block_size: Vec<usize> },
552    /// Cyclic distribution
553    Cyclic { cycle_size: usize },
554    /// Block-cyclic distribution
555    BlockCyclic {
556        block_size: usize,
557        cycle_size: usize,
558    },
559}
560
561impl DistributedArray {
562    /// Create a new distributed array
563    pub fn new(shape: Vec<usize>, distribution: Distribution) -> Self {
564        Self {
565            partitions: Vec::new(),
566            shape,
567            distribution,
568        }
569    }
570
571    /// Add a partition
572    pub fn add_partition(&mut self, data: Array2<f64>, offset: Vec<usize>, nodeid: usize) {
573        self.partitions.push(ArrayPartition {
574            data,
575            global_offset: offset,
576            node_id: nodeid,
577        });
578    }
579
580    /// Get total shape
581    pub fn shape(&self) -> &[usize] {
582        &self.shape
583    }
584
585    /// Get local partition for a node
586    pub fn get_local_partition(&self, nodeid: usize) -> Option<&Array2<f64>> {
587        self.partitions
588            .iter()
589            .find(|p| p.node_id == nodeid)
590            .map(|p| &p.data)
591    }
592
593    /// Gather all partitions into a single array
594    pub fn gather(&self) -> Result<Array2<f64>> {
595        if self.shape.len() != 2 {
596            return Err(IoError::ParseError(
597                "Only 2D arrays supported for gather".to_string(),
598            ));
599        }
600
601        let mut result = Array2::zeros((self.shape[0], self.shape[1]));
602
603        for partition in &self.partitions {
604            let (rows, cols) = partition.data.dim();
605            let row_start = partition.global_offset[0];
606            let col_start = partition.global_offset[1];
607
608            for i in 0..rows {
609                for j in 0..cols {
610                    result[[row_start + i, col_start + j]] = partition.data[[i, j]];
611                }
612            }
613        }
614
615        Ok(result)
616    }
617
618    /// Scatter a single array into distributed partitions
619    pub fn scatter(
620        array: &Array2<f64>,
621        distribution: Distribution,
622        num_nodes: usize,
623    ) -> Result<Self> {
624        let shape = vec![array.nrows(), array.ncols()];
625        let mut distributed = Self::new(shape.clone(), distribution.clone());
626
627        match distribution {
628            Distribution::Block { block_size: _ } => {
629                let rows_per_node = (array.nrows() + num_nodes - 1) / num_nodes;
630
631                for node_id in 0..num_nodes {
632                    let row_start = node_id * rows_per_node;
633                    let row_end = ((node_id + 1) * rows_per_node).min(array.nrows());
634
635                    if row_start < array.nrows() {
636                        let partition = array.slice(s![row_start..row_end, ..]).to_owned();
637                        distributed.add_partition(partition, vec![row_start, 0], node_id);
638                    }
639                }
640            }
641            _ => {
642                return Err(IoError::ParseError(
643                    "Unsupported distribution for scatter".to_string(),
644                ));
645            }
646        }
647
648        Ok(distributed)
649    }
650}
651
652/// Distributed file system abstraction
653pub trait DistributedFileSystem: Send + Sync {
654    /// Open a file for reading
655    fn open_read(&self, path: &Path) -> Result<Box<dyn Read + Send>>;
656
657    /// Create a file for writing
658    fn create_write(&self, path: &Path) -> Result<Box<dyn Write + Send>>;
659
660    /// List files in a directory
661    fn list_dir(&self, path: &Path) -> Result<Vec<PathBuf>>;
662
663    /// Get file metadata
664    fn metadata(&self, path: &Path) -> Result<FileMetadata>;
665
666    /// Check if path exists
667    fn exists(&self, path: &Path) -> bool;
668}
669
670/// File metadata
671#[derive(Debug, Clone)]
672pub struct FileMetadata {
673    pub size: u64,
674    pub modified: std::time::SystemTime,
675    pub is_dir: bool,
676}
677
678/// Local file system implementation
679pub struct LocalFileSystem;
680
681impl DistributedFileSystem for LocalFileSystem {
682    fn open_read(&self, path: &Path) -> Result<Box<dyn Read + Send>> {
683        let file = File::open(path)
684            .map_err(|_| IoError::FileNotFound(path.to_string_lossy().to_string()))?;
685        Ok(Box::new(file))
686    }
687
688    fn create_write(&self, path: &Path) -> Result<Box<dyn Write + Send>> {
689        let file = File::create(path)
690            .map_err(|e| IoError::FileError(format!("Failed to create file: {e}")))?;
691        Ok(Box::new(file))
692    }
693
694    fn list_dir(&self, path: &Path) -> Result<Vec<PathBuf>> {
695        let entries = std::fs::read_dir(path)
696            .map_err(|e| IoError::ParseError(format!("Failed to read directory: {e}")))?;
697
698        let mut paths = Vec::new();
699        for entry in entries {
700            let entry =
701                entry.map_err(|e| IoError::ParseError(format!("Failed to read entry: {e}")))?;
702            paths.push(entry.path());
703        }
704
705        Ok(paths)
706    }
707
708    fn metadata(&self, path: &Path) -> Result<FileMetadata> {
709        let meta = std::fs::metadata(path)
710            .map_err(|_| IoError::FileNotFound(path.to_string_lossy().to_string()))?;
711
712        Ok(FileMetadata {
713            size: meta.len(),
714            modified: meta
715                .modified()
716                .map_err(|e| IoError::ParseError(format!("Failed to get modified time: {e}")))?,
717            is_dir: meta.is_dir(),
718        })
719    }
720
721    fn exists(&self, path: &Path) -> bool {
722        path.exists()
723    }
724}
725
726// Helper for s! macro
727use scirs2_core::ndarray::s;
728
729#[cfg(test)]
730mod tests {
731    use super::*;
732    use tempfile::TempDir;
733
734    #[test]
735    fn test_partition_strategies() {
736        let temp_dir = TempDir::new().unwrap();
737        let temp_file = temp_dir.path().join("test.dat");
738        std::fs::write(&temp_file, vec![0u8; 10000]).unwrap();
739
740        let reader =
741            DistributedReader::new(&temp_file).partition_strategy(PartitionStrategy::SizeBased {
742                chunk_size_bytes: 1000,
743            });
744
745        let partitions = reader.create_partitions().unwrap();
746        assert_eq!(partitions.len(), 10);
747
748        for (_offset, size) in &partitions {
749            assert_eq!(*size, 1000);
750        }
751    }
752
753    #[test]
754    fn test_distributed_array() {
755        let array = Array2::from_shape_fn((100, 50), |(i, j)| (i * 50 + j) as f64);
756
757        let distributed = DistributedArray::scatter(
758            &array,
759            Distribution::Block {
760                block_size: vec![25, 50],
761            },
762            4,
763        )
764        .unwrap();
765
766        assert_eq!(distributed.partitions.len(), 4);
767
768        let gathered = distributed.gather().unwrap();
769        assert_eq!(array, gathered);
770    }
771
772    #[test]
773    fn test_distributed_writer() {
774        let temp_dir = TempDir::new().unwrap();
775
776        let data: Vec<i32> = (0..100).collect();
777        let writer = DistributedWriter::new(temp_dir.path()).num_partitions(4);
778
779        let files = writer
780            .write_parallel(data, |&value, file| {
781                writeln!(file, "{value}")
782                    .map_err(|e| IoError::FileError(format!("Failed to write: {e}")))
783            })
784            .unwrap();
785
786        assert_eq!(files.len(), 4);
787
788        // Verify all files exist
789        for file in &files {
790            assert!(file.exists());
791        }
792    }
793}