lash-core 0.1.0-alpha.36

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
/// Durable store facet that a durable execution path requires but the host
/// wired as ephemeral.
///
/// Names the failing facet so a [`RuntimeErrorCode::DurableStoreRequired`]
/// can be matched and serialized losslessly per facet.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum DurableStoreFacet {
    AttachmentStore,
    ArtifactStore,
    SessionStore,
    ProcessRegistry,
    HostEventStore,
}

impl DurableStoreFacet {
    /// Stable per-facet error-code string (the full
    /// `durable_store_required:*` code surfaced in traces and host errors).
    fn as_code(self) -> &'static str {
        match self {
            Self::AttachmentStore => "durable_store_required:attachment_store",
            Self::ArtifactStore => "durable_store_required:artifact_store",
            Self::SessionStore => "durable_store_required:session_store",
            Self::ProcessRegistry => "durable_store_required:process_registry",
            Self::HostEventStore => "durable_store_required:host_event_store",
        }
    }
}

/// Stable runtime error code.
///
/// Codes serialize as the same snake_case strings exposed in traces and host
/// errors, but callers should match this type instead of parsing display text.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum RuntimeErrorCode {
    MissingEffectScopeId,
    EffectScopeTurnIdMismatch,
    /// A process (re-)execution was handed an empty/non-persisted process id.
    /// Process execution identity is the persisted `process_id`; a retry that
    /// cannot present that stable id has lost its idempotency anchor.
    MissingProcessExecutionId,
    /// A durable execution path was wired against an ephemeral store for the
    /// named facet.
    DurableStoreRequired {
        facet: DurableStoreFacet,
    },
    StoreCommitFailed,
    PluginSessionManager,
    PluginFinalizeTurn,
    PluginCheckpoint,
    PluginPrepareTurn,
    ContextPrepareTurn,
    ProtocolTurnExtension,
    ProtocolBeforeLlmCall,
    TurnStreamJoin,
    EmptyAgentFrameRun,
    DurableEffectLiveProtocolExtension,
    DurableEffectLivePluginInput,
    Other(String),
}

impl RuntimeErrorCode {
    pub fn as_str(&self) -> &str {
        match self {
            Self::MissingEffectScopeId => "missing_effect_scope_id",
            Self::EffectScopeTurnIdMismatch => "effect_scope_turn_id_mismatch",
            Self::MissingProcessExecutionId => "missing_process_execution_id",
            Self::DurableStoreRequired { facet } => facet.as_code(),
            Self::StoreCommitFailed => "store_commit_failed",
            Self::PluginSessionManager => "plugin_session_manager",
            Self::PluginFinalizeTurn => "plugin_finalize_turn",
            Self::PluginCheckpoint => "plugin_checkpoint",
            Self::PluginPrepareTurn => "plugin_prepare_turn",
            Self::ContextPrepareTurn => "context_prepare_turn",
            Self::ProtocolTurnExtension => "protocol_turn_extension",
            Self::ProtocolBeforeLlmCall => "protocol_before_llm_call",
            Self::TurnStreamJoin => "turn_stream_join",
            Self::EmptyAgentFrameRun => "empty_agent_frame_run",
            Self::DurableEffectLiveProtocolExtension => "durable_effect_live_protocol_extension",
            Self::DurableEffectLivePluginInput => "durable_effect_live_plugin_input",
            Self::Other(code) => code.as_str(),
        }
    }
}

impl std::fmt::Display for RuntimeErrorCode {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(self.as_str())
    }
}

impl From<&str> for RuntimeErrorCode {
    fn from(code: &str) -> Self {
        match code {
            "missing_effect_scope_id" => Self::MissingEffectScopeId,
            "effect_scope_turn_id_mismatch" => Self::EffectScopeTurnIdMismatch,
            "missing_process_execution_id" => Self::MissingProcessExecutionId,
            "durable_store_required:attachment_store" => Self::DurableStoreRequired {
                facet: DurableStoreFacet::AttachmentStore,
            },
            "durable_store_required:artifact_store" => Self::DurableStoreRequired {
                facet: DurableStoreFacet::ArtifactStore,
            },
            "durable_store_required:session_store" => Self::DurableStoreRequired {
                facet: DurableStoreFacet::SessionStore,
            },
            "durable_store_required:process_registry" => Self::DurableStoreRequired {
                facet: DurableStoreFacet::ProcessRegistry,
            },
            "durable_store_required:host_event_store" => Self::DurableStoreRequired {
                facet: DurableStoreFacet::HostEventStore,
            },
            "store_commit_failed" => Self::StoreCommitFailed,
            "plugin_session_manager" => Self::PluginSessionManager,
            "plugin_finalize_turn" => Self::PluginFinalizeTurn,
            "plugin_checkpoint" => Self::PluginCheckpoint,
            "plugin_prepare_turn" => Self::PluginPrepareTurn,
            "context_prepare_turn" => Self::ContextPrepareTurn,
            "protocol_turn_extension" => Self::ProtocolTurnExtension,
            "protocol_before_llm_call" => Self::ProtocolBeforeLlmCall,
            "turn_stream_join" => Self::TurnStreamJoin,
            "empty_agent_frame_run" => Self::EmptyAgentFrameRun,
            "durable_effect_live_protocol_extension" => Self::DurableEffectLiveProtocolExtension,
            "durable_effect_live_plugin_input" => Self::DurableEffectLivePluginInput,
            other => Self::Other(other.to_string()),
        }
    }
}

