Skip to main content

rustvello_core/
error.rs

1use rustvello_proto::identifiers::{InvocationId, TaskId};
2use rustvello_proto::status::{InvocationStatus, StatusMachineError};
3use std::fmt;
4
5/// Classification of infrastructure failures.
6///
7/// Used to distinguish retriable (Connection, Timeout) from non-retriable
8/// (Query, DataCorruption) infrastructure errors without string parsing.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10#[non_exhaustive]
11pub enum InfraErrorKind {
12    /// Network or pool connection failure (retriable).
13    Connection,
14    /// Operation timed out (retriable).
15    Timeout,
16    /// Query or command failed (not retriable).
17    Query,
18    /// Stored data is corrupt or unparsable.
19    DataCorruption,
20    /// Uncategorised infrastructure error.
21    Other,
22}
23
24impl fmt::Display for InfraErrorKind {
25    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26        match self {
27            Self::Connection => write!(f, "connection"),
28            Self::Timeout => write!(f, "timeout"),
29            Self::Query => write!(f, "query"),
30            Self::DataCorruption => write!(f, "data_corruption"),
31            Self::Other => write!(f, "other"),
32        }
33    }
34}
35
36impl InfraErrorKind {
37    /// Returns `true` for errors that are typically transient and worth retrying.
38    pub fn is_retriable(self) -> bool {
39        matches!(self, Self::Connection | Self::Timeout)
40    }
41}
42
43/// Root error type — mirrors pynenc's exception hierarchy.
44///
45/// Each variant maps 1:1 to a pynenc exception class (see 302 §4).
46#[derive(Debug, thiserror::Error)]
47#[non_exhaustive]
48pub enum RustvelloError {
49    // --- Retry errors (mirror RetryError) ---
50    /// Task explicitly requested a retry
51    #[error("retry requested: {reason}")]
52    Retry { reason: String },
53
54    /// Concurrency control triggered a retry
55    #[error("concurrency retry: task {task_id} — {reason}")]
56    ConcurrencyRetry { task_id: TaskId, reason: String },
57
58    // --- Serialization (mirror SerializationError) ---
59    /// Serialization/deserialization error
60    #[error("serialization error: {message}")]
61    Serialization { message: String },
62
63    // --- Task errors (mirror TaskError hierarchy) ---
64    /// Task definition not found in any registry
65    #[error("task not found: {task_id}")]
66    TaskNotFound { task_id: TaskId },
67
68    /// Task exists but is not registered in this app instance
69    #[error("task not registered: {task_id}")]
70    TaskNotRegistered { task_id: TaskId },
71
72    /// Dependency cycle detected in task graph
73    #[error("cycle detected: {task_id} — {message}")]
74    CycleDetected { task_id: TaskId, message: String },
75
76    /// Runner cannot execute this task type
77    #[error("runner not executable: {task_id} — {message}")]
78    RunnerNotExecutable { task_id: TaskId, message: String },
79
80    /// Task class/type not found during deserialization
81    #[error("task class not found: {task_id}")]
82    TaskClassNotFound { task_id: TaskId },
83
84    // --- Invocation errors (mirror InvocationError hierarchy) ---
85    /// Invocation not found
86    #[error("invocation not found: {invocation_id}")]
87    InvocationNotFound { invocation_id: InvocationId },
88
89    /// Invalid status transition attempted
90    #[error("invalid status transition: {from_status} -> {to_status}")]
91    InvalidStatusTransition {
92        invocation_id: InvocationId,
93        from_status: InvocationStatus,
94        to_status: InvocationStatus,
95        allowed_statuses: Vec<InvocationStatus>,
96    },
97
98    /// Runner ownership violation during status transition
99    #[error("ownership violation: {from_status} -> {to_status}, owner={current_owner}, requester={attempted_owner}")]
100    OwnershipViolation {
101        invocation_id: InvocationId,
102        from_status: InvocationStatus,
103        to_status: InvocationStatus,
104        current_owner: String,
105        attempted_owner: String,
106        reason: String,
107    },
108
109    /// Status changed between read and write (optimistic locking failure)
110    #[error("status race condition on {invocation_id}")]
111    StatusRaceCondition {
112        invocation_id: InvocationId,
113        previous_status: InvocationStatus,
114        expected_status: InvocationStatus,
115        actual_status: InvocationStatus,
116    },
117
118    // --- Task execution errors ---
119    /// Task function raised an error during execution.
120    /// Carries the original exception type name for retry matching.
121    #[error("task execution error ({error_type}): {message}")]
122    TaskExecution {
123        error_type: String,
124        message: String,
125        traceback: Option<String>,
126    },
127
128    // --- Infrastructure errors ---
129    /// Unified infrastructure error with structured classification.
130    #[error("infrastructure error ({kind}): {message}")]
131    Infrastructure {
132        kind: InfraErrorKind,
133        message: String,
134        #[source]
135        source: Option<Box<dyn std::error::Error + Send + Sync>>,
136    },
137
138    // --- Config errors ---
139    /// Configuration error
140    #[error("configuration error: {message}")]
141    Configuration { message: String },
142
143    // --- Internal (no pynenc equivalent — for Rust-only bugs) ---
144    /// Generic internal error
145    #[error("internal error: {message}")]
146    Internal { message: String },
147
148    /// Backend does not support this operation.
149    #[error("{backend} backend does not support {method}")]
150    NotSupported { backend: String, method: String },
151}
152
153impl RustvelloError {
154    /// State backend query/storage error (not retriable).
155    pub fn state_backend(message: impl Into<String>) -> Self {
156        Self::Infrastructure {
157            kind: InfraErrorKind::Query,
158            message: message.into(),
159            source: None,
160        }
161    }
162
163    /// Broker messaging error (not retriable).
164    pub fn broker_err(message: impl Into<String>) -> Self {
165        Self::Infrastructure {
166            kind: InfraErrorKind::Query,
167            message: message.into(),
168            source: None,
169        }
170    }
171
172    /// Runner infrastructure error.
173    pub fn runner_err(message: impl Into<String>) -> Self {
174        Self::Infrastructure {
175            kind: InfraErrorKind::Other,
176            message: message.into(),
177            source: None,
178        }
179    }
180
181    /// Connection failure (retriable).
182    pub fn connection_err(message: impl Into<String>) -> Self {
183        Self::Infrastructure {
184            kind: InfraErrorKind::Connection,
185            message: message.into(),
186            source: None,
187        }
188    }
189}
190
191/// Convenience type alias for Results in Rustvello.
192pub type RustvelloResult<T> = Result<T, RustvelloError>;
193
194/// Convert a mutex lock-poison error into [`RustvelloError::Internal`].
195///
196/// Use this in backend implementations to avoid duplicating lock-error
197/// conversion logic.
198pub fn lock_err(e: impl std::fmt::Display) -> RustvelloError {
199    RustvelloError::Internal {
200        message: format!("lock poisoned: {}", e),
201    }
202}
203
204/// Error information stored with a failed invocation.
205#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
206pub struct TaskError {
207    pub error_type: String,
208    pub message: String,
209    pub traceback: Option<String>,
210}
211
212impl fmt::Display for TaskError {
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        write!(f, "{}: {}", self.error_type, self.message)
215    }
216}
217
218/// Convert a [`StatusMachineError`] into a [`RustvelloError`].
219///
220/// `fallback_from_status` is used when the transition error's `from` field
221/// is `None` (i.e. the current status was not captured in the error itself).
222pub fn status_machine_error_to_rustvello(
223    e: StatusMachineError,
224    invocation_id: &InvocationId,
225    fallback_from_status: InvocationStatus,
226) -> RustvelloError {
227    match e {
228        StatusMachineError::Transition(t) => RustvelloError::InvalidStatusTransition {
229            invocation_id: invocation_id.clone(),
230            from_status: t.from.unwrap_or(fallback_from_status),
231            to_status: t.to,
232            allowed_statuses: t.allowed,
233        },
234        StatusMachineError::Ownership(o) => RustvelloError::OwnershipViolation {
235            invocation_id: invocation_id.clone(),
236            from_status: o.from_status,
237            to_status: o.to_status,
238            current_owner: o.current_owner.unwrap_or_default(),
239            attempted_owner: o.attempted_owner.unwrap_or_default(),
240            reason: o.reason,
241        },
242        _ => RustvelloError::Internal {
243            message: format!("unexpected status machine error: {e}"),
244        },
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251
252    #[test]
253    fn error_display_messages() {
254        let e = RustvelloError::InvalidStatusTransition {
255            invocation_id: InvocationId::from_string("inv-1"),
256            from_status: InvocationStatus::Registered,
257            to_status: InvocationStatus::Running,
258            allowed_statuses: vec![InvocationStatus::Pending],
259        };
260        assert!(e.to_string().contains("REGISTERED"));
261        assert!(e.to_string().contains("RUNNING"));
262
263        let e = RustvelloError::InvocationNotFound {
264            invocation_id: InvocationId::from_string("inv-1"),
265        };
266        assert!(e.to_string().contains("inv-1"));
267
268        let e = RustvelloError::TaskNotRegistered {
269            task_id: TaskId::new("mod", "func"),
270        };
271        assert!(e.to_string().contains("mod.func"));
272
273        let e = RustvelloError::Serialization {
274            message: "bad json".to_string(),
275        };
276        assert!(e.to_string().contains("bad json"));
277
278        let e = RustvelloError::state_backend("disk full".to_string());
279        assert!(e.to_string().contains("disk full"));
280
281        let e = RustvelloError::Configuration {
282            message: "bad config".to_string(),
283        };
284        assert!(e.to_string().contains("bad config"));
285
286        let e = RustvelloError::broker_err("queue full".to_string());
287        assert!(e.to_string().contains("queue full"));
288
289        let e = RustvelloError::runner_err("timeout waiting for invocation inv-2".to_string());
290        assert!(e.to_string().contains("inv-2"));
291
292        let e = RustvelloError::Internal {
293            message: "unexpected".to_string(),
294        };
295        assert!(e.to_string().contains("unexpected"));
296    }
297
298    #[test]
299    fn task_error_display() {
300        let te = TaskError {
301            error_type: "ValueError".to_string(),
302            message: "negative number".to_string(),
303            traceback: Some("line 1".to_string()),
304        };
305        assert_eq!(te.to_string(), "ValueError: negative number");
306    }
307
308    #[test]
309    fn task_error_serde() {
310        let te = TaskError {
311            error_type: "RuntimeError".to_string(),
312            message: "oops".to_string(),
313            traceback: None,
314        };
315        let json = serde_json::to_string(&te).unwrap();
316        let back: TaskError = serde_json::from_str(&json).unwrap();
317        assert_eq!(back.error_type, "RuntimeError");
318        assert_eq!(back.message, "oops");
319        assert!(back.traceback.is_none());
320    }
321
322    #[test]
323    fn ownership_violation_display() {
324        let e = RustvelloError::OwnershipViolation {
325            invocation_id: InvocationId::from_string("inv-1"),
326            from_status: InvocationStatus::Running,
327            to_status: InvocationStatus::Success,
328            current_owner: "runner-a".to_string(),
329            attempted_owner: "runner-b".to_string(),
330            reason: "status requires ownership".to_string(),
331        };
332        let s = e.to_string();
333        assert!(s.contains("runner-a"));
334        assert!(s.contains("runner-b"));
335    }
336
337    #[test]
338    fn retry_display() {
339        let e = RustvelloError::Retry {
340            reason: "transient network error".to_string(),
341        };
342        let s = e.to_string();
343        assert!(s.contains("retry"));
344        assert!(s.contains("transient network error"));
345    }
346
347    #[test]
348    fn concurrency_retry_display() {
349        let e = RustvelloError::ConcurrencyRetry {
350            task_id: TaskId::new("mod", "my_task"),
351            reason: "task-level lock held".to_string(),
352        };
353        let s = e.to_string();
354        assert!(s.contains("mod.my_task"));
355        assert!(s.contains("task-level lock held"));
356    }
357
358    #[test]
359    fn lock_err_converts_to_internal() {
360        let e = lock_err("PoisonError { .. }");
361        match e {
362            RustvelloError::Internal { message } => assert!(message.contains("lock poisoned")),
363            other => panic!("expected Internal, got {other:?}"),
364        }
365    }
366
367    #[test]
368    fn status_race_condition_display() {
369        let e = RustvelloError::StatusRaceCondition {
370            invocation_id: InvocationId::from_string("inv-1"),
371            previous_status: InvocationStatus::Pending,
372            expected_status: InvocationStatus::Running,
373            actual_status: InvocationStatus::Failed,
374        };
375        let s = e.to_string();
376        assert!(s.contains("inv-1"));
377    }
378
379    #[test]
380    fn runner_error_display() {
381        let e = RustvelloError::runner_err("process exited with code 1".to_string());
382        assert!(e.to_string().contains("process exited"));
383    }
384
385    #[test]
386    fn task_error_variants_display() {
387        let tid = TaskId::new("mymod", "myfunc");
388
389        let e = RustvelloError::TaskNotFound {
390            task_id: tid.clone(),
391        };
392        assert!(e.to_string().contains("mymod.myfunc"));
393
394        let e = RustvelloError::CycleDetected {
395            task_id: tid.clone(),
396            message: "A -> B -> A".to_string(),
397        };
398        assert!(e.to_string().contains("cycle"));
399
400        let e = RustvelloError::TaskClassNotFound {
401            task_id: tid.clone(),
402        };
403        assert!(e.to_string().contains("class not found"));
404    }
405}