lash-core 0.1.0-alpha.34

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use std::sync::Arc;

use crate::plugin::{PluginError, SessionTriggerRegistry};

pub(crate) fn validate_host_event(
    plugins: &crate::PluginSession,
    resource_type: &str,
    alias: &str,
    event: &str,
    payload: &serde_json::Value,
) -> Result<(), PluginError> {
    let declared = plugins
        .host_events()
        .get(resource_type, alias, event)
        .ok_or_else(|| {
            PluginError::Session(format!(
                "unknown host event `{resource_type}.{alias}.{event}`"
            ))
        })?;
    validate_payload(payload, declared.payload_type().ty()).map_err(|message| {
        PluginError::Session(format!(
            "invalid payload for host event `{resource_type}.{alias}.{event}`: {message}"
        ))
    })
}

pub struct TriggerActivationService<'a> {
    session_id: String,
    registry: Arc<SessionTriggerRegistry>,
    processes: Arc<dyn crate::ProcessService>,
    scoped_effect_controller: crate::ScopedEffectController<'a>,
}

impl<'a> TriggerActivationService<'a> {
    pub(crate) fn new(
        session_id: String,
        registry: Arc<SessionTriggerRegistry>,
        processes: Arc<dyn crate::ProcessService>,
        scoped_effect_controller: crate::ScopedEffectController<'a>,
    ) -> Self {
        Self {
            session_id,
            registry,
            processes,
            scoped_effect_controller,
        }
    }

    pub async fn activate(
        &self,
        handle: impl AsRef<str>,
        event_payload: serde_json::Value,
        parent_invocation: Option<crate::RuntimeInvocation>,
    ) -> Result<Option<String>, PluginError> {
        let Some(route) = self.registry.route(handle.as_ref())? else {
            return Ok(None);
        };
        if !route.enabled {
            return Ok(None);
        }
        self.start_route(route, event_payload, parent_invocation)
            .await
    }

    pub async fn activate_source_type(
        &self,
        source_type: impl AsRef<str>,
        event_payload: serde_json::Value,
        parent_invocation: Option<crate::RuntimeInvocation>,
    ) -> Result<Vec<String>, PluginError> {
        let routes = self
            .registry
            .activation_routes_by_source_type(source_type.as_ref())?;
        let mut started_process_ids = Vec::new();
        for route in routes {
            if !route.enabled {
                continue;
            }
            if let Some(process_id) = self
                .start_route(route, event_payload.clone(), parent_invocation.clone())
                .await?
            {
                started_process_ids.push(process_id);
            }
        }
        Ok(started_process_ids)
    }

    async fn start_route(
        &self,
        route: crate::plugin::SessionTriggerRoute,
        event_payload: serde_json::Value,
        parent_invocation: Option<crate::RuntimeInvocation>,
    ) -> Result<Option<String>, PluginError> {
        validate_payload(&event_payload, &route.event_ty).map_err(|message| {
            PluginError::Session(format!(
                "invalid payload for trigger `{}`: {message}",
                route.handle
            ))
        })?;
        let mut args = lashlang::Record::default();
        for (input_name, input) in route.input_template.entries() {
            let value = match input {
                lashlang::TriggerInputBinding::Event => event_payload.clone(),
                lashlang::TriggerInputBinding::Fixed { value } => value.clone(),
            };
            args.insert(input_name.to_string(), lashlang::from_json(value));
        }
        let args = match serde_json::to_value(lashlang::Value::Record(Arc::new(args)))
            .map_err(|err| PluginError::Session(format!("serialize trigger process args: {err}")))?
        {
            serde_json::Value::Object(map) => map,
            _ => {
                return Err(PluginError::Session(
                    "trigger process args must serialize as an object".to_string(),
                ));
            }
        };
        let process_id = format!("process:{}", uuid::Uuid::new_v4());
        let registration = crate::ProcessRegistration::new(
            process_id.clone(),
            crate::ProcessInput::LashlangProcess {
                module_ref: route.module_ref.clone(),
                process_ref: route.process_ref.clone(),
                required_surface_ref: route.required_surface_ref.clone(),
                process_name: route.process_name.clone(),
                args,
            },
        )
        .with_extra_event_types(crate::lashlang_process_event_types());
        let scoped_effect_controller = crate::ScopedEffectController::borrowed(
            self.scoped_effect_controller.controller(),
            crate::EffectScope::host_event(
                &self.session_id,
                format!(
                    "{}:{}",
                    self.scoped_effect_controller.scope_id(),
                    route.handle
                ),
            ),
        )
        .map_err(|err| PluginError::Session(err.to_string()))?;
        self.processes
            .start(
                &self.session_id,
                registration,
                crate::ProcessStartOptions::new().with_descriptor(
                    crate::ProcessHandleDescriptor::new(
                        Some("lashlang"),
                        Some(route.process_name.as_str()),
                    ),
                ),
                crate::ProcessOpScope::new(scoped_effect_controller)
                    .with_parent_invocation(parent_invocation),
            )
            .await?;
        Ok(Some(process_id))
    }
}

fn validate_payload(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> Result<(), String> {
    if json_matches_type(value, ty) {
        Ok(())
    } else {
        Err(format!("expected {}", lashlang::format_type_expr(ty)))
    }
}

fn json_matches_type(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> bool {
    match ty {
        lashlang::TypeExpr::Any => true,
        lashlang::TypeExpr::Ref(_) => false,
        lashlang::TypeExpr::Str => value.is_string(),
        lashlang::TypeExpr::Int => value.as_i64().is_some() || value.as_u64().is_some(),
        lashlang::TypeExpr::Float => value.is_number(),
        lashlang::TypeExpr::Bool => value.is_boolean(),
        lashlang::TypeExpr::Dict => value.is_object(),
        lashlang::TypeExpr::Null => value.is_null(),
        lashlang::TypeExpr::Enum(values) => value
            .as_str()
            .is_some_and(|value| values.iter().any(|candidate| candidate.as_str() == value)),
        lashlang::TypeExpr::List(item) => value.as_array().is_some_and(|items| {
            items
                .iter()
                .all(|item_value| json_matches_type(item_value, item))
        }),
        lashlang::TypeExpr::Object(fields) => {
            let Some(map) = value.as_object() else {
                return false;
            };
            fields
                .iter()
                .all(|field| match map.get(field.name.as_str()) {
                    Some(field_value) => json_matches_type(field_value, &field.ty),
                    None => field.optional,
                })
        }
        lashlang::TypeExpr::Union(items) => items.iter().any(|item| json_matches_type(value, item)),
        lashlang::TypeExpr::Process { .. } | lashlang::TypeExpr::TriggerHandle(_) => {
            value.is_object()
        }
    }
}