Skip to main content

lash_core/session/
triggers.rs

1use std::sync::Arc;
2
3use crate::plugin::{PluginError, SessionTriggerRegistry};
4
5pub(crate) fn validate_host_event(
6    plugins: &crate::PluginSession,
7    resource_type: &str,
8    alias: &str,
9    event: &str,
10    payload: &serde_json::Value,
11) -> Result<(), PluginError> {
12    let declared = plugins
13        .host_events()
14        .get(resource_type, alias, event)
15        .ok_or_else(|| {
16            PluginError::Session(format!(
17                "unknown host event `{resource_type}.{alias}.{event}`"
18            ))
19        })?;
20    validate_payload(payload, declared.payload_type().ty()).map_err(|message| {
21        PluginError::Session(format!(
22            "invalid payload for host event `{resource_type}.{alias}.{event}`: {message}"
23        ))
24    })
25}
26
27pub struct TriggerActivationService<'a> {
28    session_id: String,
29    registry: Arc<SessionTriggerRegistry>,
30    processes: Arc<dyn crate::ProcessService>,
31    scoped_effect_controller: crate::ScopedEffectController<'a>,
32}
33
34impl<'a> TriggerActivationService<'a> {
35    pub(crate) fn new(
36        session_id: String,
37        registry: Arc<SessionTriggerRegistry>,
38        processes: Arc<dyn crate::ProcessService>,
39        scoped_effect_controller: crate::ScopedEffectController<'a>,
40    ) -> Self {
41        Self {
42            session_id,
43            registry,
44            processes,
45            scoped_effect_controller,
46        }
47    }
48
49    pub async fn activate(
50        &self,
51        handle: impl AsRef<str>,
52        event_payload: serde_json::Value,
53        parent_invocation: Option<crate::RuntimeInvocation>,
54    ) -> Result<Option<String>, PluginError> {
55        let Some(route) = self.registry.route(handle.as_ref())? else {
56            return Ok(None);
57        };
58        if !route.enabled {
59            return Ok(None);
60        }
61        self.start_route(route, event_payload, parent_invocation)
62            .await
63    }
64
65    pub async fn activate_source_type(
66        &self,
67        source_type: impl AsRef<str>,
68        event_payload: serde_json::Value,
69        parent_invocation: Option<crate::RuntimeInvocation>,
70    ) -> Result<Vec<String>, PluginError> {
71        let routes = self
72            .registry
73            .activation_routes_by_source_type(source_type.as_ref())?;
74        let mut started_process_ids = Vec::new();
75        for route in routes {
76            if !route.enabled {
77                continue;
78            }
79            if let Some(process_id) = self
80                .start_route(route, event_payload.clone(), parent_invocation.clone())
81                .await?
82            {
83                started_process_ids.push(process_id);
84            }
85        }
86        Ok(started_process_ids)
87    }
88
89    async fn start_route(
90        &self,
91        route: crate::plugin::SessionTriggerRoute,
92        event_payload: serde_json::Value,
93        parent_invocation: Option<crate::RuntimeInvocation>,
94    ) -> Result<Option<String>, PluginError> {
95        validate_payload(&event_payload, &route.event_ty).map_err(|message| {
96            PluginError::Session(format!(
97                "invalid payload for trigger `{}`: {message}",
98                route.handle
99            ))
100        })?;
101        let mut args = lashlang::Record::default();
102        for (input_name, input) in route.input_template.entries() {
103            let value = match input {
104                lashlang::TriggerInputBinding::Event => event_payload.clone(),
105                lashlang::TriggerInputBinding::Fixed { value } => value.clone(),
106            };
107            args.insert(input_name.to_string(), lashlang::from_json(value));
108        }
109        let args = match serde_json::to_value(lashlang::Value::Record(Arc::new(args)))
110            .map_err(|err| PluginError::Session(format!("serialize trigger process args: {err}")))?
111        {
112            serde_json::Value::Object(map) => map,
113            _ => {
114                return Err(PluginError::Session(
115                    "trigger process args must serialize as an object".to_string(),
116                ));
117            }
118        };
119        let process_id = format!("process:{}", uuid::Uuid::new_v4());
120        let registration = crate::ProcessRegistration::new(
121            process_id.clone(),
122            crate::ProcessInput::LashlangProcess {
123                module_ref: route.module_ref.clone(),
124                process_ref: route.process_ref.clone(),
125                required_surface_ref: route.required_surface_ref.clone(),
126                process_name: route.process_name.clone(),
127                args,
128            },
129        )
130        .with_extra_event_types(crate::lashlang_process_event_types());
131        let scoped_effect_controller = crate::ScopedEffectController::borrowed(
132            self.scoped_effect_controller.controller(),
133            crate::EffectScope::host_event(
134                &self.session_id,
135                format!(
136                    "{}:{}",
137                    self.scoped_effect_controller.scope_id(),
138                    route.handle
139                ),
140            ),
141        )
142        .map_err(|err| PluginError::Session(err.to_string()))?;
143        self.processes
144            .start(
145                &self.session_id,
146                registration,
147                crate::ProcessStartOptions::new().with_descriptor(
148                    crate::ProcessHandleDescriptor::new(
149                        Some("lashlang"),
150                        Some(route.process_name.as_str()),
151                    ),
152                ),
153                crate::ProcessOpScope::new(scoped_effect_controller)
154                    .with_parent_invocation(parent_invocation),
155            )
156            .await?;
157        Ok(Some(process_id))
158    }
159}
160
161fn validate_payload(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> Result<(), String> {
162    if json_matches_type(value, ty) {
163        Ok(())
164    } else {
165        Err(format!("expected {}", lashlang::format_type_expr(ty)))
166    }
167}
168
169fn json_matches_type(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> bool {
170    match ty {
171        lashlang::TypeExpr::Any => true,
172        lashlang::TypeExpr::Ref(_) => false,
173        lashlang::TypeExpr::Str => value.is_string(),
174        lashlang::TypeExpr::Int => value.as_i64().is_some() || value.as_u64().is_some(),
175        lashlang::TypeExpr::Float => value.is_number(),
176        lashlang::TypeExpr::Bool => value.is_boolean(),
177        lashlang::TypeExpr::Dict => value.is_object(),
178        lashlang::TypeExpr::Null => value.is_null(),
179        lashlang::TypeExpr::Enum(values) => value
180            .as_str()
181            .is_some_and(|value| values.iter().any(|candidate| candidate.as_str() == value)),
182        lashlang::TypeExpr::List(item) => value.as_array().is_some_and(|items| {
183            items
184                .iter()
185                .all(|item_value| json_matches_type(item_value, item))
186        }),
187        lashlang::TypeExpr::Object(fields) => {
188            let Some(map) = value.as_object() else {
189                return false;
190            };
191            fields
192                .iter()
193                .all(|field| match map.get(field.name.as_str()) {
194                    Some(field_value) => json_matches_type(field_value, &field.ty),
195                    None => field.optional,
196                })
197        }
198        lashlang::TypeExpr::Union(items) => items.iter().any(|item| json_matches_type(value, item)),
199        lashlang::TypeExpr::Process { .. } | lashlang::TypeExpr::TriggerHandle(_) => {
200            value.is_object()
201        }
202    }
203}