Skip to main content

Crate oxigdal_distributed

Crate oxigdal_distributed 

Source
Expand description

OxiGDAL Distributed Processing

This crate provides distributed processing capabilities for large-scale geospatial workflows using Apache Arrow Flight for zero-copy data transfer.

§Features

  • Arrow Flight RPC: Zero-copy data transfer between nodes
  • Worker Nodes: Execute processing tasks with resource management
  • Coordinator: Schedule and manage distributed execution
  • Data Partitioning: Spatial, hash, range, and load-balanced partitioning
  • Shuffle Operations: Efficient data redistribution for group-by and joins
  • Fault Tolerance: Automatic retry and failure recovery
  • Progress Monitoring: Real-time tracking of distributed execution

§Architecture

┌─────────────┐
│ Coordinator │ ──── Schedules tasks
└──────┬──────┘
       │
  ┌────┴────┐
  │  Flight │
  │  Server │
  └────┬────┘
       │
  ┌────┴────────────────┐
  │                     │
┌─▼──────┐         ┌───▼─────┐
│ Worker │         │ Worker  │
│ Node 1 │         │ Node 2  │
└────────┘         └─────────┘

§Example: Distributed NDVI Calculation

use oxigdal_distributed::*;

// Create coordinator
let config = CoordinatorConfig::new("localhost:50051".to_string());
let coordinator = Coordinator::new(config);

// Add workers
coordinator.add_worker("worker-1".to_string(), "localhost:50052".to_string())?;
coordinator.add_worker("worker-2".to_string(), "localhost:50053".to_string())?;

// Partition data spatially
let extent = SpatialExtent::new(0.0, 0.0, 1000.0, 1000.0)?;
let partitioner = TilePartitioner::new(extent, 4, 4)?;
let partitions = partitioner.partition();

// Submit tasks for each partition
for partition in partitions {
    coordinator.submit_task(
        partition.id,
        TaskOperation::CalculateIndex {
            index_type: "NDVI".to_string(),
            bands: vec![3, 4], // Red and NIR
        },
    )?;
}

// Monitor progress
while !coordinator.is_complete() {
    let progress = coordinator.get_progress()?;
    println!(
        "Progress: {}/{} completed",
        progress.completed_tasks,
        progress.total_tasks()
    );
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}

// Collect results
let results = coordinator.collect_results()?;
println!("Processing complete: {} results", results.len());

§Example: Custom Processing with Workers

use oxigdal_distributed::*;

// Create worker
let config = WorkerConfig::new("worker-1".to_string())
    .with_max_concurrent_tasks(4)
    .with_memory_limit(8 * 1024 * 1024 * 1024); // 8 GB

let worker = Worker::new(config);

// Execute tasks
// (Tasks would be received from coordinator in real implementation)

§Example: Data Shuffle

use oxigdal_distributed::*;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

// Create test data
let schema = Arc::new(Schema::new(vec![
    Field::new("id", DataType::Int32, false),
    Field::new("name", DataType::Utf8, false),
]));

let batch = RecordBatch::try_new(
    schema,
    vec![
        Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
        Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
    ],
)?;

// Hash shuffle by ID column
let shuffle = HashShuffle::new("id".to_string(), 2)?;
let partitions = shuffle.shuffle(&batch)?;

println!("Data shuffled into {} partitions", partitions.len());

Re-exports§

pub use coordinator::Coordinator;
pub use coordinator::CoordinatorConfig;
pub use coordinator::CoordinatorProgress;
pub use coordinator::WorkerInfo;
pub use error::DistributedError;
pub use error::Result;
pub use flight::FlightClient;
pub use flight::FlightServer;
pub use partition::HashPartitioner;
pub use partition::LoadBalancedPartitioner;
pub use partition::Partition;
pub use partition::PartitionStrategy;
pub use partition::RangePartitioner;
pub use partition::SpatialExtent;
pub use partition::StripPartitioner;
pub use partition::TilePartitioner;
pub use shuffle::BroadcastShuffle;
pub use shuffle::HashShuffle;
pub use shuffle::RangeShuffle;
pub use shuffle::ShuffleConfig;
pub use shuffle::ShuffleKey;
pub use shuffle::ShuffleResult;
pub use shuffle::ShuffleStats;
pub use shuffle::ShuffleType;
pub use task::PartitionId;
pub use task::Task;
pub use task::TaskContext;
pub use task::TaskId;
pub use task::TaskOperation;
pub use task::TaskResult;
pub use task::TaskScheduler;
pub use task::TaskStatus;
pub use worker::Worker;
pub use worker::WorkerConfig;
pub use worker::WorkerHealthCheck;
pub use worker::WorkerMetrics;
pub use worker::WorkerStatus;

Modules§

coordinator
Coordinator for managing distributed task execution.
error
Error types for distributed processing operations.
flight
Arrow Flight RPC server and client implementations.
partition
Data partitioning strategies for distributed processing.
shuffle
Data shuffle operations for distributed processing.
task
Task definitions and management for distributed processing.
worker
Worker node implementation for distributed processing.