1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::{Deserialize, Serialize};
4use thiserror::Error;
5
6use crate::artifact::{ModuleArtifact, ModuleRef, ProcessRef, RequiredSurfaceRef};
7use crate::ast::{AstString, Expr, TypeExpr, TypeField, format_type_expr};
8use crate::linker::{NamedDataType, ResourceCatalog};
9use crate::runtime::{
10 LASH_HOST_VALUE_KEY, LASH_HOST_VALUE_TYPE_KEY, LASH_MODULE_REF_KEY, LASH_PROCESS_NAME_KEY,
11 LASH_PROCESS_REF_KEY, LASH_PROCESS_VALUE_KEY, LASH_REQUIRED_SURFACE_REF_KEY,
12};
13
14const TRIGGERS_RESOURCE_TYPE: &str = "Triggers";
15const TRIGGERS_ALIAS: &str = "triggers";
16const TRIGGER_REGISTRATION_TYPE: &str = "TriggerRegistration";
17pub const LASH_TRIGGER_EVENT_KEY: &str = "$lash.trigger.event";
18
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub enum TriggerHostOperation {
21 Register,
22 List,
23 Cancel,
24}
25
26impl TriggerHostOperation {
27 pub const fn host_operation(self) -> &'static str {
28 match self {
29 Self::Register => "triggers.register",
30 Self::List => "triggers.list",
31 Self::Cancel => "triggers.cancel",
32 }
33 }
34
35 pub const fn receiver_method(self) -> &'static str {
36 match self {
37 Self::Register => "register",
38 Self::List => "list",
39 Self::Cancel => "cancel",
40 }
41 }
42
43 pub fn from_host_operation(operation: &str) -> Option<Self> {
44 [Self::Register, Self::List, Self::Cancel]
45 .into_iter()
46 .find(|candidate| candidate.host_operation() == operation)
47 }
48
49 pub fn from_receiver_method(operation: &str) -> Option<Self> {
50 [Self::Register, Self::List, Self::Cancel]
51 .into_iter()
52 .find(|candidate| candidate.receiver_method() == operation)
53 }
54
55 pub fn input_ty(self) -> TypeExpr {
56 match self {
57 Self::Register => TypeExpr::Object(vec![
58 required_field("source", TypeExpr::Dict),
59 required_field(
60 "target",
61 TypeExpr::Process {
62 input: Box::new(TypeExpr::Any),
63 output: Box::new(TypeExpr::Any),
64 input_count: 1,
65 },
66 ),
67 required_field("inputs", TypeExpr::Dict),
68 optional_field("name", TypeExpr::Str),
69 ]),
70 Self::List => TypeExpr::Object(vec![
71 optional_field(
72 "target",
73 TypeExpr::Process {
74 input: Box::new(TypeExpr::Any),
75 output: Box::new(TypeExpr::Any),
76 input_count: 1,
77 },
78 ),
79 optional_field("name", TypeExpr::Str),
80 optional_field("source_type", TypeExpr::Str),
81 optional_field("enabled", TypeExpr::Bool),
82 ]),
83 Self::Cancel => TypeExpr::Object(vec![required_field(
84 "handle",
85 TypeExpr::TriggerHandle(Box::new(TypeExpr::Any)),
86 )]),
87 }
88 }
89
90 pub fn output_ty(self) -> TypeExpr {
91 match self {
92 Self::Register => TypeExpr::TriggerHandle(Box::new(TypeExpr::Any)),
93 Self::List => TypeExpr::List(Box::new(TypeExpr::Ref(TRIGGER_REGISTRATION_TYPE.into()))),
94 Self::Cancel => TypeExpr::Bool,
95 }
96 }
97}
98
99pub fn is_trigger_resource_type(resource_type: &str) -> bool {
100 resource_type == TRIGGERS_RESOURCE_TYPE
101}
102
103pub fn add_trigger_resource_operations(catalog: &mut ResourceCatalog) {
104 for operation in [
105 TriggerHostOperation::Register,
106 TriggerHostOperation::List,
107 TriggerHostOperation::Cancel,
108 ] {
109 catalog.add_module_operation(
110 [TRIGGERS_ALIAS],
111 TRIGGERS_RESOURCE_TYPE,
112 operation.receiver_method(),
113 operation.host_operation(),
114 operation.input_ty(),
115 operation.output_ty(),
116 );
117 }
118}
119
120fn required_field(name: &'static str, ty: TypeExpr) -> TypeField {
121 TypeField {
122 name: name.into(),
123 ty,
124 optional: false,
125 }
126}
127
128fn optional_field(name: &'static str, ty: TypeExpr) -> TypeField {
129 TypeField {
130 name: name.into(),
131 ty,
132 optional: true,
133 }
134}
135
136pub struct TriggerRegistrationCall<'expr> {
137 pub source: &'expr Expr,
138 pub target: &'expr Expr,
139 pub inputs: &'expr Expr,
140 pub name: Option<&'expr Expr>,
141}
142
143pub struct TriggerListCall<'expr> {
144 pub entries: &'expr [(AstString, Expr)],
145}
146
147pub struct TriggerCancelCall<'expr> {
148 pub handle: &'expr Expr,
149}
150
151pub fn register_call_args(
152 args: &[Expr],
153) -> Result<TriggerRegistrationCall<'_>, TriggerCallShapeError> {
154 let entries = record_entries(args).ok_or(TriggerCallShapeError::Registration)?;
155 Ok(TriggerRegistrationCall {
156 source: required_entry(entries, "source").ok_or(TriggerCallShapeError::Registration)?,
157 target: required_entry(entries, "target").ok_or(TriggerCallShapeError::Registration)?,
158 inputs: required_entry(entries, "inputs").ok_or(TriggerCallShapeError::Registration)?,
159 name: required_entry(entries, "name"),
160 })
161}
162
163pub fn list_call_args(args: &[Expr]) -> Result<TriggerListCall<'_>, TriggerCallShapeError> {
164 let entries = record_entries(args).ok_or(TriggerCallShapeError::List)?;
165 for (name, _) in entries {
166 match name.as_str() {
167 "target" | "name" | "source_type" | "enabled" => {}
168 _ => return Err(TriggerCallShapeError::List),
169 }
170 }
171 Ok(TriggerListCall { entries })
172}
173
174pub fn cancel_call_args(args: &[Expr]) -> Result<TriggerCancelCall<'_>, TriggerCallShapeError> {
175 let entries = record_entries(args).ok_or(TriggerCallShapeError::Cancel)?;
176 Ok(TriggerCancelCall {
177 handle: required_entry(entries, "handle").ok_or(TriggerCallShapeError::Cancel)?,
178 })
179}
180
181fn record_entries(args: &[Expr]) -> Option<&[(AstString, Expr)]> {
182 let [Expr::Record(entries)] = args else {
183 return None;
184 };
185 Some(entries)
186}
187
188fn required_entry<'expr>(entries: &'expr [(AstString, Expr)], name: &str) -> Option<&'expr Expr> {
189 entries
190 .iter()
191 .find_map(|(entry_name, expr)| (entry_name.as_str() == name).then_some(expr))
192}
193
194#[derive(Clone, Copy, Debug, PartialEq, Eq)]
195pub enum TriggerCallShapeError {
196 Registration,
197 List,
198 Cancel,
199}
200
201#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
202pub struct TriggerRegistrationRequest {
203 pub source: HostValue,
204 pub target: TriggerTargetIdentity,
205 pub inputs: TriggerInputTemplate,
206 #[serde(default, skip_serializing_if = "Option::is_none")]
207 pub name: Option<String>,
208}
209
210impl TriggerRegistrationRequest {
211 pub fn decode(request: &serde_json::Value) -> Result<Self, TriggerRequestDecodeError> {
212 let operation = TriggerHostOperation::Register;
213 Ok(Self {
214 source: HostValue::decode(required_json_field(request, "source", operation)?)
215 .map_err(TriggerRequestDecodeError::from)?,
216 target: TriggerTargetIdentity::decode(
217 required_json_field(request, "target", operation)?,
218 "trigger target",
219 )?,
220 inputs: TriggerInputTemplate::decode(required_json_field(
221 request, "inputs", operation,
222 )?)?,
223 name: request
224 .get("name")
225 .and_then(serde_json::Value::as_str)
226 .map(ToOwned::to_owned),
227 })
228 }
229}
230
231#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
232#[serde(transparent)]
233pub struct TriggerInputTemplate {
234 entries: BTreeMap<String, TriggerInputBinding>,
235}
236
237impl TriggerInputTemplate {
238 pub fn new(entries: BTreeMap<String, TriggerInputBinding>) -> Self {
239 Self { entries }
240 }
241
242 pub fn decode(value: &serde_json::Value) -> Result<Self, TriggerRequestDecodeError> {
243 let map = value
244 .as_object()
245 .ok_or_else(|| TriggerRequestDecodeError::InvalidField {
246 operation: TriggerHostOperation::Register.host_operation(),
247 field: "inputs",
248 message: "expected an object mapping process params to values".to_string(),
249 })?;
250 let mut entries = BTreeMap::new();
251 for (name, value) in map {
252 let binding = if is_trigger_event_placeholder_value(value) {
253 TriggerInputBinding::Event
254 } else {
255 TriggerInputBinding::Fixed {
256 value: value.clone(),
257 }
258 };
259 entries.insert(name.clone(), binding);
260 }
261 Ok(Self { entries })
262 }
263
264 pub fn entries(&self) -> impl Iterator<Item = (&str, &TriggerInputBinding)> {
265 self.entries
266 .iter()
267 .map(|(name, binding)| (name.as_str(), binding))
268 }
269
270 pub fn get(&self, name: &str) -> Option<&TriggerInputBinding> {
271 self.entries.get(name)
272 }
273
274 pub fn contains_event(&self) -> bool {
275 self.entries
276 .values()
277 .any(|binding| matches!(binding, TriggerInputBinding::Event))
278 }
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282#[serde(tag = "kind", rename_all = "snake_case")]
283pub enum TriggerInputBinding {
284 Event,
285 Fixed { value: serde_json::Value },
286}
287
288impl TriggerInputBinding {
289 pub fn as_fixed(&self) -> Option<&serde_json::Value> {
290 match self {
291 Self::Fixed { value } => Some(value),
292 Self::Event => None,
293 }
294 }
295}
296
297pub fn trigger_event_placeholder_expr() -> Expr {
298 Expr::Record(vec![(LASH_TRIGGER_EVENT_KEY.into(), Expr::Bool(true))])
299}
300
301fn is_trigger_event_placeholder_value(value: &serde_json::Value) -> bool {
302 let Some(map) = value.as_object() else {
303 return false;
304 };
305 map.len() == 1
306 && map
307 .get(LASH_TRIGGER_EVENT_KEY)
308 .and_then(serde_json::Value::as_bool)
309 == Some(true)
310}
311
312#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
313pub struct TriggerListRequest {
314 #[serde(default, skip_serializing_if = "Option::is_none")]
315 pub target: Option<TriggerTargetIdentity>,
316 #[serde(default, skip_serializing_if = "Option::is_none")]
317 pub name: Option<String>,
318 #[serde(default, skip_serializing_if = "Option::is_none")]
319 pub source_type: Option<String>,
320 #[serde(default, skip_serializing_if = "Option::is_none")]
321 pub enabled: Option<bool>,
322}
323
324impl TriggerListRequest {
325 pub fn decode(request: &serde_json::Value) -> Result<Self, TriggerRequestDecodeError> {
326 let map = request
327 .as_object()
328 .ok_or_else(|| TriggerRequestDecodeError::InvalidField {
329 operation: TriggerHostOperation::List.host_operation(),
330 field: "filters",
331 message: "expected a record of trigger filters".to_string(),
332 })?;
333 for key in map.keys() {
334 match key.as_str() {
335 "target" | "name" | "source_type" | "enabled" => {}
336 _ => {
337 return Err(TriggerRequestDecodeError::InvalidField {
338 operation: TriggerHostOperation::List.host_operation(),
339 field: "filters",
340 message: format!("unknown filter `{key}`"),
341 });
342 }
343 }
344 }
345 Ok(Self {
346 target: request
347 .get("target")
348 .map(|value| TriggerTargetIdentity::decode(value, "triggers.list target"))
349 .transpose()?,
350 name: optional_string_filter(request, "name", TriggerHostOperation::List)?,
351 source_type: optional_string_filter(
352 request,
353 "source_type",
354 TriggerHostOperation::List,
355 )?,
356 enabled: optional_bool_filter(request, "enabled", TriggerHostOperation::List)?,
357 })
358 }
359}
360
361fn optional_string_filter(
362 request: &serde_json::Value,
363 field: &'static str,
364 operation: TriggerHostOperation,
365) -> Result<Option<String>, TriggerRequestDecodeError> {
366 request
367 .get(field)
368 .map(|value| {
369 value.as_str().map(ToOwned::to_owned).ok_or_else(|| {
370 TriggerRequestDecodeError::InvalidField {
371 operation: operation.host_operation(),
372 field,
373 message: "expected a string".to_string(),
374 }
375 })
376 })
377 .transpose()
378}
379
380fn optional_bool_filter(
381 request: &serde_json::Value,
382 field: &'static str,
383 operation: TriggerHostOperation,
384) -> Result<Option<bool>, TriggerRequestDecodeError> {
385 request
386 .get(field)
387 .map(|value| {
388 value
389 .as_bool()
390 .ok_or_else(|| TriggerRequestDecodeError::InvalidField {
391 operation: operation.host_operation(),
392 field,
393 message: "expected a boolean".to_string(),
394 })
395 })
396 .transpose()
397}
398
399#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
400pub struct TriggerCancelRequest {
401 pub handle: String,
402}
403
404impl TriggerCancelRequest {
405 pub fn decode(request: &serde_json::Value) -> Result<Self, TriggerRequestDecodeError> {
406 let value = required_json_field(request, "handle", TriggerHostOperation::Cancel)?;
407 let handle = value
408 .as_str()
409 .map(ToOwned::to_owned)
410 .or_else(|| {
411 value
412 .get("id")
413 .and_then(serde_json::Value::as_str)
414 .map(ToOwned::to_owned)
415 })
416 .ok_or_else(|| TriggerRequestDecodeError::InvalidField {
417 operation: TriggerHostOperation::Cancel.host_operation(),
418 field: "handle",
419 message: "expected trigger handle string or object with `id`".to_string(),
420 })?;
421 Ok(Self { handle })
422 }
423}
424
425#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
426pub struct HostValue {
427 pub source_type: String,
428 pub value: serde_json::Value,
429}
430
431impl HostValue {
432 pub fn new(source_type: impl Into<String>, value: serde_json::Value) -> Self {
433 Self {
434 source_type: source_type.into(),
435 value,
436 }
437 }
438
439 pub fn decode(source: &serde_json::Value) -> Result<Self, HostValueError> {
440 let source_type = source
441 .get(LASH_HOST_VALUE_TYPE_KEY)
442 .and_then(serde_json::Value::as_str)
443 .map(ToOwned::to_owned)
444 .ok_or(HostValueError::InvalidHostValue)?;
445 let value = source
446 .get(LASH_HOST_VALUE_KEY)
447 .cloned()
448 .ok_or(HostValueError::InvalidHostValue)?;
449 Ok(Self { source_type, value })
450 }
451
452 pub fn encode(
453 source_type: impl Into<String>,
454 value: impl Serialize,
455 ) -> Result<serde_json::Value, HostValueError> {
456 let source_type = source_type.into();
457 let value =
458 serde_json::to_value(value).map_err(|err| HostValueError::MalformedPayload {
459 source_type: source_type.clone(),
460 message: err.to_string(),
461 })?;
462 Ok(Self::new(source_type, value).to_json())
463 }
464
465 pub fn decode_as<T: serde::de::DeserializeOwned>(
466 &self,
467 resources: &ResourceCatalog,
468 ) -> Result<T, HostValueError> {
469 resources.decode_host_value_as(&self.source_type, self.value.clone())
470 }
471
472 pub fn to_json(&self) -> serde_json::Value {
473 serde_json::json!({
474 LASH_HOST_VALUE_TYPE_KEY: self.source_type,
475 LASH_HOST_VALUE_KEY: self.value,
476 })
477 }
478}
479
480#[derive(Clone, Debug, PartialEq, Eq, Error)]
481pub enum HostValueError {
482 #[error("host value must be a host value constructor result")]
483 InvalidHostValue,
484 #[error("host value `{source_type}` is not declared in the resource catalog")]
485 UnknownSourceType { source_type: String },
486 #[error("host value `{source_type}` payload is invalid: {message}")]
487 MalformedPayload {
488 source_type: String,
489 message: String,
490 },
491}
492
493#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
494pub struct TriggerTargetIdentity {
495 pub module_ref: ModuleRef,
496 pub required_surface_ref: RequiredSurfaceRef,
497 pub process_ref: ProcessRef,
498 pub process_name: String,
499}
500
501impl TriggerTargetIdentity {
502 pub fn decode(
503 value: &serde_json::Value,
504 label: &'static str,
505 ) -> Result<Self, TriggerRequestDecodeError> {
506 if value
507 .get(LASH_PROCESS_VALUE_KEY)
508 .and_then(serde_json::Value::as_bool)
509 != Some(true)
510 {
511 return Err(TriggerRequestDecodeError::InvalidTarget {
512 label,
513 message: "must be a process value".to_string(),
514 });
515 }
516 Ok(Self {
517 module_ref: decode_json_field(value, LASH_MODULE_REF_KEY, label)?,
518 required_surface_ref: decode_json_field(value, LASH_REQUIRED_SURFACE_REF_KEY, label)?,
519 process_ref: decode_json_field(value, LASH_PROCESS_REF_KEY, label)?,
520 process_name: value
521 .get(LASH_PROCESS_NAME_KEY)
522 .and_then(serde_json::Value::as_str)
523 .ok_or_else(|| TriggerRequestDecodeError::InvalidTarget {
524 label,
525 message: format!("missing {LASH_PROCESS_NAME_KEY}"),
526 })?
527 .to_string(),
528 })
529 }
530
531 pub fn matches(
532 &self,
533 module_ref: &ModuleRef,
534 required_surface_ref: &RequiredSurfaceRef,
535 process_ref: &ProcessRef,
536 process_name: &str,
537 ) -> bool {
538 self.module_ref == *module_ref
539 && self.required_surface_ref == *required_surface_ref
540 && self.process_ref == *process_ref
541 && self.process_name == process_name
542 }
543}
544
545pub fn event_type_for_source(
546 resources: &ResourceCatalog,
547 source_type: &str,
548) -> Result<NamedDataType, TriggerRequestDecodeError> {
549 resources
550 .resolve_trigger_source(source_type)
551 .map(|binding| binding.event_type().clone())
552 .ok_or_else(|| TriggerRequestDecodeError::UnknownSourceType {
553 source_type: source_type.to_string(),
554 })
555}
556
557#[derive(Clone, Debug, PartialEq, Eq, Error)]
558pub enum TriggerRequestDecodeError {
559 #[error("{operation} requires `{field}`")]
560 MissingField {
561 operation: &'static str,
562 field: &'static str,
563 },
564 #[error("{operation} field `{field}` is invalid: {message}")]
565 InvalidField {
566 operation: &'static str,
567 field: &'static str,
568 message: String,
569 },
570 #[error("trigger source must be a host value constructor result")]
571 InvalidSource,
572 #[error("{label} {message}")]
573 InvalidTarget {
574 label: &'static str,
575 message: String,
576 },
577 #[error("host value `{source_type}` is not registered as a trigger source")]
578 UnknownSourceType { source_type: String },
579}
580
581impl From<HostValueError> for TriggerRequestDecodeError {
582 fn from(err: HostValueError) -> Self {
583 match err {
584 HostValueError::InvalidHostValue => Self::InvalidSource,
585 HostValueError::UnknownSourceType { source_type } => {
586 Self::UnknownSourceType { source_type }
587 }
588 HostValueError::MalformedPayload { message, .. } => Self::InvalidField {
589 operation: TriggerHostOperation::Register.host_operation(),
590 field: "source",
591 message,
592 },
593 }
594 }
595}
596
597fn required_json_field<'json>(
598 request: &'json serde_json::Value,
599 field: &'static str,
600 operation: TriggerHostOperation,
601) -> Result<&'json serde_json::Value, TriggerRequestDecodeError> {
602 request
603 .get(field)
604 .ok_or_else(|| TriggerRequestDecodeError::MissingField {
605 operation: operation.host_operation(),
606 field,
607 })
608}
609
610fn decode_json_field<T: serde::de::DeserializeOwned>(
611 value: &serde_json::Value,
612 field: &'static str,
613 label: &'static str,
614) -> Result<T, TriggerRequestDecodeError> {
615 serde_json::from_value(value.get(field).cloned().ok_or_else(|| {
616 TriggerRequestDecodeError::InvalidTarget {
617 label,
618 message: format!("missing {field}"),
619 }
620 })?)
621 .map_err(|err| TriggerRequestDecodeError::InvalidTarget {
622 label,
623 message: format!("invalid {field}: {err}"),
624 })
625}
626
627#[derive(Clone, Debug, PartialEq)]
628pub struct TriggerTargetValidation {
629 pub inputs: TriggerInputTemplate,
630 pub event_ty: TypeExpr,
631}
632
633pub fn validate_trigger_target(
634 target: &TriggerTargetIdentity,
635 event_ty: &NamedDataType,
636 inputs: &TriggerInputTemplate,
637 artifact: &ModuleArtifact,
638) -> Result<TriggerTargetValidation, TriggerTargetValidationError> {
639 if artifact.required_surface_ref != target.required_surface_ref {
640 return Err(TriggerTargetValidationError::RequiredSurfaceMismatch {
641 process_name: target.process_name.clone(),
642 target_surface: target.required_surface_ref.to_string(),
643 artifact_surface: artifact.required_surface_ref.to_string(),
644 });
645 }
646 let Some(exported_process_name) = artifact.process_name_for_ref(&target.process_ref) else {
647 return Err(TriggerTargetValidationError::ProcessRefMismatch {
648 module_ref: target.module_ref.to_string(),
649 process_name: target.process_name.clone(),
650 process_ref: format!("{:?}", target.process_ref),
651 });
652 };
653 if exported_process_name != target.process_name {
654 return Err(TriggerTargetValidationError::ProcessRefMismatch {
655 module_ref: target.module_ref.to_string(),
656 process_name: target.process_name.clone(),
657 process_ref: format!("{:?}", target.process_ref),
658 });
659 }
660 let process = artifact
661 .canonical_ir
662 .process(exported_process_name)
663 .ok_or_else(|| TriggerTargetValidationError::MissingProcess {
664 module_ref: target.module_ref.to_string(),
665 process_name: target.process_name.clone(),
666 })?;
667 for (input_name, _) in inputs.entries() {
668 if !process
669 .params
670 .iter()
671 .any(|param| param.name.as_str() == input_name)
672 {
673 return Err(TriggerTargetValidationError::UnknownInput {
674 process_name: target.process_name.clone(),
675 input: input_name.to_string(),
676 });
677 }
678 }
679 if !inputs.contains_event() {
680 return Err(TriggerTargetValidationError::MissingEventInput {
681 process_name: target.process_name.clone(),
682 });
683 }
684 let aliases = type_aliases(artifact);
685 let event_ty = resolve_type_refs(
686 &event_ty.to_ref_ty(),
687 &aliases,
688 &artifact.required_surface.resources,
689 );
690 for param in &process.params {
691 let Some(input) = inputs.get(param.name.as_str()) else {
692 return Err(TriggerTargetValidationError::MissingInput {
693 process_name: target.process_name.clone(),
694 input: param.name.to_string(),
695 });
696 };
697 let input_ty = resolve_type_refs(¶m.ty, &aliases, &artifact.required_surface.resources);
698 match input {
699 TriggerInputBinding::Event => {
700 if !is_resolved_type_assignable(&event_ty, &input_ty) {
701 return Err(TriggerTargetValidationError::EventMismatch {
702 event: format_type_expr(&event_ty),
703 process_name: target.process_name.clone(),
704 input_name: param.name.to_string(),
705 input: format_type_expr(&input_ty),
706 });
707 }
708 }
709 TriggerInputBinding::Fixed { value } => {
710 validate_fixed_input_value(
711 value,
712 &input_ty,
713 &artifact.required_surface.resources,
714 target.process_name.as_str(),
715 param.name.as_str(),
716 )?;
717 }
718 }
719 }
720 Ok(TriggerTargetValidation {
721 inputs: inputs.clone(),
722 event_ty,
723 })
724}
725
726fn validate_fixed_input_value(
727 value: &serde_json::Value,
728 input_ty: &TypeExpr,
729 resources: &ResourceCatalog,
730 process_name: &str,
731 input_name: &str,
732) -> Result<(), TriggerTargetValidationError> {
733 let TypeExpr::Ref(resource_type) = input_ty else {
734 return Ok(());
735 };
736 if !resources.has_resource_type(resource_type.as_str()) {
737 return Ok(());
738 }
739 match crate::runtime::from_json(value.clone()) {
740 crate::Value::Resource(handle) if handle.resource_type == *resource_type => Ok(()),
741 crate::Value::Resource(handle) => Err(TriggerTargetValidationError::FixedInputMismatch {
742 process_name: process_name.to_string(),
743 input: input_name.to_string(),
744 expected: resource_type.to_string(),
745 actual: handle.resource_type,
746 }),
747 _ => Err(TriggerTargetValidationError::FixedInputMismatch {
748 process_name: process_name.to_string(),
749 input: input_name.to_string(),
750 expected: resource_type.to_string(),
751 actual: "value".to_string(),
752 }),
753 }
754}
755
756#[derive(Clone, Debug, PartialEq, Eq, Error)]
757pub enum TriggerTargetValidationError {
758 #[error(
759 "trigger target `{process_name}` required surface mismatch: target has {target_surface}, artifact has {artifact_surface}"
760 )]
761 RequiredSurfaceMismatch {
762 process_name: String,
763 target_surface: String,
764 artifact_surface: String,
765 },
766 #[error(
767 "trigger target artifact `{module_ref}` does not export process `{process_name}` as requested ref {process_ref}"
768 )]
769 ProcessRefMismatch {
770 module_ref: String,
771 process_name: String,
772 process_ref: String,
773 },
774 #[error("trigger target artifact `{module_ref}` is missing process `{process_name}`")]
775 MissingProcess {
776 module_ref: String,
777 process_name: String,
778 },
779 #[error("trigger target `{process_name}` input `{input}` is not mapped")]
780 MissingInput { process_name: String, input: String },
781 #[error("trigger target `{process_name}` has no input `{input}`")]
782 UnknownInput { process_name: String, input: String },
783 #[error("trigger target `{process_name}` inputs must map at least one param to trigger.event")]
784 MissingEventInput { process_name: String },
785 #[error(
786 "trigger source emits {event}, but target `{process_name}` input `{input_name}` expects {input}"
787 )]
788 EventMismatch {
789 event: String,
790 process_name: String,
791 input_name: String,
792 input: String,
793 },
794 #[error(
795 "trigger target `{process_name}` input `{input}` has incompatible fixed authority type: expected {expected}, got {actual}"
796 )]
797 FixedInputMismatch {
798 process_name: String,
799 input: String,
800 expected: String,
801 actual: String,
802 },
803}
804
805fn resolve_type_refs(
806 ty: &TypeExpr,
807 aliases: &BTreeMap<String, TypeExpr>,
808 resources: &ResourceCatalog,
809) -> TypeExpr {
810 resolve_type_refs_inner(ty, aliases, Some(resources), &mut BTreeSet::new())
811}
812
813fn resolve_type_refs_inner(
814 ty: &TypeExpr,
815 aliases: &BTreeMap<String, TypeExpr>,
816 resources: Option<&ResourceCatalog>,
817 seen: &mut BTreeSet<String>,
818) -> TypeExpr {
819 match ty {
820 TypeExpr::Ref(name) if seen.insert(name.to_string()) => {
821 let resolved = if let Some(ty) = aliases.get(name.as_str()) {
822 resolve_type_refs_inner(ty, aliases, resources, seen)
823 } else if let Some(data_type) =
824 resources.and_then(|resources| resources.resolve_named_data_type(name.as_str()))
825 {
826 data_type.ty().clone()
827 } else {
828 ty.clone()
829 };
830 seen.remove(name.as_str());
831 resolved
832 }
833 TypeExpr::List(item) => TypeExpr::List(Box::new(resolve_type_refs_inner(
834 item, aliases, resources, seen,
835 ))),
836 TypeExpr::Object(fields) => TypeExpr::Object(
837 fields
838 .iter()
839 .map(|field| TypeField {
840 name: field.name.clone(),
841 ty: resolve_type_refs_inner(&field.ty, aliases, resources, seen),
842 optional: field.optional,
843 })
844 .collect(),
845 ),
846 TypeExpr::Union(items) => TypeExpr::Union(
847 items
848 .iter()
849 .map(|item| resolve_type_refs_inner(item, aliases, resources, seen))
850 .collect(),
851 ),
852 TypeExpr::Process {
853 input,
854 output,
855 input_count,
856 } => TypeExpr::Process {
857 input: Box::new(resolve_type_refs_inner(input, aliases, resources, seen)),
858 output: Box::new(resolve_type_refs_inner(output, aliases, resources, seen)),
859 input_count: *input_count,
860 },
861 TypeExpr::TriggerHandle(event) => TypeExpr::TriggerHandle(Box::new(
862 resolve_type_refs_inner(event, aliases, resources, seen),
863 )),
864 _ => ty.clone(),
865 }
866}
867
868pub fn is_resolved_type_assignable(source: &TypeExpr, target: &TypeExpr) -> bool {
869 if matches!(target, TypeExpr::Any) {
870 return true;
871 }
872 if source == target {
873 return true;
874 }
875
876 match (source, target) {
877 (TypeExpr::Any, _) => false,
878 (TypeExpr::Union(sources), _) => sources
879 .iter()
880 .all(|source| is_resolved_type_assignable(source, target)),
881 (_, TypeExpr::Union(targets)) => targets
882 .iter()
883 .any(|target| is_resolved_type_assignable(source, target)),
884 (TypeExpr::Int, TypeExpr::Float) => true,
885 (TypeExpr::Enum(_), TypeExpr::Str) => true,
886 (TypeExpr::Enum(sources), TypeExpr::Enum(targets)) => {
887 sources.iter().all(|source| targets.contains(source))
888 }
889 (TypeExpr::List(source), TypeExpr::List(target)) => {
890 is_resolved_type_assignable(source, target)
891 }
892 (TypeExpr::Object(_), TypeExpr::Dict) => true,
893 (TypeExpr::Object(source), TypeExpr::Object(target)) => {
894 object_type_assignable(source, target)
895 }
896 (TypeExpr::Ref(source), TypeExpr::Ref(target)) => source == target,
897 (
898 TypeExpr::Process {
899 input: source_input,
900 output: source_output,
901 input_count: source_count,
902 },
903 TypeExpr::Process {
904 input: target_input,
905 output: target_output,
906 input_count: target_count,
907 },
908 ) => {
909 source_count == target_count
910 && is_resolved_type_assignable(source_input, target_input)
911 && is_resolved_type_assignable(source_output, target_output)
912 }
913 (TypeExpr::TriggerHandle(source), TypeExpr::TriggerHandle(target)) => {
914 is_resolved_type_assignable(source, target)
915 }
916 _ => false,
917 }
918}
919
920fn object_type_assignable(source: &[TypeField], target: &[TypeField]) -> bool {
921 target.iter().all(|target_field| {
922 let Some(source_field) = source
923 .iter()
924 .find(|source_field| source_field.name == target_field.name)
925 else {
926 return target_field.optional;
927 };
928 if !target_field.optional && source_field.optional {
929 return false;
930 }
931 is_resolved_type_assignable(&source_field.ty, &target_field.ty)
932 })
933}
934
935fn type_aliases(artifact: &ModuleArtifact) -> BTreeMap<String, TypeExpr> {
936 artifact
937 .canonical_ir
938 .declarations
939 .iter()
940 .filter_map(|declaration| match declaration {
941 crate::Declaration::Type(decl) => Some((decl.name.to_string(), decl.ty.clone())),
942 _ => None,
943 })
944 .collect()
945}
946
947#[cfg(test)]
948mod tests {
949 use super::*;
950
951 #[derive(Debug, Deserialize, PartialEq)]
952 struct ScheduleSource {
953 expr: String,
954 #[serde(default)]
955 tz: Option<String>,
956 }
957
958 fn resources() -> ResourceCatalog {
959 let mut resources = ResourceCatalog::new();
960 resources
961 .add_trigger_source_constructor(
962 ["cron", "Schedule"],
963 TypeExpr::Object(vec![
964 TypeField {
965 name: "expr".into(),
966 ty: TypeExpr::Str,
967 optional: false,
968 },
969 TypeField {
970 name: "tz".into(),
971 ty: TypeExpr::Str,
972 optional: true,
973 },
974 ]),
975 NamedDataType::object(
976 "cron.Tick",
977 vec![TypeField {
978 name: "fired_at".into(),
979 ty: TypeExpr::Str,
980 optional: false,
981 }],
982 )
983 .expect("valid cron tick type"),
984 )
985 .expect("valid cron schedule source");
986 resources
987 }
988
989 #[test]
990 fn host_value_encode_decode_and_typed_decode_round_trip() {
991 let value = serde_json::json!({
992 "expr": "*/10 * * * * *",
993 "tz": "UTC",
994 });
995 let encoded = HostValue::encode("cron.Schedule", value).expect("host value encode");
996 let decoded = HostValue::decode(&encoded).expect("host value decode");
997 let payload: ScheduleSource = decoded
998 .decode_as(&resources())
999 .expect("typed host value payload");
1000
1001 assert_eq!(
1002 payload,
1003 ScheduleSource {
1004 expr: "*/10 * * * * *".to_string(),
1005 tz: Some("UTC".to_string()),
1006 }
1007 );
1008 }
1009
1010 #[test]
1011 fn host_value_typed_decode_rejects_unknown_source_type() {
1012 let decoded = HostValue::new("missing.Source", serde_json::json!({ "expr": "*" }));
1013 let err = decoded
1014 .decode_as::<ScheduleSource>(&resources())
1015 .expect_err("unknown source type should fail");
1016
1017 assert!(
1018 matches!(err, HostValueError::UnknownSourceType { source_type } if source_type == "missing.Source")
1019 );
1020 }
1021
1022 #[test]
1023 fn host_value_typed_decode_reports_malformed_payload() {
1024 let decoded = HostValue::new("cron.Schedule", serde_json::json!({ "expr": 1 }));
1025 let err = decoded
1026 .decode_as::<ScheduleSource>(&resources())
1027 .expect_err("malformed source payload should fail");
1028
1029 assert!(
1030 matches!(err, HostValueError::MalformedPayload { source_type, .. } if source_type == "cron.Schedule")
1031 );
1032 }
1033}