Skip to main content

oxigdal_distributed/
error.rs

1//! Error types for distributed processing operations.
2//!
3//! This module provides comprehensive error handling for distributed geospatial
4//! processing, including Flight RPC errors, worker failures, and coordination issues.
5
6/// Result type for distributed operations.
7pub type Result<T> = std::result::Result<T, DistributedError>;
8
9/// Errors that can occur during distributed processing.
10#[derive(Debug, thiserror::Error)]
11pub enum DistributedError {
12    /// Error during Flight RPC communication.
13    #[error("Flight RPC error: {0}")]
14    FlightRpc(String),
15
16    /// Error connecting to a worker node.
17    #[error("Worker connection error: {0}")]
18    WorkerConnection(String),
19
20    /// Worker failed to complete a task.
21    #[error("Worker task failure: {0}")]
22    WorkerTaskFailure(String),
23
24    /// Coordinator error.
25    #[error("Coordinator error: {0}")]
26    Coordinator(String),
27
28    /// Data partitioning error.
29    #[error("Partitioning error: {0}")]
30    Partitioning(String),
31
32    /// Data shuffle error.
33    #[error("Shuffle error: {0}")]
34    Shuffle(String),
35
36    /// Task serialization/deserialization error.
37    #[error("Task serialization error: {0}")]
38    TaskSerialization(String),
39
40    /// Network timeout error.
41    #[error("Network timeout: {0}")]
42    Timeout(String),
43
44    /// Authentication error.
45    #[error("Authentication failed: {0}")]
46    Authentication(String),
47
48    /// Resource allocation error.
49    #[error("Resource allocation error: {0}")]
50    ResourceAllocation(String),
51
52    /// Result aggregation error.
53    #[error("Result aggregation error: {0}")]
54    Aggregation(String),
55
56    /// Arrow error during data transfer.
57    #[error("Arrow error: {0}")]
58    Arrow(String),
59
60    /// Core OxiGDAL error.
61    #[error("OxiGDAL core error: {0}")]
62    Core(#[from] oxigdal_core::error::OxiGdalError),
63
64    /// I/O error.
65    #[error("I/O error: {0}")]
66    Io(#[from] std::io::Error),
67
68    /// JSON serialization error.
69    #[error("JSON error: {0}")]
70    Json(#[from] serde_json::Error),
71
72    /// Tonic transport error.
73    #[error("Transport error: {0}")]
74    Transport(#[from] tonic::transport::Error),
75
76    /// Tonic status error.
77    #[error("RPC status error: {0}")]
78    Status(#[from] tonic::Status),
79
80    /// Generic error with custom message.
81    #[error("{0}")]
82    Custom(String),
83}
84
85impl DistributedError {
86    /// Create a Flight RPC error.
87    pub fn flight_rpc<S: Into<String>>(msg: S) -> Self {
88        Self::FlightRpc(msg.into())
89    }
90
91    /// Create a worker connection error.
92    pub fn worker_connection<S: Into<String>>(msg: S) -> Self {
93        Self::WorkerConnection(msg.into())
94    }
95
96    /// Create a worker task failure error.
97    pub fn worker_task_failure<S: Into<String>>(msg: S) -> Self {
98        Self::WorkerTaskFailure(msg.into())
99    }
100
101    /// Create a coordinator error.
102    pub fn coordinator<S: Into<String>>(msg: S) -> Self {
103        Self::Coordinator(msg.into())
104    }
105
106    /// Create a partitioning error.
107    pub fn partitioning<S: Into<String>>(msg: S) -> Self {
108        Self::Partitioning(msg.into())
109    }
110
111    /// Create a shuffle error.
112    pub fn shuffle<S: Into<String>>(msg: S) -> Self {
113        Self::Shuffle(msg.into())
114    }
115
116    /// Create a task serialization error.
117    pub fn task_serialization<S: Into<String>>(msg: S) -> Self {
118        Self::TaskSerialization(msg.into())
119    }
120
121    /// Create a timeout error.
122    pub fn timeout<S: Into<String>>(msg: S) -> Self {
123        Self::Timeout(msg.into())
124    }
125
126    /// Create an authentication error.
127    pub fn authentication<S: Into<String>>(msg: S) -> Self {
128        Self::Authentication(msg.into())
129    }
130
131    /// Create a resource allocation error.
132    pub fn resource_allocation<S: Into<String>>(msg: S) -> Self {
133        Self::ResourceAllocation(msg.into())
134    }
135
136    /// Create an aggregation error.
137    pub fn aggregation<S: Into<String>>(msg: S) -> Self {
138        Self::Aggregation(msg.into())
139    }
140
141    /// Create an Arrow error.
142    pub fn arrow<S: Into<String>>(msg: S) -> Self {
143        Self::Arrow(msg.into())
144    }
145
146    /// Create a custom error.
147    pub fn custom<S: Into<String>>(msg: S) -> Self {
148        Self::Custom(msg.into())
149    }
150}
151
152impl From<arrow::error::ArrowError> for DistributedError {
153    fn from(err: arrow::error::ArrowError) -> Self {
154        Self::Arrow(err.to_string())
155    }
156}
157
158impl From<DistributedError> for tonic::Status {
159    fn from(err: DistributedError) -> Self {
160        match err {
161            DistributedError::FlightRpc(msg) => {
162                tonic::Status::internal(format!("Flight RPC error: {}", msg))
163            }
164            DistributedError::WorkerConnection(msg) => {
165                tonic::Status::unavailable(format!("Worker connection error: {}", msg))
166            }
167            DistributedError::WorkerTaskFailure(msg) => {
168                tonic::Status::internal(format!("Worker task failure: {}", msg))
169            }
170            DistributedError::Coordinator(msg) => {
171                tonic::Status::internal(format!("Coordinator error: {}", msg))
172            }
173            DistributedError::Partitioning(msg) => {
174                tonic::Status::invalid_argument(format!("Partitioning error: {}", msg))
175            }
176            DistributedError::Shuffle(msg) => {
177                tonic::Status::internal(format!("Shuffle error: {}", msg))
178            }
179            DistributedError::TaskSerialization(msg) => {
180                tonic::Status::invalid_argument(format!("Task serialization error: {}", msg))
181            }
182            DistributedError::Timeout(msg) => {
183                tonic::Status::deadline_exceeded(format!("Timeout: {}", msg))
184            }
185            DistributedError::Authentication(msg) => {
186                tonic::Status::unauthenticated(format!("Authentication failed: {}", msg))
187            }
188            DistributedError::ResourceAllocation(msg) => {
189                tonic::Status::resource_exhausted(format!("Resource allocation error: {}", msg))
190            }
191            DistributedError::Aggregation(msg) => {
192                tonic::Status::internal(format!("Aggregation error: {}", msg))
193            }
194            DistributedError::Arrow(msg) => {
195                tonic::Status::internal(format!("Arrow error: {}", msg))
196            }
197            DistributedError::Core(err) => tonic::Status::internal(format!("Core error: {}", err)),
198            DistributedError::Io(err) => tonic::Status::internal(format!("I/O error: {}", err)),
199            DistributedError::Json(err) => {
200                tonic::Status::invalid_argument(format!("JSON error: {}", err))
201            }
202            DistributedError::Transport(err) => {
203                tonic::Status::unavailable(format!("Transport error: {}", err))
204            }
205            DistributedError::Status(status) => status,
206            DistributedError::Custom(msg) => tonic::Status::internal(msg),
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[test]
216    fn test_error_creation() {
217        let err = DistributedError::flight_rpc("test error");
218        assert!(matches!(err, DistributedError::FlightRpc(_)));
219
220        let err = DistributedError::worker_connection("connection failed");
221        assert!(matches!(err, DistributedError::WorkerConnection(_)));
222
223        let err = DistributedError::timeout("operation timed out");
224        assert!(matches!(err, DistributedError::Timeout(_)));
225    }
226
227    #[test]
228    fn test_error_display() {
229        let err = DistributedError::flight_rpc("test error");
230        let msg = format!("{}", err);
231        assert!(msg.contains("Flight RPC error"));
232        assert!(msg.contains("test error"));
233    }
234
235    #[test]
236    fn test_to_tonic_status() {
237        let err = DistributedError::flight_rpc("test");
238        let status: tonic::Status = err.into();
239        assert_eq!(status.code(), tonic::Code::Internal);
240
241        let err = DistributedError::authentication("invalid token");
242        let status: tonic::Status = err.into();
243        assert_eq!(status.code(), tonic::Code::Unauthenticated);
244
245        let err = DistributedError::timeout("too slow");
246        let status: tonic::Status = err.into();
247        assert_eq!(status.code(), tonic::Code::DeadlineExceeded);
248    }
249}