use std::collections::BTreeMap;
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use super::model::{ProcessId, SessionScope, SessionScopeId};
use super::validation::process_event_payload_hash;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessEventType {
pub name: String,
pub payload_schema: crate::LashSchema,
pub semantics: ProcessEventSemanticsSpec,
}
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct ProcessEventSemanticsSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub terminal: Option<ProcessTerminalSpec>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wake: Option<ProcessWakeSpec>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessTerminalSpec {
pub state: ProcessTerminalState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub await_output: Option<ProcessValueSelector>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessWakeSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub when: Option<ProcessValueSelector>,
pub input: ProcessValueSelector,
#[serde(default)]
pub dedupe_key: ProcessWakeDedupeKey,
}
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ProcessWakeDedupeKey {
#[default]
EventIdentity,
Selector(ProcessValueSelector),
Const(String),
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ProcessValueSelector {
Payload,
Pointer(String),
Const(serde_json::Value),
Template {
template: String,
#[serde(default)]
fields: BTreeMap<String, ProcessValueSelector>,
},
Present(String),
}
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct ProcessEventSemantics {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub terminal: Option<ProcessTerminalSemantics>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wake: Option<ProcessWake>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ProcessTerminalState {
Completed,
Failed,
Cancelled,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessTerminalSemantics {
pub state: ProcessTerminalState,
pub await_output: ProcessAwaitOutput,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ProcessAwaitOutput {
Success {
value: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
control: Option<crate::ToolControl>,
},
Failure {
class: crate::ToolFailureClass,
code: String,
message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
raw: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
control: Option<crate::ToolControl>,
},
Cancelled {
message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
raw: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
control: Option<crate::ToolControl>,
},
}
impl ProcessAwaitOutput {
pub fn terminal_state(&self) -> ProcessTerminalState {
match self {
Self::Success { .. } => ProcessTerminalState::Completed,
Self::Failure { .. } => ProcessTerminalState::Failed,
Self::Cancelled { .. } => ProcessTerminalState::Cancelled,
}
}
pub fn from_tool_output(output: crate::ToolCallOutput) -> Self {
let control = output.control;
match output.outcome {
crate::ToolCallOutcome::Success(value) => Self::Success {
value: value.to_json_value(),
control,
},
crate::ToolCallOutcome::Failure(failure) => Self::Failure {
class: failure.class,
code: failure.code,
message: failure.message,
raw: failure.raw.map(|value| value.to_json_value()),
control,
},
crate::ToolCallOutcome::Cancelled(cancellation) => Self::Cancelled {
message: cancellation.message,
raw: cancellation.raw.map(|value| value.to_json_value()),
control,
},
}
}
pub fn into_tool_output(self) -> crate::ToolCallOutput {
match self {
Self::Success { value, control } => {
let mut output = crate::ToolCallOutput::success(value);
output.control = control;
output
}
Self::Failure {
class,
code,
message,
raw,
control,
} => {
let mut failure = crate::ToolFailure::tool(class, code, message);
failure.raw = raw.map(crate::ToolValue::from);
let mut output = crate::ToolCallOutput::failure(failure);
output.control = control;
output
}
Self::Cancelled {
message,
raw,
control,
} => {
let mut cancellation = crate::ToolCancellation::runtime(message);
cancellation.raw = raw.map(crate::ToolValue::from);
let mut output = crate::ToolCallOutput::cancelled(cancellation);
output.control = control;
output
}
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessWake {
pub input: String,
pub dedupe_key: String,
}
pub fn process_signal_event_type(signal_name: &str) -> Result<String, crate::PluginError> {
validate_process_signal_name(signal_name)?;
Ok(format!("signal.{signal_name}"))
}
pub fn process_signal_name_from_event_type(event_type: &str) -> Option<&str> {
event_type.strip_prefix("signal.")
}
pub fn process_signal_wait_key(process_id: &str, signal_name: &str, ordinal: u64) -> String {
format!("process:{process_id}:signal.{signal_name}:{ordinal}")
}
pub fn validate_process_signal_name(signal_name: &str) -> Result<(), crate::PluginError> {
let valid = !signal_name.is_empty()
&& signal_name
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-');
if valid {
Ok(())
} else {
Err(crate::PluginError::Session(format!(
"process signal name must be non-empty and contain only ASCII letters, digits, `_`, or `-`, got `{signal_name}`"
)))
}
}
pub fn lashlang_process_event_types() -> Vec<ProcessEventType> {
vec![
ProcessEventType {
name: "process.yield".to_string(),
payload_schema: crate::LashSchema::any(),
semantics: ProcessEventSemanticsSpec::default(),
},
ProcessEventType {
name: "process.wake".to_string(),
payload_schema: crate::LashSchema::any(),
semantics: ProcessEventSemanticsSpec {
wake: Some(ProcessWakeSpec {
when: None,
input: ProcessValueSelector::Pointer("/text".to_string()),
dedupe_key: ProcessWakeDedupeKey::EventIdentity,
}),
..ProcessEventSemanticsSpec::default()
},
},
]
}
pub fn lashlang_process_signal_event_types(
process: &lashlang::ProcessDecl,
) -> Vec<ProcessEventType> {
process
.signals
.iter()
.map(|signal| ProcessEventType {
name: process_signal_event_type(signal.name.as_str())
.expect("lashlang process signal declarations use parser-validated names"),
payload_schema: crate::LashSchema::new(lashlang_type_expr_schema(&signal.ty)),
semantics: ProcessEventSemanticsSpec::default(),
})
.collect()
}
fn lashlang_type_expr_schema(ty: &lashlang::TypeExpr) -> serde_json::Value {
match ty {
lashlang::TypeExpr::Any
| lashlang::TypeExpr::Dict
| lashlang::TypeExpr::Ref(_)
| lashlang::TypeExpr::Process { .. }
| lashlang::TypeExpr::TriggerHandle(_) => serde_json::json!({}),
lashlang::TypeExpr::Str => serde_json::json!({ "type": "string" }),
lashlang::TypeExpr::Int => serde_json::json!({ "type": "integer" }),
lashlang::TypeExpr::Float => serde_json::json!({ "type": "number" }),
lashlang::TypeExpr::Bool => serde_json::json!({ "type": "boolean" }),
lashlang::TypeExpr::Null => serde_json::json!({ "type": "null" }),
lashlang::TypeExpr::Enum(values) => serde_json::json!({
"enum": values.iter().map(|value| value.as_str()).collect::<Vec<_>>()
}),
lashlang::TypeExpr::List(item) => serde_json::json!({
"type": "array",
"items": lashlang_type_expr_schema(item),
}),
lashlang::TypeExpr::Object(fields) => {
let mut properties = serde_json::Map::new();
let mut required = Vec::new();
for field in fields {
properties.insert(field.name.to_string(), lashlang_type_expr_schema(&field.ty));
if !field.optional {
required.push(serde_json::Value::String(field.name.to_string()));
}
}
let mut schema = serde_json::Map::new();
schema.insert(
"type".to_string(),
serde_json::Value::String("object".to_string()),
);
schema.insert(
"properties".to_string(),
serde_json::Value::Object(properties),
);
if !required.is_empty() {
schema.insert("required".to_string(), serde_json::Value::Array(required));
}
schema.insert(
"additionalProperties".to_string(),
serde_json::Value::Bool(true),
);
serde_json::Value::Object(schema)
}
lashlang::TypeExpr::Union(variants) => serde_json::json!({
"anyOf": variants.iter().map(lashlang_type_expr_schema).collect::<Vec<_>>()
}),
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProcessEvent {
pub process_id: ProcessId,
pub sequence: u64,
pub event_type: String,
pub payload: serde_json::Value,
pub invocation: crate::RuntimeInvocation,
pub semantics: ProcessEventSemantics,
pub occurred_at: SystemTime,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProcessEventAppendResult {
pub event: ProcessEvent,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wake_delivery: Option<ProcessWakeDelivery>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessEventAppendRequest {
pub event_type: String,
pub payload: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replay: Option<crate::RuntimeReplay>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wake_target_scope: Option<SessionScope>,
}
impl ProcessEventAppendRequest {
pub fn new(event_type: impl Into<String>, payload: serde_json::Value) -> Self {
Self {
event_type: event_type.into(),
payload,
replay: None,
wake_target_scope: None,
}
}
pub fn with_replay_key(mut self, replay_key: impl Into<String>) -> Self {
self.replay = Some(crate::RuntimeReplay {
key: replay_key.into(),
});
self
}
pub fn with_optional_replay(mut self, replay: Option<crate::RuntimeReplay>) -> Self {
self.replay = replay;
self
}
pub fn with_wake_target_scope(mut self, scope: SessionScope) -> Self {
self.wake_target_scope = Some(scope);
self
}
pub fn with_optional_wake_target_scope(mut self, scope: Option<SessionScope>) -> Self {
self.wake_target_scope = scope;
self
}
pub fn cancel_requested(process_id: &str, reason: Option<String>) -> Self {
let payload = serde_json::json!({
"reason": reason,
});
let replay_key = process_event_payload_hash("process.cancel_requested", &payload)
.unwrap_or_else(|_| format!("process:{process_id}:cancel_requested"));
Self::new("process.cancel_requested", payload).with_replay_key(format!(
"process:{process_id}:cancel_requested:{replay_key}"
))
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ProcessWakeDelivery {
pub wake_id: String,
pub target_session_id: String,
pub target_scope_id: SessionScopeId,
pub process_id: ProcessId,
pub sequence: u64,
#[serde(default = "default_process_wake_event_type")]
pub event_type: String,
#[serde(default = "default_process_wake_event_invocation")]
pub event_invocation: crate::RuntimeInvocation,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub process_caused_by: Option<crate::CausalRef>,
pub dedupe_key: String,
pub input: String,
pub created_at_ms: u64,
}
fn default_process_wake_event_type() -> String {
"process.wake".to_string()
}
fn default_process_wake_event_invocation() -> crate::RuntimeInvocation {
crate::RuntimeInvocation {
scope: crate::RuntimeScope::new(""),
subject: crate::RuntimeSubject::ProcessEvent {
process_id: String::new(),
sequence: 0,
event_type: default_process_wake_event_type(),
},
caused_by: None,
replay: None,
}
}
pub(super) fn default_process_event_types() -> Vec<ProcessEventType> {
vec![
ProcessEventType {
name: "process.cancel_requested".to_string(),
payload_schema: crate::LashSchema::any(),
semantics: ProcessEventSemanticsSpec::default(),
},
ProcessEventType {
name: "process.waiting".to_string(),
payload_schema: crate::LashSchema::any(),
semantics: ProcessEventSemanticsSpec::default(),
},
ProcessEventType {
name: "process.resumed".to_string(),
payload_schema: crate::LashSchema::any(),
semantics: ProcessEventSemanticsSpec::default(),
},
terminal_event_type("process.completed", ProcessTerminalState::Completed),
terminal_event_type("process.failed", ProcessTerminalState::Failed),
terminal_event_type("process.cancelled", ProcessTerminalState::Cancelled),
]
}
fn terminal_event_type(name: &str, state: ProcessTerminalState) -> ProcessEventType {
ProcessEventType {
name: name.to_string(),
payload_schema: crate::LashSchema::any(),
semantics: ProcessEventSemanticsSpec {
terminal: Some(ProcessTerminalSpec {
state,
await_output: Some(ProcessValueSelector::Pointer("/await_output".to_string())),
}),
..ProcessEventSemanticsSpec::default()
},
}
}