#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(clippy::too_many_arguments)]
pub mod sharding;
pub use sharding::{
ElementType, HashSharding, RangeSharding, RoundRobinSharding, ShardConfig, ShardReader,
ShardWriter, ShardedArray, ShardedArrayMeta, ShardingStrategy, VirtualConcatenation,
};
use crate::error::{IoError, Result};
use crate::thread_pool::ThreadPool;
use scirs2_core::ndarray::Array2;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::thread;
#[derive(Clone)]
pub enum PartitionStrategy {
RowBased { chunk_size: usize },
SizeBased { chunk_size_bytes: usize },
BlockBased { blocks_per_partition: usize },
Custom(Arc<dyn Fn(usize) -> Vec<(usize, usize)> + Send + Sync>),
}
impl std::fmt::Debug for PartitionStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RowBased { chunk_size } => f
.debug_struct("RowBased")
.field("chunk_size", chunk_size)
.finish(),
Self::SizeBased { chunk_size_bytes } => f
.debug_struct("SizeBased")
.field("chunk_size_bytes", chunk_size_bytes)
.finish(),
Self::BlockBased {
blocks_per_partition,
} => f
.debug_struct("BlockBased")
.field("blocks_per_partition", blocks_per_partition)
.finish(),
Self::Custom(_) => f
.debug_struct("Custom")
.field("function", &"<function>")
.finish(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerStatus {
Idle,
Processing,
Completed,
Failed,
}
#[derive(Debug, Clone)]
pub struct WorkerInfo {
pub id: usize,
pub status: WorkerStatus,
pub progress: f64,
pub items_processed: usize,
pub error: Option<String>,
}
pub struct DistributedReader {
file_path: PathBuf,
partition_strategy: PartitionStrategy,
num_workers: usize,
#[allow(dead_code)]
worker_pool: Option<ThreadPool>,
progress_callback: Option<Arc<dyn Fn(&[WorkerInfo]) + Send + Sync>>,
}
impl DistributedReader {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
Self {
file_path: path.as_ref().to_path_buf(),
partition_strategy: PartitionStrategy::SizeBased {
chunk_size_bytes: 64 * 1024 * 1024,
}, num_workers: num_cpus::get(),
worker_pool: None,
progress_callback: None,
}
}
pub fn partition_strategy(mut self, strategy: PartitionStrategy) -> Self {
self.partition_strategy = strategy;
self
}
pub fn num_workers(mut self, num_workers: usize) -> Self {
self.num_workers = num_workers;
self
}
pub fn progress_callback<F>(mut self, callback: F) -> Self
where
F: Fn(&[WorkerInfo]) + Send + Sync + 'static,
{
self.progress_callback = Some(Arc::new(callback));
self
}
fn get_file_size(&self) -> Result<usize> {
let metadata = std::fs::metadata(&self.file_path)
.map_err(|_| IoError::FileNotFound(self.file_path.to_string_lossy().to_string()))?;
Ok(metadata.len() as usize)
}
fn create_partitions(&self) -> Result<Vec<(usize, usize)>> {
let file_size = self.get_file_size()?;
match &self.partition_strategy {
PartitionStrategy::SizeBased { chunk_size_bytes } => {
let mut partitions = Vec::new();
let mut offset = 0;
while offset < file_size {
let end = (offset + chunk_size_bytes).min(file_size);
partitions.push((offset, end - offset));
offset = end;
}
Ok(partitions)
}
PartitionStrategy::RowBased { chunk_size } => {
let total_rows = self.estimate_row_count()?;
let mut partitions = Vec::new();
let mut row_offset = 0;
while row_offset < total_rows {
let rows = (*chunk_size).min(total_rows - row_offset);
partitions.push((row_offset, rows));
row_offset += rows;
}
Ok(partitions)
}
PartitionStrategy::BlockBased {
blocks_per_partition,
} => {
let block_size = 4096; let total_blocks = (file_size + block_size - 1) / block_size;
let mut partitions = Vec::new();
let mut block_offset = 0;
while block_offset < total_blocks {
let blocks = (*blocks_per_partition).min(total_blocks - block_offset);
partitions.push((block_offset * block_size, blocks * block_size));
block_offset += blocks;
}
Ok(partitions)
}
PartitionStrategy::Custom(f) => Ok(f(file_size)),
}
}
fn estimate_row_count(&self) -> Result<usize> {
let mut file = File::open(&self.file_path)
.map_err(|_| IoError::FileNotFound(self.file_path.to_string_lossy().to_string()))?;
let mut buffer = vec![0u8; 8192];
let bytes_read = file
.read(&mut buffer)
.map_err(|e| IoError::ParseError(format!("Failed to read sample: {e}")))?;
let newlines = buffer[..bytes_read].iter().filter(|&&b| b == b'\n').count();
if newlines == 0 {
return Ok(1);
}
let file_size = self.get_file_size()?;
let estimated_rows = (file_size as f64 / bytes_read as f64 * newlines as f64) as usize;
Ok(estimated_rows)
}
pub fn process_parallel<T, F>(&self, processor: F) -> Result<Vec<T>>
where
T: Send + 'static + std::cmp::Ord,
F: Fn(Vec<u8>) -> Result<T> + Send + Sync + 'static,
{
let partitions = self.create_partitions()?;
let num_partitions = partitions.len();
let available_workers = std::cmp::min(self.num_workers, num_partitions);
let cpu_count = num_cpus::get();
let optimal_workers = std::cmp::min(available_workers, cpu_count * 2);
println!(
"Processing {num_partitions} partitions with {optimal_workers} workers (CPU cores: {cpu_count})"
);
let worker_infos = Arc::new(Mutex::new(
(0..num_partitions)
.map(|i| WorkerInfo {
id: i,
status: WorkerStatus::Idle,
progress: 0.0,
items_processed: 0,
error: None,
})
.collect::<Vec<_>>(),
));
let results = Arc::new(Mutex::new(Vec::with_capacity(num_partitions)));
let processor = Arc::new(processor);
let file_path = self.file_path.clone();
let progress_callback = self.progress_callback.clone();
let handles: Vec<_> = partitions
.into_iter()
.enumerate()
.map(|(idx, (offset, size))| {
let file_path = file_path.clone();
let processor = processor.clone();
let results = results.clone();
let worker_infos = worker_infos.clone();
let progress_callback = progress_callback.clone();
thread::spawn(move || {
{
let mut infos = worker_infos.lock().expect("Operation failed");
infos[idx].status = WorkerStatus::Processing;
}
let partition_result = (|| -> Result<T> {
let mut file = File::open(&file_path).map_err(|_| {
IoError::FileNotFound(file_path.to_string_lossy().to_string())
})?;
file.seek(SeekFrom::Start(offset as u64))
.map_err(|e| IoError::ParseError(format!("Failed to seek: {e}")))?;
let mut buffer = vec![0u8; size];
file.read_exact(&mut buffer).map_err(|e| {
IoError::ParseError(format!("Failed to read partition: {e}"))
})?;
processor(buffer)
})();
match partition_result {
Ok(result) => {
let mut infos = worker_infos.lock().expect("Operation failed");
infos[idx].status = WorkerStatus::Completed;
infos[idx].progress = 1.0;
infos[idx].items_processed = 1;
drop(infos);
let mut results_guard = results.lock().expect("Operation failed");
results_guard.push((idx, Ok(result)));
}
Err(e) => {
let mut infos = worker_infos.lock().expect("Operation failed");
infos[idx].status = WorkerStatus::Failed;
infos[idx].error = Some(e.to_string());
drop(infos);
let mut results_guard = results.lock().expect("Operation failed");
results_guard.push((idx, Err(e)));
}
}
if let Some(callback) = &progress_callback {
let infos = worker_infos.lock().expect("Operation failed");
callback(&infos);
}
})
})
.collect();
for handle in handles {
handle
.join()
.map_err(|_| IoError::ParseError("Worker thread panicked".to_string()))?;
}
let mut results_guard = results.lock().expect("Operation failed");
results_guard.sort_by_key(|(idx_, _)| *idx_);
let sorted_results: Vec<_> = results_guard.drain(..).collect();
drop(results_guard);
sorted_results
.into_iter()
.map(|(_, result)| result)
.collect()
}
}
pub struct DistributedWriter {
output_dir: PathBuf,
num_partitions: usize,
partition_naming: Arc<dyn Fn(usize) -> String + Send + Sync>,
merge_strategy: MergeStrategy,
}
#[derive(Clone)]
pub enum MergeStrategy {
None,
Concatenate { output_file: PathBuf },
Custom(Arc<dyn Fn(&[PathBuf], &Path) -> Result<()> + Send + Sync>),
}
impl std::fmt::Debug for MergeStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MergeStrategy::None => write!(f, "MergeStrategy::None"),
MergeStrategy::Concatenate { output_file } => f
.debug_struct("MergeStrategy::Concatenate")
.field("output_file", output_file)
.finish(),
MergeStrategy::Custom(_) => write!(f, "MergeStrategy::Custom(<function>)"),
}
}
}
impl DistributedWriter {
pub fn new<P: AsRef<Path>>(output_dir: P) -> Self {
Self {
output_dir: output_dir.as_ref().to_path_buf(),
num_partitions: num_cpus::get(),
partition_naming: Arc::new(|idx| format!("partition_{idx:04}.dat")),
merge_strategy: MergeStrategy::None,
}
}
pub fn num_partitions(mut self, num: usize) -> Self {
self.num_partitions = num;
self
}
pub fn partition_naming<F>(mut self, naming: F) -> Self
where
F: Fn(usize) -> String + Send + Sync + 'static,
{
self.partition_naming = Arc::new(naming);
self
}
pub fn merge_strategy(mut self, strategy: MergeStrategy) -> Self {
self.merge_strategy = strategy;
self
}
pub fn write_parallel<T, F>(&self, data: Vec<T>, writer: F) -> Result<Vec<PathBuf>>
where
T: Send + 'static + Clone,
F: Fn(&T, &mut File) -> Result<()> + Send + Sync + 'static,
{
std::fs::create_dir_all(&self.output_dir)
.map_err(|e| IoError::FileError(format!("Failed to create output directory: {e}")))?;
let chunk_size = (data.len() + self.num_partitions - 1) / self.num_partitions;
let chunks: Vec<_> = data
.into_iter()
.collect::<Vec<_>>()
.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
.collect();
let writer = Arc::new(writer);
let output_dir = self.output_dir.clone();
let partition_naming = self.partition_naming.clone();
let handles: Vec<_> = chunks
.into_iter()
.enumerate()
.map(|(idx, chunk)| {
let writer = writer.clone();
let output_dir = output_dir.clone();
let partition_naming = partition_naming.clone();
thread::spawn(move || -> Result<PathBuf> {
let filename = partition_naming(idx);
let filepath = output_dir.join(&filename);
let mut file = File::create(&filepath).map_err(|e| {
IoError::FileError(format!("Failed to create partition file: {e}"))
})?;
for item in chunk {
writer(&item, &mut file)?;
}
file.sync_all()
.map_err(|e| IoError::FileError(format!("Failed to sync file: {e}")))?;
Ok(filepath)
})
})
.collect();
let mut partition_files = Vec::new();
for handle in handles {
let filepath = handle
.join()
.map_err(|_| IoError::FileError("Writer thread panicked".to_string()))??;
partition_files.push(filepath);
}
match &self.merge_strategy {
MergeStrategy::None => Ok(partition_files),
MergeStrategy::Concatenate { output_file } => {
self.merge_files(&partition_files, output_file)?;
Ok(vec![output_file.clone()])
}
MergeStrategy::Custom(merger) => {
let merged_file = self.output_dir.join("merged.dat");
merger(&partition_files, &merged_file)?;
Ok(vec![merged_file])
}
}
}
fn merge_files(&self, partitions: &[PathBuf], output: &Path) -> Result<()> {
let mut output_file = File::create(output)
.map_err(|e| IoError::FileError(format!("Failed to create merge output: {e}")))?;
for partition in partitions {
let mut input = File::open(partition)
.map_err(|_| IoError::FileNotFound(partition.to_string_lossy().to_string()))?;
std::io::copy(&mut input, &mut output_file)
.map_err(|e| IoError::FileError(format!("Failed to copy partition: {e}")))?;
}
output_file
.sync_all()
.map_err(|e| IoError::FileError(format!("Failed to sync merged file: {e}")))?;
for partition in partitions {
let _ = std::fs::remove_file(partition);
}
Ok(())
}
}
pub struct DistributedArray {
partitions: Vec<ArrayPartition>,
shape: Vec<usize>,
#[allow(dead_code)]
distribution: Distribution,
}
struct ArrayPartition {
data: Array2<f64>,
global_offset: Vec<usize>,
node_id: usize,
}
#[derive(Debug, Clone)]
pub enum Distribution {
Block { block_size: Vec<usize> },
Cyclic { cycle_size: usize },
BlockCyclic {
block_size: usize,
cycle_size: usize,
},
}
impl DistributedArray {
pub fn new(shape: Vec<usize>, distribution: Distribution) -> Self {
Self {
partitions: Vec::new(),
shape,
distribution,
}
}
pub fn add_partition(&mut self, data: Array2<f64>, offset: Vec<usize>, nodeid: usize) {
self.partitions.push(ArrayPartition {
data,
global_offset: offset,
node_id: nodeid,
});
}
pub fn shape(&self) -> &[usize] {
&self.shape
}
pub fn get_local_partition(&self, nodeid: usize) -> Option<&Array2<f64>> {
self.partitions
.iter()
.find(|p| p.node_id == nodeid)
.map(|p| &p.data)
}
pub fn gather(&self) -> Result<Array2<f64>> {
if self.shape.len() != 2 {
return Err(IoError::ParseError(
"Only 2D arrays supported for gather".to_string(),
));
}
let mut result = Array2::zeros((self.shape[0], self.shape[1]));
for partition in &self.partitions {
let (rows, cols) = partition.data.dim();
let row_start = partition.global_offset[0];
let col_start = partition.global_offset[1];
for i in 0..rows {
for j in 0..cols {
result[[row_start + i, col_start + j]] = partition.data[[i, j]];
}
}
}
Ok(result)
}
pub fn scatter(
array: &Array2<f64>,
distribution: Distribution,
num_nodes: usize,
) -> Result<Self> {
let shape = vec![array.nrows(), array.ncols()];
let mut distributed = Self::new(shape.clone(), distribution.clone());
match distribution {
Distribution::Block { block_size: _ } => {
let rows_per_node = (array.nrows() + num_nodes - 1) / num_nodes;
for node_id in 0..num_nodes {
let row_start = node_id * rows_per_node;
let row_end = ((node_id + 1) * rows_per_node).min(array.nrows());
if row_start < array.nrows() {
let partition = array.slice(s![row_start..row_end, ..]).to_owned();
distributed.add_partition(partition, vec![row_start, 0], node_id);
}
}
}
_ => {
return Err(IoError::ParseError(
"Unsupported distribution for scatter".to_string(),
));
}
}
Ok(distributed)
}
}
pub trait DistributedFileSystem: Send + Sync {
fn open_read(&self, path: &Path) -> Result<Box<dyn Read + Send>>;
fn create_write(&self, path: &Path) -> Result<Box<dyn Write + Send>>;
fn list_dir(&self, path: &Path) -> Result<Vec<PathBuf>>;
fn metadata(&self, path: &Path) -> Result<FileMetadata>;
fn exists(&self, path: &Path) -> bool;
}
#[derive(Debug, Clone)]
pub struct FileMetadata {
pub size: u64,
pub modified: std::time::SystemTime,
pub is_dir: bool,
}
pub struct LocalFileSystem;
impl DistributedFileSystem for LocalFileSystem {
fn open_read(&self, path: &Path) -> Result<Box<dyn Read + Send>> {
let file = File::open(path)
.map_err(|_| IoError::FileNotFound(path.to_string_lossy().to_string()))?;
Ok(Box::new(file))
}
fn create_write(&self, path: &Path) -> Result<Box<dyn Write + Send>> {
let file = File::create(path)
.map_err(|e| IoError::FileError(format!("Failed to create file: {e}")))?;
Ok(Box::new(file))
}
fn list_dir(&self, path: &Path) -> Result<Vec<PathBuf>> {
let entries = std::fs::read_dir(path)
.map_err(|e| IoError::ParseError(format!("Failed to read directory: {e}")))?;
let mut paths = Vec::new();
for entry in entries {
let entry =
entry.map_err(|e| IoError::ParseError(format!("Failed to read entry: {e}")))?;
paths.push(entry.path());
}
Ok(paths)
}
fn metadata(&self, path: &Path) -> Result<FileMetadata> {
let meta = std::fs::metadata(path)
.map_err(|_| IoError::FileNotFound(path.to_string_lossy().to_string()))?;
Ok(FileMetadata {
size: meta.len(),
modified: meta
.modified()
.map_err(|e| IoError::ParseError(format!("Failed to get modified time: {e}")))?,
is_dir: meta.is_dir(),
})
}
fn exists(&self, path: &Path) -> bool {
path.exists()
}
}
use scirs2_core::ndarray::s;
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_partition_strategies() {
let temp_dir = TempDir::new().expect("Operation failed");
let temp_file = temp_dir.path().join("test.dat");
std::fs::write(&temp_file, vec![0u8; 10000]).expect("Operation failed");
let reader =
DistributedReader::new(&temp_file).partition_strategy(PartitionStrategy::SizeBased {
chunk_size_bytes: 1000,
});
let partitions = reader.create_partitions().expect("Operation failed");
assert_eq!(partitions.len(), 10);
for (_offset, size) in &partitions {
assert_eq!(*size, 1000);
}
}
#[test]
fn test_distributed_array() {
let array = Array2::from_shape_fn((100, 50), |(i, j)| (i * 50 + j) as f64);
let distributed = DistributedArray::scatter(
&array,
Distribution::Block {
block_size: vec![25, 50],
},
4,
)
.expect("Operation failed");
assert_eq!(distributed.partitions.len(), 4);
let gathered = distributed.gather().expect("Operation failed");
assert_eq!(array, gathered);
}
#[test]
fn test_distributed_writer() {
let temp_dir = TempDir::new().expect("Operation failed");
let data: Vec<i32> = (0..100).collect();
let writer = DistributedWriter::new(temp_dir.path()).num_partitions(4);
let files = writer
.write_parallel(data, |&value, file| {
writeln!(file, "{value}")
.map_err(|e| IoError::FileError(format!("Failed to write: {e}")))
})
.expect("Operation failed");
assert_eq!(files.len(), 4);
for file in &files {
assert!(file.exists());
}
}
}