oxigdal_distributed/
error.rs1pub type Result<T> = std::result::Result<T, DistributedError>;
8
9#[derive(Debug, thiserror::Error)]
11pub enum DistributedError {
12 #[error("Flight RPC error: {0}")]
14 FlightRpc(String),
15
16 #[error("Worker connection error: {0}")]
18 WorkerConnection(String),
19
20 #[error("Worker task failure: {0}")]
22 WorkerTaskFailure(String),
23
24 #[error("Coordinator error: {0}")]
26 Coordinator(String),
27
28 #[error("Partitioning error: {0}")]
30 Partitioning(String),
31
32 #[error("Shuffle error: {0}")]
34 Shuffle(String),
35
36 #[error("Task serialization error: {0}")]
38 TaskSerialization(String),
39
40 #[error("Network timeout: {0}")]
42 Timeout(String),
43
44 #[error("Authentication failed: {0}")]
46 Authentication(String),
47
48 #[error("Resource allocation error: {0}")]
50 ResourceAllocation(String),
51
52 #[error("Result aggregation error: {0}")]
54 Aggregation(String),
55
56 #[error("Arrow error: {0}")]
58 Arrow(String),
59
60 #[error("OxiGDAL core error: {0}")]
62 Core(#[from] oxigdal_core::error::OxiGdalError),
63
64 #[error("I/O error: {0}")]
66 Io(#[from] std::io::Error),
67
68 #[error("JSON error: {0}")]
70 Json(#[from] serde_json::Error),
71
72 #[error("Transport error: {0}")]
74 Transport(#[from] tonic::transport::Error),
75
76 #[error("RPC status error: {0}")]
78 Status(#[from] tonic::Status),
79
80 #[error("{0}")]
82 Custom(String),
83}
84
85impl DistributedError {
86 pub fn flight_rpc<S: Into<String>>(msg: S) -> Self {
88 Self::FlightRpc(msg.into())
89 }
90
91 pub fn worker_connection<S: Into<String>>(msg: S) -> Self {
93 Self::WorkerConnection(msg.into())
94 }
95
96 pub fn worker_task_failure<S: Into<String>>(msg: S) -> Self {
98 Self::WorkerTaskFailure(msg.into())
99 }
100
101 pub fn coordinator<S: Into<String>>(msg: S) -> Self {
103 Self::Coordinator(msg.into())
104 }
105
106 pub fn partitioning<S: Into<String>>(msg: S) -> Self {
108 Self::Partitioning(msg.into())
109 }
110
111 pub fn shuffle<S: Into<String>>(msg: S) -> Self {
113 Self::Shuffle(msg.into())
114 }
115
116 pub fn task_serialization<S: Into<String>>(msg: S) -> Self {
118 Self::TaskSerialization(msg.into())
119 }
120
121 pub fn timeout<S: Into<String>>(msg: S) -> Self {
123 Self::Timeout(msg.into())
124 }
125
126 pub fn authentication<S: Into<String>>(msg: S) -> Self {
128 Self::Authentication(msg.into())
129 }
130
131 pub fn resource_allocation<S: Into<String>>(msg: S) -> Self {
133 Self::ResourceAllocation(msg.into())
134 }
135
136 pub fn aggregation<S: Into<String>>(msg: S) -> Self {
138 Self::Aggregation(msg.into())
139 }
140
141 pub fn arrow<S: Into<String>>(msg: S) -> Self {
143 Self::Arrow(msg.into())
144 }
145
146 pub fn custom<S: Into<String>>(msg: S) -> Self {
148 Self::Custom(msg.into())
149 }
150}
151
152impl From<arrow::error::ArrowError> for DistributedError {
153 fn from(err: arrow::error::ArrowError) -> Self {
154 Self::Arrow(err.to_string())
155 }
156}
157
158impl From<DistributedError> for tonic::Status {
159 fn from(err: DistributedError) -> Self {
160 match err {
161 DistributedError::FlightRpc(msg) => {
162 tonic::Status::internal(format!("Flight RPC error: {}", msg))
163 }
164 DistributedError::WorkerConnection(msg) => {
165 tonic::Status::unavailable(format!("Worker connection error: {}", msg))
166 }
167 DistributedError::WorkerTaskFailure(msg) => {
168 tonic::Status::internal(format!("Worker task failure: {}", msg))
169 }
170 DistributedError::Coordinator(msg) => {
171 tonic::Status::internal(format!("Coordinator error: {}", msg))
172 }
173 DistributedError::Partitioning(msg) => {
174 tonic::Status::invalid_argument(format!("Partitioning error: {}", msg))
175 }
176 DistributedError::Shuffle(msg) => {
177 tonic::Status::internal(format!("Shuffle error: {}", msg))
178 }
179 DistributedError::TaskSerialization(msg) => {
180 tonic::Status::invalid_argument(format!("Task serialization error: {}", msg))
181 }
182 DistributedError::Timeout(msg) => {
183 tonic::Status::deadline_exceeded(format!("Timeout: {}", msg))
184 }
185 DistributedError::Authentication(msg) => {
186 tonic::Status::unauthenticated(format!("Authentication failed: {}", msg))
187 }
188 DistributedError::ResourceAllocation(msg) => {
189 tonic::Status::resource_exhausted(format!("Resource allocation error: {}", msg))
190 }
191 DistributedError::Aggregation(msg) => {
192 tonic::Status::internal(format!("Aggregation error: {}", msg))
193 }
194 DistributedError::Arrow(msg) => {
195 tonic::Status::internal(format!("Arrow error: {}", msg))
196 }
197 DistributedError::Core(err) => tonic::Status::internal(format!("Core error: {}", err)),
198 DistributedError::Io(err) => tonic::Status::internal(format!("I/O error: {}", err)),
199 DistributedError::Json(err) => {
200 tonic::Status::invalid_argument(format!("JSON error: {}", err))
201 }
202 DistributedError::Transport(err) => {
203 tonic::Status::unavailable(format!("Transport error: {}", err))
204 }
205 DistributedError::Status(status) => status,
206 DistributedError::Custom(msg) => tonic::Status::internal(msg),
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 #[test]
216 fn test_error_creation() {
217 let err = DistributedError::flight_rpc("test error");
218 assert!(matches!(err, DistributedError::FlightRpc(_)));
219
220 let err = DistributedError::worker_connection("connection failed");
221 assert!(matches!(err, DistributedError::WorkerConnection(_)));
222
223 let err = DistributedError::timeout("operation timed out");
224 assert!(matches!(err, DistributedError::Timeout(_)));
225 }
226
227 #[test]
228 fn test_error_display() {
229 let err = DistributedError::flight_rpc("test error");
230 let msg = format!("{}", err);
231 assert!(msg.contains("Flight RPC error"));
232 assert!(msg.contains("test error"));
233 }
234
235 #[test]
236 fn test_to_tonic_status() {
237 let err = DistributedError::flight_rpc("test");
238 let status: tonic::Status = err.into();
239 assert_eq!(status.code(), tonic::Code::Internal);
240
241 let err = DistributedError::authentication("invalid token");
242 let status: tonic::Status = err.into();
243 assert_eq!(status.code(), tonic::Code::Unauthenticated);
244
245 let err = DistributedError::timeout("too slow");
246 let status: tonic::Status = err.into();
247 assert_eq!(status.code(), tonic::Code::DeadlineExceeded);
248 }
249}