Skip to main content

lash_core/
triggers.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
9pub struct TriggerEvent {
10    pub resource_type: String,
11    pub alias: String,
12    pub event: String,
13    pub payload_ty: lashlang::NamedDataType,
14}
15
16impl TriggerEvent {
17    pub fn new(
18        resource_type: impl Into<String>,
19        alias: impl Into<String>,
20        event: impl Into<String>,
21        payload_ty: lashlang::NamedDataType,
22    ) -> Self {
23        Self {
24            resource_type: resource_type.into(),
25            alias: alias.into(),
26            event: event.into(),
27            payload_ty,
28        }
29    }
30
31    pub fn payload_type(&self) -> &lashlang::NamedDataType {
32        &self.payload_ty
33    }
34
35    pub fn key(&self) -> TriggerEventKey {
36        TriggerEventKey {
37            resource_type: self.resource_type.clone(),
38            alias: self.alias.clone(),
39            event: self.event.clone(),
40        }
41    }
42
43    pub fn source_type(&self) -> String {
44        trigger_event_type(&self.alias, &self.event)
45    }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct TriggerEventKey {
50    pub resource_type: String,
51    pub alias: String,
52    pub event: String,
53}
54
55impl TriggerEventKey {
56    pub fn new(
57        resource_type: impl Into<String>,
58        alias: impl Into<String>,
59        event: impl Into<String>,
60    ) -> Self {
61        Self {
62            resource_type: resource_type.into(),
63            alias: alias.into(),
64            event: event.into(),
65        }
66    }
67
68    pub fn source_type(&self) -> String {
69        trigger_event_type(&self.alias, &self.event)
70    }
71}
72
73pub fn trigger_event_type(alias: &str, event: &str) -> String {
74    format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
78pub struct TriggerEventCatalog {
79    events: BTreeMap<TriggerEventKey, TriggerEvent>,
80}
81
82impl TriggerEventCatalog {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    pub fn declare(&mut self, event: TriggerEvent) -> Result<(), String> {
88        let key = event.key();
89        if self.events.contains_key(&key) {
90            return Err(format!(
91                "duplicate trigger occurrence `{}.{}.{}`",
92                key.resource_type, key.alias, key.event
93            ));
94        }
95        let source_type = event.source_type();
96        if let Some(existing) = self
97            .events
98            .values()
99            .find(|existing| existing.source_type() == source_type)
100        {
101            return Err(format!(
102                "duplicate trigger source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103                existing.resource_type,
104                existing.alias,
105                existing.event,
106                key.resource_type,
107                key.alias,
108                key.event
109            ));
110        }
111        self.events.insert(key, event);
112        Ok(())
113    }
114
115    pub fn from_events(events: impl IntoIterator<Item = TriggerEvent>) -> Result<Self, String> {
116        let mut catalog = Self::new();
117        for event in events {
118            catalog.declare(event)?;
119        }
120        Ok(catalog)
121    }
122
123    pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&TriggerEvent> {
124        self.events
125            .get(&TriggerEventKey::new(resource_type, alias, event))
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.events.is_empty()
130    }
131
132    pub fn events(&self) -> impl Iterator<Item = &TriggerEvent> {
133        self.events.values()
134    }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerEmitReport {
139    #[serde(default, skip_serializing_if = "String::is_empty")]
140    pub occurrence_id: String,
141    pub started_process_ids: Vec<String>,
142}
143
144impl TriggerEmitReport {
145    pub fn empty() -> Self {
146        Self::default()
147    }
148
149    fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150        Self {
151            occurrence_id,
152            started_process_ids,
153        }
154    }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct TriggerOccurrenceRequest {
159    pub source_type: String,
160    pub source_key: String,
161    #[serde(default)]
162    pub payload: serde_json::Value,
163    pub idempotency_key: String,
164    #[serde(default, skip_serializing_if = "Option::is_none")]
165    pub source: Option<serde_json::Value>,
166}
167
168impl TriggerOccurrenceRequest {
169    pub fn new(
170        source_type: impl Into<String>,
171        source_key: impl Into<String>,
172        payload: serde_json::Value,
173        idempotency_key: impl Into<String>,
174    ) -> Self {
175        Self {
176            source_type: source_type.into(),
177            source_key: source_key.into(),
178            payload,
179            idempotency_key: idempotency_key.into(),
180            source: None,
181        }
182    }
183
184    pub fn with_source(mut self, source: serde_json::Value) -> Self {
185        self.source = Some(source);
186        self
187    }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct TriggerOccurrenceRecord {
192    pub occurrence_id: String,
193    pub source_type: String,
194    pub source_key: String,
195    #[serde(default)]
196    pub payload: serde_json::Value,
197    pub idempotency_key: String,
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub source: Option<serde_json::Value>,
200    pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerEventType(String);
206
207impl TriggerEventType {
208    pub fn new(value: impl Into<String>) -> Self {
209        Self(value.into())
210    }
211
212    pub fn as_str(&self) -> &str {
213        &self.0
214    }
215}
216
217impl From<String> for TriggerEventType {
218    fn from(value: String) -> Self {
219        Self::new(value)
220    }
221}
222
223impl From<&str> for TriggerEventType {
224    fn from(value: &str) -> Self {
225        Self::new(value)
226    }
227}
228
229impl AsRef<str> for TriggerEventType {
230    fn as_ref(&self) -> &str {
231        self.as_str()
232    }
233}
234
235impl std::fmt::Display for TriggerEventType {
236    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        formatter.write_str(self.as_str())
238    }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243    pub handle: String,
244    pub source_key: String,
245    #[serde(default, skip_serializing_if = "Option::is_none")]
246    pub name: Option<String>,
247    pub source_type: TriggerEventType,
248    pub source: serde_json::Value,
249    pub target: TriggerTargetSummary,
250    #[serde(default = "default_enabled")]
251    pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256    pub process_name: String,
257    pub inputs: lashlang::TriggerInputTemplate,
258}
259
260#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
261pub struct TriggerSubscriptionDraft {
262    pub registrant: crate::ProcessOriginator,
263    pub env_ref: crate::ProcessExecutionEnvRef,
264    #[serde(default, skip_serializing_if = "Option::is_none")]
265    pub wake_target: Option<crate::SessionScope>,
266    #[serde(default, skip_serializing_if = "Option::is_none")]
267    pub name: Option<String>,
268    pub source_type: String,
269    pub source_key: String,
270    pub source: serde_json::Value,
271    pub event_ty: lashlang::TypeExpr,
272    pub module_ref: lashlang::ModuleRef,
273    pub host_requirements_ref: lashlang::HostRequirementsRef,
274    pub process_ref: lashlang::ProcessRef,
275    pub process_name: String,
276    pub input_template: lashlang::TriggerInputTemplate,
277}
278
279#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
280pub struct TriggerSubscriptionRecord {
281    pub subscription_id: String,
282    pub registrant: crate::ProcessOriginator,
283    pub env_ref: crate::ProcessExecutionEnvRef,
284    #[serde(default, skip_serializing_if = "Option::is_none")]
285    pub wake_target: Option<crate::SessionScope>,
286    pub handle: String,
287    #[serde(default, skip_serializing_if = "Option::is_none")]
288    pub name: Option<String>,
289    pub source_type: String,
290    pub source_key: String,
291    pub source: serde_json::Value,
292    pub event_ty: lashlang::TypeExpr,
293    pub module_ref: lashlang::ModuleRef,
294    pub host_requirements_ref: lashlang::HostRequirementsRef,
295    pub process_ref: lashlang::ProcessRef,
296    pub process_name: String,
297    pub input_template: lashlang::TriggerInputTemplate,
298    #[serde(default = "default_enabled")]
299    pub enabled: bool,
300    pub created_at_ms: u64,
301    pub updated_at_ms: u64,
302}
303
304impl TriggerSubscriptionRecord {
305    pub fn registrant_scope_id(&self) -> String {
306        self.registrant.scope_id()
307    }
308
309    pub fn registrant_session_id(&self) -> Option<&str> {
310        match &self.registrant {
311            crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
312            crate::ProcessOriginator::Host => None,
313        }
314    }
315}
316
317impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
318    fn from(route: &TriggerSubscriptionRecord) -> Self {
319        Self {
320            handle: route.handle.clone(),
321            source_key: route.source_key.clone(),
322            name: route.name.clone(),
323            source_type: TriggerEventType::new(route.source_type.clone()),
324            source: route.source.clone(),
325            target: TriggerTargetSummary {
326                process_name: route.process_name.clone(),
327                inputs: route.input_template.clone(),
328            },
329            enabled: route.enabled,
330        }
331    }
332}
333
334#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
335pub struct TriggerSubscriptionFilter {
336    #[serde(default, skip_serializing_if = "Option::is_none")]
337    pub session_id: Option<String>,
338    #[serde(default, skip_serializing_if = "Option::is_none")]
339    pub handle: Option<String>,
340    #[serde(default, skip_serializing_if = "Option::is_none")]
341    pub name: Option<String>,
342    #[serde(default, skip_serializing_if = "Option::is_none")]
343    pub source_type: Option<String>,
344    #[serde(default, skip_serializing_if = "Option::is_none")]
345    pub source_key: Option<String>,
346    #[serde(default, skip_serializing_if = "Option::is_none")]
347    pub target: Option<lashlang::TriggerTargetIdentity>,
348    #[serde(default, skip_serializing_if = "Option::is_none")]
349    pub enabled: Option<bool>,
350}
351
352impl TriggerSubscriptionFilter {
353    pub fn for_session(session_id: impl Into<String>) -> Self {
354        Self {
355            session_id: Some(session_id.into()),
356            ..Self::default()
357        }
358    }
359
360    pub fn for_source_type(source_type: impl Into<String>) -> Self {
361        Self {
362            source_type: Some(source_type.into()),
363            ..Self::default()
364        }
365    }
366
367    pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
368        self.session_id
369            .as_deref()
370            .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
371            && self
372                .handle
373                .as_deref()
374                .is_none_or(|handle| record.handle == handle)
375            && self
376                .name
377                .as_deref()
378                .is_none_or(|name| record.name.as_deref() == Some(name))
379            && self
380                .source_type
381                .as_deref()
382                .is_none_or(|source_type| record.source_type == source_type)
383            && self
384                .source_key
385                .as_deref()
386                .is_none_or(|source_key| record.source_key == source_key)
387            && self.enabled.is_none_or(|enabled| record.enabled == enabled)
388            && self.target.as_ref().is_none_or(|target| {
389                target.matches(
390                    &record.module_ref,
391                    &record.host_requirements_ref,
392                    &record.process_ref,
393                    &record.process_name,
394                )
395            })
396    }
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct TriggerDeliveryReservation {
401    pub occurrence: TriggerOccurrenceRecord,
402    pub subscription: TriggerSubscriptionRecord,
403    pub process_id: String,
404}
405
406#[async_trait::async_trait]
407pub trait TriggerStore: Send + Sync {
408    fn durability_tier(&self) -> crate::DurabilityTier {
409        crate::DurabilityTier::Inline
410    }
411
412    async fn source_key_for_subscription(
413        &self,
414        source_type: &str,
415        source: &serde_json::Value,
416    ) -> Result<String, PluginError> {
417        default_trigger_source_key(source_type, source)
418    }
419
420    async fn register_subscription(
421        &self,
422        draft: TriggerSubscriptionDraft,
423    ) -> Result<TriggerSubscriptionRecord, PluginError>;
424
425    async fn list_subscriptions(
426        &self,
427        filter: TriggerSubscriptionFilter,
428    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
429
430    async fn cancel_subscription(
431        &self,
432        session_id: &str,
433        handle: &str,
434    ) -> Result<bool, PluginError>;
435
436    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
437
438    async fn record_occurrence(
439        &self,
440        request: TriggerOccurrenceRequest,
441    ) -> Result<TriggerOccurrenceRecord, PluginError>;
442
443    async fn reserve_matching_deliveries(
444        &self,
445        occurrence_id: &str,
446    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
447}
448
449#[derive(Default)]
450pub struct InMemoryTriggerStore {
451    state: Mutex<InMemoryTriggerEventState>,
452}
453
454#[derive(Default)]
455struct InMemoryTriggerEventState {
456    next_subscription_seq: u64,
457    subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
458    occurrences: BTreeMap<String, TriggerOccurrenceRecord>,
459    occurrence_id_by_idempotency_key: BTreeMap<String, String>,
460    occurrence_hashes: BTreeMap<String, String>,
461    deliveries: BTreeSet<(String, String)>,
462}
463
464#[async_trait::async_trait]
465impl TriggerStore for InMemoryTriggerStore {
466    async fn register_subscription(
467        &self,
468        draft: TriggerSubscriptionDraft,
469    ) -> Result<TriggerSubscriptionRecord, PluginError> {
470        let mut state = self
471            .state
472            .lock()
473            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
474        state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
475        let handle = format!("trigger:{}", state.next_subscription_seq);
476        let subscription_id = format!("subscription:{}", state.next_subscription_seq);
477        let now = crate::runtime::current_epoch_ms();
478        let record = TriggerSubscriptionRecord {
479            subscription_id: subscription_id.clone(),
480            registrant: draft.registrant,
481            env_ref: draft.env_ref,
482            wake_target: draft.wake_target,
483            handle,
484            name: draft.name,
485            source_type: draft.source_type,
486            source_key: draft.source_key,
487            source: draft.source,
488            event_ty: draft.event_ty,
489            module_ref: draft.module_ref,
490            host_requirements_ref: draft.host_requirements_ref,
491            process_ref: draft.process_ref,
492            process_name: draft.process_name,
493            input_template: draft.input_template,
494            enabled: true,
495            created_at_ms: now,
496            updated_at_ms: now,
497        };
498        state.subscriptions.insert(subscription_id, record.clone());
499        Ok(record)
500    }
501
502    async fn list_subscriptions(
503        &self,
504        filter: TriggerSubscriptionFilter,
505    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
506        let state = self
507            .state
508            .lock()
509            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
510        let mut records = state
511            .subscriptions
512            .values()
513            .filter(|record| filter.matches(record))
514            .cloned()
515            .collect::<Vec<_>>();
516        records.sort_by(|left, right| {
517            left.registrant_scope_id()
518                .cmp(&right.registrant_scope_id())
519                .then_with(|| left.handle.cmp(&right.handle))
520        });
521        Ok(records)
522    }
523
524    async fn cancel_subscription(
525        &self,
526        session_id: &str,
527        handle: &str,
528    ) -> Result<bool, PluginError> {
529        let mut state = self
530            .state
531            .lock()
532            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
533        let now = crate::runtime::current_epoch_ms();
534        let Some(record) = state.subscriptions.values_mut().find(|record| {
535            record.registrant_session_id() == Some(session_id) && record.handle == handle
536        }) else {
537            return Ok(false);
538        };
539        let changed = record.enabled;
540        record.enabled = false;
541        record.updated_at_ms = now;
542        Ok(changed)
543    }
544
545    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
546        let mut state = self
547            .state
548            .lock()
549            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
550        let before = state.subscriptions.len();
551        state
552            .subscriptions
553            .retain(|_, record| record.registrant_session_id() != Some(session_id));
554        Ok(before.saturating_sub(state.subscriptions.len()))
555    }
556
557    async fn record_occurrence(
558        &self,
559        request: TriggerOccurrenceRequest,
560    ) -> Result<TriggerOccurrenceRecord, PluginError> {
561        validate_trigger_occurrence_request(&request)?;
562        let mut state = self
563            .state
564            .lock()
565            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
566        let request_hash = trigger_occurrence_request_hash(&request)?;
567        if let Some(existing_id) = state
568            .occurrence_id_by_idempotency_key
569            .get(&request.idempotency_key)
570            .cloned()
571        {
572            let existing_hash = state
573                .occurrence_hashes
574                .get(&existing_id)
575                .cloned()
576                .unwrap_or_default();
577            if existing_hash != request_hash {
578                return Err(PluginError::Session(format!(
579                    "trigger occurrence idempotency conflict for `{}`",
580                    request.idempotency_key
581                )));
582            }
583            return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
584                PluginError::Session(format!(
585                    "missing trigger occurrence `{existing_id}` for idempotency key"
586                ))
587            });
588        }
589        let occurrence_id = deterministic_occurrence_id(&request)?;
590        let record = TriggerOccurrenceRecord {
591            occurrence_id: occurrence_id.clone(),
592            source_type: request.source_type,
593            source_key: request.source_key,
594            payload: request.payload,
595            idempotency_key: request.idempotency_key.clone(),
596            source: request.source,
597            occurred_at_ms: crate::runtime::current_epoch_ms(),
598        };
599        state
600            .occurrence_id_by_idempotency_key
601            .insert(request.idempotency_key, occurrence_id.clone());
602        state
603            .occurrence_hashes
604            .insert(occurrence_id.clone(), request_hash);
605        state.occurrences.insert(occurrence_id, record.clone());
606        Ok(record)
607    }
608
609    async fn reserve_matching_deliveries(
610        &self,
611        occurrence_id: &str,
612    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
613        let mut state = self
614            .state
615            .lock()
616            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
617        let occurrence = state
618            .occurrences
619            .get(occurrence_id)
620            .cloned()
621            .ok_or_else(|| {
622                PluginError::Session(format!("unknown trigger occurrence `{occurrence_id}`"))
623            })?;
624        let subscriptions = state
625            .subscriptions
626            .values()
627            .filter(|record| {
628                record.enabled
629                    && record.source_type == occurrence.source_type
630                    && record.source_key == occurrence.source_key
631            })
632            .cloned()
633            .collect::<Vec<_>>();
634        let mut deliveries = Vec::new();
635        for subscription in subscriptions {
636            let key = (
637                occurrence.occurrence_id.clone(),
638                subscription.subscription_id.clone(),
639            );
640            if !state.deliveries.insert(key) {
641                continue;
642            }
643            let process_id = deterministic_delivery_process_id(
644                &occurrence.occurrence_id,
645                &subscription.subscription_id,
646            )?;
647            deliveries.push(TriggerDeliveryReservation {
648                occurrence: occurrence.clone(),
649                subscription,
650                process_id,
651            });
652        }
653        Ok(deliveries)
654    }
655}
656
657fn default_enabled() -> bool {
658    true
659}
660
661pub fn default_trigger_source_key(
662    source_type: &str,
663    source: &serde_json::Value,
664) -> Result<String, PluginError> {
665    let digest = crate::stable_hash::stable_json_sha256_hex(&(source_type, source))
666        .map_err(|err| PluginError::Session(format!("failed to hash trigger source key: {err}")))?;
667    Ok(format!("source:{source_type}:sha256:{digest}"))
668}
669
670pub fn empty_trigger_source_key(source_type: &str) -> Result<String, PluginError> {
671    default_trigger_source_key(source_type, &serde_json::json!({}))
672}
673
674pub fn deterministic_occurrence_id(
675    request: &TriggerOccurrenceRequest,
676) -> Result<String, PluginError> {
677    let digest = crate::stable_hash::stable_json_sha256_hex(&(
678        request.source_type.as_str(),
679        request.source_key.as_str(),
680        request.idempotency_key.as_str(),
681    ))
682    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))?;
683    Ok(format!("trigger:{digest}"))
684}
685
686pub fn deterministic_delivery_process_id(
687    occurrence_id: &str,
688    subscription_id: &str,
689) -> Result<String, PluginError> {
690    let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
691        .map_err(|err| PluginError::Session(format!("failed to hash trigger delivery: {err}")))?;
692    Ok(format!("process:trigger:{digest}"))
693}
694
695#[derive(Clone)]
696pub struct TriggerRouter {
697    store: Arc<dyn TriggerStore>,
698    artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
699    process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
700    process_work_poke: Option<crate::ProcessWorkPoke>,
701    host_profile_id: String,
702}
703
704impl TriggerRouter {
705    pub fn new(
706        store: Arc<dyn TriggerStore>,
707        artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
708        process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
709        process_work_poke: Option<crate::ProcessWorkPoke>,
710        host_profile_id: impl Into<String>,
711    ) -> Self {
712        Self {
713            store,
714            artifact_store,
715            process_registry,
716            process_work_poke,
717            host_profile_id: host_profile_id.into(),
718        }
719    }
720
721    pub fn store(&self) -> Arc<dyn TriggerStore> {
722        Arc::clone(&self.store)
723    }
724
725    pub async fn emit(
726        &self,
727        request: TriggerOccurrenceRequest,
728        effect_controller: &dyn crate::RuntimeEffectController,
729    ) -> Result<TriggerEmitReport, PluginError> {
730        let occurrence = self.store.record_occurrence(request).await?;
731        let reservations = self
732            .store
733            .reserve_matching_deliveries(&occurrence.occurrence_id)
734            .await?;
735        let Some(process_registry) = self.process_registry.as_ref() else {
736            if reservations.is_empty() {
737                return Ok(TriggerEmitReport::new(occurrence.occurrence_id, Vec::new()));
738            }
739            return Err(PluginError::Session(
740                "trigger delivery requires a process registry".to_string(),
741            ));
742        };
743        let mut started_process_ids = Vec::new();
744        let mut start_errors = Vec::new();
745        for reservation in reservations {
746            let process_id = reservation.process_id.clone();
747            if let Err(err) = self
748                .start_delivery(
749                    &reservation,
750                    Arc::clone(process_registry),
751                    effect_controller,
752                )
753                .await
754            {
755                start_errors.push(format!(
756                    "{}: {err}",
757                    reservation.subscription.subscription_id
758                ));
759                continue;
760            }
761            started_process_ids.push(process_id);
762        }
763        if !started_process_ids.is_empty()
764            && let Some(poke) = self.process_work_poke.as_ref()
765        {
766            poke.poke();
767        }
768        if started_process_ids.is_empty()
769            && let Some(message) = trigger_delivery_failure_summary(&start_errors)
770        {
771            return Err(PluginError::Session(message));
772        }
773        Ok(TriggerEmitReport::new(
774            occurrence.occurrence_id,
775            started_process_ids,
776        ))
777    }
778
779    async fn start_delivery(
780        &self,
781        reservation: &TriggerDeliveryReservation,
782        process_registry: Arc<dyn crate::ProcessRegistry>,
783        effect_controller: &dyn crate::RuntimeEffectController,
784    ) -> Result<(), PluginError> {
785        let subscription = &reservation.subscription;
786        let occurrence = &reservation.occurrence;
787        validate_payload(&occurrence.payload, &subscription.event_ty).map_err(|message| {
788            PluginError::Session(format!(
789                "invalid payload for trigger `{}`: {message}",
790                subscription.handle
791            ))
792        })?;
793        let artifact = self
794            .artifact_store
795            .get_module_artifact(&subscription.module_ref)
796            .await
797            .map_err(|err| {
798                PluginError::Session(format!(
799                    "failed to load trigger target module `{}`: {err}",
800                    subscription.module_ref
801                ))
802            })?
803            .ok_or_else(|| {
804                PluginError::Session(format!(
805                    "missing trigger target module `{}`",
806                    subscription.module_ref
807                ))
808            })?;
809        let signal_event_types = artifact
810            .canonical_ir
811            .process(&subscription.process_name)
812            .map(crate::lashlang_process_signal_event_types)
813            .unwrap_or_default();
814        let args =
815            materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
816        let originator_scope_id = subscription.registrant_scope_id();
817        let trigger_occurrence_invocation = crate::runtime::causal::trigger_occurrence_invocation(
818            &originator_scope_id,
819            &occurrence.occurrence_id,
820        );
821        let registration = crate::ProcessRegistration::new(
822            reservation.process_id.clone(),
823            crate::ProcessInput::LashlangProcess {
824                module_ref: subscription.module_ref.clone(),
825                process_ref: subscription.process_ref.clone(),
826                host_requirements_ref: subscription.host_requirements_ref.clone(),
827                process_name: subscription.process_name.clone(),
828                args,
829            },
830            crate::ProcessProvenance::new(
831                subscription.registrant.clone(),
832                self.host_profile_id.clone(),
833            )
834            .with_caused_by(trigger_occurrence_invocation.causal_ref()),
835        )
836        .with_extra_event_types(
837            crate::lashlang_process_event_types()
838                .into_iter()
839                .chain(signal_event_types),
840        )
841        .with_execution_env_ref(Some(subscription.env_ref.clone()))
842        .with_wake_target(subscription.wake_target.clone());
843        let grant =
844            subscription
845                .wake_target
846                .clone()
847                .map(|session_scope| crate::ProcessStartGrant {
848                    session_scope,
849                    descriptor: crate::ProcessHandleDescriptor::new(
850                        Some("lashlang"),
851                        Some(subscription.process_name.as_str()),
852                    ),
853                });
854        let execution_context = crate::ProcessExecutionContext::default()
855            .with_causal_invocation(Some(trigger_occurrence_invocation));
856        let command = crate::ProcessCommand::Start {
857            registration,
858            grant,
859            execution_context: Box::new(execution_context),
860        };
861        let effect_id = command.effect_id();
862        let invocation = crate::RuntimeInvocation::effect(
863            crate::RuntimeScope::new(originator_scope_id),
864            effect_id.clone(),
865            crate::RuntimeEffectKind::Process,
866            format!(
867                "trigger:{}:{}",
868                occurrence.occurrence_id, subscription.subscription_id
869            ),
870        )
871        .with_caused_by(Some(crate::CausalRef::TriggerOccurrence {
872            occurrence_id: occurrence.occurrence_id.clone(),
873        }));
874        let outcome = effect_controller
875            .execute_effect(
876                crate::RuntimeEffectEnvelope::new(
877                    invocation,
878                    crate::RuntimeEffectCommand::process(command),
879                ),
880                crate::RuntimeEffectLocalExecutor::processes(process_registry),
881            )
882            .await?;
883        match outcome {
884            crate::RuntimeEffectOutcome::Process {
885                result: crate::ProcessEffectOutcome::Start { .. },
886            } => Ok(()),
887            other => Err(PluginError::Session(format!(
888                "trigger process start returned the wrong outcome: {}",
889                other.kind().as_str()
890            ))),
891        }
892    }
893}
894
895fn trigger_delivery_failure_summary(errors: &[String]) -> Option<String> {
896    match errors {
897        [] => None,
898        [only] => Some(format!("trigger delivery failed: {only}")),
899        [first, rest @ ..] => Some(format!(
900            "trigger delivery failed for {} matching subscriptions: {first}; {} more failed",
901            errors.len(),
902            rest.len()
903        )),
904    }
905}
906
907fn materialize_trigger_process_args(
908    input_template: &lashlang::TriggerInputTemplate,
909    event_payload: &serde_json::Value,
910) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
911    let mut args = lashlang::Record::default();
912    for (input_name, input) in input_template.entries() {
913        let value = match input {
914            lashlang::TriggerInputBinding::Event => event_payload.clone(),
915            lashlang::TriggerInputBinding::Fixed { value } => value.clone(),
916        };
917        args.insert(input_name.to_string(), lashlang::from_json(value));
918    }
919    match serde_json::to_value(lashlang::Value::Record(Arc::new(args)))
920        .map_err(|err| PluginError::Session(format!("serialize trigger process args: {err}")))?
921    {
922        serde_json::Value::Object(map) => Ok(map),
923        _ => Err(PluginError::Session(
924            "trigger process args must serialize as an object".to_string(),
925        )),
926    }
927}
928
929pub fn validate_payload(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> Result<(), String> {
930    if json_matches_type(value, ty) {
931        Ok(())
932    } else {
933        Err(format!("expected {}", lashlang::format_type_expr(ty)))
934    }
935}
936
937fn json_matches_type(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> bool {
938    match ty {
939        lashlang::TypeExpr::Any => true,
940        lashlang::TypeExpr::Ref(_) => false,
941        lashlang::TypeExpr::Str => value.is_string(),
942        lashlang::TypeExpr::Int => value.as_i64().is_some() || value.as_u64().is_some(),
943        lashlang::TypeExpr::Float => value.is_number(),
944        lashlang::TypeExpr::Bool => value.is_boolean(),
945        lashlang::TypeExpr::Dict => value.is_object(),
946        lashlang::TypeExpr::Null => value.is_null(),
947        lashlang::TypeExpr::Enum(values) => value
948            .as_str()
949            .is_some_and(|value| values.iter().any(|candidate| candidate.as_str() == value)),
950        lashlang::TypeExpr::List(item) => value.as_array().is_some_and(|items| {
951            items
952                .iter()
953                .all(|item_value| json_matches_type(item_value, item))
954        }),
955        lashlang::TypeExpr::Object(fields) => {
956            let Some(map) = value.as_object() else {
957                return false;
958            };
959            fields
960                .iter()
961                .all(|field| match map.get(field.name.as_str()) {
962                    Some(field_value) => json_matches_type(field_value, &field.ty),
963                    None => field.optional,
964                })
965        }
966        lashlang::TypeExpr::Union(items) => items.iter().any(|item| json_matches_type(value, item)),
967        lashlang::TypeExpr::Process { .. } | lashlang::TypeExpr::TriggerHandle(_) => {
968            value.is_object()
969        }
970    }
971}
972
973pub fn validate_trigger_occurrence_request(
974    request: &TriggerOccurrenceRequest,
975) -> Result<(), PluginError> {
976    if request.source_type.trim().is_empty() {
977        return Err(PluginError::Session(
978            "trigger occurrence requires source_type".to_string(),
979        ));
980    }
981    if request.source_key.trim().is_empty() {
982        return Err(PluginError::Session(
983            "trigger occurrence requires source_key".to_string(),
984        ));
985    }
986    if request.idempotency_key.trim().is_empty() {
987        return Err(PluginError::Session(
988            "trigger occurrence requires idempotency_key".to_string(),
989        ));
990    }
991    Ok(())
992}
993
994pub fn trigger_occurrence_request_hash(
995    request: &TriggerOccurrenceRequest,
996) -> Result<String, PluginError> {
997    crate::stable_hash::stable_json_sha256_hex(&(
998        request.source_type.as_str(),
999        request.source_key.as_str(),
1000        &request.payload,
1001        &request.source,
1002    ))
1003    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use super::*;
1009
1010    fn button_payload_type() -> lashlang::NamedDataType {
1011        lashlang::NamedDataType::object(
1012            "ui.button.Pressed",
1013            vec![lashlang::TypeField {
1014                name: "button".into(),
1015                ty: lashlang::TypeExpr::Str,
1016                optional: false,
1017            }],
1018        )
1019        .expect("valid trigger occurrence payload")
1020    }
1021
1022    #[test]
1023    fn trigger_catalog_rejects_duplicate_trigger_source_identity() {
1024        let mut catalog = TriggerEventCatalog::new();
1025        catalog
1026            .declare(TriggerEvent::new(
1027                "Button",
1028                "ui.button",
1029                "pressed",
1030                button_payload_type(),
1031            ))
1032            .expect("first trigger occurrence");
1033
1034        let err = catalog
1035            .declare(TriggerEvent::new(
1036                "AlternateButton",
1037                "ui.button",
1038                "pressed",
1039                button_payload_type(),
1040            ))
1041            .expect_err("duplicate public source identity should be rejected");
1042
1043        assert!(err.contains("duplicate trigger source `ui.button.pressed`"));
1044    }
1045
1046    #[test]
1047    fn trigger_subscription_record_rejects_legacy_required_surface_ref() {
1048        let mut inputs = BTreeMap::new();
1049        inputs.insert("event".to_string(), lashlang::TriggerInputBinding::Event);
1050        let record = TriggerSubscriptionRecord {
1051            subscription_id: "subscription:1".to_string(),
1052            registrant: crate::ProcessOriginator::session(crate::SessionScope::new("session-a")),
1053            env_ref: crate::ProcessExecutionEnvRef::new("process-env:session-a"),
1054            wake_target: Some(crate::SessionScope::new("session-a")),
1055            handle: "trigger:1".to_string(),
1056            name: Some("button watcher".to_string()),
1057            source_type: "ui.button.pressed".to_string(),
1058            source_key: empty_trigger_source_key("ui.button.pressed").expect("source key"),
1059            source: serde_json::json!({}),
1060            event_ty: lashlang::TypeExpr::Object(vec![lashlang::TypeField {
1061                name: "button".into(),
1062                ty: lashlang::TypeExpr::Str,
1063                optional: false,
1064            }]),
1065            module_ref: lashlang::ModuleRef::new(&lashlang::ContentHash::new("module")),
1066            host_requirements_ref: lashlang::HostRequirementsRef::new(&lashlang::ContentHash::new(
1067                "surface",
1068            )),
1069            process_ref: lashlang::ProcessRef::new(lashlang::ContentHash::new("process"), 0),
1070            process_name: "on_button".to_string(),
1071            input_template: lashlang::TriggerInputTemplate::new(inputs),
1072            enabled: true,
1073            created_at_ms: 1,
1074            updated_at_ms: 1,
1075        };
1076        let mut value = serde_json::to_value(record).expect("record json");
1077        let object = value.as_object_mut().expect("record object");
1078        let legacy_ref = object
1079            .remove("host_requirements_ref")
1080            .expect("host requirements ref");
1081        object.insert("required_surface_ref".to_string(), legacy_ref);
1082
1083        let err = serde_json::from_value::<TriggerSubscriptionRecord>(value)
1084            .expect_err("legacy record must be malformed");
1085
1086        assert!(err.to_string().contains("host_requirements_ref"));
1087    }
1088}