oxigdal_distributed/lib.rs
1//! OxiGDAL Distributed Processing
2//!
3//! This crate provides distributed processing capabilities for large-scale geospatial
4//! workflows using Apache Arrow Flight for zero-copy data transfer.
5//!
6//! # Features
7//!
8//! - **Arrow Flight RPC**: Zero-copy data transfer between nodes
9//! - **Worker Nodes**: Execute processing tasks with resource management
10//! - **Coordinator**: Schedule and manage distributed execution
11//! - **Data Partitioning**: Spatial, hash, range, and load-balanced partitioning
12//! - **Shuffle Operations**: Efficient data redistribution for group-by and joins
13//! - **Fault Tolerance**: Automatic retry and failure recovery
14//! - **Progress Monitoring**: Real-time tracking of distributed execution
15//!
16//! # Architecture
17//!
18//! ```text
19//! ┌─────────────┐
20//! │ Coordinator │ ──── Schedules tasks
21//! └──────┬──────┘
22//! │
23//! ┌────┴────┐
24//! │ Flight │
25//! │ Server │
26//! └────┬────┘
27//! │
28//! ┌────┴────────────────┐
29//! │ │
30//! ┌─▼──────┐ ┌───▼─────┐
31//! │ Worker │ │ Worker │
32//! │ Node 1 │ │ Node 2 │
33//! └────────┘ └─────────┘
34//! ```
35//!
36//! # Example: Distributed NDVI Calculation
37//!
38//! ```rust,no_run
39//! use oxigdal_distributed::*;
40//! # async fn example() -> std::result::Result<(), Box<dyn std::error::Error>> {
41//!
42//! // Create coordinator
43//! let config = CoordinatorConfig::new("localhost:50051".to_string());
44//! let coordinator = Coordinator::new(config);
45//!
46//! // Add workers
47//! coordinator.add_worker("worker-1".to_string(), "localhost:50052".to_string())?;
48//! coordinator.add_worker("worker-2".to_string(), "localhost:50053".to_string())?;
49//!
50//! // Partition data spatially
51//! let extent = SpatialExtent::new(0.0, 0.0, 1000.0, 1000.0)?;
52//! let partitioner = TilePartitioner::new(extent, 4, 4)?;
53//! let partitions = partitioner.partition();
54//!
55//! // Submit tasks for each partition
56//! for partition in partitions {
57//! coordinator.submit_task(
58//! partition.id,
59//! TaskOperation::CalculateIndex {
60//! index_type: "NDVI".to_string(),
61//! bands: vec![3, 4], // Red and NIR
62//! },
63//! )?;
64//! }
65//!
66//! // Monitor progress
67//! while !coordinator.is_complete() {
68//! let progress = coordinator.get_progress()?;
69//! println!(
70//! "Progress: {}/{} completed",
71//! progress.completed_tasks,
72//! progress.total_tasks()
73//! );
74//! tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
75//! }
76//!
77//! // Collect results
78//! let results = coordinator.collect_results()?;
79//! println!("Processing complete: {} results", results.len());
80//! # Ok(())
81//! # }
82//! ```
83//!
84//! # Example: Custom Processing with Workers
85//!
86//! ```rust,no_run
87//! use oxigdal_distributed::*;
88//! # async fn example() -> std::result::Result<(), Box<dyn std::error::Error>> {
89//!
90//! // Create worker
91//! let config = WorkerConfig::new("worker-1".to_string())
92//! .with_max_concurrent_tasks(4)
93//! .with_memory_limit(8 * 1024 * 1024 * 1024); // 8 GB
94//!
95//! let worker = Worker::new(config);
96//!
97//! // Execute tasks
98//! // (Tasks would be received from coordinator in real implementation)
99//! # Ok(())
100//! # }
101//! ```
102//!
103//! # Example: Data Shuffle
104//!
105//! ```rust,no_run
106//! use oxigdal_distributed::*;
107//! use arrow::array::{Int32Array, StringArray};
108//! use arrow::datatypes::{DataType, Field, Schema};
109//! use arrow::record_batch::RecordBatch;
110//! use std::sync::Arc;
111//!
112//! # fn example() -> std::result::Result<(), Box<dyn std::error::Error>> {
113//! // Create test data
114//! let schema = Arc::new(Schema::new(vec![
115//! Field::new("id", DataType::Int32, false),
116//! Field::new("name", DataType::Utf8, false),
117//! ]));
118//!
119//! let batch = RecordBatch::try_new(
120//! schema,
121//! vec![
122//! Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
123//! Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
124//! ],
125//! )?;
126//!
127//! // Hash shuffle by ID column
128//! let shuffle = HashShuffle::new("id".to_string(), 2)?;
129//! let partitions = shuffle.shuffle(&batch)?;
130//!
131//! println!("Data shuffled into {} partitions", partitions.len());
132//! # Ok(())
133//! # }
134//! ```
135
136#![deny(clippy::unwrap_used)]
137#![deny(clippy::panic)]
138#![warn(missing_docs)]
139#![warn(clippy::expect_used)]
140
141pub mod coordinator;
142pub mod error;
143pub mod flight;
144pub mod partition;
145pub mod shuffle;
146pub mod task;
147pub mod worker;
148
149// Re-export main types
150pub use coordinator::{Coordinator, CoordinatorConfig, CoordinatorProgress, WorkerInfo};
151pub use error::{DistributedError, Result};
152pub use flight::{FlightClient, FlightServer};
153pub use partition::{
154 HashPartitioner, LoadBalancedPartitioner, Partition, PartitionStrategy, RangePartitioner,
155 SpatialExtent, StripPartitioner, TilePartitioner,
156};
157pub use shuffle::{
158 BroadcastShuffle, HashShuffle, RangeShuffle, ShuffleConfig, ShuffleKey, ShuffleResult,
159 ShuffleStats, ShuffleType,
160};
161pub use task::{
162 PartitionId, Task, TaskContext, TaskId, TaskOperation, TaskResult, TaskScheduler, TaskStatus,
163};
164pub use worker::{Worker, WorkerConfig, WorkerHealthCheck, WorkerMetrics, WorkerStatus};
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[test]
171 fn test_exports() {
172 // Verify that main types are exported
173 let _config: CoordinatorConfig;
174 let _worker_config: WorkerConfig;
175 let _task_id: TaskId;
176 let _partition_id: PartitionId;
177 }
178}