impl From<String> for RuntimeErrorCode {
    fn from(code: String) -> Self {
        Self::from(code.as_str())
    }
}

impl serde::Serialize for RuntimeErrorCode {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        serializer.serialize_str(self.as_str())
    }
}

impl<'de> serde::Deserialize<'de> for RuntimeErrorCode {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let code = <String as serde::Deserialize>::deserialize(deserializer)?;
        Ok(Self::from(code))
    }
}

/// Runtime error for unexpected failures.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct RuntimeError {
    pub code: RuntimeErrorCode,
    pub message: String,
}

impl RuntimeError {
    pub fn new(code: impl Into<RuntimeErrorCode>, message: impl Into<String>) -> Self {
        Self {
            code: code.into(),
            message: message.into(),
        }
    }

    pub fn is_code(&self, code: RuntimeErrorCode) -> bool {
        self.code == code
    }

    /// Build the loud error raised when a durable execution path was wired
    /// against an ephemeral store for `facet`.
    pub fn durable_store_required(facet: DurableStoreFacet) -> Self {
        let facet_label = match facet {
            DurableStoreFacet::AttachmentStore => "attachment store",
            DurableStoreFacet::ArtifactStore => "lashlang artifact store",
            DurableStoreFacet::SessionStore => "session store",
            DurableStoreFacet::ProcessRegistry => "process registry",
            DurableStoreFacet::HostEventStore => "host event store",
        };
        Self::new(
            RuntimeErrorCode::DurableStoreRequired { facet },
            format!("durable effect hosts require a durable {facet_label}"),
        )
    }

    /// Build the loud error raised when a process (re-)execution is handed an
    /// empty/non-persisted id.
    ///
    /// Process execution identity is the persisted `process_id`, so a retry
    /// must present that stable id — mirroring how
    /// [`EffectScope`](crate::EffectScope) rejects an empty stable id.
    pub fn missing_process_execution_id() -> Self {
        Self::new(
            RuntimeErrorCode::MissingProcessExecutionId,
            "process execution requires a non-empty persisted process id",
        )
    }
}

impl std::fmt::Display for RuntimeError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}: {}", self.code, self.message)
    }
}

impl std::error::Error for RuntimeError {}

#[cfg(test)]
mod tests {
    use super::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};

    #[test]
    fn durable_store_required_round_trips_per_facet() {
        for facet in [
            DurableStoreFacet::AttachmentStore,
            DurableStoreFacet::ArtifactStore,
            DurableStoreFacet::SessionStore,
            DurableStoreFacet::ProcessRegistry,
            DurableStoreFacet::HostEventStore,
        ] {
            let err = RuntimeError::durable_store_required(facet);
            let json = serde_json::to_value(&err).expect("serialize runtime error");
            let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
            assert_eq!(
                decoded.code,
                RuntimeErrorCode::DurableStoreRequired { facet }
            );
        }
    }

    #[test]
    fn missing_process_execution_id_round_trips() {
        let err = RuntimeError::missing_process_execution_id();
        assert_eq!(err.code, RuntimeErrorCode::MissingProcessExecutionId);
        let json = serde_json::to_value(&err).expect("serialize runtime error");
        assert_eq!(json["code"], "missing_process_execution_id");
        let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
        assert_eq!(decoded.code, RuntimeErrorCode::MissingProcessExecutionId);
    }

    #[test]
    fn runtime_error_code_serializes_as_stable_string() {
        let err = RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, "commit failed");

        let json = serde_json::to_value(&err).expect("serialize runtime error");
        assert_eq!(json["code"], "store_commit_failed");

        let decoded: RuntimeError = serde_json::from_value(json).expect("decode runtime error");
        assert_eq!(decoded.code, RuntimeErrorCode::StoreCommitFailed);
    }

    #[test]
    fn unknown_runtime_error_code_round_trips() {
        let decoded: RuntimeError = serde_json::from_value(serde_json::json!({
            "code": "plugin_defined_abort",
            "message": "stopped by plugin"
        }))
        .expect("decode plugin runtime error");

        assert_eq!(
            decoded.code,
            RuntimeErrorCode::Other("plugin_defined_abort".to_string())
        );
        assert_eq!(decoded.code.as_str(), "plugin_defined_abort");
    }
}