Expand description
OxiGDAL Distributed Processing
This crate provides distributed processing capabilities for large-scale geospatial workflows using Apache Arrow Flight for zero-copy data transfer.
§Features
- Arrow Flight RPC: Zero-copy data transfer between nodes
- Worker Nodes: Execute processing tasks with resource management
- Coordinator: Schedule and manage distributed execution
- Data Partitioning: Spatial, hash, range, and load-balanced partitioning
- Shuffle Operations: Efficient data redistribution for group-by and joins
- Fault Tolerance: Automatic retry and failure recovery
- Progress Monitoring: Real-time tracking of distributed execution
§Architecture
┌─────────────┐
│ Coordinator │ ──── Schedules tasks
└──────┬──────┘
│
┌────┴────┐
│ Flight │
│ Server │
└────┬────┘
│
┌────┴────────────────┐
│ │
┌─▼──────┐ ┌───▼─────┐
│ Worker │ │ Worker │
│ Node 1 │ │ Node 2 │
└────────┘ └─────────┘§Example: Distributed NDVI Calculation
use oxigdal_distributed::*;
// Create coordinator
let config = CoordinatorConfig::new("localhost:50051".to_string());
let coordinator = Coordinator::new(config);
// Add workers
coordinator.add_worker("worker-1".to_string(), "localhost:50052".to_string())?;
coordinator.add_worker("worker-2".to_string(), "localhost:50053".to_string())?;
// Partition data spatially
let extent = SpatialExtent::new(0.0, 0.0, 1000.0, 1000.0)?;
let partitioner = TilePartitioner::new(extent, 4, 4)?;
let partitions = partitioner.partition();
// Submit tasks for each partition
for partition in partitions {
coordinator.submit_task(
partition.id,
TaskOperation::CalculateIndex {
index_type: "NDVI".to_string(),
bands: vec![3, 4], // Red and NIR
},
)?;
}
// Monitor progress
while !coordinator.is_complete() {
let progress = coordinator.get_progress()?;
println!(
"Progress: {}/{} completed",
progress.completed_tasks,
progress.total_tasks()
);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
// Collect results
let results = coordinator.collect_results()?;
println!("Processing complete: {} results", results.len());§Example: Custom Processing with Workers
use oxigdal_distributed::*;
// Create worker
let config = WorkerConfig::new("worker-1".to_string())
.with_max_concurrent_tasks(4)
.with_memory_limit(8 * 1024 * 1024 * 1024); // 8 GB
let worker = Worker::new(config);
// Execute tasks
// (Tasks would be received from coordinator in real implementation)§Example: Data Shuffle
use oxigdal_distributed::*;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
// Create test data
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
],
)?;
// Hash shuffle by ID column
let shuffle = HashShuffle::new("id".to_string(), 2)?;
let partitions = shuffle.shuffle(&batch)?;
println!("Data shuffled into {} partitions", partitions.len());Re-exports§
pub use coordinator::Coordinator;pub use coordinator::CoordinatorConfig;pub use coordinator::CoordinatorProgress;pub use coordinator::WorkerInfo;pub use error::DistributedError;pub use error::Result;pub use flight::FlightClient;pub use flight::FlightServer;pub use partition::HashPartitioner;pub use partition::LoadBalancedPartitioner;pub use partition::Partition;pub use partition::PartitionStrategy;pub use partition::RangePartitioner;pub use partition::SpatialExtent;pub use partition::StripPartitioner;pub use partition::TilePartitioner;pub use shuffle::BroadcastShuffle;pub use shuffle::HashShuffle;pub use shuffle::RangeShuffle;pub use shuffle::ShuffleConfig;pub use shuffle::ShuffleKey;pub use shuffle::ShuffleResult;pub use shuffle::ShuffleStats;pub use shuffle::ShuffleType;pub use task::PartitionId;pub use task::Task;pub use task::TaskContext;pub use task::TaskId;pub use task::TaskOperation;pub use task::TaskResult;pub use task::TaskScheduler;pub use task::TaskStatus;pub use worker::Worker;pub use worker::WorkerConfig;pub use worker::WorkerHealthCheck;pub use worker::WorkerMetrics;pub use worker::WorkerStatus;
Modules§
- coordinator
- Coordinator for managing distributed task execution.
- error
- Error types for distributed processing operations.
- flight
- Arrow Flight RPC server and client implementations.
- partition
- Data partitioning strategies for distributed processing.
- shuffle
- Data shuffle operations for distributed processing.
- task
- Task definitions and management for distributed processing.
- worker
- Worker node implementation for distributed processing.