#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum DurableStoreFacet {
AttachmentStore,
ArtifactStore,
SessionStore,
ProcessRegistry,
}
impl DurableStoreFacet {
fn as_code(self) -> &'static str {
match self {
Self::AttachmentStore => "durable_store_required:attachment_store",
Self::ArtifactStore => "durable_store_required:artifact_store",
Self::SessionStore => "durable_store_required:session_store",
Self::ProcessRegistry => "durable_store_required:process_registry",
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum RuntimeErrorCode {
MissingEffectScopeId,
EffectScopeTurnIdMismatch,
MissingProcessExecutionId,
DurableStoreRequired {
facet: DurableStoreFacet,
},
StoreCommitFailed,
PluginSessionManager,
PluginFinalizeTurn,
PluginCheckpoint,
PluginPrepareTurn,
ContextPrepareTurn,
ProtocolTurnExtension,
ProtocolBeforeLlmCall,
TurnStreamJoin,
EmptyAgentFrameRun,
DurableEffectLiveProtocolExtension,
DurableEffectLivePluginInput,
Other(String),
}
impl RuntimeErrorCode {
pub fn as_str(&self) -> &str {
match self {
Self::MissingEffectScopeId => "missing_effect_scope_id",
Self::EffectScopeTurnIdMismatch => "effect_scope_turn_id_mismatch",
Self::MissingProcessExecutionId => "missing_process_execution_id",
Self::DurableStoreRequired { facet } => facet.as_code(),
Self::StoreCommitFailed => "store_commit_failed",
Self::PluginSessionManager => "plugin_session_manager",
Self::PluginFinalizeTurn => "plugin_finalize_turn",
Self::PluginCheckpoint => "plugin_checkpoint",
Self::PluginPrepareTurn => "plugin_prepare_turn",
Self::ContextPrepareTurn => "context_prepare_turn",
Self::ProtocolTurnExtension => "protocol_turn_extension",
Self::ProtocolBeforeLlmCall => "protocol_before_llm_call",
Self::TurnStreamJoin => "turn_stream_join",
Self::EmptyAgentFrameRun => "empty_agent_frame_run",
Self::DurableEffectLiveProtocolExtension => "durable_effect_live_protocol_extension",
Self::DurableEffectLivePluginInput => "durable_effect_live_plugin_input",
Self::Other(code) => code.as_str(),
}
}
}
impl std::fmt::Display for RuntimeErrorCode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl From<&str> for RuntimeErrorCode {
fn from(code: &str) -> Self {
match code {
"missing_effect_scope_id" => Self::MissingEffectScopeId,
"effect_scope_turn_id_mismatch" => Self::EffectScopeTurnIdMismatch,
"missing_process_execution_id" => Self::MissingProcessExecutionId,
"durable_store_required:attachment_store" => Self::DurableStoreRequired {
facet: DurableStoreFacet::AttachmentStore,
},
"durable_store_required:artifact_store" => Self::DurableStoreRequired {
facet: DurableStoreFacet::ArtifactStore,
},
"durable_store_required:session_store" => Self::DurableStoreRequired {
facet: DurableStoreFacet::SessionStore,
},
"durable_store_required:process_registry" => Self::DurableStoreRequired {
facet: DurableStoreFacet::ProcessRegistry,
},
"store_commit_failed" => Self::StoreCommitFailed,
"plugin_session_manager" => Self::PluginSessionManager,
"plugin_finalize_turn" => Self::PluginFinalizeTurn,
"plugin_checkpoint" => Self::PluginCheckpoint,
"plugin_prepare_turn" => Self::PluginPrepareTurn,
"context_prepare_turn" => Self::ContextPrepareTurn,
"protocol_turn_extension" => Self::ProtocolTurnExtension,
"protocol_before_llm_call" => Self::ProtocolBeforeLlmCall,
"turn_stream_join" => Self::TurnStreamJoin,
"empty_agent_frame_run" => Self::EmptyAgentFrameRun,
"durable_effect_live_protocol_extension" => Self::DurableEffectLiveProtocolExtension,
"durable_effect_live_plugin_input" => Self::DurableEffectLivePluginInput,
other => Self::Other(other.to_string()),
}
}
}
impl From<String> for RuntimeErrorCode {
fn from(code: String) -> Self {
Self::from(code.as_str())
}
}
impl serde::Serialize for RuntimeErrorCode {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(self.as_str())
}
}
impl<'de> serde::Deserialize<'de> for RuntimeErrorCode {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let code = <String as serde::Deserialize>::deserialize(deserializer)?;
Ok(Self::from(code))
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct RuntimeError {
pub code: RuntimeErrorCode,
pub message: String,
}
impl RuntimeError {
pub fn new(code: impl Into<RuntimeErrorCode>, message: impl Into<String>) -> Self {
Self {
code: code.into(),
message: message.into(),
}
}
pub fn is_code(&self, code: RuntimeErrorCode) -> bool {
self.code == code
}
pub fn durable_store_required(facet: DurableStoreFacet) -> Self {
let facet_label = match facet {
DurableStoreFacet::AttachmentStore => "attachment store",
DurableStoreFacet::ArtifactStore => "lashlang artifact store",
DurableStoreFacet::SessionStore => "session store",
DurableStoreFacet::ProcessRegistry => "process registry",
};
Self::new(
RuntimeErrorCode::DurableStoreRequired { facet },
format!("durable effect hosts require a durable {facet_label}"),
)
}
pub fn missing_process_execution_id() -> Self {
Self::new(
RuntimeErrorCode::MissingProcessExecutionId,
"process execution requires a non-empty persisted process id",
)
}
}
impl std::fmt::Display for RuntimeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.code, self.message)
}
}
impl std::error::Error for RuntimeError {}
#[cfg(test)]
mod tests {
use super::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
#[test]
fn durable_store_required_round_trips_per_facet() {
for facet in [
DurableStoreFacet::AttachmentStore,
DurableStoreFacet::ArtifactStore,
DurableStoreFacet::SessionStore,
DurableStoreFacet::ProcessRegistry,
] {
let err = RuntimeError::durable_store_required(facet);
let json = serde_json::to_value(&err).expect("serialize runtime error");
let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
assert_eq!(
decoded.code,
RuntimeErrorCode::DurableStoreRequired { facet }
);
}
}
#[test]
fn missing_process_execution_id_round_trips() {
let err = RuntimeError::missing_process_execution_id();
assert_eq!(err.code, RuntimeErrorCode::MissingProcessExecutionId);
let json = serde_json::to_value(&err).expect("serialize runtime error");
assert_eq!(json["code"], "missing_process_execution_id");
let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
assert_eq!(decoded.code, RuntimeErrorCode::MissingProcessExecutionId);
}
#[test]
fn runtime_error_code_serializes_as_stable_string() {
let err = RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, "commit failed");
let json = serde_json::to_value(&err).expect("serialize runtime error");
assert_eq!(json["code"], "store_commit_failed");
let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
assert_eq!(decoded.code, RuntimeErrorCode::StoreCommitFailed);
}
#[test]
fn unknown_runtime_error_code_round_trips() {
let decoded: RuntimeError = serde_json::from_value(serde_json::json!({
"code": "plugin_defined_abort",
"message": "stopped by plugin"
}))
.expect("decode plugin runtime error");
assert_eq!(
decoded.code,
RuntimeErrorCode::Other("plugin_defined_abort".to_string())
);
assert_eq!(decoded.code.as_str(), "plugin_defined_abort");
}
}