Skip to main content

oxigdal_distributed/
lib.rs

1//! OxiGDAL Distributed Processing
2//!
3//! This crate provides distributed processing capabilities for large-scale geospatial
4//! workflows using Apache Arrow Flight for zero-copy data transfer.
5//!
6//! # Features
7//!
8//! - **Arrow Flight RPC**: Zero-copy data transfer between nodes
9//! - **Worker Nodes**: Execute processing tasks with resource management
10//! - **Coordinator**: Schedule and manage distributed execution
11//! - **Data Partitioning**: Spatial, hash, range, and load-balanced partitioning
12//! - **Shuffle Operations**: Efficient data redistribution for group-by and joins
13//! - **Fault Tolerance**: Automatic retry and failure recovery
14//! - **Progress Monitoring**: Real-time tracking of distributed execution
15//!
16//! # Architecture
17//!
18//! ```text
19//! ┌─────────────┐
20//! │ Coordinator │ ──── Schedules tasks
21//! └──────┬──────┘
22//!        │
23//!   ┌────┴────┐
24//!   │  Flight │
25//!   │  Server │
26//!   └────┬────┘
27//!        │
28//!   ┌────┴────────────────┐
29//!   │                     │
30//! ┌─▼──────┐         ┌───▼─────┐
31//! │ Worker │         │ Worker  │
32//! │ Node 1 │         │ Node 2  │
33//! └────────┘         └─────────┘
34//! ```
35//!
36//! # Example: Distributed NDVI Calculation
37//!
38//! ```rust,no_run
39//! use oxigdal_distributed::*;
40//! # async fn example() -> std::result::Result<(), Box<dyn std::error::Error>> {
41//!
42//! // Create coordinator
43//! let config = CoordinatorConfig::new("localhost:50051".to_string());
44//! let coordinator = Coordinator::new(config);
45//!
46//! // Add workers
47//! coordinator.add_worker("worker-1".to_string(), "localhost:50052".to_string())?;
48//! coordinator.add_worker("worker-2".to_string(), "localhost:50053".to_string())?;
49//!
50//! // Partition data spatially
51//! let extent = SpatialExtent::new(0.0, 0.0, 1000.0, 1000.0)?;
52//! let partitioner = TilePartitioner::new(extent, 4, 4)?;
53//! let partitions = partitioner.partition();
54//!
55//! // Submit tasks for each partition
56//! for partition in partitions {
57//!     coordinator.submit_task(
58//!         partition.id,
59//!         TaskOperation::CalculateIndex {
60//!             index_type: "NDVI".to_string(),
61//!             bands: vec![3, 4], // Red and NIR
62//!         },
63//!     )?;
64//! }
65//!
66//! // Monitor progress
67//! while !coordinator.is_complete() {
68//!     let progress = coordinator.get_progress()?;
69//!     println!(
70//!         "Progress: {}/{} completed",
71//!         progress.completed_tasks,
72//!         progress.total_tasks()
73//!     );
74//!     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
75//! }
76//!
77//! // Collect results
78//! let results = coordinator.collect_results()?;
79//! println!("Processing complete: {} results", results.len());
80//! # Ok(())
81//! # }
82//! ```
83//!
84//! # Example: Custom Processing with Workers
85//!
86//! ```rust,no_run
87//! use oxigdal_distributed::*;
88//! # async fn example() -> std::result::Result<(), Box<dyn std::error::Error>> {
89//!
90//! // Create worker
91//! let config = WorkerConfig::new("worker-1".to_string())
92//!     .with_max_concurrent_tasks(4)
93//!     .with_memory_limit(8 * 1024 * 1024 * 1024); // 8 GB
94//!
95//! let worker = Worker::new(config);
96//!
97//! // Execute tasks
98//! // (Tasks would be received from coordinator in real implementation)
99//! # Ok(())
100//! # }
101//! ```
102//!
103//! # Example: Data Shuffle
104//!
105//! ```rust,no_run
106//! use oxigdal_distributed::*;
107//! use arrow::array::{Int32Array, StringArray};
108//! use arrow::datatypes::{DataType, Field, Schema};
109//! use arrow::record_batch::RecordBatch;
110//! use std::sync::Arc;
111//!
112//! # fn example() -> std::result::Result<(), Box<dyn std::error::Error>> {
113//! // Create test data
114//! let schema = Arc::new(Schema::new(vec![
115//!     Field::new("id", DataType::Int32, false),
116//!     Field::new("name", DataType::Utf8, false),
117//! ]));
118//!
119//! let batch = RecordBatch::try_new(
120//!     schema,
121//!     vec![
122//!         Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
123//!         Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
124//!     ],
125//! )?;
126//!
127//! // Hash shuffle by ID column
128//! let shuffle = HashShuffle::new("id".to_string(), 2)?;
129//! let partitions = shuffle.shuffle(&batch)?;
130//!
131//! println!("Data shuffled into {} partitions", partitions.len());
132//! # Ok(())
133//! # }
134//! ```
135
136#![deny(clippy::unwrap_used)]
137#![deny(clippy::panic)]
138#![warn(missing_docs)]
139#![warn(clippy::expect_used)]
140
141pub mod coordinator;
142pub mod error;
143pub mod flight;
144pub mod partition;
145pub mod shuffle;
146pub mod task;
147pub mod worker;
148
149// Re-export main types
150pub use coordinator::{Coordinator, CoordinatorConfig, CoordinatorProgress, WorkerInfo};
151pub use error::{DistributedError, Result};
152pub use flight::{FlightClient, FlightServer};
153pub use partition::{
154    HashPartitioner, LoadBalancedPartitioner, Partition, PartitionStrategy, RangePartitioner,
155    SpatialExtent, StripPartitioner, TilePartitioner,
156};
157pub use shuffle::{
158    BroadcastShuffle, HashShuffle, RangeShuffle, ShuffleConfig, ShuffleKey, ShuffleResult,
159    ShuffleStats, ShuffleType,
160};
161pub use task::{
162    PartitionId, Task, TaskContext, TaskId, TaskOperation, TaskResult, TaskScheduler, TaskStatus,
163};
164pub use worker::{Worker, WorkerConfig, WorkerHealthCheck, WorkerMetrics, WorkerStatus};
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_exports() {
172        // Verify that main types are exported
173        let _config: CoordinatorConfig;
174        let _worker_config: WorkerConfig;
175        let _task_id: TaskId;
176        let _partition_id: PartitionId;
177    }
178}