Skip to main content

lash_core/
triggers.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
9pub struct TriggerEvent {
10    pub resource_type: String,
11    pub alias: String,
12    pub event: String,
13    pub payload_schema: crate::LashSchema,
14}
15
16impl TriggerEvent {
17    pub fn new(
18        resource_type: impl Into<String>,
19        alias: impl Into<String>,
20        event: impl Into<String>,
21        payload_schema: crate::LashSchema,
22    ) -> Self {
23        Self {
24            resource_type: resource_type.into(),
25            alias: alias.into(),
26            event: event.into(),
27            payload_schema,
28        }
29    }
30
31    pub fn payload_schema(&self) -> &crate::LashSchema {
32        &self.payload_schema
33    }
34
35    pub fn key(&self) -> TriggerEventKey {
36        TriggerEventKey {
37            resource_type: self.resource_type.clone(),
38            alias: self.alias.clone(),
39            event: self.event.clone(),
40        }
41    }
42
43    pub fn source_type(&self) -> String {
44        trigger_event_type(&self.alias, &self.event)
45    }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct TriggerEventKey {
50    pub resource_type: String,
51    pub alias: String,
52    pub event: String,
53}
54
55impl TriggerEventKey {
56    pub fn new(
57        resource_type: impl Into<String>,
58        alias: impl Into<String>,
59        event: impl Into<String>,
60    ) -> Self {
61        Self {
62            resource_type: resource_type.into(),
63            alias: alias.into(),
64            event: event.into(),
65        }
66    }
67
68    pub fn source_type(&self) -> String {
69        trigger_event_type(&self.alias, &self.event)
70    }
71}
72
73pub fn trigger_event_type(alias: &str, event: &str) -> String {
74    format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
78pub struct TriggerEventCatalog {
79    events: BTreeMap<TriggerEventKey, TriggerEvent>,
80}
81
82impl TriggerEventCatalog {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    pub fn declare(&mut self, event: TriggerEvent) -> Result<(), String> {
88        let key = event.key();
89        if self.events.contains_key(&key) {
90            return Err(format!(
91                "duplicate trigger occurrence `{}.{}.{}`",
92                key.resource_type, key.alias, key.event
93            ));
94        }
95        let source_type = event.source_type();
96        if let Some(existing) = self
97            .events
98            .values()
99            .find(|existing| existing.source_type() == source_type)
100        {
101            return Err(format!(
102                "duplicate trigger source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103                existing.resource_type,
104                existing.alias,
105                existing.event,
106                key.resource_type,
107                key.alias,
108                key.event
109            ));
110        }
111        self.events.insert(key, event);
112        Ok(())
113    }
114
115    pub fn from_events(events: impl IntoIterator<Item = TriggerEvent>) -> Result<Self, String> {
116        let mut catalog = Self::new();
117        for event in events {
118            catalog.declare(event)?;
119        }
120        Ok(catalog)
121    }
122
123    pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&TriggerEvent> {
124        self.events
125            .get(&TriggerEventKey::new(resource_type, alias, event))
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.events.is_empty()
130    }
131
132    pub fn events(&self) -> impl Iterator<Item = &TriggerEvent> {
133        self.events.values()
134    }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerEmitReport {
139    #[serde(default, skip_serializing_if = "String::is_empty")]
140    pub occurrence_id: String,
141    pub started_process_ids: Vec<String>,
142}
143
144impl TriggerEmitReport {
145    pub fn empty() -> Self {
146        Self::default()
147    }
148
149    fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150        Self {
151            occurrence_id,
152            started_process_ids,
153        }
154    }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct TriggerOccurrenceRequest {
159    pub source_type: String,
160    pub source_key: String,
161    #[serde(default)]
162    pub payload: serde_json::Value,
163    pub idempotency_key: String,
164    #[serde(default, skip_serializing_if = "Option::is_none")]
165    pub source: Option<serde_json::Value>,
166}
167
168impl TriggerOccurrenceRequest {
169    pub fn new(
170        source_type: impl Into<String>,
171        source_key: impl Into<String>,
172        payload: serde_json::Value,
173        idempotency_key: impl Into<String>,
174    ) -> Self {
175        Self {
176            source_type: source_type.into(),
177            source_key: source_key.into(),
178            payload,
179            idempotency_key: idempotency_key.into(),
180            source: None,
181        }
182    }
183
184    pub fn with_source(mut self, source: serde_json::Value) -> Self {
185        self.source = Some(source);
186        self
187    }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct TriggerOccurrenceRecord {
192    pub occurrence_id: String,
193    pub source_type: String,
194    pub source_key: String,
195    #[serde(default)]
196    pub payload: serde_json::Value,
197    pub idempotency_key: String,
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub source: Option<serde_json::Value>,
200    pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerEventType(String);
206
207impl TriggerEventType {
208    pub fn new(value: impl Into<String>) -> Self {
209        Self(value.into())
210    }
211
212    pub fn as_str(&self) -> &str {
213        &self.0
214    }
215}
216
217impl From<String> for TriggerEventType {
218    fn from(value: String) -> Self {
219        Self::new(value)
220    }
221}
222
223impl From<&str> for TriggerEventType {
224    fn from(value: &str) -> Self {
225        Self::new(value)
226    }
227}
228
229impl AsRef<str> for TriggerEventType {
230    fn as_ref(&self) -> &str {
231        self.as_str()
232    }
233}
234
235impl std::fmt::Display for TriggerEventType {
236    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        formatter.write_str(self.as_str())
238    }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243    pub handle: String,
244    pub source_key: String,
245    #[serde(default, skip_serializing_if = "Option::is_none")]
246    pub name: Option<String>,
247    pub source_type: TriggerEventType,
248    pub source: serde_json::Value,
249    pub target: TriggerTargetSummary,
250    #[serde(default = "default_enabled")]
251    pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256    pub label: Option<String>,
257    pub identity: crate::ProcessIdentity,
258    pub input: crate::ProcessInput,
259    #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
260    pub inputs: BTreeMap<String, TriggerInputBinding>,
261}
262
263#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
264#[serde(tag = "type", rename_all = "snake_case")]
265pub enum TriggerInputBinding {
266    Event,
267    Fixed { value: serde_json::Value },
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct TriggerSubscriptionDraft {
272    pub registrant: crate::ProcessOriginator,
273    pub env_ref: crate::ProcessExecutionEnvRef,
274    #[serde(default, skip_serializing_if = "Option::is_none")]
275    pub wake_target: Option<crate::SessionScope>,
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub name: Option<String>,
278    pub source_type: String,
279    pub source_key: String,
280    pub source: serde_json::Value,
281    pub payload_schema: crate::LashSchema,
282    pub target: crate::ProcessInput,
283    pub target_identity: crate::ProcessIdentity,
284    #[serde(default)]
285    pub event_types: Vec<crate::ProcessEventType>,
286    #[serde(default)]
287    pub input_template: BTreeMap<String, TriggerInputBinding>,
288    #[serde(default, skip_serializing_if = "Option::is_none")]
289    pub target_label: Option<String>,
290}
291
292#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
293pub struct TriggerSubscriptionRecord {
294    pub subscription_id: String,
295    pub registrant: crate::ProcessOriginator,
296    pub env_ref: crate::ProcessExecutionEnvRef,
297    #[serde(default, skip_serializing_if = "Option::is_none")]
298    pub wake_target: Option<crate::SessionScope>,
299    pub handle: String,
300    #[serde(default, skip_serializing_if = "Option::is_none")]
301    pub name: Option<String>,
302    pub source_type: String,
303    pub source_key: String,
304    pub source: serde_json::Value,
305    pub payload_schema: crate::LashSchema,
306    pub target: crate::ProcessInput,
307    pub target_identity: crate::ProcessIdentity,
308    #[serde(default)]
309    pub event_types: Vec<crate::ProcessEventType>,
310    #[serde(default)]
311    pub input_template: BTreeMap<String, TriggerInputBinding>,
312    #[serde(default, skip_serializing_if = "Option::is_none")]
313    pub target_label: Option<String>,
314    #[serde(default = "default_enabled")]
315    pub enabled: bool,
316    pub created_at_ms: u64,
317    pub updated_at_ms: u64,
318}
319
320impl TriggerSubscriptionRecord {
321    pub fn registrant_scope_id(&self) -> String {
322        self.registrant.scope_id()
323    }
324
325    pub fn registrant_session_id(&self) -> Option<&str> {
326        match &self.registrant {
327            crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
328            crate::ProcessOriginator::Host => None,
329        }
330    }
331}
332
333impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
334    fn from(route: &TriggerSubscriptionRecord) -> Self {
335        Self {
336            handle: route.handle.clone(),
337            source_key: route.source_key.clone(),
338            name: route.name.clone(),
339            source_type: TriggerEventType::new(route.source_type.clone()),
340            source: route.source.clone(),
341            target: TriggerTargetSummary {
342                label: route.target_label.clone(),
343                identity: route.target_identity.clone(),
344                input: route.target.clone(),
345                inputs: route.input_template.clone(),
346            },
347            enabled: route.enabled,
348        }
349    }
350}
351
352#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
353pub struct TriggerSubscriptionFilter {
354    #[serde(default, skip_serializing_if = "Option::is_none")]
355    pub session_id: Option<String>,
356    #[serde(default, skip_serializing_if = "Option::is_none")]
357    pub handle: Option<String>,
358    #[serde(default, skip_serializing_if = "Option::is_none")]
359    pub name: Option<String>,
360    #[serde(default, skip_serializing_if = "Option::is_none")]
361    pub source_type: Option<String>,
362    #[serde(default, skip_serializing_if = "Option::is_none")]
363    pub source_key: Option<String>,
364    #[serde(default, skip_serializing_if = "Option::is_none")]
365    pub target: Option<serde_json::Value>,
366    #[serde(default, skip_serializing_if = "Option::is_none")]
367    pub enabled: Option<bool>,
368}
369
370impl TriggerSubscriptionFilter {
371    pub fn for_session(session_id: impl Into<String>) -> Self {
372        Self {
373            session_id: Some(session_id.into()),
374            ..Self::default()
375        }
376    }
377
378    pub fn for_source_type(source_type: impl Into<String>) -> Self {
379        Self {
380            source_type: Some(source_type.into()),
381            ..Self::default()
382        }
383    }
384
385    pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
386        self.session_id
387            .as_deref()
388            .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
389            && self
390                .handle
391                .as_deref()
392                .is_none_or(|handle| record.handle == handle)
393            && self
394                .name
395                .as_deref()
396                .is_none_or(|name| record.name.as_deref() == Some(name))
397            && self
398                .source_type
399                .as_deref()
400                .is_none_or(|source_type| record.source_type == source_type)
401            && self
402                .source_key
403                .as_deref()
404                .is_none_or(|source_key| record.source_key == source_key)
405            && self.enabled.is_none_or(|enabled| record.enabled == enabled)
406            && self
407                .target
408                .as_ref()
409                .is_none_or(|target| record.target_identity.definition.as_ref() == Some(target))
410    }
411}
412
413#[derive(Clone, Debug, Serialize, Deserialize)]
414pub struct TriggerDeliveryReservation {
415    pub occurrence: TriggerOccurrenceRecord,
416    pub subscription: TriggerSubscriptionRecord,
417    pub process_id: String,
418}
419
420#[async_trait::async_trait]
421pub trait TriggerStore: Send + Sync {
422    fn durability_tier(&self) -> crate::DurabilityTier {
423        crate::DurabilityTier::Inline
424    }
425
426    async fn source_key_for_subscription(
427        &self,
428        source_type: &str,
429        source: &serde_json::Value,
430    ) -> Result<String, PluginError> {
431        default_trigger_source_key(source_type, source)
432    }
433
434    async fn register_subscription(
435        &self,
436        draft: TriggerSubscriptionDraft,
437    ) -> Result<TriggerSubscriptionRecord, PluginError>;
438
439    async fn list_subscriptions(
440        &self,
441        filter: TriggerSubscriptionFilter,
442    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
443
444    async fn cancel_subscription(
445        &self,
446        session_id: &str,
447        handle: &str,
448    ) -> Result<bool, PluginError>;
449
450    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
451
452    async fn record_occurrence(
453        &self,
454        request: TriggerOccurrenceRequest,
455    ) -> Result<TriggerOccurrenceRecord, PluginError>;
456
457    async fn reserve_matching_deliveries(
458        &self,
459        occurrence_id: &str,
460    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
461}
462
463#[derive(Default)]
464pub struct InMemoryTriggerStore {
465    state: Mutex<InMemoryTriggerEventState>,
466}
467
468#[derive(Default)]
469struct InMemoryTriggerEventState {
470    next_subscription_seq: u64,
471    subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
472    occurrences: BTreeMap<String, TriggerOccurrenceRecord>,
473    occurrence_id_by_idempotency_key: BTreeMap<String, String>,
474    occurrence_hashes: BTreeMap<String, String>,
475    deliveries: BTreeSet<(String, String)>,
476}
477
478#[async_trait::async_trait]
479impl TriggerStore for InMemoryTriggerStore {
480    async fn register_subscription(
481        &self,
482        draft: TriggerSubscriptionDraft,
483    ) -> Result<TriggerSubscriptionRecord, PluginError> {
484        let mut state = self
485            .state
486            .lock()
487            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
488        state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
489        let handle = format!("trigger:{}", state.next_subscription_seq);
490        let subscription_id = format!("subscription:{}", state.next_subscription_seq);
491        let now = crate::runtime::current_epoch_ms();
492        let record = TriggerSubscriptionRecord {
493            subscription_id: subscription_id.clone(),
494            registrant: draft.registrant,
495            env_ref: draft.env_ref,
496            wake_target: draft.wake_target,
497            handle,
498            name: draft.name,
499            source_type: draft.source_type,
500            source_key: draft.source_key,
501            source: draft.source,
502            payload_schema: draft.payload_schema,
503            target: draft.target,
504            target_identity: draft.target_identity,
505            event_types: draft.event_types,
506            input_template: draft.input_template,
507            target_label: draft.target_label,
508            enabled: true,
509            created_at_ms: now,
510            updated_at_ms: now,
511        };
512        state.subscriptions.insert(subscription_id, record.clone());
513        Ok(record)
514    }
515
516    async fn list_subscriptions(
517        &self,
518        filter: TriggerSubscriptionFilter,
519    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
520        let state = self
521            .state
522            .lock()
523            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
524        let mut records = state
525            .subscriptions
526            .values()
527            .filter(|record| filter.matches(record))
528            .cloned()
529            .collect::<Vec<_>>();
530        records.sort_by(|left, right| {
531            left.registrant_scope_id()
532                .cmp(&right.registrant_scope_id())
533                .then_with(|| left.handle.cmp(&right.handle))
534        });
535        Ok(records)
536    }
537
538    async fn cancel_subscription(
539        &self,
540        session_id: &str,
541        handle: &str,
542    ) -> Result<bool, PluginError> {
543        let mut state = self
544            .state
545            .lock()
546            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
547        let now = crate::runtime::current_epoch_ms();
548        let Some(record) = state.subscriptions.values_mut().find(|record| {
549            record.registrant_session_id() == Some(session_id) && record.handle == handle
550        }) else {
551            return Ok(false);
552        };
553        let changed = record.enabled;
554        record.enabled = false;
555        record.updated_at_ms = now;
556        Ok(changed)
557    }
558
559    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
560        let mut state = self
561            .state
562            .lock()
563            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
564        let before = state.subscriptions.len();
565        state
566            .subscriptions
567            .retain(|_, record| record.registrant_session_id() != Some(session_id));
568        Ok(before.saturating_sub(state.subscriptions.len()))
569    }
570
571    async fn record_occurrence(
572        &self,
573        request: TriggerOccurrenceRequest,
574    ) -> Result<TriggerOccurrenceRecord, PluginError> {
575        validate_trigger_occurrence_request(&request)?;
576        let mut state = self
577            .state
578            .lock()
579            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
580        let request_hash = trigger_occurrence_request_hash(&request)?;
581        if let Some(existing_id) = state
582            .occurrence_id_by_idempotency_key
583            .get(&request.idempotency_key)
584            .cloned()
585        {
586            let existing_hash = state
587                .occurrence_hashes
588                .get(&existing_id)
589                .cloned()
590                .unwrap_or_default();
591            if existing_hash != request_hash {
592                return Err(PluginError::Session(format!(
593                    "trigger occurrence idempotency conflict for `{}`",
594                    request.idempotency_key
595                )));
596            }
597            return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
598                PluginError::Session(format!(
599                    "missing trigger occurrence `{existing_id}` for idempotency key"
600                ))
601            });
602        }
603        let occurrence_id = deterministic_occurrence_id(&request)?;
604        let record = TriggerOccurrenceRecord {
605            occurrence_id: occurrence_id.clone(),
606            source_type: request.source_type,
607            source_key: request.source_key,
608            payload: request.payload,
609            idempotency_key: request.idempotency_key.clone(),
610            source: request.source,
611            occurred_at_ms: crate::runtime::current_epoch_ms(),
612        };
613        state
614            .occurrence_id_by_idempotency_key
615            .insert(request.idempotency_key, occurrence_id.clone());
616        state
617            .occurrence_hashes
618            .insert(occurrence_id.clone(), request_hash);
619        state.occurrences.insert(occurrence_id, record.clone());
620        Ok(record)
621    }
622
623    async fn reserve_matching_deliveries(
624        &self,
625        occurrence_id: &str,
626    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
627        let mut state = self
628            .state
629            .lock()
630            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
631        let occurrence = state
632            .occurrences
633            .get(occurrence_id)
634            .cloned()
635            .ok_or_else(|| {
636                PluginError::Session(format!("unknown trigger occurrence `{occurrence_id}`"))
637            })?;
638        let subscriptions = state
639            .subscriptions
640            .values()
641            .filter(|record| {
642                record.enabled
643                    && record.source_type == occurrence.source_type
644                    && record.source_key == occurrence.source_key
645            })
646            .cloned()
647            .collect::<Vec<_>>();
648        let mut deliveries = Vec::new();
649        for subscription in subscriptions {
650            let key = (
651                occurrence.occurrence_id.clone(),
652                subscription.subscription_id.clone(),
653            );
654            if !state.deliveries.insert(key) {
655                continue;
656            }
657            let process_id = deterministic_delivery_process_id(
658                &occurrence.occurrence_id,
659                &subscription.subscription_id,
660            )?;
661            deliveries.push(TriggerDeliveryReservation {
662                occurrence: occurrence.clone(),
663                subscription,
664                process_id,
665            });
666        }
667        Ok(deliveries)
668    }
669}
670
671fn default_enabled() -> bool {
672    true
673}
674
675pub fn default_trigger_source_key(
676    source_type: &str,
677    source: &serde_json::Value,
678) -> Result<String, PluginError> {
679    let digest = crate::stable_hash::stable_json_sha256_hex(&(source_type, source))
680        .map_err(|err| PluginError::Session(format!("failed to hash trigger source key: {err}")))?;
681    Ok(format!("source:{source_type}:sha256:{digest}"))
682}
683
684pub fn empty_trigger_source_key(source_type: &str) -> Result<String, PluginError> {
685    default_trigger_source_key(source_type, &serde_json::json!({}))
686}
687
688pub fn deterministic_occurrence_id(
689    request: &TriggerOccurrenceRequest,
690) -> Result<String, PluginError> {
691    let digest = crate::stable_hash::stable_json_sha256_hex(&(
692        request.source_type.as_str(),
693        request.source_key.as_str(),
694        request.idempotency_key.as_str(),
695    ))
696    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))?;
697    Ok(format!("trigger:{digest}"))
698}
699
700pub fn deterministic_delivery_process_id(
701    occurrence_id: &str,
702    subscription_id: &str,
703) -> Result<String, PluginError> {
704    let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
705        .map_err(|err| PluginError::Session(format!("failed to hash trigger delivery: {err}")))?;
706    Ok(format!("process:trigger:{digest}"))
707}
708
709#[derive(Clone)]
710pub struct TriggerRouter {
711    store: Arc<dyn TriggerStore>,
712    process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
713    process_work_poke: Option<crate::ProcessWorkPoke>,
714}
715
716impl TriggerRouter {
717    pub fn new(
718        store: Arc<dyn TriggerStore>,
719        process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
720        process_work_poke: Option<crate::ProcessWorkPoke>,
721    ) -> Self {
722        Self {
723            store,
724            process_registry,
725            process_work_poke,
726        }
727    }
728
729    pub fn store(&self) -> Arc<dyn TriggerStore> {
730        Arc::clone(&self.store)
731    }
732
733    pub async fn emit(
734        &self,
735        request: TriggerOccurrenceRequest,
736        effect_controller: &dyn crate::RuntimeEffectController,
737    ) -> Result<TriggerEmitReport, PluginError> {
738        let occurrence = self.store.record_occurrence(request).await?;
739        let reservations = self
740            .store
741            .reserve_matching_deliveries(&occurrence.occurrence_id)
742            .await?;
743        let Some(process_registry) = self.process_registry.as_ref() else {
744            if reservations.is_empty() {
745                return Ok(TriggerEmitReport::new(occurrence.occurrence_id, Vec::new()));
746            }
747            return Err(PluginError::Session(
748                "trigger delivery requires a process registry".to_string(),
749            ));
750        };
751        let mut started_process_ids = Vec::new();
752        let mut start_errors = Vec::new();
753        for reservation in reservations {
754            let process_id = reservation.process_id.clone();
755            if let Err(err) = self
756                .start_delivery(
757                    &reservation,
758                    Arc::clone(process_registry),
759                    effect_controller,
760                )
761                .await
762            {
763                start_errors.push(format!(
764                    "{}: {err}",
765                    reservation.subscription.subscription_id
766                ));
767                continue;
768            }
769            started_process_ids.push(process_id);
770        }
771        if !started_process_ids.is_empty()
772            && let Some(poke) = self.process_work_poke.as_ref()
773        {
774            poke.poke();
775        }
776        if started_process_ids.is_empty()
777            && let Some(message) = trigger_delivery_failure_summary(&start_errors)
778        {
779            return Err(PluginError::Session(message));
780        }
781        Ok(TriggerEmitReport::new(
782            occurrence.occurrence_id,
783            started_process_ids,
784        ))
785    }
786
787    async fn start_delivery(
788        &self,
789        reservation: &TriggerDeliveryReservation,
790        process_registry: Arc<dyn crate::ProcessRegistry>,
791        effect_controller: &dyn crate::RuntimeEffectController,
792    ) -> Result<(), PluginError> {
793        let subscription = &reservation.subscription;
794        let occurrence = &reservation.occurrence;
795        subscription
796            .payload_schema
797            .validate(&occurrence.payload)
798            .map_err(|err| {
799                PluginError::Session(format!(
800                    "invalid payload for trigger `{}`: {err}",
801                    subscription.handle
802                ))
803            })?;
804        let args =
805            materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
806        let target = apply_trigger_inputs(subscription.target.clone(), args)?;
807        let originator_scope_id = subscription.registrant_scope_id();
808        let trigger_occurrence_invocation = crate::runtime::causal::trigger_occurrence_invocation(
809            &originator_scope_id,
810            &occurrence.occurrence_id,
811        );
812        let registration = crate::ProcessRegistration::new(
813            reservation.process_id.clone(),
814            target.clone(),
815            crate::ProcessProvenance::new(subscription.registrant.clone())
816                .with_caused_by(trigger_occurrence_invocation.causal_ref()),
817        )
818        .with_identity(subscription.target_identity.clone())
819        .with_extra_event_types(subscription.event_types.clone())
820        .with_execution_env_ref(Some(subscription.env_ref.clone()))
821        .with_wake_target(subscription.wake_target.clone());
822        let descriptor_kind = subscription.target_identity.kind.clone();
823        let grant =
824            subscription
825                .wake_target
826                .clone()
827                .map(|session_scope| crate::ProcessStartGrant {
828                    session_scope,
829                    descriptor: crate::ProcessHandleDescriptor::new(
830                        Some(descriptor_kind.as_str()),
831                        subscription.target_label.as_deref(),
832                    ),
833                });
834        let execution_context = crate::ProcessExecutionContext::default()
835            .with_causal_invocation(Some(trigger_occurrence_invocation));
836        let command = crate::ProcessCommand::Start {
837            registration,
838            grant,
839            execution_context: Box::new(execution_context),
840        };
841        let effect_id = command.effect_id();
842        let invocation = crate::RuntimeInvocation::effect(
843            crate::RuntimeScope::new(originator_scope_id),
844            effect_id.clone(),
845            crate::RuntimeEffectKind::Process,
846            format!(
847                "trigger:{}:{}",
848                occurrence.occurrence_id, subscription.subscription_id
849            ),
850        )
851        .with_caused_by(Some(crate::CausalRef::TriggerOccurrence {
852            occurrence_id: occurrence.occurrence_id.clone(),
853        }));
854        let outcome = effect_controller
855            .execute_effect(
856                crate::RuntimeEffectEnvelope::new(
857                    invocation,
858                    crate::RuntimeEffectCommand::process(command),
859                ),
860                crate::RuntimeEffectLocalExecutor::processes(process_registry),
861            )
862            .await?;
863        match outcome {
864            crate::RuntimeEffectOutcome::Process {
865                result: crate::ProcessEffectOutcome::Start { .. },
866            } => Ok(()),
867            other => Err(PluginError::Session(format!(
868                "trigger process start returned the wrong outcome: {}",
869                other.kind().as_str()
870            ))),
871        }
872    }
873}
874
875fn trigger_delivery_failure_summary(errors: &[String]) -> Option<String> {
876    match errors {
877        [] => None,
878        [only] => Some(format!("trigger delivery failed: {only}")),
879        [first, rest @ ..] => Some(format!(
880            "trigger delivery failed for {} matching subscriptions: {first}; {} more failed",
881            errors.len(),
882            rest.len()
883        )),
884    }
885}
886
887fn materialize_trigger_process_args(
888    input_template: &BTreeMap<String, TriggerInputBinding>,
889    event_payload: &serde_json::Value,
890) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
891    let mut args = serde_json::Map::new();
892    for (input_name, input) in input_template {
893        let value = match input {
894            TriggerInputBinding::Event => event_payload.clone(),
895            TriggerInputBinding::Fixed { value } => value.clone(),
896        };
897        args.insert(input_name.to_string(), value);
898    }
899    Ok(args)
900}
901
902fn apply_trigger_inputs(
903    mut target: crate::ProcessInput,
904    args: serde_json::Map<String, serde_json::Value>,
905) -> Result<crate::ProcessInput, PluginError> {
906    match &mut target {
907        crate::ProcessInput::Engine { payload, .. } => {
908            let object = payload.as_object_mut().ok_or_else(|| {
909                PluginError::Session(
910                    "trigger engine target payload must be a JSON object".to_string(),
911                )
912            })?;
913            object.insert("args".to_string(), serde_json::Value::Object(args));
914            Ok(target)
915        }
916        other => Err(PluginError::Session(format!(
917            "trigger target must be an engine process, got {}",
918            other.engine_kind()
919        ))),
920    }
921}
922
923pub fn validate_trigger_occurrence_request(
924    request: &TriggerOccurrenceRequest,
925) -> Result<(), PluginError> {
926    if request.source_type.trim().is_empty() {
927        return Err(PluginError::Session(
928            "trigger occurrence requires source_type".to_string(),
929        ));
930    }
931    if request.source_key.trim().is_empty() {
932        return Err(PluginError::Session(
933            "trigger occurrence requires source_key".to_string(),
934        ));
935    }
936    if request.idempotency_key.trim().is_empty() {
937        return Err(PluginError::Session(
938            "trigger occurrence requires idempotency_key".to_string(),
939        ));
940    }
941    Ok(())
942}
943
944pub fn trigger_occurrence_request_hash(
945    request: &TriggerOccurrenceRequest,
946) -> Result<String, PluginError> {
947    crate::stable_hash::stable_json_sha256_hex(&(
948        request.source_type.as_str(),
949        request.source_key.as_str(),
950        &request.payload,
951        &request.source,
952    ))
953    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))
954}
955
956#[cfg(test)]
957mod tests {
958    use super::*;
959
960    fn button_payload_schema() -> crate::LashSchema {
961        crate::LashSchema::any()
962    }
963
964    #[test]
965    fn trigger_catalog_rejects_duplicate_trigger_source_identity() {
966        let mut catalog = TriggerEventCatalog::new();
967        catalog
968            .declare(TriggerEvent::new(
969                "Button",
970                "ui.button",
971                "pressed",
972                button_payload_schema(),
973            ))
974            .expect("first trigger occurrence");
975
976        let err = catalog
977            .declare(TriggerEvent::new(
978                "AlternateButton",
979                "ui.button",
980                "pressed",
981                button_payload_schema(),
982            ))
983            .expect_err("duplicate public source identity should be rejected");
984
985        assert!(err.contains("duplicate trigger source `ui.button.pressed`"));
986    }
987}