lash-core 0.1.0-alpha.34

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};

use serde::{Deserialize, Serialize};

use super::{
    PluginError, PluginFactory, PluginRegistrar, PluginSessionContext, PluginSnapshotMeta,
    SessionPlugin, SnapshotReader, SnapshotWriter,
};

pub(crate) const SESSION_TRIGGER_PLUGIN_ID: &str = "lash.session_triggers";

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct TriggerSourceType(String);

impl TriggerSourceType {
    pub fn new(value: impl Into<String>) -> Self {
        Self(value.into())
    }

    pub fn as_str(&self) -> &str {
        &self.0
    }
}

impl From<String> for TriggerSourceType {
    fn from(value: String) -> Self {
        Self::new(value)
    }
}

impl From<&str> for TriggerSourceType {
    fn from(value: &str) -> Self {
        Self::new(value)
    }
}

impl AsRef<str> for TriggerSourceType {
    fn as_ref(&self) -> &str {
        self.as_str()
    }
}

impl std::fmt::Display for TriggerSourceType {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter.write_str(self.as_str())
    }
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct TriggerRegistration {
    pub handle: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub name: Option<String>,
    pub source_type: TriggerSourceType,
    pub source: serde_json::Value,
    pub target: TriggerTargetSummary,
    #[serde(default = "default_enabled")]
    pub enabled: bool,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct TriggerTargetSummary {
    pub process_name: String,
    pub inputs: lashlang::TriggerInputTemplate,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct SessionTriggerRoute {
    pub handle: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub name: Option<String>,
    pub source_type: String,
    pub source: serde_json::Value,
    pub event_ty: lashlang::TypeExpr,
    pub module_ref: lashlang::ModuleRef,
    pub required_surface_ref: lashlang::RequiredSurfaceRef,
    pub process_ref: lashlang::ProcessRef,
    pub process_name: String,
    pub input_template: lashlang::TriggerInputTemplate,
    #[serde(default = "default_enabled")]
    pub enabled: bool,
}

impl From<&SessionTriggerRoute> for TriggerRegistration {
    fn from(route: &SessionTriggerRoute) -> Self {
        Self {
            handle: route.handle.clone(),
            name: route.name.clone(),
            source_type: TriggerSourceType::new(route.source_type.clone()),
            source: route.source.clone(),
            target: TriggerTargetSummary {
                process_name: route.process_name.clone(),
                inputs: route.input_template.clone(),
            },
            enabled: route.enabled,
        }
    }
}

fn default_enabled() -> bool {
    true
}

#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
struct SessionTriggerRegistryState {
    #[serde(default)]
    revision: u64,
    #[serde(default)]
    next_id: u64,
    #[serde(default)]
    routes: BTreeMap<String, SessionTriggerRoute>,
}

#[derive(Default)]
pub(crate) struct SessionTriggerRegistry {
    state: Mutex<SessionTriggerRegistryState>,
}

impl SessionTriggerRegistry {
    pub(crate) async fn register_route(
        &self,
        request: serde_json::Value,
        resources: &lashlang::ResourceCatalog,
        artifact_store: &dyn lashlang::LashlangArtifactStore,
    ) -> Result<SessionTriggerRoute, PluginError> {
        let request = lashlang::TriggerRegistrationRequest::decode(&request)
            .map_err(|err| PluginError::Session(err.to_string()))?;
        let source_type = request.source.source_type.clone();
        let source = request.source.to_json();
        let event_type = lashlang::event_type_for_source(resources, &source_type)
            .map_err(|err| PluginError::Session(err.to_string()))?;
        let target = request.target;
        let validation =
            validate_target_process(&target, &event_type, &request.inputs, artifact_store).await?;

        let mut state = self
            .state
            .lock()
            .map_err(|_| PluginError::Session("trigger registry lock poisoned".to_string()))?;
        state.next_id = state.next_id.saturating_add(1);
        let handle = format!("trigger:{}", state.next_id);
        let route = SessionTriggerRoute {
            handle: handle.clone(),
            name: request.name,
            source_type,
            source,
            event_ty: validation.event_ty,
            module_ref: target.module_ref,
            required_surface_ref: target.required_surface_ref,
            process_ref: target.process_ref,
            process_name: target.process_name,
            input_template: validation.inputs,
            enabled: true,
        };
        state.routes.insert(handle, route.clone());
        state.revision = state.revision.saturating_add(1);
        Ok(route)
    }

    pub(crate) fn list(
        &self,
        request: serde_json::Value,
    ) -> Result<Vec<TriggerRegistration>, PluginError> {
        let request = lashlang::TriggerListRequest::decode(&request)
            .map_err(|err| PluginError::Session(err.to_string()))?;
        let state = self
            .state
            .lock()
            .map_err(|_| PluginError::Session("trigger registry lock poisoned".to_string()))?;
        Ok(state
            .routes
            .values()
            .filter(|route| {
                request.target.as_ref().is_none_or(|target| {
                    target.matches(
                        &route.module_ref,
                        &route.required_surface_ref,
                        &route.process_ref,
                        &route.process_name,
                    )
                }) && request
                    .name
                    .as_deref()
                    .is_none_or(|name| route.name.as_deref() == Some(name))
                    && request
                        .source_type
                        .as_deref()
                        .is_none_or(|source_type| route.source_type == source_type)
                    && request
                        .enabled
                        .is_none_or(|enabled| route.enabled == enabled)
            })
            .map(TriggerRegistration::from)
            .collect())
    }

    pub(crate) fn list_all(&self) -> Result<Vec<TriggerRegistration>, PluginError> {
        let state = self
            .state
            .lock()
            .map_err(|_| PluginError::Session("trigger registry lock poisoned".to_string()))?;
        Ok(state
            .routes
            .values()
            .map(TriggerRegistration::from)
            .collect())
    }

    pub(crate) fn routes_by_source_type(
        &self,
        source_type: &TriggerSourceType,
    ) -> Result<Vec<TriggerRegistration>, PluginError> {
        let state = self
            .state
            .lock()
            .map_err(|_| PluginError::Session("trigger registry lock poisoned".to_string()))?;
        Ok(state
            .routes
            .values()
            .filter(|route| route.source_type == source_type.as_str())
            .map(TriggerRegistration::from)
            .collect())
    }

    pub(crate) fn activation_routes_by_source_type(
        &self,
        source_type: &str,
    ) -> Result<Vec<SessionTriggerRoute>, PluginError> {
        let state = self
            .state
            .lock()
            .map_err(|_| PluginError::Session("trigger registry lock poisoned".to_string()))?;
        Ok(state
            .routes
            .values()
            .filter(|route| route.source_type == source_type)
            .cloned()
            .collect())
    }

    pub(crate) fn cancel(&self, request: serde_json::Value) -> Result<bool, PluginError> {
        let request = lashlang::TriggerCancelRequest::decode(&request)
            .map_err(|err| PluginError::Session(err.to_string()))?;
        let mut state = self
            .state
            .lock()
            .map_err(|_| PluginError::Session("trigger registry lock poisoned".to_string()))?;
        let Some(route) = state.routes.get_mut(&request.handle) else {
            return Ok(false);
        };
        let changed = route.enabled;
        route.enabled = false;
        if changed {
            state.revision = state.revision.saturating_add(1);
        }
        Ok(changed)
    }

    pub(crate) fn route(&self, handle: &str) -> Result<Option<SessionTriggerRoute>, PluginError> {
        let state = self
            .state
            .lock()
            .map_err(|_| PluginError::Session("trigger registry lock poisoned".to_string()))?;
        Ok(state.routes.get(handle).cloned())
    }

    fn snapshot_state(&self) -> Result<SessionTriggerRegistryState, PluginError> {
        self.state
            .lock()
            .map(|state| state.clone())
            .map_err(|_| PluginError::Session("trigger registry lock poisoned".to_string()))
    }

    fn restore_state(&self, state: SessionTriggerRegistryState) -> Result<(), PluginError> {
        let mut current = self
            .state
            .lock()
            .map_err(|_| PluginError::Session("trigger registry lock poisoned".to_string()))?;
        *current = state;
        Ok(())
    }

    fn revision(&self) -> u64 {
        self.state
            .lock()
            .map(|state| state.revision)
            .unwrap_or_default()
    }
}

pub(crate) struct SessionTriggerPluginFactory;

impl PluginFactory for SessionTriggerPluginFactory {
    fn id(&self) -> &'static str {
        SESSION_TRIGGER_PLUGIN_ID
    }

    fn lashlang_resources(&self) -> lashlang::ResourceCatalog {
        let mut resources = lashlang::ResourceCatalog::new();
        lashlang::add_trigger_resource_operations(&mut resources);
        resources
    }

    fn build(&self, _ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
        Ok(Arc::new(SessionTriggerPlugin {
            registry: Arc::new(SessionTriggerRegistry::default()),
        }))
    }
}

struct SessionTriggerPlugin {
    registry: Arc<SessionTriggerRegistry>,
}

impl SessionPlugin for SessionTriggerPlugin {
    fn id(&self) -> &'static str {
        SESSION_TRIGGER_PLUGIN_ID
    }

    fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
        reg.triggers().registry(Arc::clone(&self.registry))
    }

    fn snapshot(
        &self,
        _writer: &mut dyn SnapshotWriter,
    ) -> Result<PluginSnapshotMeta, PluginError> {
        Ok(PluginSnapshotMeta {
            plugin_id: self.id().to_string(),
            plugin_version: self.version().to_string(),
            revision: self.snapshot_revision(),
            state: Some(
                serde_json::to_value(self.registry.snapshot_state()?).map_err(|err| {
                    PluginError::Session(format!(
                        "failed to encode trigger registry snapshot: {err}"
                    ))
                })?,
            ),
        })
    }

    fn snapshot_revision(&self) -> u64 {
        self.registry.revision()
    }

    fn restore(
        &self,
        meta: &PluginSnapshotMeta,
        _reader: &dyn SnapshotReader,
    ) -> Result<(), PluginError> {
        let Some(value) = meta.state.clone() else {
            return self
                .registry
                .restore_state(SessionTriggerRegistryState::default());
        };
        let state: SessionTriggerRegistryState = serde_json::from_value(value).map_err(|err| {
            PluginError::Session(format!("failed to decode trigger registry snapshot: {err}"))
        })?;
        self.registry.restore_state(state)
    }
}

pub(super) fn trigger_handle_json(handle: &str) -> serde_json::Value {
    serde_json::json!({
        "type": "trigger_handle",
        "id": handle,
    })
}

async fn validate_target_process(
    target: &lashlang::TriggerTargetIdentity,
    event_ty: &lashlang::NamedDataType,
    inputs: &lashlang::TriggerInputTemplate,
    artifact_store: &dyn lashlang::LashlangArtifactStore,
) -> Result<lashlang::TriggerTargetValidation, PluginError> {
    let artifact = artifact_store
        .get_module_artifact(&target.module_ref)
        .await
        .map_err(|err| PluginError::Session(format!("load trigger target artifact: {err}")))?
        .ok_or_else(|| {
            PluginError::Session(format!(
                "missing trigger target artifact `{}`",
                target.module_ref
            ))
        })?;
    let validation = lashlang::validate_trigger_target(target, event_ty, inputs, &artifact)
        .map_err(|err| PluginError::Session(err.to_string()))?;
    Ok(validation)
}