1use std::sync::Arc;
2
3use crate::plugin::{PluginError, SessionTriggerRegistry};
4
5pub(crate) fn validate_host_event(
6 plugins: &crate::PluginSession,
7 resource_type: &str,
8 alias: &str,
9 event: &str,
10 payload: &serde_json::Value,
11) -> Result<(), PluginError> {
12 let declared = plugins
13 .host_events()
14 .get(resource_type, alias, event)
15 .ok_or_else(|| {
16 PluginError::Session(format!(
17 "unknown host event `{resource_type}.{alias}.{event}`"
18 ))
19 })?;
20 validate_payload(payload, declared.payload_type().ty()).map_err(|message| {
21 PluginError::Session(format!(
22 "invalid payload for host event `{resource_type}.{alias}.{event}`: {message}"
23 ))
24 })
25}
26
27pub struct TriggerActivationService<'a> {
28 session_id: String,
29 registry: Arc<SessionTriggerRegistry>,
30 processes: Arc<dyn crate::ProcessService>,
31 scoped_effect_controller: crate::ScopedEffectController<'a>,
32}
33
34impl<'a> TriggerActivationService<'a> {
35 pub(crate) fn new(
36 session_id: String,
37 registry: Arc<SessionTriggerRegistry>,
38 processes: Arc<dyn crate::ProcessService>,
39 scoped_effect_controller: crate::ScopedEffectController<'a>,
40 ) -> Self {
41 Self {
42 session_id,
43 registry,
44 processes,
45 scoped_effect_controller,
46 }
47 }
48
49 pub async fn activate(
50 &self,
51 handle: impl AsRef<str>,
52 event_payload: serde_json::Value,
53 parent_invocation: Option<crate::RuntimeInvocation>,
54 ) -> Result<Option<String>, PluginError> {
55 let Some(route) = self.registry.route(handle.as_ref())? else {
56 return Ok(None);
57 };
58 if !route.enabled {
59 return Ok(None);
60 }
61 self.start_route(route, event_payload, parent_invocation)
62 .await
63 }
64
65 pub async fn activate_source_type(
66 &self,
67 source_type: impl AsRef<str>,
68 event_payload: serde_json::Value,
69 parent_invocation: Option<crate::RuntimeInvocation>,
70 ) -> Result<Vec<String>, PluginError> {
71 let routes = self
72 .registry
73 .activation_routes_by_source_type(source_type.as_ref())?;
74 let mut started_process_ids = Vec::new();
75 for route in routes {
76 if !route.enabled {
77 continue;
78 }
79 if let Some(process_id) = self
80 .start_route(route, event_payload.clone(), parent_invocation.clone())
81 .await?
82 {
83 started_process_ids.push(process_id);
84 }
85 }
86 Ok(started_process_ids)
87 }
88
89 async fn start_route(
90 &self,
91 route: crate::plugin::SessionTriggerRoute,
92 event_payload: serde_json::Value,
93 parent_invocation: Option<crate::RuntimeInvocation>,
94 ) -> Result<Option<String>, PluginError> {
95 validate_payload(&event_payload, &route.event_ty).map_err(|message| {
96 PluginError::Session(format!(
97 "invalid payload for trigger `{}`: {message}",
98 route.handle
99 ))
100 })?;
101 let mut args = lashlang::Record::default();
102 for (input_name, input) in route.input_template.entries() {
103 let value = match input {
104 lashlang::TriggerInputBinding::Event => event_payload.clone(),
105 lashlang::TriggerInputBinding::Fixed { value } => value.clone(),
106 };
107 args.insert(input_name.to_string(), lashlang::from_json(value));
108 }
109 let args = match serde_json::to_value(lashlang::Value::Record(Arc::new(args)))
110 .map_err(|err| PluginError::Session(format!("serialize trigger process args: {err}")))?
111 {
112 serde_json::Value::Object(map) => map,
113 _ => {
114 return Err(PluginError::Session(
115 "trigger process args must serialize as an object".to_string(),
116 ));
117 }
118 };
119 let process_id = format!("process:{}", uuid::Uuid::new_v4());
120 let registration = crate::ProcessRegistration::new(
121 process_id.clone(),
122 crate::ProcessInput::LashlangProcess {
123 module_ref: route.module_ref.clone(),
124 process_ref: route.process_ref.clone(),
125 required_surface_ref: route.required_surface_ref.clone(),
126 process_name: route.process_name.clone(),
127 args,
128 },
129 )
130 .with_extra_event_types(crate::lashlang_process_event_types());
131 let scoped_effect_controller = crate::ScopedEffectController::borrowed(
132 self.scoped_effect_controller.controller(),
133 crate::EffectScope::host_event(
134 &self.session_id,
135 format!(
136 "{}:{}",
137 self.scoped_effect_controller.scope_id(),
138 route.handle
139 ),
140 ),
141 )
142 .map_err(|err| PluginError::Session(err.to_string()))?;
143 self.processes
144 .start(
145 &self.session_id,
146 registration,
147 crate::ProcessStartOptions::new().with_descriptor(
148 crate::ProcessHandleDescriptor::new(
149 Some("lashlang"),
150 Some(route.process_name.as_str()),
151 ),
152 ),
153 crate::ProcessOpScope::new(scoped_effect_controller)
154 .with_parent_invocation(parent_invocation),
155 )
156 .await?;
157 Ok(Some(process_id))
158 }
159}
160
161fn validate_payload(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> Result<(), String> {
162 if json_matches_type(value, ty) {
163 Ok(())
164 } else {
165 Err(format!("expected {}", lashlang::format_type_expr(ty)))
166 }
167}
168
169fn json_matches_type(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> bool {
170 match ty {
171 lashlang::TypeExpr::Any => true,
172 lashlang::TypeExpr::Ref(_) => false,
173 lashlang::TypeExpr::Str => value.is_string(),
174 lashlang::TypeExpr::Int => value.as_i64().is_some() || value.as_u64().is_some(),
175 lashlang::TypeExpr::Float => value.is_number(),
176 lashlang::TypeExpr::Bool => value.is_boolean(),
177 lashlang::TypeExpr::Dict => value.is_object(),
178 lashlang::TypeExpr::Null => value.is_null(),
179 lashlang::TypeExpr::Enum(values) => value
180 .as_str()
181 .is_some_and(|value| values.iter().any(|candidate| candidate.as_str() == value)),
182 lashlang::TypeExpr::List(item) => value.as_array().is_some_and(|items| {
183 items
184 .iter()
185 .all(|item_value| json_matches_type(item_value, item))
186 }),
187 lashlang::TypeExpr::Object(fields) => {
188 let Some(map) = value.as_object() else {
189 return false;
190 };
191 fields
192 .iter()
193 .all(|field| match map.get(field.name.as_str()) {
194 Some(field_value) => json_matches_type(field_value, &field.ty),
195 None => field.optional,
196 })
197 }
198 lashlang::TypeExpr::Union(items) => items.iter().any(|item| json_matches_type(value, item)),
199 lashlang::TypeExpr::Process { .. } | lashlang::TypeExpr::TriggerHandle(_) => {
200 value.is_object()
201 }
202 }
203}