Skip to main content

lash_core/runtime/
error.rs

1/// Durable store facet that a durable execution path requires but the host
2/// wired as ephemeral.
3///
4/// Names the failing facet so a [`RuntimeErrorCode::DurableStoreRequired`]
5/// can be matched and serialized losslessly per facet.
6#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
7pub enum DurableStoreFacet {
8    AttachmentStore,
9    ProcessEnvStore,
10    SessionStore,
11    ProcessRegistry,
12    TriggerStore,
13}
14
15impl DurableStoreFacet {
16    /// Stable per-facet error-code string (the full
17    /// `durable_store_required:*` code surfaced in traces and host errors).
18    fn as_code(self) -> &'static str {
19        match self {
20            Self::AttachmentStore => "durable_store_required:attachment_store",
21            Self::ProcessEnvStore => "durable_store_required:process_env_store",
22            Self::SessionStore => "durable_store_required:session_store",
23            Self::ProcessRegistry => "durable_store_required:process_registry",
24            Self::TriggerStore => "durable_store_required:trigger_store",
25        }
26    }
27}
28
29/// Stable runtime error code.
30///
31/// Codes serialize as the same snake_case strings exposed in traces and host
32/// errors, but callers should match this type instead of parsing display text.
33#[derive(Clone, Debug, PartialEq, Eq, Hash)]
34pub enum RuntimeErrorCode {
35    MissingExecutionScopeId,
36    ExecutionScopeTurnIdMismatch,
37    SessionExecutionBusy,
38    SessionExecutionLeaseLost,
39    /// A process (re-)execution was handed an empty/non-persisted process id.
40    /// Process execution identity is the persisted `process_id`; a retry that
41    /// cannot present that stable id has lost its idempotency anchor.
42    MissingProcessExecutionId,
43    /// A durable execution path was wired against an ephemeral store for the
44    /// named facet.
45    DurableStoreRequired {
46        facet: DurableStoreFacet,
47    },
48    StoreCommitFailed,
49    PluginSessionManager,
50    PluginFinalizeTurn,
51    PluginCheckpoint,
52    PluginPrepareTurn,
53    ContextPrepareTurn,
54    ProtocolTurnExtension,
55    ProtocolBeforeLlmCall,
56    TurnStreamJoin,
57    EmptyAgentFrameRun,
58    DurableEffectLiveProtocolExtension,
59    DurableEffectLivePluginInput,
60    Other(String),
61}
62
63impl RuntimeErrorCode {
64    pub fn as_str(&self) -> &str {
65        match self {
66            Self::MissingExecutionScopeId => "missing_execution_scope_id",
67            Self::ExecutionScopeTurnIdMismatch => "execution_scope_turn_id_mismatch",
68            Self::SessionExecutionBusy => "session_execution_busy",
69            Self::SessionExecutionLeaseLost => "session_execution_lease_lost",
70            Self::MissingProcessExecutionId => "missing_process_execution_id",
71            Self::DurableStoreRequired { facet } => facet.as_code(),
72            Self::StoreCommitFailed => "store_commit_failed",
73            Self::PluginSessionManager => "plugin_session_manager",
74            Self::PluginFinalizeTurn => "plugin_finalize_turn",
75            Self::PluginCheckpoint => "plugin_checkpoint",
76            Self::PluginPrepareTurn => "plugin_prepare_turn",
77            Self::ContextPrepareTurn => "context_prepare_turn",
78            Self::ProtocolTurnExtension => "protocol_turn_extension",
79            Self::ProtocolBeforeLlmCall => "protocol_before_llm_call",
80            Self::TurnStreamJoin => "turn_stream_join",
81            Self::EmptyAgentFrameRun => "empty_agent_frame_run",
82            Self::DurableEffectLiveProtocolExtension => "durable_effect_live_protocol_extension",
83            Self::DurableEffectLivePluginInput => "durable_effect_live_plugin_input",
84            Self::Other(code) => code.as_str(),
85        }
86    }
87}
88
89impl std::fmt::Display for RuntimeErrorCode {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.write_str(self.as_str())
92    }
93}
94
95impl From<&str> for RuntimeErrorCode {
96    fn from(code: &str) -> Self {
97        match code {
98            "missing_execution_scope_id" => Self::MissingExecutionScopeId,
99            "execution_scope_turn_id_mismatch" => Self::ExecutionScopeTurnIdMismatch,
100            "session_execution_busy" => Self::SessionExecutionBusy,
101            "session_execution_lease_lost" => Self::SessionExecutionLeaseLost,
102            "missing_process_execution_id" => Self::MissingProcessExecutionId,
103            "durable_store_required:attachment_store" => Self::DurableStoreRequired {
104                facet: DurableStoreFacet::AttachmentStore,
105            },
106            "durable_store_required:process_env_store" => Self::DurableStoreRequired {
107                facet: DurableStoreFacet::ProcessEnvStore,
108            },
109            "durable_store_required:session_store" => Self::DurableStoreRequired {
110                facet: DurableStoreFacet::SessionStore,
111            },
112            "durable_store_required:process_registry" => Self::DurableStoreRequired {
113                facet: DurableStoreFacet::ProcessRegistry,
114            },
115            "durable_store_required:trigger_store" => Self::DurableStoreRequired {
116                facet: DurableStoreFacet::TriggerStore,
117            },
118            "store_commit_failed" => Self::StoreCommitFailed,
119            "plugin_session_manager" => Self::PluginSessionManager,
120            "plugin_finalize_turn" => Self::PluginFinalizeTurn,
121            "plugin_checkpoint" => Self::PluginCheckpoint,
122            "plugin_prepare_turn" => Self::PluginPrepareTurn,
123            "context_prepare_turn" => Self::ContextPrepareTurn,
124            "protocol_turn_extension" => Self::ProtocolTurnExtension,
125            "protocol_before_llm_call" => Self::ProtocolBeforeLlmCall,
126            "turn_stream_join" => Self::TurnStreamJoin,
127            "empty_agent_frame_run" => Self::EmptyAgentFrameRun,
128            "durable_effect_live_protocol_extension" => Self::DurableEffectLiveProtocolExtension,
129            "durable_effect_live_plugin_input" => Self::DurableEffectLivePluginInput,
130            other => Self::Other(other.to_string()),
131        }
132    }
133}
134
135impl From<String> for RuntimeErrorCode {
136    fn from(code: String) -> Self {
137        Self::from(code.as_str())
138    }
139}
140
141impl serde::Serialize for RuntimeErrorCode {
142    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
143    where
144        S: serde::Serializer,
145    {
146        serializer.serialize_str(self.as_str())
147    }
148}
149
150impl<'de> serde::Deserialize<'de> for RuntimeErrorCode {
151    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
152    where
153        D: serde::Deserializer<'de>,
154    {
155        let code = <String as serde::Deserialize>::deserialize(deserializer)?;
156        Ok(Self::from(code))
157    }
158}
159
160/// Runtime error for unexpected failures.
161#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
162pub struct RuntimeError {
163    pub code: RuntimeErrorCode,
164    pub message: String,
165}
166
167impl RuntimeError {
168    pub fn new(code: impl Into<RuntimeErrorCode>, message: impl Into<String>) -> Self {
169        Self {
170            code: code.into(),
171            message: message.into(),
172        }
173    }
174
175    pub fn is_code(&self, code: RuntimeErrorCode) -> bool {
176        self.code == code
177    }
178
179    /// Build the loud error raised when a durable execution path was wired
180    /// against an ephemeral store for `facet`.
181    pub fn durable_store_required(facet: DurableStoreFacet) -> Self {
182        let facet_label = match facet {
183            DurableStoreFacet::AttachmentStore => "attachment store",
184            DurableStoreFacet::ProcessEnvStore => "process env store",
185            DurableStoreFacet::SessionStore => "session store",
186            DurableStoreFacet::ProcessRegistry => "process registry",
187            DurableStoreFacet::TriggerStore => "trigger store",
188        };
189        Self::new(
190            RuntimeErrorCode::DurableStoreRequired { facet },
191            format!("durable effect hosts require a durable {facet_label}"),
192        )
193    }
194
195    /// Build the loud error raised when a process (re-)execution is handed an
196    /// empty/non-persisted id.
197    ///
198    /// Process execution identity is the persisted `process_id`, so a retry
199    /// must present that stable id — mirroring how
200    /// [`ExecutionScope`](crate::ExecutionScope) rejects an empty stable id.
201    pub fn missing_process_execution_id() -> Self {
202        Self::new(
203            RuntimeErrorCode::MissingProcessExecutionId,
204            "process execution requires a non-empty persisted process id",
205        )
206    }
207}
208
209impl std::fmt::Display for RuntimeError {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        write!(f, "{}: {}", self.code, self.message)
212    }
213}
214
215impl std::error::Error for RuntimeError {}
216
217#[cfg(test)]
218mod tests {
219    use super::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
220
221    #[test]
222    fn durable_store_required_round_trips_per_facet() {
223        for facet in [
224            DurableStoreFacet::AttachmentStore,
225            DurableStoreFacet::ProcessEnvStore,
226            DurableStoreFacet::SessionStore,
227            DurableStoreFacet::ProcessRegistry,
228            DurableStoreFacet::TriggerStore,
229        ] {
230            let err = RuntimeError::durable_store_required(facet);
231            let json = serde_json::to_value(&err).expect("serialize runtime error");
232            let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
233            assert_eq!(
234                decoded.code,
235                RuntimeErrorCode::DurableStoreRequired { facet }
236            );
237        }
238    }
239
240    #[test]
241    fn missing_process_execution_id_round_trips() {
242        let err = RuntimeError::missing_process_execution_id();
243        assert_eq!(err.code, RuntimeErrorCode::MissingProcessExecutionId);
244        let json = serde_json::to_value(&err).expect("serialize runtime error");
245        assert_eq!(json["code"], "missing_process_execution_id");
246        let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
247        assert_eq!(decoded.code, RuntimeErrorCode::MissingProcessExecutionId);
248    }
249
250    #[test]
251    fn session_execution_lease_lost_round_trips() {
252        let err = RuntimeError::new(RuntimeErrorCode::SessionExecutionLeaseLost, "lease lost");
253        let json = serde_json::to_value(&err).expect("serialize runtime error");
254        assert_eq!(json["code"], "session_execution_lease_lost");
255        let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
256        assert_eq!(decoded.code, RuntimeErrorCode::SessionExecutionLeaseLost);
257    }
258
259    #[test]
260    fn runtime_error_code_serializes_as_stable_string() {
261        let err = RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, "commit failed");
262
263        let json = serde_json::to_value(&err).expect("serialize runtime error");
264        assert_eq!(json["code"], "store_commit_failed");
265
266        let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
267        assert_eq!(decoded.code, RuntimeErrorCode::StoreCommitFailed);
268    }
269
270    #[test]
271    fn unknown_runtime_error_code_round_trips() {
272        let decoded: RuntimeError = serde_json::from_value(serde_json::json!({
273            "code": "plugin_defined_abort",
274            "message": "stopped by plugin"
275        }))
276        .expect("decode plugin runtime error");
277
278        assert_eq!(
279            decoded.code,
280            RuntimeErrorCode::Other("plugin_defined_abort".to_string())
281        );
282        assert_eq!(decoded.code.as_str(), "plugin_defined_abort");
283    }
284}