oxigdal-distributed 0.1.3

Distributed processing capabilities for OxiGDAL using Apache Arrow Flight
Documentation
# OxiGDAL Distributed

Distributed processing capabilities for large-scale geospatial workflows using Apache Arrow Flight.

## Features

- **Apache Arrow Flight RPC**: Zero-copy data transfer between nodes using gRPC
- **Multi-node Processing**: Distribute workloads across multiple worker nodes
- **Fault Tolerance**: Automatic task retry and failure recovery
- **Dynamic Scaling**: Add or remove workers at runtime
- **Progress Monitoring**: Real-time tracking of distributed execution
- **Resource Management**: Memory and CPU limits per worker
- **Data Partitioning**: Multiple strategies (spatial tiles, strips, hash, range, load-balanced)
- **Shuffle Operations**: Efficient data redistribution for group-by, sort, and joins
- **Pure Rust**: No C/C++ dependencies

## Architecture

```text
┌─────────────┐
│ Coordinator │ ──── Schedules tasks and manages workers
└──────┬──────┘
  ┌────┴────┐
  │  Flight │
  │  Server │ ──── Zero-copy data transfer
  └────┬────┘
  ┌────┴────────────────┐
  │                     │
┌─▼──────┐         ┌───▼─────┐
│ Worker │         │ Worker  │ ──── Execute tasks
│ Node 1 │         │ Node 2  │
└────────┘         └─────────┘
```

## Example: Distributed NDVI Calculation

```rust
use oxigdal_distributed::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create coordinator
    let config = CoordinatorConfig::new("localhost:50051".to_string());
    let coordinator = Coordinator::new(config);

    // Add workers
    coordinator.add_worker("worker-1".to_string(), "localhost:50052".to_string())?;
    coordinator.add_worker("worker-2".to_string(), "localhost:50053".to_string())?;

    // Partition data spatially into 4x4 tiles
    let extent = SpatialExtent::new(0.0, 0.0, 1000.0, 1000.0)?;
    let partitioner = TilePartitioner::new(extent, 4, 4)?;
    let partitions = partitioner.partition();

    // Submit tasks for each partition
    for partition in partitions {
        coordinator.submit_task(
            partition.id,
            TaskOperation::CalculateIndex {
                index_type: "NDVI".to_string(),
                bands: vec![3, 4], // Red and NIR bands
            },
        )?;
    }

    // Monitor progress
    while !coordinator.is_complete() {
        let progress = coordinator.get_progress()?;
        println!(
            "Progress: {}/{} completed ({:.1}%)",
            progress.completed_tasks,
            progress.total_tasks(),
            progress.completion_percentage()
        );
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }

    // Collect results
    let results = coordinator.collect_results()?;
    println!("Processing complete: {} results", results.len());

    Ok(())
}
```

## Example: Worker Node

```rust
use oxigdal_distributed::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure worker
    let config = WorkerConfig::new("worker-1".to_string())
        .with_max_concurrent_tasks(4)
        .with_memory_limit(8 * 1024 * 1024 * 1024) // 8 GB
        .with_num_cores(4);

    let worker = Worker::new(config);

    // Start heartbeat
    let (tx, rx) = tokio::sync::mpsc::channel(100);
    worker.start_heartbeat(tx).await?;

    // Worker would receive and execute tasks from coordinator
    // (Implementation would connect to Flight server)

    Ok(())
}
```

## Example: Data Shuffle

```rust
use oxigdal_distributed::*;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create test data
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let batch = RecordBatch::try_new(
        schema,
        vec![
            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
            Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
        ],
    )?;

    // Hash shuffle by ID column into 2 partitions
    let shuffle = HashShuffle::new("id".to_string(), 2)?;
    let partitions = shuffle.shuffle(&batch)?;

    println!("Data shuffled into {} partitions", partitions.len());
    for (partition_id, partition_batch) in partitions {
        println!(
            "Partition {:?}: {} rows",
            partition_id,
            partition_batch.num_rows()
        );
    }

    Ok(())
}
```

## Partitioning Strategies

### Tile Partitioning

Divide spatial data into regular tiles:

```rust
let extent = SpatialExtent::new(0.0, 0.0, 1000.0, 1000.0)?;
let partitioner = TilePartitioner::new(extent, 4, 4)?; // 4x4 grid
let partitions = partitioner.partition();
```

### Strip Partitioning

Divide data into horizontal strips:

```rust
let extent = SpatialExtent::new(0.0, 0.0, 1000.0, 1000.0)?;
let partitioner = StripPartitioner::new(extent, 8)?; // 8 strips
let partitions = partitioner.partition();
```

### Hash Partitioning

Distribute data based on hash of a key:

```rust
let partitioner = HashPartitioner::new(16)?; // 16 partitions
let partition_id = partitioner.partition_for_key(b"my_key");
```

### Range Partitioning

Partition based on value ranges:

```rust
let boundaries = vec![100.0, 200.0, 300.0, 400.0];
let partitioner = RangePartitioner::new(boundaries)?;
let partition_id = partitioner.partition_for_value(250.0);
```

### Load Balanced Partitioning

Balance load based on data size:

```rust
let total_size = 1000 * 1024 * 1024; // 1 GB
let num_workers = 8;
let partitioner = LoadBalancedPartitioner::new(total_size, num_workers)?;
```

## Shuffle Operations

### Hash Shuffle

Group data by key:

```rust
let shuffle = HashShuffle::new("column_name".to_string(), 4)?;
let partitions = shuffle.shuffle(&batch)?;
```

### Range Shuffle

Sort data by value ranges:

```rust
let boundaries = vec![10.0, 20.0, 30.0];
let shuffle = RangeShuffle::new("value_column".to_string(), boundaries)?;
let partitions = shuffle.shuffle(&batch)?;
```

### Broadcast Shuffle

Replicate data to all partitions:

```rust
let shuffle = BroadcastShuffle::new(num_workers)?;
let partitions = shuffle.shuffle(&batch);
```

## Performance

Benchmarks show excellent performance for large-scale operations:

- **Tile Partitioning**: 16x16 grid in <1ms
- **Hash Shuffle**: 100K rows in ~50ms
- **Range Shuffle**: 100K rows in ~60ms
- **Task Scheduling**: 10K tasks in ~100ms

Run benchmarks:

```bash
cargo bench --package oxigdal-distributed
```

## Safety

This crate follows OxiGDAL's strict safety policies:

- No `unwrap()` usage
- No `panic!()` calls
- Comprehensive error handling
- Pure Rust implementation

## License

Apache-2.0

## Authors

COOLJAPAN OU (Team Kitasan)