lash-core 0.1.0-alpha.37

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use std::collections::BTreeMap;
use std::time::SystemTime;

use serde::{Deserialize, Serialize};

use super::model::{ProcessId, ProcessScope, ProcessScopeId};
use super::validation::process_event_payload_hash;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessEventType {
    pub name: String,
    pub payload_schema: crate::LashSchema,
    pub semantics: ProcessEventSemanticsSpec,
}

#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct ProcessEventSemanticsSpec {
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub terminal: Option<ProcessTerminalSpec>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub wake: Option<ProcessWakeSpec>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessTerminalSpec {
    pub state: ProcessTerminalState,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub await_output: Option<ProcessValueSelector>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessWakeSpec {
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub when: Option<ProcessValueSelector>,
    pub input: ProcessValueSelector,
    #[serde(default)]
    pub dedupe_key: ProcessWakeDedupeKey,
}

#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ProcessWakeDedupeKey {
    #[default]
    EventIdentity,
    Selector(ProcessValueSelector),
    Const(String),
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ProcessValueSelector {
    Payload,
    Pointer(String),
    Const(serde_json::Value),
    Template {
        template: String,
        #[serde(default)]
        fields: BTreeMap<String, ProcessValueSelector>,
    },
    Present(String),
}

#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct ProcessEventSemantics {
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub terminal: Option<ProcessTerminalSemantics>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub wake: Option<ProcessWake>,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ProcessTerminalState {
    Completed,
    Failed,
    Cancelled,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessTerminalSemantics {
    pub state: ProcessTerminalState,
    pub await_output: ProcessAwaitOutput,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ProcessAwaitOutput {
    Success {
        value: serde_json::Value,
        #[serde(default, skip_serializing_if = "Option::is_none")]
        control: Option<crate::ToolControl>,
    },
    Failure {
        class: crate::ToolFailureClass,
        code: String,
        message: String,
        #[serde(default, skip_serializing_if = "Option::is_none")]
        raw: Option<serde_json::Value>,
        #[serde(default, skip_serializing_if = "Option::is_none")]
        control: Option<crate::ToolControl>,
    },
    Cancelled {
        message: String,
        #[serde(default, skip_serializing_if = "Option::is_none")]
        raw: Option<serde_json::Value>,
        #[serde(default, skip_serializing_if = "Option::is_none")]
        control: Option<crate::ToolControl>,
    },
}

impl ProcessAwaitOutput {
    pub fn terminal_state(&self) -> ProcessTerminalState {
        match self {
            Self::Success { .. } => ProcessTerminalState::Completed,
            Self::Failure { .. } => ProcessTerminalState::Failed,
            Self::Cancelled { .. } => ProcessTerminalState::Cancelled,
        }
    }

    pub fn from_tool_output(output: crate::ToolCallOutput) -> Self {
        let control = output.control;
        match output.outcome {
            crate::ToolCallOutcome::Success(value) => Self::Success {
                value: value.to_json_value(),
                control,
            },
            crate::ToolCallOutcome::Failure(failure) => Self::Failure {
                class: failure.class,
                code: failure.code,
                message: failure.message,
                raw: failure.raw.map(|value| value.to_json_value()),
                control,
            },
            crate::ToolCallOutcome::Cancelled(cancellation) => Self::Cancelled {
                message: cancellation.message,
                raw: cancellation.raw.map(|value| value.to_json_value()),
                control,
            },
        }
    }

    pub fn into_tool_output(self) -> crate::ToolCallOutput {
        match self {
            Self::Success { value, control } => {
                let mut output = crate::ToolCallOutput::success(value);
                output.control = control;
                output
            }
            Self::Failure {
                class,
                code,
                message,
                raw,
                control,
            } => {
                let mut failure = crate::ToolFailure::tool(class, code, message);
                failure.raw = raw.map(crate::ToolValue::from);
                let mut output = crate::ToolCallOutput::failure(failure);
                output.control = control;
                output
            }
            Self::Cancelled {
                message,
                raw,
                control,
            } => {
                let mut cancellation = crate::ToolCancellation::runtime(message);
                cancellation.raw = raw.map(crate::ToolValue::from);
                let mut output = crate::ToolCallOutput::cancelled(cancellation);
                output.control = control;
                output
            }
        }
    }
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessWake {
    pub input: String,
    pub dedupe_key: String,
}

pub fn lashlang_process_event_types() -> Vec<ProcessEventType> {
    vec![
        ProcessEventType {
            name: "process.yield".to_string(),
            payload_schema: crate::LashSchema::any(),
            semantics: ProcessEventSemanticsSpec::default(),
        },
        ProcessEventType {
            name: "process.wake".to_string(),
            payload_schema: crate::LashSchema::any(),
            semantics: ProcessEventSemanticsSpec {
                wake: Some(ProcessWakeSpec {
                    when: None,
                    input: ProcessValueSelector::Pointer("/text".to_string()),
                    dedupe_key: ProcessWakeDedupeKey::EventIdentity,
                }),
                ..ProcessEventSemanticsSpec::default()
            },
        },
        ProcessEventType {
            name: "process.signal".to_string(),
            payload_schema: crate::LashSchema::any(),
            semantics: ProcessEventSemanticsSpec::default(),
        },
    ]
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProcessEvent {
    pub process_id: ProcessId,
    pub sequence: u64,
    pub event_type: String,
    pub payload: serde_json::Value,
    pub invocation: crate::RuntimeInvocation,
    pub semantics: ProcessEventSemantics,
    pub occurred_at: SystemTime,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProcessEventAppendResult {
    pub event: ProcessEvent,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub wake_delivery: Option<ProcessWakeDelivery>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessEventAppendRequest {
    pub event_type: String,
    pub payload: serde_json::Value,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub replay: Option<crate::RuntimeReplay>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub wake_target_scope: Option<ProcessScope>,
}

impl ProcessEventAppendRequest {
    pub fn new(event_type: impl Into<String>, payload: serde_json::Value) -> Self {
        Self {
            event_type: event_type.into(),
            payload,
            replay: None,
            wake_target_scope: None,
        }
    }

    pub fn with_replay_key(mut self, replay_key: impl Into<String>) -> Self {
        self.replay = Some(crate::RuntimeReplay {
            key: replay_key.into(),
        });
        self
    }

    pub fn with_optional_replay(mut self, replay: Option<crate::RuntimeReplay>) -> Self {
        self.replay = replay;
        self
    }

    pub fn with_wake_target_scope(mut self, scope: ProcessScope) -> Self {
        self.wake_target_scope = Some(scope);
        self
    }

    pub fn with_optional_wake_target_scope(mut self, scope: Option<ProcessScope>) -> Self {
        self.wake_target_scope = scope;
        self
    }

    pub fn cancel_requested(process_id: &str, reason: Option<String>) -> Self {
        let payload = serde_json::json!({
            "reason": reason,
        });
        let replay_key = process_event_payload_hash("process.cancel_requested", &payload)
            .unwrap_or_else(|_| format!("process:{process_id}:cancel_requested"));
        Self::new("process.cancel_requested", payload).with_replay_key(format!(
            "process:{process_id}:cancel_requested:{replay_key}"
        ))
    }
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessWakeDelivery {
    pub wake_id: String,
    pub target_session_id: String,
    pub target_scope_id: ProcessScopeId,
    pub process_id: ProcessId,
    pub sequence: u64,
    #[serde(default = "default_process_wake_event_type")]
    pub event_type: String,
    #[serde(default = "default_process_wake_event_invocation")]
    pub event_invocation: crate::RuntimeInvocation,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub process_caused_by: Option<crate::CausalRef>,
    pub dedupe_key: String,
    pub input: String,
    pub created_at_ms: u64,
}

fn default_process_wake_event_type() -> String {
    "process.wake".to_string()
}

fn default_process_wake_event_invocation() -> crate::RuntimeInvocation {
    crate::RuntimeInvocation {
        scope: crate::RuntimeScope::new(""),
        subject: crate::RuntimeSubject::ProcessEvent {
            process_id: String::new(),
            sequence: 0,
            event_type: default_process_wake_event_type(),
        },
        caused_by: None,
        replay: None,
    }
}

pub(super) fn default_process_event_types() -> Vec<ProcessEventType> {
    vec![
        ProcessEventType {
            name: "process.cancel_requested".to_string(),
            payload_schema: crate::LashSchema::any(),
            semantics: ProcessEventSemanticsSpec::default(),
        },
        terminal_event_type("process.completed", ProcessTerminalState::Completed),
        terminal_event_type("process.failed", ProcessTerminalState::Failed),
        terminal_event_type("process.cancelled", ProcessTerminalState::Cancelled),
    ]
}

fn terminal_event_type(name: &str, state: ProcessTerminalState) -> ProcessEventType {
    ProcessEventType {
        name: name.to_string(),
        payload_schema: crate::LashSchema::any(),
        semantics: ProcessEventSemanticsSpec {
            terminal: Some(ProcessTerminalSpec {
                state,
                await_output: Some(ProcessValueSelector::Pointer("/await_output".to_string())),
            }),
            ..ProcessEventSemanticsSpec::default()
        },
    }
}