1#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
7pub enum DurableStoreFacet {
8 AttachmentStore,
9 ProcessEnvStore,
10 SessionStore,
11 ProcessRegistry,
12 TriggerStore,
13}
14
15impl DurableStoreFacet {
16 fn as_code(self) -> &'static str {
19 match self {
20 Self::AttachmentStore => "durable_store_required:attachment_store",
21 Self::ProcessEnvStore => "durable_store_required:process_env_store",
22 Self::SessionStore => "durable_store_required:session_store",
23 Self::ProcessRegistry => "durable_store_required:process_registry",
24 Self::TriggerStore => "durable_store_required:trigger_store",
25 }
26 }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, Hash)]
34pub enum RuntimeErrorCode {
35 MissingExecutionScopeId,
36 ExecutionScopeTurnIdMismatch,
37 SessionExecutionBusy,
38 SessionExecutionLeaseLost,
39 MissingProcessExecutionId,
43 DurableStoreRequired {
46 facet: DurableStoreFacet,
47 },
48 StoreCommitFailed,
49 PluginSessionManager,
50 PluginFinalizeTurn,
51 PluginCheckpoint,
52 PluginPrepareTurn,
53 ContextPrepareTurn,
54 ProtocolTurnExtension,
55 ProtocolBeforeLlmCall,
56 TurnStreamJoin,
57 EmptyAgentFrameRun,
58 DurableEffectLiveProtocolExtension,
59 DurableEffectLivePluginInput,
60 Other(String),
61}
62
63impl RuntimeErrorCode {
64 pub fn as_str(&self) -> &str {
65 match self {
66 Self::MissingExecutionScopeId => "missing_execution_scope_id",
67 Self::ExecutionScopeTurnIdMismatch => "execution_scope_turn_id_mismatch",
68 Self::SessionExecutionBusy => "session_execution_busy",
69 Self::SessionExecutionLeaseLost => "session_execution_lease_lost",
70 Self::MissingProcessExecutionId => "missing_process_execution_id",
71 Self::DurableStoreRequired { facet } => facet.as_code(),
72 Self::StoreCommitFailed => "store_commit_failed",
73 Self::PluginSessionManager => "plugin_session_manager",
74 Self::PluginFinalizeTurn => "plugin_finalize_turn",
75 Self::PluginCheckpoint => "plugin_checkpoint",
76 Self::PluginPrepareTurn => "plugin_prepare_turn",
77 Self::ContextPrepareTurn => "context_prepare_turn",
78 Self::ProtocolTurnExtension => "protocol_turn_extension",
79 Self::ProtocolBeforeLlmCall => "protocol_before_llm_call",
80 Self::TurnStreamJoin => "turn_stream_join",
81 Self::EmptyAgentFrameRun => "empty_agent_frame_run",
82 Self::DurableEffectLiveProtocolExtension => "durable_effect_live_protocol_extension",
83 Self::DurableEffectLivePluginInput => "durable_effect_live_plugin_input",
84 Self::Other(code) => code.as_str(),
85 }
86 }
87}
88
89impl std::fmt::Display for RuntimeErrorCode {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.write_str(self.as_str())
92 }
93}
94
95impl From<&str> for RuntimeErrorCode {
96 fn from(code: &str) -> Self {
97 match code {
98 "missing_execution_scope_id" => Self::MissingExecutionScopeId,
99 "execution_scope_turn_id_mismatch" => Self::ExecutionScopeTurnIdMismatch,
100 "session_execution_busy" => Self::SessionExecutionBusy,
101 "session_execution_lease_lost" => Self::SessionExecutionLeaseLost,
102 "missing_process_execution_id" => Self::MissingProcessExecutionId,
103 "durable_store_required:attachment_store" => Self::DurableStoreRequired {
104 facet: DurableStoreFacet::AttachmentStore,
105 },
106 "durable_store_required:process_env_store" => Self::DurableStoreRequired {
107 facet: DurableStoreFacet::ProcessEnvStore,
108 },
109 "durable_store_required:session_store" => Self::DurableStoreRequired {
110 facet: DurableStoreFacet::SessionStore,
111 },
112 "durable_store_required:process_registry" => Self::DurableStoreRequired {
113 facet: DurableStoreFacet::ProcessRegistry,
114 },
115 "durable_store_required:trigger_store" => Self::DurableStoreRequired {
116 facet: DurableStoreFacet::TriggerStore,
117 },
118 "store_commit_failed" => Self::StoreCommitFailed,
119 "plugin_session_manager" => Self::PluginSessionManager,
120 "plugin_finalize_turn" => Self::PluginFinalizeTurn,
121 "plugin_checkpoint" => Self::PluginCheckpoint,
122 "plugin_prepare_turn" => Self::PluginPrepareTurn,
123 "context_prepare_turn" => Self::ContextPrepareTurn,
124 "protocol_turn_extension" => Self::ProtocolTurnExtension,
125 "protocol_before_llm_call" => Self::ProtocolBeforeLlmCall,
126 "turn_stream_join" => Self::TurnStreamJoin,
127 "empty_agent_frame_run" => Self::EmptyAgentFrameRun,
128 "durable_effect_live_protocol_extension" => Self::DurableEffectLiveProtocolExtension,
129 "durable_effect_live_plugin_input" => Self::DurableEffectLivePluginInput,
130 other => Self::Other(other.to_string()),
131 }
132 }
133}
134
135impl From<String> for RuntimeErrorCode {
136 fn from(code: String) -> Self {
137 Self::from(code.as_str())
138 }
139}
140
141impl serde::Serialize for RuntimeErrorCode {
142 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
143 where
144 S: serde::Serializer,
145 {
146 serializer.serialize_str(self.as_str())
147 }
148}
149
150impl<'de> serde::Deserialize<'de> for RuntimeErrorCode {
151 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
152 where
153 D: serde::Deserializer<'de>,
154 {
155 let code = <String as serde::Deserialize>::deserialize(deserializer)?;
156 Ok(Self::from(code))
157 }
158}
159
160#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
162pub struct RuntimeError {
163 pub code: RuntimeErrorCode,
164 pub message: String,
165}
166
167impl RuntimeError {
168 pub fn new(code: impl Into<RuntimeErrorCode>, message: impl Into<String>) -> Self {
169 Self {
170 code: code.into(),
171 message: message.into(),
172 }
173 }
174
175 pub fn is_code(&self, code: RuntimeErrorCode) -> bool {
176 self.code == code
177 }
178
179 pub fn durable_store_required(facet: DurableStoreFacet) -> Self {
182 let facet_label = match facet {
183 DurableStoreFacet::AttachmentStore => "attachment store",
184 DurableStoreFacet::ProcessEnvStore => "process env store",
185 DurableStoreFacet::SessionStore => "session store",
186 DurableStoreFacet::ProcessRegistry => "process registry",
187 DurableStoreFacet::TriggerStore => "trigger store",
188 };
189 Self::new(
190 RuntimeErrorCode::DurableStoreRequired { facet },
191 format!("durable effect hosts require a durable {facet_label}"),
192 )
193 }
194
195 pub fn missing_process_execution_id() -> Self {
202 Self::new(
203 RuntimeErrorCode::MissingProcessExecutionId,
204 "process execution requires a non-empty persisted process id",
205 )
206 }
207}
208
209impl std::fmt::Display for RuntimeError {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 write!(f, "{}: {}", self.code, self.message)
212 }
213}
214
215impl std::error::Error for RuntimeError {}
216
217#[cfg(test)]
218mod tests {
219 use super::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
220
221 #[test]
222 fn durable_store_required_round_trips_per_facet() {
223 for facet in [
224 DurableStoreFacet::AttachmentStore,
225 DurableStoreFacet::ProcessEnvStore,
226 DurableStoreFacet::SessionStore,
227 DurableStoreFacet::ProcessRegistry,
228 DurableStoreFacet::TriggerStore,
229 ] {
230 let err = RuntimeError::durable_store_required(facet);
231 let json = serde_json::to_value(&err).expect("serialize runtime error");
232 let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
233 assert_eq!(
234 decoded.code,
235 RuntimeErrorCode::DurableStoreRequired { facet }
236 );
237 }
238 }
239
240 #[test]
241 fn missing_process_execution_id_round_trips() {
242 let err = RuntimeError::missing_process_execution_id();
243 assert_eq!(err.code, RuntimeErrorCode::MissingProcessExecutionId);
244 let json = serde_json::to_value(&err).expect("serialize runtime error");
245 assert_eq!(json["code"], "missing_process_execution_id");
246 let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
247 assert_eq!(decoded.code, RuntimeErrorCode::MissingProcessExecutionId);
248 }
249
250 #[test]
251 fn session_execution_lease_lost_round_trips() {
252 let err = RuntimeError::new(RuntimeErrorCode::SessionExecutionLeaseLost, "lease lost");
253 let json = serde_json::to_value(&err).expect("serialize runtime error");
254 assert_eq!(json["code"], "session_execution_lease_lost");
255 let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
256 assert_eq!(decoded.code, RuntimeErrorCode::SessionExecutionLeaseLost);
257 }
258
259 #[test]
260 fn runtime_error_code_serializes_as_stable_string() {
261 let err = RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, "commit failed");
262
263 let json = serde_json::to_value(&err).expect("serialize runtime error");
264 assert_eq!(json["code"], "store_commit_failed");
265
266 let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
267 assert_eq!(decoded.code, RuntimeErrorCode::StoreCommitFailed);
268 }
269
270 #[test]
271 fn unknown_runtime_error_code_round_trips() {
272 let decoded: RuntimeError = serde_json::from_value(serde_json::json!({
273 "code": "plugin_defined_abort",
274 "message": "stopped by plugin"
275 }))
276 .expect("decode plugin runtime error");
277
278 assert_eq!(
279 decoded.code,
280 RuntimeErrorCode::Other("plugin_defined_abort".to_string())
281 );
282 assert_eq!(decoded.code.as_str(), "plugin_defined_abort");
283 }
284}