Skip to main content

lash_core/
triggers.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
9pub struct TriggerEvent {
10    pub resource_type: String,
11    pub alias: String,
12    pub event: String,
13    pub payload_ty: lashlang::NamedDataType,
14}
15
16impl TriggerEvent {
17    pub fn new(
18        resource_type: impl Into<String>,
19        alias: impl Into<String>,
20        event: impl Into<String>,
21        payload_ty: lashlang::NamedDataType,
22    ) -> Self {
23        Self {
24            resource_type: resource_type.into(),
25            alias: alias.into(),
26            event: event.into(),
27            payload_ty,
28        }
29    }
30
31    pub fn payload_type(&self) -> &lashlang::NamedDataType {
32        &self.payload_ty
33    }
34
35    pub fn key(&self) -> TriggerEventKey {
36        TriggerEventKey {
37            resource_type: self.resource_type.clone(),
38            alias: self.alias.clone(),
39            event: self.event.clone(),
40        }
41    }
42
43    pub fn source_type(&self) -> String {
44        trigger_event_type(&self.alias, &self.event)
45    }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct TriggerEventKey {
50    pub resource_type: String,
51    pub alias: String,
52    pub event: String,
53}
54
55impl TriggerEventKey {
56    pub fn new(
57        resource_type: impl Into<String>,
58        alias: impl Into<String>,
59        event: impl Into<String>,
60    ) -> Self {
61        Self {
62            resource_type: resource_type.into(),
63            alias: alias.into(),
64            event: event.into(),
65        }
66    }
67
68    pub fn source_type(&self) -> String {
69        trigger_event_type(&self.alias, &self.event)
70    }
71}
72
73pub fn trigger_event_type(alias: &str, event: &str) -> String {
74    format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
78pub struct TriggerEventCatalog {
79    events: BTreeMap<TriggerEventKey, TriggerEvent>,
80}
81
82impl TriggerEventCatalog {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    pub fn declare(&mut self, event: TriggerEvent) -> Result<(), String> {
88        let key = event.key();
89        if self.events.contains_key(&key) {
90            return Err(format!(
91                "duplicate trigger occurrence `{}.{}.{}`",
92                key.resource_type, key.alias, key.event
93            ));
94        }
95        let source_type = event.source_type();
96        if let Some(existing) = self
97            .events
98            .values()
99            .find(|existing| existing.source_type() == source_type)
100        {
101            return Err(format!(
102                "duplicate trigger source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103                existing.resource_type,
104                existing.alias,
105                existing.event,
106                key.resource_type,
107                key.alias,
108                key.event
109            ));
110        }
111        self.events.insert(key, event);
112        Ok(())
113    }
114
115    pub fn from_events(events: impl IntoIterator<Item = TriggerEvent>) -> Result<Self, String> {
116        let mut catalog = Self::new();
117        for event in events {
118            catalog.declare(event)?;
119        }
120        Ok(catalog)
121    }
122
123    pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&TriggerEvent> {
124        self.events
125            .get(&TriggerEventKey::new(resource_type, alias, event))
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.events.is_empty()
130    }
131
132    pub fn events(&self) -> impl Iterator<Item = &TriggerEvent> {
133        self.events.values()
134    }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerEmitReport {
139    #[serde(default, skip_serializing_if = "String::is_empty")]
140    pub occurrence_id: String,
141    pub started_process_ids: Vec<String>,
142}
143
144impl TriggerEmitReport {
145    pub fn empty() -> Self {
146        Self::default()
147    }
148
149    fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150        Self {
151            occurrence_id,
152            started_process_ids,
153        }
154    }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct TriggerOccurrenceRequest {
159    pub source_type: String,
160    pub source_key: String,
161    #[serde(default)]
162    pub payload: serde_json::Value,
163    pub idempotency_key: String,
164    #[serde(default, skip_serializing_if = "Option::is_none")]
165    pub source: Option<serde_json::Value>,
166}
167
168impl TriggerOccurrenceRequest {
169    pub fn new(
170        source_type: impl Into<String>,
171        source_key: impl Into<String>,
172        payload: serde_json::Value,
173        idempotency_key: impl Into<String>,
174    ) -> Self {
175        Self {
176            source_type: source_type.into(),
177            source_key: source_key.into(),
178            payload,
179            idempotency_key: idempotency_key.into(),
180            source: None,
181        }
182    }
183
184    pub fn with_source(mut self, source: serde_json::Value) -> Self {
185        self.source = Some(source);
186        self
187    }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct TriggerOccurrenceRecord {
192    pub occurrence_id: String,
193    pub source_type: String,
194    pub source_key: String,
195    #[serde(default)]
196    pub payload: serde_json::Value,
197    pub idempotency_key: String,
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub source: Option<serde_json::Value>,
200    pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerEventType(String);
206
207impl TriggerEventType {
208    pub fn new(value: impl Into<String>) -> Self {
209        Self(value.into())
210    }
211
212    pub fn as_str(&self) -> &str {
213        &self.0
214    }
215}
216
217impl From<String> for TriggerEventType {
218    fn from(value: String) -> Self {
219        Self::new(value)
220    }
221}
222
223impl From<&str> for TriggerEventType {
224    fn from(value: &str) -> Self {
225        Self::new(value)
226    }
227}
228
229impl AsRef<str> for TriggerEventType {
230    fn as_ref(&self) -> &str {
231        self.as_str()
232    }
233}
234
235impl std::fmt::Display for TriggerEventType {
236    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        formatter.write_str(self.as_str())
238    }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243    pub handle: String,
244    pub source_key: String,
245    #[serde(default, skip_serializing_if = "Option::is_none")]
246    pub name: Option<String>,
247    pub source_type: TriggerEventType,
248    pub source: serde_json::Value,
249    pub target: TriggerTargetSummary,
250    #[serde(default = "default_enabled")]
251    pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256    pub process_name: String,
257    pub inputs: lashlang::TriggerInputTemplate,
258}
259
260#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
261pub struct TriggerSubscriptionDraft {
262    pub registrant: crate::ProcessOriginator,
263    pub env_ref: crate::ProcessExecutionEnvRef,
264    #[serde(default, skip_serializing_if = "Option::is_none")]
265    pub wake_target: Option<crate::SessionScope>,
266    #[serde(default, skip_serializing_if = "Option::is_none")]
267    pub name: Option<String>,
268    pub source_type: String,
269    pub source_key: String,
270    pub source: serde_json::Value,
271    pub event_ty: lashlang::TypeExpr,
272    pub module_ref: lashlang::ModuleRef,
273    pub host_requirements_ref: lashlang::HostRequirementsRef,
274    pub process_ref: lashlang::ProcessRef,
275    pub process_name: String,
276    pub input_template: lashlang::TriggerInputTemplate,
277}
278
279#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
280pub struct TriggerSubscriptionRecord {
281    pub subscription_id: String,
282    pub registrant: crate::ProcessOriginator,
283    pub env_ref: crate::ProcessExecutionEnvRef,
284    #[serde(default, skip_serializing_if = "Option::is_none")]
285    pub wake_target: Option<crate::SessionScope>,
286    pub handle: String,
287    #[serde(default, skip_serializing_if = "Option::is_none")]
288    pub name: Option<String>,
289    pub source_type: String,
290    pub source_key: String,
291    pub source: serde_json::Value,
292    pub event_ty: lashlang::TypeExpr,
293    pub module_ref: lashlang::ModuleRef,
294    pub host_requirements_ref: lashlang::HostRequirementsRef,
295    pub process_ref: lashlang::ProcessRef,
296    pub process_name: String,
297    pub input_template: lashlang::TriggerInputTemplate,
298    #[serde(default = "default_enabled")]
299    pub enabled: bool,
300    pub created_at_ms: u64,
301    pub updated_at_ms: u64,
302}
303
304impl TriggerSubscriptionRecord {
305    pub fn registrant_scope_id(&self) -> String {
306        self.registrant.scope_id()
307    }
308
309    pub fn registrant_session_id(&self) -> Option<&str> {
310        match &self.registrant {
311            crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
312            crate::ProcessOriginator::Host => None,
313        }
314    }
315}
316
317impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
318    fn from(route: &TriggerSubscriptionRecord) -> Self {
319        Self {
320            handle: route.handle.clone(),
321            source_key: route.source_key.clone(),
322            name: route.name.clone(),
323            source_type: TriggerEventType::new(route.source_type.clone()),
324            source: route.source.clone(),
325            target: TriggerTargetSummary {
326                process_name: route.process_name.clone(),
327                inputs: route.input_template.clone(),
328            },
329            enabled: route.enabled,
330        }
331    }
332}
333
334#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
335pub struct TriggerSubscriptionFilter {
336    #[serde(default, skip_serializing_if = "Option::is_none")]
337    pub session_id: Option<String>,
338    #[serde(default, skip_serializing_if = "Option::is_none")]
339    pub handle: Option<String>,
340    #[serde(default, skip_serializing_if = "Option::is_none")]
341    pub name: Option<String>,
342    #[serde(default, skip_serializing_if = "Option::is_none")]
343    pub source_type: Option<String>,
344    #[serde(default, skip_serializing_if = "Option::is_none")]
345    pub source_key: Option<String>,
346    #[serde(default, skip_serializing_if = "Option::is_none")]
347    pub target: Option<lashlang::TriggerTargetIdentity>,
348    #[serde(default, skip_serializing_if = "Option::is_none")]
349    pub enabled: Option<bool>,
350}
351
352impl TriggerSubscriptionFilter {
353    pub fn for_session(session_id: impl Into<String>) -> Self {
354        Self {
355            session_id: Some(session_id.into()),
356            ..Self::default()
357        }
358    }
359
360    pub fn for_source_type(source_type: impl Into<String>) -> Self {
361        Self {
362            source_type: Some(source_type.into()),
363            ..Self::default()
364        }
365    }
366
367    pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
368        self.session_id
369            .as_deref()
370            .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
371            && self
372                .handle
373                .as_deref()
374                .is_none_or(|handle| record.handle == handle)
375            && self
376                .name
377                .as_deref()
378                .is_none_or(|name| record.name.as_deref() == Some(name))
379            && self
380                .source_type
381                .as_deref()
382                .is_none_or(|source_type| record.source_type == source_type)
383            && self
384                .source_key
385                .as_deref()
386                .is_none_or(|source_key| record.source_key == source_key)
387            && self.enabled.is_none_or(|enabled| record.enabled == enabled)
388            && self.target.as_ref().is_none_or(|target| {
389                target.matches(
390                    &record.module_ref,
391                    &record.host_requirements_ref,
392                    &record.process_ref,
393                    &record.process_name,
394                )
395            })
396    }
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct TriggerDeliveryReservation {
401    pub occurrence: TriggerOccurrenceRecord,
402    pub subscription: TriggerSubscriptionRecord,
403    pub process_id: String,
404}
405
406#[async_trait::async_trait]
407pub trait TriggerStore: Send + Sync {
408    fn durability_tier(&self) -> crate::DurabilityTier {
409        crate::DurabilityTier::Inline
410    }
411
412    async fn source_key_for_subscription(
413        &self,
414        source_type: &str,
415        source: &serde_json::Value,
416    ) -> Result<String, PluginError> {
417        default_trigger_source_key(source_type, source)
418    }
419
420    async fn register_subscription(
421        &self,
422        draft: TriggerSubscriptionDraft,
423    ) -> Result<TriggerSubscriptionRecord, PluginError>;
424
425    async fn list_subscriptions(
426        &self,
427        filter: TriggerSubscriptionFilter,
428    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
429
430    async fn cancel_subscription(
431        &self,
432        session_id: &str,
433        handle: &str,
434    ) -> Result<bool, PluginError>;
435
436    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
437
438    async fn record_occurrence(
439        &self,
440        request: TriggerOccurrenceRequest,
441    ) -> Result<TriggerOccurrenceRecord, PluginError>;
442
443    async fn reserve_matching_deliveries(
444        &self,
445        occurrence_id: &str,
446    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
447}
448
449#[derive(Default)]
450pub struct InMemoryTriggerStore {
451    state: Mutex<InMemoryTriggerEventState>,
452}
453
454#[derive(Default)]
455struct InMemoryTriggerEventState {
456    next_subscription_seq: u64,
457    subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
458    occurrences: BTreeMap<String, TriggerOccurrenceRecord>,
459    occurrence_id_by_idempotency_key: BTreeMap<String, String>,
460    occurrence_hashes: BTreeMap<String, String>,
461    deliveries: BTreeSet<(String, String)>,
462}
463
464#[async_trait::async_trait]
465impl TriggerStore for InMemoryTriggerStore {
466    async fn register_subscription(
467        &self,
468        draft: TriggerSubscriptionDraft,
469    ) -> Result<TriggerSubscriptionRecord, PluginError> {
470        let mut state = self
471            .state
472            .lock()
473            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
474        state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
475        let handle = format!("trigger:{}", state.next_subscription_seq);
476        let subscription_id = format!("subscription:{}", state.next_subscription_seq);
477        let now = crate::runtime::current_epoch_ms();
478        let record = TriggerSubscriptionRecord {
479            subscription_id: subscription_id.clone(),
480            registrant: draft.registrant,
481            env_ref: draft.env_ref,
482            wake_target: draft.wake_target,
483            handle,
484            name: draft.name,
485            source_type: draft.source_type,
486            source_key: draft.source_key,
487            source: draft.source,
488            event_ty: draft.event_ty,
489            module_ref: draft.module_ref,
490            host_requirements_ref: draft.host_requirements_ref,
491            process_ref: draft.process_ref,
492            process_name: draft.process_name,
493            input_template: draft.input_template,
494            enabled: true,
495            created_at_ms: now,
496            updated_at_ms: now,
497        };
498        state.subscriptions.insert(subscription_id, record.clone());
499        Ok(record)
500    }
501
502    async fn list_subscriptions(
503        &self,
504        filter: TriggerSubscriptionFilter,
505    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
506        let state = self
507            .state
508            .lock()
509            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
510        let mut records = state
511            .subscriptions
512            .values()
513            .filter(|record| filter.matches(record))
514            .cloned()
515            .collect::<Vec<_>>();
516        records.sort_by(|left, right| {
517            left.registrant_scope_id()
518                .cmp(&right.registrant_scope_id())
519                .then_with(|| left.handle.cmp(&right.handle))
520        });
521        Ok(records)
522    }
523
524    async fn cancel_subscription(
525        &self,
526        session_id: &str,
527        handle: &str,
528    ) -> Result<bool, PluginError> {
529        let mut state = self
530            .state
531            .lock()
532            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
533        let now = crate::runtime::current_epoch_ms();
534        let Some(record) = state.subscriptions.values_mut().find(|record| {
535            record.registrant_session_id() == Some(session_id) && record.handle == handle
536        }) else {
537            return Ok(false);
538        };
539        let changed = record.enabled;
540        record.enabled = false;
541        record.updated_at_ms = now;
542        Ok(changed)
543    }
544
545    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
546        let mut state = self
547            .state
548            .lock()
549            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
550        let before = state.subscriptions.len();
551        state
552            .subscriptions
553            .retain(|_, record| record.registrant_session_id() != Some(session_id));
554        Ok(before.saturating_sub(state.subscriptions.len()))
555    }
556
557    async fn record_occurrence(
558        &self,
559        request: TriggerOccurrenceRequest,
560    ) -> Result<TriggerOccurrenceRecord, PluginError> {
561        validate_trigger_occurrence_request(&request)?;
562        let mut state = self
563            .state
564            .lock()
565            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
566        let request_hash = trigger_occurrence_request_hash(&request)?;
567        if let Some(existing_id) = state
568            .occurrence_id_by_idempotency_key
569            .get(&request.idempotency_key)
570            .cloned()
571        {
572            let existing_hash = state
573                .occurrence_hashes
574                .get(&existing_id)
575                .cloned()
576                .unwrap_or_default();
577            if existing_hash != request_hash {
578                return Err(PluginError::Session(format!(
579                    "trigger occurrence idempotency conflict for `{}`",
580                    request.idempotency_key
581                )));
582            }
583            return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
584                PluginError::Session(format!(
585                    "missing trigger occurrence `{existing_id}` for idempotency key"
586                ))
587            });
588        }
589        let occurrence_id = deterministic_occurrence_id(&request)?;
590        let record = TriggerOccurrenceRecord {
591            occurrence_id: occurrence_id.clone(),
592            source_type: request.source_type,
593            source_key: request.source_key,
594            payload: request.payload,
595            idempotency_key: request.idempotency_key.clone(),
596            source: request.source,
597            occurred_at_ms: crate::runtime::current_epoch_ms(),
598        };
599        state
600            .occurrence_id_by_idempotency_key
601            .insert(request.idempotency_key, occurrence_id.clone());
602        state
603            .occurrence_hashes
604            .insert(occurrence_id.clone(), request_hash);
605        state.occurrences.insert(occurrence_id, record.clone());
606        Ok(record)
607    }
608
609    async fn reserve_matching_deliveries(
610        &self,
611        occurrence_id: &str,
612    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
613        let mut state = self
614            .state
615            .lock()
616            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
617        let occurrence = state
618            .occurrences
619            .get(occurrence_id)
620            .cloned()
621            .ok_or_else(|| {
622                PluginError::Session(format!("unknown trigger occurrence `{occurrence_id}`"))
623            })?;
624        let subscriptions = state
625            .subscriptions
626            .values()
627            .filter(|record| {
628                record.enabled
629                    && record.source_type == occurrence.source_type
630                    && record.source_key == occurrence.source_key
631            })
632            .cloned()
633            .collect::<Vec<_>>();
634        let mut deliveries = Vec::new();
635        for subscription in subscriptions {
636            let key = (
637                occurrence.occurrence_id.clone(),
638                subscription.subscription_id.clone(),
639            );
640            if !state.deliveries.insert(key) {
641                continue;
642            }
643            let process_id = deterministic_delivery_process_id(
644                &occurrence.occurrence_id,
645                &subscription.subscription_id,
646            )?;
647            deliveries.push(TriggerDeliveryReservation {
648                occurrence: occurrence.clone(),
649                subscription,
650                process_id,
651            });
652        }
653        Ok(deliveries)
654    }
655}
656
657fn default_enabled() -> bool {
658    true
659}
660
661pub fn default_trigger_source_key(
662    source_type: &str,
663    source: &serde_json::Value,
664) -> Result<String, PluginError> {
665    let digest = crate::stable_hash::stable_json_sha256_hex(&(source_type, source))
666        .map_err(|err| PluginError::Session(format!("failed to hash trigger source key: {err}")))?;
667    Ok(format!("source:{source_type}:sha256:{digest}"))
668}
669
670pub fn empty_trigger_source_key(source_type: &str) -> Result<String, PluginError> {
671    default_trigger_source_key(source_type, &serde_json::json!({}))
672}
673
674pub fn deterministic_occurrence_id(
675    request: &TriggerOccurrenceRequest,
676) -> Result<String, PluginError> {
677    let digest = crate::stable_hash::stable_json_sha256_hex(&(
678        request.source_type.as_str(),
679        request.source_key.as_str(),
680        request.idempotency_key.as_str(),
681    ))
682    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))?;
683    Ok(format!("trigger:{digest}"))
684}
685
686pub fn deterministic_delivery_process_id(
687    occurrence_id: &str,
688    subscription_id: &str,
689) -> Result<String, PluginError> {
690    let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
691        .map_err(|err| PluginError::Session(format!("failed to hash trigger delivery: {err}")))?;
692    Ok(format!("process:trigger:{digest}"))
693}
694
695#[derive(Clone)]
696pub struct TriggerRouter {
697    store: Arc<dyn TriggerStore>,
698    artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
699    process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
700    process_work_poke: Option<crate::ProcessWorkPoke>,
701}
702
703impl TriggerRouter {
704    pub fn new(
705        store: Arc<dyn TriggerStore>,
706        artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
707        process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
708        process_work_poke: Option<crate::ProcessWorkPoke>,
709    ) -> Self {
710        Self {
711            store,
712            artifact_store,
713            process_registry,
714            process_work_poke,
715        }
716    }
717
718    pub fn store(&self) -> Arc<dyn TriggerStore> {
719        Arc::clone(&self.store)
720    }
721
722    pub async fn emit(
723        &self,
724        request: TriggerOccurrenceRequest,
725        effect_controller: &dyn crate::RuntimeEffectController,
726    ) -> Result<TriggerEmitReport, PluginError> {
727        let occurrence = self.store.record_occurrence(request).await?;
728        let reservations = self
729            .store
730            .reserve_matching_deliveries(&occurrence.occurrence_id)
731            .await?;
732        let Some(process_registry) = self.process_registry.as_ref() else {
733            if reservations.is_empty() {
734                return Ok(TriggerEmitReport::new(occurrence.occurrence_id, Vec::new()));
735            }
736            return Err(PluginError::Session(
737                "trigger delivery requires a process registry".to_string(),
738            ));
739        };
740        let mut started_process_ids = Vec::new();
741        let mut start_errors = Vec::new();
742        for reservation in reservations {
743            let process_id = reservation.process_id.clone();
744            if let Err(err) = self
745                .start_delivery(
746                    &reservation,
747                    Arc::clone(process_registry),
748                    effect_controller,
749                )
750                .await
751            {
752                start_errors.push(format!(
753                    "{}: {err}",
754                    reservation.subscription.subscription_id
755                ));
756                continue;
757            }
758            started_process_ids.push(process_id);
759        }
760        if !started_process_ids.is_empty()
761            && let Some(poke) = self.process_work_poke.as_ref()
762        {
763            poke.poke();
764        }
765        if started_process_ids.is_empty()
766            && let Some(message) = trigger_delivery_failure_summary(&start_errors)
767        {
768            return Err(PluginError::Session(message));
769        }
770        Ok(TriggerEmitReport::new(
771            occurrence.occurrence_id,
772            started_process_ids,
773        ))
774    }
775
776    async fn start_delivery(
777        &self,
778        reservation: &TriggerDeliveryReservation,
779        process_registry: Arc<dyn crate::ProcessRegistry>,
780        effect_controller: &dyn crate::RuntimeEffectController,
781    ) -> Result<(), PluginError> {
782        let subscription = &reservation.subscription;
783        let occurrence = &reservation.occurrence;
784        validate_payload(&occurrence.payload, &subscription.event_ty).map_err(|message| {
785            PluginError::Session(format!(
786                "invalid payload for trigger `{}`: {message}",
787                subscription.handle
788            ))
789        })?;
790        let artifact = self
791            .artifact_store
792            .get_module_artifact(&subscription.module_ref)
793            .await
794            .map_err(|err| {
795                PluginError::Session(format!(
796                    "failed to load trigger target module `{}`: {err}",
797                    subscription.module_ref
798                ))
799            })?
800            .ok_or_else(|| {
801                PluginError::Session(format!(
802                    "missing trigger target module `{}`",
803                    subscription.module_ref
804                ))
805            })?;
806        let signal_event_types = artifact
807            .canonical_ir
808            .process(&subscription.process_name)
809            .map(crate::lashlang_process_signal_event_types)
810            .unwrap_or_default();
811        let args =
812            materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
813        let originator_scope_id = subscription.registrant_scope_id();
814        let trigger_occurrence_invocation = crate::runtime::causal::trigger_occurrence_invocation(
815            &originator_scope_id,
816            &occurrence.occurrence_id,
817        );
818        let registration = crate::ProcessRegistration::new(
819            reservation.process_id.clone(),
820            crate::ProcessInput::LashlangProcess {
821                module_ref: subscription.module_ref.clone(),
822                process_ref: subscription.process_ref.clone(),
823                host_requirements_ref: subscription.host_requirements_ref.clone(),
824                process_name: subscription.process_name.clone(),
825                args,
826            },
827            crate::ProcessProvenance::new(subscription.registrant.clone())
828                .with_caused_by(trigger_occurrence_invocation.causal_ref()),
829        )
830        .with_extra_event_types(
831            crate::lashlang_process_event_types()
832                .into_iter()
833                .chain(signal_event_types),
834        )
835        .with_execution_env_ref(Some(subscription.env_ref.clone()))
836        .with_wake_target(subscription.wake_target.clone());
837        let grant =
838            subscription
839                .wake_target
840                .clone()
841                .map(|session_scope| crate::ProcessStartGrant {
842                    session_scope,
843                    descriptor: crate::ProcessHandleDescriptor::new(
844                        Some("lashlang"),
845                        Some(subscription.process_name.as_str()),
846                    ),
847                });
848        let execution_context = crate::ProcessExecutionContext::default()
849            .with_causal_invocation(Some(trigger_occurrence_invocation));
850        let command = crate::ProcessCommand::Start {
851            registration,
852            grant,
853            execution_context: Box::new(execution_context),
854        };
855        let effect_id = command.effect_id();
856        let invocation = crate::RuntimeInvocation::effect(
857            crate::RuntimeScope::new(originator_scope_id),
858            effect_id.clone(),
859            crate::RuntimeEffectKind::Process,
860            format!(
861                "trigger:{}:{}",
862                occurrence.occurrence_id, subscription.subscription_id
863            ),
864        )
865        .with_caused_by(Some(crate::CausalRef::TriggerOccurrence {
866            occurrence_id: occurrence.occurrence_id.clone(),
867        }));
868        let outcome = effect_controller
869            .execute_effect(
870                crate::RuntimeEffectEnvelope::new(
871                    invocation,
872                    crate::RuntimeEffectCommand::process(command),
873                ),
874                crate::RuntimeEffectLocalExecutor::processes(process_registry),
875            )
876            .await?;
877        match outcome {
878            crate::RuntimeEffectOutcome::Process {
879                result: crate::ProcessEffectOutcome::Start { .. },
880            } => Ok(()),
881            other => Err(PluginError::Session(format!(
882                "trigger process start returned the wrong outcome: {}",
883                other.kind().as_str()
884            ))),
885        }
886    }
887}
888
889fn trigger_delivery_failure_summary(errors: &[String]) -> Option<String> {
890    match errors {
891        [] => None,
892        [only] => Some(format!("trigger delivery failed: {only}")),
893        [first, rest @ ..] => Some(format!(
894            "trigger delivery failed for {} matching subscriptions: {first}; {} more failed",
895            errors.len(),
896            rest.len()
897        )),
898    }
899}
900
901fn materialize_trigger_process_args(
902    input_template: &lashlang::TriggerInputTemplate,
903    event_payload: &serde_json::Value,
904) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
905    let mut args = lashlang::Record::default();
906    for (input_name, input) in input_template.entries() {
907        let value = match input {
908            lashlang::TriggerInputBinding::Event => event_payload.clone(),
909            lashlang::TriggerInputBinding::Fixed { value } => value.clone(),
910        };
911        args.insert(input_name.to_string(), lashlang::from_json(value));
912    }
913    match serde_json::to_value(lashlang::Value::Record(Arc::new(args)))
914        .map_err(|err| PluginError::Session(format!("serialize trigger process args: {err}")))?
915    {
916        serde_json::Value::Object(map) => Ok(map),
917        _ => Err(PluginError::Session(
918            "trigger process args must serialize as an object".to_string(),
919        )),
920    }
921}
922
923pub fn validate_payload(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> Result<(), String> {
924    if json_matches_type(value, ty) {
925        Ok(())
926    } else {
927        Err(format!("expected {}", lashlang::format_type_expr(ty)))
928    }
929}
930
931fn json_matches_type(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> bool {
932    match ty {
933        lashlang::TypeExpr::Any => true,
934        lashlang::TypeExpr::Ref(_) => false,
935        lashlang::TypeExpr::Str => value.is_string(),
936        lashlang::TypeExpr::Int => value.as_i64().is_some() || value.as_u64().is_some(),
937        lashlang::TypeExpr::Float => value.is_number(),
938        lashlang::TypeExpr::Bool => value.is_boolean(),
939        lashlang::TypeExpr::Dict => value.is_object(),
940        lashlang::TypeExpr::Null => value.is_null(),
941        lashlang::TypeExpr::Enum(values) => value
942            .as_str()
943            .is_some_and(|value| values.iter().any(|candidate| candidate.as_str() == value)),
944        lashlang::TypeExpr::List(item) => value.as_array().is_some_and(|items| {
945            items
946                .iter()
947                .all(|item_value| json_matches_type(item_value, item))
948        }),
949        lashlang::TypeExpr::Object(fields) => {
950            let Some(map) = value.as_object() else {
951                return false;
952            };
953            fields
954                .iter()
955                .all(|field| match map.get(field.name.as_str()) {
956                    Some(field_value) => json_matches_type(field_value, &field.ty),
957                    None => field.optional,
958                })
959        }
960        lashlang::TypeExpr::Union(items) => items.iter().any(|item| json_matches_type(value, item)),
961        lashlang::TypeExpr::Process { .. } | lashlang::TypeExpr::TriggerHandle(_) => {
962            value.is_object()
963        }
964    }
965}
966
967pub fn validate_trigger_occurrence_request(
968    request: &TriggerOccurrenceRequest,
969) -> Result<(), PluginError> {
970    if request.source_type.trim().is_empty() {
971        return Err(PluginError::Session(
972            "trigger occurrence requires source_type".to_string(),
973        ));
974    }
975    if request.source_key.trim().is_empty() {
976        return Err(PluginError::Session(
977            "trigger occurrence requires source_key".to_string(),
978        ));
979    }
980    if request.idempotency_key.trim().is_empty() {
981        return Err(PluginError::Session(
982            "trigger occurrence requires idempotency_key".to_string(),
983        ));
984    }
985    Ok(())
986}
987
988pub fn trigger_occurrence_request_hash(
989    request: &TriggerOccurrenceRequest,
990) -> Result<String, PluginError> {
991    crate::stable_hash::stable_json_sha256_hex(&(
992        request.source_type.as_str(),
993        request.source_key.as_str(),
994        &request.payload,
995        &request.source,
996    ))
997    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use super::*;
1003
1004    fn button_payload_type() -> lashlang::NamedDataType {
1005        lashlang::NamedDataType::object(
1006            "ui.button.Pressed",
1007            vec![lashlang::TypeField {
1008                name: "button".into(),
1009                ty: lashlang::TypeExpr::Str,
1010                optional: false,
1011            }],
1012        )
1013        .expect("valid trigger occurrence payload")
1014    }
1015
1016    #[test]
1017    fn trigger_catalog_rejects_duplicate_trigger_source_identity() {
1018        let mut catalog = TriggerEventCatalog::new();
1019        catalog
1020            .declare(TriggerEvent::new(
1021                "Button",
1022                "ui.button",
1023                "pressed",
1024                button_payload_type(),
1025            ))
1026            .expect("first trigger occurrence");
1027
1028        let err = catalog
1029            .declare(TriggerEvent::new(
1030                "AlternateButton",
1031                "ui.button",
1032                "pressed",
1033                button_payload_type(),
1034            ))
1035            .expect_err("duplicate public source identity should be rejected");
1036
1037        assert!(err.contains("duplicate trigger source `ui.button.pressed`"));
1038    }
1039
1040    #[test]
1041    fn trigger_subscription_record_rejects_legacy_required_surface_ref() {
1042        let mut inputs = BTreeMap::new();
1043        inputs.insert("event".to_string(), lashlang::TriggerInputBinding::Event);
1044        let record = TriggerSubscriptionRecord {
1045            subscription_id: "subscription:1".to_string(),
1046            registrant: crate::ProcessOriginator::session(crate::SessionScope::new("session-a")),
1047            env_ref: crate::ProcessExecutionEnvRef::new("process-env:session-a"),
1048            wake_target: Some(crate::SessionScope::new("session-a")),
1049            handle: "trigger:1".to_string(),
1050            name: Some("button watcher".to_string()),
1051            source_type: "ui.button.pressed".to_string(),
1052            source_key: empty_trigger_source_key("ui.button.pressed").expect("source key"),
1053            source: serde_json::json!({}),
1054            event_ty: lashlang::TypeExpr::Object(vec![lashlang::TypeField {
1055                name: "button".into(),
1056                ty: lashlang::TypeExpr::Str,
1057                optional: false,
1058            }]),
1059            module_ref: lashlang::ModuleRef::new(&lashlang::ContentHash::new("module")),
1060            host_requirements_ref: lashlang::HostRequirementsRef::new(&lashlang::ContentHash::new(
1061                "surface",
1062            )),
1063            process_ref: lashlang::ProcessRef::new(lashlang::ContentHash::new("process"), 0),
1064            process_name: "on_button".to_string(),
1065            input_template: lashlang::TriggerInputTemplate::new(inputs),
1066            enabled: true,
1067            created_at_ms: 1,
1068            updated_at_ms: 1,
1069        };
1070        let mut value = serde_json::to_value(record).expect("record json");
1071        let object = value.as_object_mut().expect("record object");
1072        let legacy_ref = object
1073            .remove("host_requirements_ref")
1074            .expect("host requirements ref");
1075        object.insert("required_surface_ref".to_string(), legacy_ref);
1076
1077        let err = serde_json::from_value::<TriggerSubscriptionRecord>(value)
1078            .expect_err("legacy record must be malformed");
1079
1080        assert!(err.to_string().contains("host_requirements_ref"));
1081    }
1082}