1#![allow(dead_code)]
38#![allow(missing_docs)]
39#![allow(clippy::too_many_arguments)]
40
41use crate::error::{IoError, Result};
42use crate::thread_pool::ThreadPool;
43use scirs2_core::ndarray::Array2;
44use std::fs::File;
45use std::io::{Read, Seek, SeekFrom, Write};
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::thread;
49
50#[derive(Clone)]
52pub enum PartitionStrategy {
53 RowBased { chunk_size: usize },
55 SizeBased { chunk_size_bytes: usize },
57 BlockBased { blocks_per_partition: usize },
59 Custom(Arc<dyn Fn(usize) -> Vec<(usize, usize)> + Send + Sync>),
61}
62
63impl std::fmt::Debug for PartitionStrategy {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 Self::RowBased { chunk_size } => f
67 .debug_struct("RowBased")
68 .field("chunk_size", chunk_size)
69 .finish(),
70 Self::SizeBased { chunk_size_bytes } => f
71 .debug_struct("SizeBased")
72 .field("chunk_size_bytes", chunk_size_bytes)
73 .finish(),
74 Self::BlockBased {
75 blocks_per_partition,
76 } => f
77 .debug_struct("BlockBased")
78 .field("blocks_per_partition", blocks_per_partition)
79 .finish(),
80 Self::Custom(_) => f
81 .debug_struct("Custom")
82 .field("function", &"<function>")
83 .finish(),
84 }
85 }
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum WorkerStatus {
91 Idle,
92 Processing,
93 Completed,
94 Failed,
95}
96
97#[derive(Debug, Clone)]
99pub struct WorkerInfo {
100 pub id: usize,
102 pub status: WorkerStatus,
104 pub progress: f64,
106 pub items_processed: usize,
108 pub error: Option<String>,
110}
111
112pub struct DistributedReader {
114 file_path: PathBuf,
115 partition_strategy: PartitionStrategy,
116 num_workers: usize,
117 #[allow(dead_code)]
118 worker_pool: Option<ThreadPool>,
119 progress_callback: Option<Arc<dyn Fn(&[WorkerInfo]) + Send + Sync>>,
120}
121
122impl DistributedReader {
123 pub fn new<P: AsRef<Path>>(path: P) -> Self {
125 Self {
126 file_path: path.as_ref().to_path_buf(),
127 partition_strategy: PartitionStrategy::SizeBased {
128 chunk_size_bytes: 64 * 1024 * 1024,
129 }, num_workers: num_cpus::get(),
131 worker_pool: None,
132 progress_callback: None,
133 }
134 }
135
136 pub fn partition_strategy(mut self, strategy: PartitionStrategy) -> Self {
138 self.partition_strategy = strategy;
139 self
140 }
141
142 pub fn num_workers(mut self, num_workers: usize) -> Self {
144 self.num_workers = num_workers;
145 self
146 }
147
148 pub fn progress_callback<F>(mut self, callback: F) -> Self
150 where
151 F: Fn(&[WorkerInfo]) + Send + Sync + 'static,
152 {
153 self.progress_callback = Some(Arc::new(callback));
154 self
155 }
156
157 fn get_file_size(&self) -> Result<usize> {
159 let metadata = std::fs::metadata(&self.file_path)
160 .map_err(|_| IoError::FileNotFound(self.file_path.to_string_lossy().to_string()))?;
161 Ok(metadata.len() as usize)
162 }
163
164 fn create_partitions(&self) -> Result<Vec<(usize, usize)>> {
166 let file_size = self.get_file_size()?;
167
168 match &self.partition_strategy {
169 PartitionStrategy::SizeBased { chunk_size_bytes } => {
170 let mut partitions = Vec::new();
171 let mut offset = 0;
172
173 while offset < file_size {
174 let end = (offset + chunk_size_bytes).min(file_size);
175 partitions.push((offset, end - offset));
176 offset = end;
177 }
178
179 Ok(partitions)
180 }
181 PartitionStrategy::RowBased { chunk_size } => {
182 let total_rows = self.estimate_row_count()?;
185 let mut partitions = Vec::new();
186 let mut row_offset = 0;
187
188 while row_offset < total_rows {
189 let rows = (*chunk_size).min(total_rows - row_offset);
190 partitions.push((row_offset, rows));
191 row_offset += rows;
192 }
193
194 Ok(partitions)
195 }
196 PartitionStrategy::BlockBased {
197 blocks_per_partition,
198 } => {
199 let block_size = 4096; let total_blocks = (file_size + block_size - 1) / block_size;
202 let mut partitions = Vec::new();
203 let mut block_offset = 0;
204
205 while block_offset < total_blocks {
206 let blocks = (*blocks_per_partition).min(total_blocks - block_offset);
207 partitions.push((block_offset * block_size, blocks * block_size));
208 block_offset += blocks;
209 }
210
211 Ok(partitions)
212 }
213 PartitionStrategy::Custom(f) => Ok(f(file_size)),
214 }
215 }
216
217 fn estimate_row_count(&self) -> Result<usize> {
219 let mut file = File::open(&self.file_path)
221 .map_err(|_| IoError::FileNotFound(self.file_path.to_string_lossy().to_string()))?;
222
223 let mut buffer = vec![0u8; 8192];
224 let bytes_read = file
225 .read(&mut buffer)
226 .map_err(|e| IoError::ParseError(format!("Failed to read sample: {e}")))?;
227
228 let newlines = buffer[..bytes_read].iter().filter(|&&b| b == b'\n').count();
229 if newlines == 0 {
230 return Ok(1);
231 }
232
233 let file_size = self.get_file_size()?;
234 let estimated_rows = (file_size as f64 / bytes_read as f64 * newlines as f64) as usize;
235
236 Ok(estimated_rows)
237 }
238
239 pub fn process_parallel<T, F>(&self, processor: F) -> Result<Vec<T>>
241 where
242 T: Send + 'static + std::cmp::Ord,
243 F: Fn(Vec<u8>) -> Result<T> + Send + Sync + 'static,
244 {
245 let partitions = self.create_partitions()?;
246 let num_partitions = partitions.len();
247
248 let available_workers = std::cmp::min(self.num_workers, num_partitions);
250 let cpu_count = num_cpus::get();
251 let optimal_workers = std::cmp::min(available_workers, cpu_count * 2); println!(
254 "Processing {num_partitions} partitions with {optimal_workers} workers (CPU cores: {cpu_count})"
255 );
256
257 let worker_infos = Arc::new(Mutex::new(
259 (0..num_partitions)
260 .map(|i| WorkerInfo {
261 id: i,
262 status: WorkerStatus::Idle,
263 progress: 0.0,
264 items_processed: 0,
265 error: None,
266 })
267 .collect::<Vec<_>>(),
268 ));
269
270 let results = Arc::new(Mutex::new(Vec::with_capacity(num_partitions)));
272 let processor = Arc::new(processor);
273 let file_path = self.file_path.clone();
274 let progress_callback = self.progress_callback.clone();
275
276 let handles: Vec<_> = partitions
278 .into_iter()
279 .enumerate()
280 .map(|(idx, (offset, size))| {
281 let file_path = file_path.clone();
282 let processor = processor.clone();
283 let results = results.clone();
284 let worker_infos = worker_infos.clone();
285 let progress_callback = progress_callback.clone();
286
287 thread::spawn(move || {
288 {
290 let mut infos = worker_infos.lock().unwrap();
291 infos[idx].status = WorkerStatus::Processing;
292 }
293
294 let partition_result = (|| -> Result<T> {
296 let mut file = File::open(&file_path).map_err(|_| {
297 IoError::FileNotFound(file_path.to_string_lossy().to_string())
298 })?;
299
300 file.seek(SeekFrom::Start(offset as u64))
301 .map_err(|e| IoError::ParseError(format!("Failed to seek: {e}")))?;
302
303 let mut buffer = vec![0u8; size];
304 file.read_exact(&mut buffer).map_err(|e| {
305 IoError::ParseError(format!("Failed to read partition: {e}"))
306 })?;
307
308 processor(buffer)
309 })();
310
311 match partition_result {
313 Ok(result) => {
314 let mut infos = worker_infos.lock().unwrap();
315 infos[idx].status = WorkerStatus::Completed;
316 infos[idx].progress = 1.0;
317 infos[idx].items_processed = 1;
318 drop(infos);
319
320 let mut results_guard = results.lock().unwrap();
321 results_guard.push((idx, Ok(result)));
322 }
323 Err(e) => {
324 let mut infos = worker_infos.lock().unwrap();
325 infos[idx].status = WorkerStatus::Failed;
326 infos[idx].error = Some(e.to_string());
327 drop(infos);
328
329 let mut results_guard = results.lock().unwrap();
330 results_guard.push((idx, Err(e)));
331 }
332 }
333
334 if let Some(callback) = &progress_callback {
336 let infos = worker_infos.lock().unwrap();
337 callback(&infos);
338 }
339 })
340 })
341 .collect();
342
343 for handle in handles {
345 handle
346 .join()
347 .map_err(|_| IoError::ParseError("Worker thread panicked".to_string()))?;
348 }
349
350 let mut results_guard = results.lock().unwrap();
352 results_guard.sort_by_key(|(idx_, _)| *idx_);
353
354 let sorted_results: Vec<_> = results_guard.drain(..).collect();
356 drop(results_guard);
357
358 sorted_results
360 .into_iter()
361 .map(|(_, result)| result)
362 .collect()
363 }
364}
365
366pub struct DistributedWriter {
368 output_dir: PathBuf,
369 num_partitions: usize,
370 partition_naming: Arc<dyn Fn(usize) -> String + Send + Sync>,
371 merge_strategy: MergeStrategy,
372}
373
374#[derive(Clone)]
376pub enum MergeStrategy {
377 None,
379 Concatenate { output_file: PathBuf },
381 Custom(Arc<dyn Fn(&[PathBuf], &Path) -> Result<()> + Send + Sync>),
383}
384
385impl std::fmt::Debug for MergeStrategy {
386 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387 match self {
388 MergeStrategy::None => write!(f, "MergeStrategy::None"),
389 MergeStrategy::Concatenate { output_file } => f
390 .debug_struct("MergeStrategy::Concatenate")
391 .field("output_file", output_file)
392 .finish(),
393 MergeStrategy::Custom(_) => write!(f, "MergeStrategy::Custom(<function>)"),
394 }
395 }
396}
397
398impl DistributedWriter {
399 pub fn new<P: AsRef<Path>>(output_dir: P) -> Self {
401 Self {
402 output_dir: output_dir.as_ref().to_path_buf(),
403 num_partitions: num_cpus::get(),
404 partition_naming: Arc::new(|idx| format!("partition_{idx:04}.dat")),
405 merge_strategy: MergeStrategy::None,
406 }
407 }
408
409 pub fn num_partitions(mut self, num: usize) -> Self {
411 self.num_partitions = num;
412 self
413 }
414
415 pub fn partition_naming<F>(mut self, naming: F) -> Self
417 where
418 F: Fn(usize) -> String + Send + Sync + 'static,
419 {
420 self.partition_naming = Arc::new(naming);
421 self
422 }
423
424 pub fn merge_strategy(mut self, strategy: MergeStrategy) -> Self {
426 self.merge_strategy = strategy;
427 self
428 }
429
430 pub fn write_parallel<T, F>(&self, data: Vec<T>, writer: F) -> Result<Vec<PathBuf>>
432 where
433 T: Send + 'static + Clone,
434 F: Fn(&T, &mut File) -> Result<()> + Send + Sync + 'static,
435 {
436 std::fs::create_dir_all(&self.output_dir)
438 .map_err(|e| IoError::FileError(format!("Failed to create output directory: {e}")))?;
439
440 let chunk_size = (data.len() + self.num_partitions - 1) / self.num_partitions;
442 let chunks: Vec<_> = data
443 .into_iter()
444 .collect::<Vec<_>>()
445 .chunks(chunk_size)
446 .map(|chunk| chunk.to_vec())
447 .collect();
448
449 let writer = Arc::new(writer);
450 let output_dir = self.output_dir.clone();
451 let partition_naming = self.partition_naming.clone();
452
453 let handles: Vec<_> = chunks
455 .into_iter()
456 .enumerate()
457 .map(|(idx, chunk)| {
458 let writer = writer.clone();
459 let output_dir = output_dir.clone();
460 let partition_naming = partition_naming.clone();
461
462 thread::spawn(move || -> Result<PathBuf> {
463 let filename = partition_naming(idx);
464 let filepath = output_dir.join(&filename);
465
466 let mut file = File::create(&filepath).map_err(|e| {
467 IoError::FileError(format!("Failed to create partition file: {e}"))
468 })?;
469
470 for item in chunk {
471 writer(&item, &mut file)?;
472 }
473
474 file.sync_all()
475 .map_err(|e| IoError::FileError(format!("Failed to sync file: {e}")))?;
476
477 Ok(filepath)
478 })
479 })
480 .collect();
481
482 let mut partition_files = Vec::new();
484 for handle in handles {
485 let filepath = handle
486 .join()
487 .map_err(|_| IoError::FileError("Writer thread panicked".to_string()))??;
488 partition_files.push(filepath);
489 }
490
491 match &self.merge_strategy {
493 MergeStrategy::None => Ok(partition_files),
494 MergeStrategy::Concatenate { output_file } => {
495 self.merge_files(&partition_files, output_file)?;
496 Ok(vec![output_file.clone()])
497 }
498 MergeStrategy::Custom(merger) => {
499 let merged_file = self.output_dir.join("merged.dat");
500 merger(&partition_files, &merged_file)?;
501 Ok(vec![merged_file])
502 }
503 }
504 }
505
506 fn merge_files(&self, partitions: &[PathBuf], output: &Path) -> Result<()> {
508 let mut output_file = File::create(output)
509 .map_err(|e| IoError::FileError(format!("Failed to create merge output: {e}")))?;
510
511 for partition in partitions {
512 let mut input = File::open(partition)
513 .map_err(|_| IoError::FileNotFound(partition.to_string_lossy().to_string()))?;
514
515 std::io::copy(&mut input, &mut output_file)
516 .map_err(|e| IoError::FileError(format!("Failed to copy partition: {e}")))?;
517 }
518
519 output_file
520 .sync_all()
521 .map_err(|e| IoError::FileError(format!("Failed to sync merged file: {e}")))?;
522
523 for partition in partitions {
525 let _ = std::fs::remove_file(partition);
526 }
527
528 Ok(())
529 }
530}
531
532pub struct DistributedArray {
534 partitions: Vec<ArrayPartition>,
535 shape: Vec<usize>,
536 #[allow(dead_code)]
537 distribution: Distribution,
538}
539
540struct ArrayPartition {
542 data: Array2<f64>,
543 global_offset: Vec<usize>,
544 node_id: usize,
545}
546
547#[derive(Debug, Clone)]
549pub enum Distribution {
550 Block { block_size: Vec<usize> },
552 Cyclic { cycle_size: usize },
554 BlockCyclic {
556 block_size: usize,
557 cycle_size: usize,
558 },
559}
560
561impl DistributedArray {
562 pub fn new(shape: Vec<usize>, distribution: Distribution) -> Self {
564 Self {
565 partitions: Vec::new(),
566 shape,
567 distribution,
568 }
569 }
570
571 pub fn add_partition(&mut self, data: Array2<f64>, offset: Vec<usize>, nodeid: usize) {
573 self.partitions.push(ArrayPartition {
574 data,
575 global_offset: offset,
576 node_id: nodeid,
577 });
578 }
579
580 pub fn shape(&self) -> &[usize] {
582 &self.shape
583 }
584
585 pub fn get_local_partition(&self, nodeid: usize) -> Option<&Array2<f64>> {
587 self.partitions
588 .iter()
589 .find(|p| p.node_id == nodeid)
590 .map(|p| &p.data)
591 }
592
593 pub fn gather(&self) -> Result<Array2<f64>> {
595 if self.shape.len() != 2 {
596 return Err(IoError::ParseError(
597 "Only 2D arrays supported for gather".to_string(),
598 ));
599 }
600
601 let mut result = Array2::zeros((self.shape[0], self.shape[1]));
602
603 for partition in &self.partitions {
604 let (rows, cols) = partition.data.dim();
605 let row_start = partition.global_offset[0];
606 let col_start = partition.global_offset[1];
607
608 for i in 0..rows {
609 for j in 0..cols {
610 result[[row_start + i, col_start + j]] = partition.data[[i, j]];
611 }
612 }
613 }
614
615 Ok(result)
616 }
617
618 pub fn scatter(
620 array: &Array2<f64>,
621 distribution: Distribution,
622 num_nodes: usize,
623 ) -> Result<Self> {
624 let shape = vec![array.nrows(), array.ncols()];
625 let mut distributed = Self::new(shape.clone(), distribution.clone());
626
627 match distribution {
628 Distribution::Block { block_size: _ } => {
629 let rows_per_node = (array.nrows() + num_nodes - 1) / num_nodes;
630
631 for node_id in 0..num_nodes {
632 let row_start = node_id * rows_per_node;
633 let row_end = ((node_id + 1) * rows_per_node).min(array.nrows());
634
635 if row_start < array.nrows() {
636 let partition = array.slice(s![row_start..row_end, ..]).to_owned();
637 distributed.add_partition(partition, vec![row_start, 0], node_id);
638 }
639 }
640 }
641 _ => {
642 return Err(IoError::ParseError(
643 "Unsupported distribution for scatter".to_string(),
644 ));
645 }
646 }
647
648 Ok(distributed)
649 }
650}
651
652pub trait DistributedFileSystem: Send + Sync {
654 fn open_read(&self, path: &Path) -> Result<Box<dyn Read + Send>>;
656
657 fn create_write(&self, path: &Path) -> Result<Box<dyn Write + Send>>;
659
660 fn list_dir(&self, path: &Path) -> Result<Vec<PathBuf>>;
662
663 fn metadata(&self, path: &Path) -> Result<FileMetadata>;
665
666 fn exists(&self, path: &Path) -> bool;
668}
669
670#[derive(Debug, Clone)]
672pub struct FileMetadata {
673 pub size: u64,
674 pub modified: std::time::SystemTime,
675 pub is_dir: bool,
676}
677
678pub struct LocalFileSystem;
680
681impl DistributedFileSystem for LocalFileSystem {
682 fn open_read(&self, path: &Path) -> Result<Box<dyn Read + Send>> {
683 let file = File::open(path)
684 .map_err(|_| IoError::FileNotFound(path.to_string_lossy().to_string()))?;
685 Ok(Box::new(file))
686 }
687
688 fn create_write(&self, path: &Path) -> Result<Box<dyn Write + Send>> {
689 let file = File::create(path)
690 .map_err(|e| IoError::FileError(format!("Failed to create file: {e}")))?;
691 Ok(Box::new(file))
692 }
693
694 fn list_dir(&self, path: &Path) -> Result<Vec<PathBuf>> {
695 let entries = std::fs::read_dir(path)
696 .map_err(|e| IoError::ParseError(format!("Failed to read directory: {e}")))?;
697
698 let mut paths = Vec::new();
699 for entry in entries {
700 let entry =
701 entry.map_err(|e| IoError::ParseError(format!("Failed to read entry: {e}")))?;
702 paths.push(entry.path());
703 }
704
705 Ok(paths)
706 }
707
708 fn metadata(&self, path: &Path) -> Result<FileMetadata> {
709 let meta = std::fs::metadata(path)
710 .map_err(|_| IoError::FileNotFound(path.to_string_lossy().to_string()))?;
711
712 Ok(FileMetadata {
713 size: meta.len(),
714 modified: meta
715 .modified()
716 .map_err(|e| IoError::ParseError(format!("Failed to get modified time: {e}")))?,
717 is_dir: meta.is_dir(),
718 })
719 }
720
721 fn exists(&self, path: &Path) -> bool {
722 path.exists()
723 }
724}
725
726use scirs2_core::ndarray::s;
728
729#[cfg(test)]
730mod tests {
731 use super::*;
732 use tempfile::TempDir;
733
734 #[test]
735 fn test_partition_strategies() {
736 let temp_dir = TempDir::new().unwrap();
737 let temp_file = temp_dir.path().join("test.dat");
738 std::fs::write(&temp_file, vec![0u8; 10000]).unwrap();
739
740 let reader =
741 DistributedReader::new(&temp_file).partition_strategy(PartitionStrategy::SizeBased {
742 chunk_size_bytes: 1000,
743 });
744
745 let partitions = reader.create_partitions().unwrap();
746 assert_eq!(partitions.len(), 10);
747
748 for (_offset, size) in &partitions {
749 assert_eq!(*size, 1000);
750 }
751 }
752
753 #[test]
754 fn test_distributed_array() {
755 let array = Array2::from_shape_fn((100, 50), |(i, j)| (i * 50 + j) as f64);
756
757 let distributed = DistributedArray::scatter(
758 &array,
759 Distribution::Block {
760 block_size: vec![25, 50],
761 },
762 4,
763 )
764 .unwrap();
765
766 assert_eq!(distributed.partitions.len(), 4);
767
768 let gathered = distributed.gather().unwrap();
769 assert_eq!(array, gathered);
770 }
771
772 #[test]
773 fn test_distributed_writer() {
774 let temp_dir = TempDir::new().unwrap();
775
776 let data: Vec<i32> = (0..100).collect();
777 let writer = DistributedWriter::new(temp_dir.path()).num_partitions(4);
778
779 let files = writer
780 .write_parallel(data, |&value, file| {
781 writeln!(file, "{value}")
782 .map_err(|e| IoError::FileError(format!("Failed to write: {e}")))
783 })
784 .unwrap();
785
786 assert_eq!(files.len(), 4);
787
788 for file in &files {
790 assert!(file.exists());
791 }
792 }
793}