1use rustvello_proto::identifiers::{InvocationId, TaskId};
2use rustvello_proto::status::{InvocationStatus, StatusMachineError};
3use std::fmt;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10#[non_exhaustive]
11pub enum InfraErrorKind {
12 Connection,
14 Timeout,
16 Query,
18 DataCorruption,
20 Other,
22}
23
24impl fmt::Display for InfraErrorKind {
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 match self {
27 Self::Connection => write!(f, "connection"),
28 Self::Timeout => write!(f, "timeout"),
29 Self::Query => write!(f, "query"),
30 Self::DataCorruption => write!(f, "data_corruption"),
31 Self::Other => write!(f, "other"),
32 }
33 }
34}
35
36impl InfraErrorKind {
37 pub fn is_retriable(self) -> bool {
39 matches!(self, Self::Connection | Self::Timeout)
40 }
41}
42
43#[derive(Debug, thiserror::Error)]
47#[non_exhaustive]
48pub enum RustvelloError {
49 #[error("retry requested: {reason}")]
52 Retry { reason: String },
53
54 #[error("concurrency retry: task {task_id} — {reason}")]
56 ConcurrencyRetry { task_id: TaskId, reason: String },
57
58 #[error("serialization error: {message}")]
61 Serialization { message: String },
62
63 #[error("task not found: {task_id}")]
66 TaskNotFound { task_id: TaskId },
67
68 #[error("task not registered: {task_id}")]
70 TaskNotRegistered { task_id: TaskId },
71
72 #[error("cycle detected: {task_id} — {message}")]
74 CycleDetected { task_id: TaskId, message: String },
75
76 #[error("runner not executable: {task_id} — {message}")]
78 RunnerNotExecutable { task_id: TaskId, message: String },
79
80 #[error("task class not found: {task_id}")]
82 TaskClassNotFound { task_id: TaskId },
83
84 #[error("invocation not found: {invocation_id}")]
87 InvocationNotFound { invocation_id: InvocationId },
88
89 #[error("invalid status transition: {from_status} -> {to_status}")]
91 InvalidStatusTransition {
92 invocation_id: InvocationId,
93 from_status: InvocationStatus,
94 to_status: InvocationStatus,
95 allowed_statuses: Vec<InvocationStatus>,
96 },
97
98 #[error("ownership violation: {from_status} -> {to_status}, owner={current_owner}, requester={attempted_owner}")]
100 OwnershipViolation {
101 invocation_id: InvocationId,
102 from_status: InvocationStatus,
103 to_status: InvocationStatus,
104 current_owner: String,
105 attempted_owner: String,
106 reason: String,
107 },
108
109 #[error("status race condition on {invocation_id}")]
111 StatusRaceCondition {
112 invocation_id: InvocationId,
113 previous_status: InvocationStatus,
114 expected_status: InvocationStatus,
115 actual_status: InvocationStatus,
116 },
117
118 #[error("task execution error ({error_type}): {message}")]
122 TaskExecution {
123 error_type: String,
124 message: String,
125 traceback: Option<String>,
126 },
127
128 #[error("infrastructure error ({kind}): {message}")]
131 Infrastructure {
132 kind: InfraErrorKind,
133 message: String,
134 #[source]
135 source: Option<Box<dyn std::error::Error + Send + Sync>>,
136 },
137
138 #[error("configuration error: {message}")]
141 Configuration { message: String },
142
143 #[error("internal error: {message}")]
146 Internal { message: String },
147
148 #[error("{backend} backend does not support {method}")]
150 NotSupported { backend: String, method: String },
151}
152
153impl RustvelloError {
154 pub fn state_backend(message: impl Into<String>) -> Self {
156 Self::Infrastructure {
157 kind: InfraErrorKind::Query,
158 message: message.into(),
159 source: None,
160 }
161 }
162
163 pub fn broker_err(message: impl Into<String>) -> Self {
165 Self::Infrastructure {
166 kind: InfraErrorKind::Query,
167 message: message.into(),
168 source: None,
169 }
170 }
171
172 pub fn runner_err(message: impl Into<String>) -> Self {
174 Self::Infrastructure {
175 kind: InfraErrorKind::Other,
176 message: message.into(),
177 source: None,
178 }
179 }
180
181 pub fn connection_err(message: impl Into<String>) -> Self {
183 Self::Infrastructure {
184 kind: InfraErrorKind::Connection,
185 message: message.into(),
186 source: None,
187 }
188 }
189}
190
191pub type RustvelloResult<T> = Result<T, RustvelloError>;
193
194pub fn lock_err(e: impl std::fmt::Display) -> RustvelloError {
199 RustvelloError::Internal {
200 message: format!("lock poisoned: {}", e),
201 }
202}
203
204#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
206pub struct TaskError {
207 pub error_type: String,
208 pub message: String,
209 pub traceback: Option<String>,
210}
211
212impl fmt::Display for TaskError {
213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214 write!(f, "{}: {}", self.error_type, self.message)
215 }
216}
217
218pub fn status_machine_error_to_rustvello(
223 e: StatusMachineError,
224 invocation_id: &InvocationId,
225 fallback_from_status: InvocationStatus,
226) -> RustvelloError {
227 match e {
228 StatusMachineError::Transition(t) => RustvelloError::InvalidStatusTransition {
229 invocation_id: invocation_id.clone(),
230 from_status: t.from.unwrap_or(fallback_from_status),
231 to_status: t.to,
232 allowed_statuses: t.allowed,
233 },
234 StatusMachineError::Ownership(o) => RustvelloError::OwnershipViolation {
235 invocation_id: invocation_id.clone(),
236 from_status: o.from_status,
237 to_status: o.to_status,
238 current_owner: o.current_owner.unwrap_or_default(),
239 attempted_owner: o.attempted_owner.unwrap_or_default(),
240 reason: o.reason,
241 },
242 _ => RustvelloError::Internal {
243 message: format!("unexpected status machine error: {e}"),
244 },
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251
252 #[test]
253 fn error_display_messages() {
254 let e = RustvelloError::InvalidStatusTransition {
255 invocation_id: InvocationId::from_string("inv-1"),
256 from_status: InvocationStatus::Registered,
257 to_status: InvocationStatus::Running,
258 allowed_statuses: vec![InvocationStatus::Pending],
259 };
260 assert!(e.to_string().contains("REGISTERED"));
261 assert!(e.to_string().contains("RUNNING"));
262
263 let e = RustvelloError::InvocationNotFound {
264 invocation_id: InvocationId::from_string("inv-1"),
265 };
266 assert!(e.to_string().contains("inv-1"));
267
268 let e = RustvelloError::TaskNotRegistered {
269 task_id: TaskId::new("mod", "func"),
270 };
271 assert!(e.to_string().contains("mod.func"));
272
273 let e = RustvelloError::Serialization {
274 message: "bad json".to_string(),
275 };
276 assert!(e.to_string().contains("bad json"));
277
278 let e = RustvelloError::state_backend("disk full".to_string());
279 assert!(e.to_string().contains("disk full"));
280
281 let e = RustvelloError::Configuration {
282 message: "bad config".to_string(),
283 };
284 assert!(e.to_string().contains("bad config"));
285
286 let e = RustvelloError::broker_err("queue full".to_string());
287 assert!(e.to_string().contains("queue full"));
288
289 let e = RustvelloError::runner_err("timeout waiting for invocation inv-2".to_string());
290 assert!(e.to_string().contains("inv-2"));
291
292 let e = RustvelloError::Internal {
293 message: "unexpected".to_string(),
294 };
295 assert!(e.to_string().contains("unexpected"));
296 }
297
298 #[test]
299 fn task_error_display() {
300 let te = TaskError {
301 error_type: "ValueError".to_string(),
302 message: "negative number".to_string(),
303 traceback: Some("line 1".to_string()),
304 };
305 assert_eq!(te.to_string(), "ValueError: negative number");
306 }
307
308 #[test]
309 fn task_error_serde() {
310 let te = TaskError {
311 error_type: "RuntimeError".to_string(),
312 message: "oops".to_string(),
313 traceback: None,
314 };
315 let json = serde_json::to_string(&te).unwrap();
316 let back: TaskError = serde_json::from_str(&json).unwrap();
317 assert_eq!(back.error_type, "RuntimeError");
318 assert_eq!(back.message, "oops");
319 assert!(back.traceback.is_none());
320 }
321
322 #[test]
323 fn ownership_violation_display() {
324 let e = RustvelloError::OwnershipViolation {
325 invocation_id: InvocationId::from_string("inv-1"),
326 from_status: InvocationStatus::Running,
327 to_status: InvocationStatus::Success,
328 current_owner: "runner-a".to_string(),
329 attempted_owner: "runner-b".to_string(),
330 reason: "status requires ownership".to_string(),
331 };
332 let s = e.to_string();
333 assert!(s.contains("runner-a"));
334 assert!(s.contains("runner-b"));
335 }
336
337 #[test]
338 fn retry_display() {
339 let e = RustvelloError::Retry {
340 reason: "transient network error".to_string(),
341 };
342 let s = e.to_string();
343 assert!(s.contains("retry"));
344 assert!(s.contains("transient network error"));
345 }
346
347 #[test]
348 fn concurrency_retry_display() {
349 let e = RustvelloError::ConcurrencyRetry {
350 task_id: TaskId::new("mod", "my_task"),
351 reason: "task-level lock held".to_string(),
352 };
353 let s = e.to_string();
354 assert!(s.contains("mod.my_task"));
355 assert!(s.contains("task-level lock held"));
356 }
357
358 #[test]
359 fn lock_err_converts_to_internal() {
360 let e = lock_err("PoisonError { .. }");
361 match e {
362 RustvelloError::Internal { message } => assert!(message.contains("lock poisoned")),
363 other => panic!("expected Internal, got {other:?}"),
364 }
365 }
366
367 #[test]
368 fn status_race_condition_display() {
369 let e = RustvelloError::StatusRaceCondition {
370 invocation_id: InvocationId::from_string("inv-1"),
371 previous_status: InvocationStatus::Pending,
372 expected_status: InvocationStatus::Running,
373 actual_status: InvocationStatus::Failed,
374 };
375 let s = e.to_string();
376 assert!(s.contains("inv-1"));
377 }
378
379 #[test]
380 fn runner_error_display() {
381 let e = RustvelloError::runner_err("process exited with code 1".to_string());
382 assert!(e.to_string().contains("process exited"));
383 }
384
385 #[test]
386 fn task_error_variants_display() {
387 let tid = TaskId::new("mymod", "myfunc");
388
389 let e = RustvelloError::TaskNotFound {
390 task_id: tid.clone(),
391 };
392 assert!(e.to_string().contains("mymod.myfunc"));
393
394 let e = RustvelloError::CycleDetected {
395 task_id: tid.clone(),
396 message: "A -> B -> A".to_string(),
397 };
398 assert!(e.to_string().contains("cycle"));
399
400 let e = RustvelloError::TaskClassNotFound {
401 task_id: tid.clone(),
402 };
403 assert!(e.to_string().contains("class not found"));
404 }
405}