Skip to main content

lash_core/
triggers.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
9pub struct TriggerEvent {
10    pub resource_type: String,
11    pub alias: String,
12    pub event: String,
13    pub payload_schema: crate::LashSchema,
14}
15
16impl TriggerEvent {
17    pub fn new(
18        resource_type: impl Into<String>,
19        alias: impl Into<String>,
20        event: impl Into<String>,
21        payload_schema: crate::LashSchema,
22    ) -> Self {
23        Self {
24            resource_type: resource_type.into(),
25            alias: alias.into(),
26            event: event.into(),
27            payload_schema,
28        }
29    }
30
31    pub fn payload_schema(&self) -> &crate::LashSchema {
32        &self.payload_schema
33    }
34
35    pub fn key(&self) -> TriggerEventKey {
36        TriggerEventKey {
37            resource_type: self.resource_type.clone(),
38            alias: self.alias.clone(),
39            event: self.event.clone(),
40        }
41    }
42
43    pub fn source_type(&self) -> String {
44        trigger_event_type(&self.alias, &self.event)
45    }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct TriggerEventKey {
50    pub resource_type: String,
51    pub alias: String,
52    pub event: String,
53}
54
55impl TriggerEventKey {
56    pub fn new(
57        resource_type: impl Into<String>,
58        alias: impl Into<String>,
59        event: impl Into<String>,
60    ) -> Self {
61        Self {
62            resource_type: resource_type.into(),
63            alias: alias.into(),
64            event: event.into(),
65        }
66    }
67
68    pub fn source_type(&self) -> String {
69        trigger_event_type(&self.alias, &self.event)
70    }
71}
72
73pub fn trigger_event_type(alias: &str, event: &str) -> String {
74    format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
78pub struct TriggerEventCatalog {
79    events: BTreeMap<TriggerEventKey, TriggerEvent>,
80}
81
82impl TriggerEventCatalog {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    pub fn declare(&mut self, event: TriggerEvent) -> Result<(), String> {
88        let key = event.key();
89        if self.events.contains_key(&key) {
90            return Err(format!(
91                "duplicate trigger occurrence `{}.{}.{}`",
92                key.resource_type, key.alias, key.event
93            ));
94        }
95        let source_type = event.source_type();
96        if let Some(existing) = self
97            .events
98            .values()
99            .find(|existing| existing.source_type() == source_type)
100        {
101            return Err(format!(
102                "duplicate trigger source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103                existing.resource_type,
104                existing.alias,
105                existing.event,
106                key.resource_type,
107                key.alias,
108                key.event
109            ));
110        }
111        self.events.insert(key, event);
112        Ok(())
113    }
114
115    pub fn from_events(events: impl IntoIterator<Item = TriggerEvent>) -> Result<Self, String> {
116        let mut catalog = Self::new();
117        for event in events {
118            catalog.declare(event)?;
119        }
120        Ok(catalog)
121    }
122
123    pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&TriggerEvent> {
124        self.events
125            .get(&TriggerEventKey::new(resource_type, alias, event))
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.events.is_empty()
130    }
131
132    pub fn events(&self) -> impl Iterator<Item = &TriggerEvent> {
133        self.events.values()
134    }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerEmitReport {
139    #[serde(default, skip_serializing_if = "String::is_empty")]
140    pub occurrence_id: String,
141    pub started_process_ids: Vec<String>,
142}
143
144impl TriggerEmitReport {
145    pub fn empty() -> Self {
146        Self::default()
147    }
148
149    fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150        Self {
151            occurrence_id,
152            started_process_ids,
153        }
154    }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct TriggerOccurrenceRequest {
159    pub source_type: String,
160    pub source_key: String,
161    #[serde(default)]
162    pub payload: serde_json::Value,
163    pub idempotency_key: String,
164    #[serde(default, skip_serializing_if = "Option::is_none")]
165    pub source: Option<serde_json::Value>,
166}
167
168impl TriggerOccurrenceRequest {
169    pub fn new(
170        source_type: impl Into<String>,
171        source_key: impl Into<String>,
172        payload: serde_json::Value,
173        idempotency_key: impl Into<String>,
174    ) -> Self {
175        Self {
176            source_type: source_type.into(),
177            source_key: source_key.into(),
178            payload,
179            idempotency_key: idempotency_key.into(),
180            source: None,
181        }
182    }
183
184    pub fn with_source(mut self, source: serde_json::Value) -> Self {
185        self.source = Some(source);
186        self
187    }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct TriggerOccurrenceRecord {
192    pub occurrence_id: String,
193    pub source_type: String,
194    pub source_key: String,
195    #[serde(default)]
196    pub payload: serde_json::Value,
197    pub idempotency_key: String,
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub source: Option<serde_json::Value>,
200    pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerEventType(String);
206
207impl TriggerEventType {
208    pub fn new(value: impl Into<String>) -> Self {
209        Self(value.into())
210    }
211
212    pub fn as_str(&self) -> &str {
213        &self.0
214    }
215}
216
217impl From<String> for TriggerEventType {
218    fn from(value: String) -> Self {
219        Self::new(value)
220    }
221}
222
223impl From<&str> for TriggerEventType {
224    fn from(value: &str) -> Self {
225        Self::new(value)
226    }
227}
228
229impl AsRef<str> for TriggerEventType {
230    fn as_ref(&self) -> &str {
231        self.as_str()
232    }
233}
234
235impl std::fmt::Display for TriggerEventType {
236    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        formatter.write_str(self.as_str())
238    }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243    pub handle: String,
244    pub source_key: String,
245    #[serde(default, skip_serializing_if = "Option::is_none")]
246    pub name: Option<String>,
247    pub source_type: TriggerEventType,
248    pub source: serde_json::Value,
249    pub target: TriggerTargetSummary,
250    #[serde(default = "default_enabled")]
251    pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256    pub label: Option<String>,
257    pub identity: crate::ProcessIdentity,
258    pub input: crate::ProcessInput,
259    #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
260    pub inputs: BTreeMap<String, TriggerInputBinding>,
261}
262
263#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
264#[serde(tag = "type", rename_all = "snake_case")]
265pub enum TriggerInputBinding {
266    Event,
267    Fixed { value: serde_json::Value },
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct TriggerSubscriptionDraft {
272    pub registrant: crate::ProcessOriginator,
273    pub env_ref: crate::ProcessExecutionEnvRef,
274    #[serde(default, skip_serializing_if = "Option::is_none")]
275    pub wake_target: Option<crate::SessionScope>,
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub name: Option<String>,
278    pub source_type: String,
279    pub source_key: String,
280    pub source: serde_json::Value,
281    pub payload_schema: crate::LashSchema,
282    pub target: crate::ProcessInput,
283    pub target_identity: crate::ProcessIdentity,
284    #[serde(default)]
285    pub event_types: Vec<crate::ProcessEventType>,
286    #[serde(default)]
287    pub input_template: BTreeMap<String, TriggerInputBinding>,
288    #[serde(default, skip_serializing_if = "Option::is_none")]
289    pub target_label: Option<String>,
290}
291
292impl TriggerSubscriptionDraft {
293    pub fn for_process(
294        registrant: crate::ProcessOriginator,
295        env_ref: crate::ProcessExecutionEnvRef,
296        source_type: impl Into<String>,
297        source_key: impl Into<String>,
298        target: crate::ProcessInput,
299        target_identity: crate::ProcessIdentity,
300    ) -> Self {
301        let target_label = target_identity.label.clone();
302        Self {
303            registrant,
304            env_ref,
305            wake_target: None,
306            name: None,
307            source_type: source_type.into(),
308            source_key: source_key.into(),
309            source: serde_json::Value::Object(serde_json::Map::new()),
310            payload_schema: crate::LashSchema::new(serde_json::Value::Object(
311                serde_json::Map::new(),
312            )),
313            target,
314            target_identity,
315            event_types: Vec::new(),
316            input_template: BTreeMap::new(),
317            target_label,
318        }
319    }
320
321    pub fn with_name(mut self, name: impl Into<String>) -> Self {
322        self.name = Some(name.into());
323        self
324    }
325
326    pub fn with_source(mut self, source: serde_json::Value) -> Self {
327        self.source = source;
328        self
329    }
330
331    pub fn with_payload_schema(mut self, payload_schema: crate::LashSchema) -> Self {
332        self.payload_schema = payload_schema;
333        self
334    }
335
336    pub fn with_wake_target(mut self, wake_target: crate::SessionScope) -> Self {
337        self.wake_target = Some(wake_target);
338        self
339    }
340
341    pub fn with_event_types(
342        mut self,
343        event_types: impl IntoIterator<Item = crate::ProcessEventType>,
344    ) -> Self {
345        self.event_types = event_types.into_iter().collect();
346        self
347    }
348
349    pub fn with_input_template(
350        mut self,
351        input_template: BTreeMap<String, TriggerInputBinding>,
352    ) -> Self {
353        self.input_template = input_template;
354        self
355    }
356
357    pub fn with_target_label(mut self, target_label: impl Into<String>) -> Self {
358        self.target_label = Some(target_label.into());
359        self
360    }
361
362    pub fn validate(&self) -> Result<(), PluginError> {
363        validate_trigger_subscription_target_label(
364            self.target_label.as_deref(),
365            self.target_identity.label.as_deref(),
366        )
367    }
368}
369
370#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
371pub struct TriggerSubscriptionRecord {
372    pub subscription_id: String,
373    pub registrant: crate::ProcessOriginator,
374    pub env_ref: crate::ProcessExecutionEnvRef,
375    #[serde(default, skip_serializing_if = "Option::is_none")]
376    pub wake_target: Option<crate::SessionScope>,
377    pub handle: String,
378    #[serde(default, skip_serializing_if = "Option::is_none")]
379    pub name: Option<String>,
380    pub source_type: String,
381    pub source_key: String,
382    pub source: serde_json::Value,
383    pub payload_schema: crate::LashSchema,
384    pub target: crate::ProcessInput,
385    pub target_identity: crate::ProcessIdentity,
386    #[serde(default)]
387    pub event_types: Vec<crate::ProcessEventType>,
388    #[serde(default)]
389    pub input_template: BTreeMap<String, TriggerInputBinding>,
390    #[serde(default, skip_serializing_if = "Option::is_none")]
391    pub target_label: Option<String>,
392    #[serde(default = "default_enabled")]
393    pub enabled: bool,
394    pub created_at_ms: u64,
395    pub updated_at_ms: u64,
396}
397
398impl TriggerSubscriptionRecord {
399    pub fn registrant_scope_id(&self) -> String {
400        self.registrant.scope_id()
401    }
402
403    pub fn registrant_session_id(&self) -> Option<&str> {
404        match &self.registrant {
405            crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
406            crate::ProcessOriginator::Host => None,
407        }
408    }
409}
410
411fn validate_trigger_subscription_target_label(
412    target_label: Option<&str>,
413    identity_label: Option<&str>,
414) -> Result<(), PluginError> {
415    match (target_label, identity_label) {
416        (Some(target_label), Some(identity_label)) if target_label != identity_label => {
417            Err(PluginError::Session(
418                "trigger target_label must match target_identity.label when both are present"
419                    .to_string(),
420            ))
421        }
422        _ => Ok(()),
423    }
424}
425
426impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
427    fn from(route: &TriggerSubscriptionRecord) -> Self {
428        Self {
429            handle: route.handle.clone(),
430            source_key: route.source_key.clone(),
431            name: route.name.clone(),
432            source_type: TriggerEventType::new(route.source_type.clone()),
433            source: route.source.clone(),
434            target: TriggerTargetSummary {
435                label: route.target_label.clone(),
436                identity: route.target_identity.clone(),
437                input: route.target.clone(),
438                inputs: route.input_template.clone(),
439            },
440            enabled: route.enabled,
441        }
442    }
443}
444
445#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
446pub struct TriggerSubscriptionFilter {
447    #[serde(default, skip_serializing_if = "Option::is_none")]
448    pub session_id: Option<String>,
449    #[serde(default, skip_serializing_if = "Option::is_none")]
450    pub handle: Option<String>,
451    #[serde(default, skip_serializing_if = "Option::is_none")]
452    pub name: Option<String>,
453    #[serde(default, skip_serializing_if = "Option::is_none")]
454    pub source_type: Option<String>,
455    #[serde(default, skip_serializing_if = "Option::is_none")]
456    pub source_key: Option<String>,
457    #[serde(default, skip_serializing_if = "Option::is_none")]
458    pub target: Option<serde_json::Value>,
459    #[serde(default, skip_serializing_if = "Option::is_none")]
460    pub enabled: Option<bool>,
461}
462
463impl TriggerSubscriptionFilter {
464    pub fn for_session(session_id: impl Into<String>) -> Self {
465        Self {
466            session_id: Some(session_id.into()),
467            ..Self::default()
468        }
469    }
470
471    pub fn for_source_type(source_type: impl Into<String>) -> Self {
472        Self {
473            source_type: Some(source_type.into()),
474            ..Self::default()
475        }
476    }
477
478    pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
479        self.session_id
480            .as_deref()
481            .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
482            && self
483                .handle
484                .as_deref()
485                .is_none_or(|handle| record.handle == handle)
486            && self
487                .name
488                .as_deref()
489                .is_none_or(|name| record.name.as_deref() == Some(name))
490            && self
491                .source_type
492                .as_deref()
493                .is_none_or(|source_type| record.source_type == source_type)
494            && self
495                .source_key
496                .as_deref()
497                .is_none_or(|source_key| record.source_key == source_key)
498            && self.enabled.is_none_or(|enabled| record.enabled == enabled)
499            && self
500                .target
501                .as_ref()
502                .is_none_or(|target| record.target_identity.definition.as_ref() == Some(target))
503    }
504}
505
506#[derive(Clone, Debug, Serialize, Deserialize)]
507pub struct TriggerDeliveryReservation {
508    pub occurrence: TriggerOccurrenceRecord,
509    pub subscription: TriggerSubscriptionRecord,
510    pub process_id: String,
511}
512
513#[async_trait::async_trait]
514pub trait TriggerStore: Send + Sync {
515    fn durability_tier(&self) -> crate::DurabilityTier {
516        crate::DurabilityTier::Inline
517    }
518
519    async fn source_key_for_subscription(
520        &self,
521        source_type: &str,
522        source: &serde_json::Value,
523    ) -> Result<String, PluginError> {
524        default_trigger_source_key(source_type, source)
525    }
526
527    async fn register_subscription(
528        &self,
529        draft: TriggerSubscriptionDraft,
530    ) -> Result<TriggerSubscriptionRecord, PluginError>;
531
532    async fn list_subscriptions(
533        &self,
534        filter: TriggerSubscriptionFilter,
535    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
536
537    async fn cancel_subscription(
538        &self,
539        session_id: &str,
540        handle: &str,
541    ) -> Result<bool, PluginError>;
542
543    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
544
545    async fn record_occurrence(
546        &self,
547        request: TriggerOccurrenceRequest,
548    ) -> Result<TriggerOccurrenceRecord, PluginError>;
549
550    async fn reserve_matching_deliveries(
551        &self,
552        occurrence_id: &str,
553    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
554}
555
556pub struct InMemoryTriggerStore {
557    clock: Arc<dyn crate::Clock>,
558    state: Mutex<InMemoryTriggerEventState>,
559}
560
561impl InMemoryTriggerStore {
562    pub fn new() -> Self {
563        Self::with_clock(Arc::new(crate::SystemClock))
564    }
565
566    pub fn with_clock(clock: Arc<dyn crate::Clock>) -> Self {
567        Self {
568            clock,
569            state: Mutex::new(InMemoryTriggerEventState::default()),
570        }
571    }
572}
573
574impl Default for InMemoryTriggerStore {
575    fn default() -> Self {
576        Self::new()
577    }
578}
579
580#[derive(Default)]
581struct InMemoryTriggerEventState {
582    next_subscription_seq: u64,
583    subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
584    occurrences: BTreeMap<String, TriggerOccurrenceRecord>,
585    occurrence_id_by_idempotency_key: BTreeMap<String, String>,
586    occurrence_hashes: BTreeMap<String, String>,
587    deliveries: BTreeSet<(String, String)>,
588}
589
590#[async_trait::async_trait]
591impl TriggerStore for InMemoryTriggerStore {
592    async fn register_subscription(
593        &self,
594        draft: TriggerSubscriptionDraft,
595    ) -> Result<TriggerSubscriptionRecord, PluginError> {
596        draft.validate()?;
597        let mut state = self
598            .state
599            .lock()
600            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
601        state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
602        let handle = format!("trigger:{}", state.next_subscription_seq);
603        let subscription_id = format!("subscription:{}", state.next_subscription_seq);
604        let now = self.clock.timestamp_ms();
605        let record = TriggerSubscriptionRecord {
606            subscription_id: subscription_id.clone(),
607            registrant: draft.registrant,
608            env_ref: draft.env_ref,
609            wake_target: draft.wake_target,
610            handle,
611            name: draft.name,
612            source_type: draft.source_type,
613            source_key: draft.source_key,
614            source: draft.source,
615            payload_schema: draft.payload_schema,
616            target: draft.target,
617            target_identity: draft.target_identity,
618            event_types: draft.event_types,
619            input_template: draft.input_template,
620            target_label: draft.target_label,
621            enabled: true,
622            created_at_ms: now,
623            updated_at_ms: now,
624        };
625        state.subscriptions.insert(subscription_id, record.clone());
626        Ok(record)
627    }
628
629    async fn list_subscriptions(
630        &self,
631        filter: TriggerSubscriptionFilter,
632    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
633        let state = self
634            .state
635            .lock()
636            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
637        let mut records = state
638            .subscriptions
639            .values()
640            .filter(|record| filter.matches(record))
641            .cloned()
642            .collect::<Vec<_>>();
643        records.sort_by(|left, right| {
644            left.registrant_scope_id()
645                .cmp(&right.registrant_scope_id())
646                .then_with(|| left.handle.cmp(&right.handle))
647        });
648        Ok(records)
649    }
650
651    async fn cancel_subscription(
652        &self,
653        session_id: &str,
654        handle: &str,
655    ) -> Result<bool, PluginError> {
656        let mut state = self
657            .state
658            .lock()
659            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
660        let now = self.clock.timestamp_ms();
661        let Some(record) = state.subscriptions.values_mut().find(|record| {
662            record.registrant_session_id() == Some(session_id) && record.handle == handle
663        }) else {
664            return Ok(false);
665        };
666        let changed = record.enabled;
667        record.enabled = false;
668        record.updated_at_ms = now;
669        Ok(changed)
670    }
671
672    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
673        let mut state = self
674            .state
675            .lock()
676            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
677        let before = state.subscriptions.len();
678        state
679            .subscriptions
680            .retain(|_, record| record.registrant_session_id() != Some(session_id));
681        Ok(before.saturating_sub(state.subscriptions.len()))
682    }
683
684    async fn record_occurrence(
685        &self,
686        request: TriggerOccurrenceRequest,
687    ) -> Result<TriggerOccurrenceRecord, PluginError> {
688        validate_trigger_occurrence_request(&request)?;
689        let mut state = self
690            .state
691            .lock()
692            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
693        let request_hash = trigger_occurrence_request_hash(&request)?;
694        if let Some(existing_id) = state
695            .occurrence_id_by_idempotency_key
696            .get(&request.idempotency_key)
697            .cloned()
698        {
699            let existing_hash = state
700                .occurrence_hashes
701                .get(&existing_id)
702                .cloned()
703                .unwrap_or_default();
704            if existing_hash != request_hash {
705                return Err(PluginError::Session(format!(
706                    "trigger occurrence idempotency conflict for `{}`",
707                    request.idempotency_key
708                )));
709            }
710            return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
711                PluginError::Session(format!(
712                    "missing trigger occurrence `{existing_id}` for idempotency key"
713                ))
714            });
715        }
716        let occurrence_id = deterministic_occurrence_id(&request)?;
717        let record = TriggerOccurrenceRecord {
718            occurrence_id: occurrence_id.clone(),
719            source_type: request.source_type,
720            source_key: request.source_key,
721            payload: request.payload,
722            idempotency_key: request.idempotency_key.clone(),
723            source: request.source,
724            occurred_at_ms: self.clock.timestamp_ms(),
725        };
726        state
727            .occurrence_id_by_idempotency_key
728            .insert(request.idempotency_key, occurrence_id.clone());
729        state
730            .occurrence_hashes
731            .insert(occurrence_id.clone(), request_hash);
732        state.occurrences.insert(occurrence_id, record.clone());
733        Ok(record)
734    }
735
736    async fn reserve_matching_deliveries(
737        &self,
738        occurrence_id: &str,
739    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
740        let mut state = self
741            .state
742            .lock()
743            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
744        let occurrence = state
745            .occurrences
746            .get(occurrence_id)
747            .cloned()
748            .ok_or_else(|| {
749                PluginError::Session(format!("unknown trigger occurrence `{occurrence_id}`"))
750            })?;
751        let subscriptions = state
752            .subscriptions
753            .values()
754            .filter(|record| {
755                record.enabled
756                    && record.source_type == occurrence.source_type
757                    && record.source_key == occurrence.source_key
758            })
759            .cloned()
760            .collect::<Vec<_>>();
761        let mut deliveries = Vec::new();
762        for subscription in subscriptions {
763            let key = (
764                occurrence.occurrence_id.clone(),
765                subscription.subscription_id.clone(),
766            );
767            if !state.deliveries.insert(key) {
768                continue;
769            }
770            let process_id = deterministic_delivery_process_id(
771                &occurrence.occurrence_id,
772                &subscription.subscription_id,
773            )?;
774            deliveries.push(TriggerDeliveryReservation {
775                occurrence: occurrence.clone(),
776                subscription,
777                process_id,
778            });
779        }
780        Ok(deliveries)
781    }
782}
783
784fn default_enabled() -> bool {
785    true
786}
787
788pub fn default_trigger_source_key(
789    source_type: &str,
790    source: &serde_json::Value,
791) -> Result<String, PluginError> {
792    let digest = crate::stable_hash::stable_json_sha256_hex(&(source_type, source))
793        .map_err(|err| PluginError::Session(format!("failed to hash trigger source key: {err}")))?;
794    Ok(format!("source:{source_type}:sha256:{digest}"))
795}
796
797pub fn empty_trigger_source_key(source_type: &str) -> Result<String, PluginError> {
798    default_trigger_source_key(source_type, &serde_json::json!({}))
799}
800
801pub fn deterministic_occurrence_id(
802    request: &TriggerOccurrenceRequest,
803) -> Result<String, PluginError> {
804    let digest = crate::stable_hash::stable_json_sha256_hex(&(
805        request.source_type.as_str(),
806        request.source_key.as_str(),
807        request.idempotency_key.as_str(),
808    ))
809    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))?;
810    Ok(format!("trigger:{digest}"))
811}
812
813pub fn deterministic_delivery_process_id(
814    occurrence_id: &str,
815    subscription_id: &str,
816) -> Result<String, PluginError> {
817    let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
818        .map_err(|err| PluginError::Session(format!("failed to hash trigger delivery: {err}")))?;
819    Ok(format!("process:trigger:{digest}"))
820}
821
822#[derive(Clone)]
823pub struct TriggerRouter {
824    store: Arc<dyn TriggerStore>,
825    process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
826    process_work_driver: Option<crate::ProcessWorkDriver>,
827}
828
829impl TriggerRouter {
830    pub fn new(
831        store: Arc<dyn TriggerStore>,
832        process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
833        process_work_driver: Option<crate::ProcessWorkDriver>,
834    ) -> Self {
835        Self {
836            store,
837            process_registry,
838            process_work_driver,
839        }
840    }
841
842    pub fn store(&self) -> Arc<dyn TriggerStore> {
843        Arc::clone(&self.store)
844    }
845
846    pub async fn emit(
847        &self,
848        request: TriggerOccurrenceRequest,
849        effect_controller: &dyn crate::RuntimeEffectController,
850    ) -> Result<TriggerEmitReport, PluginError> {
851        let occurrence = self.store.record_occurrence(request).await?;
852        let reservations = self
853            .store
854            .reserve_matching_deliveries(&occurrence.occurrence_id)
855            .await?;
856        let Some(process_registry) = self.process_registry.as_ref() else {
857            if reservations.is_empty() {
858                return Ok(TriggerEmitReport::new(occurrence.occurrence_id, Vec::new()));
859            }
860            return Err(PluginError::Session(
861                "trigger delivery requires a process registry".to_string(),
862            ));
863        };
864        let mut started_process_ids = Vec::new();
865        let mut start_errors = Vec::new();
866        for reservation in reservations {
867            let process_id = reservation.process_id.clone();
868            if let Err(err) = self
869                .start_delivery(
870                    &reservation,
871                    Arc::clone(process_registry),
872                    effect_controller,
873                )
874                .await
875            {
876                start_errors.push(format!(
877                    "{}: {err}",
878                    reservation.subscription.subscription_id
879                ));
880                continue;
881            }
882            started_process_ids.push(process_id);
883        }
884        if !started_process_ids.is_empty()
885            && let Some(driver) = self.process_work_driver.as_ref()
886        {
887            driver.claim_and_run_pending("trigger_delivery").await?;
888        }
889        if started_process_ids.is_empty()
890            && let Some(message) = trigger_delivery_failure_summary(&start_errors)
891        {
892            return Err(PluginError::Session(message));
893        }
894        Ok(TriggerEmitReport::new(
895            occurrence.occurrence_id,
896            started_process_ids,
897        ))
898    }
899
900    async fn start_delivery(
901        &self,
902        reservation: &TriggerDeliveryReservation,
903        process_registry: Arc<dyn crate::ProcessRegistry>,
904        effect_controller: &dyn crate::RuntimeEffectController,
905    ) -> Result<(), PluginError> {
906        let subscription = &reservation.subscription;
907        let occurrence = &reservation.occurrence;
908        subscription
909            .payload_schema
910            .validate(&occurrence.payload)
911            .map_err(|err| {
912                PluginError::Session(format!(
913                    "invalid payload for trigger `{}`: {err}",
914                    subscription.handle
915                ))
916            })?;
917        let args =
918            materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
919        let target = apply_trigger_inputs(subscription.target.clone(), args)?;
920        let originator_scope_id = subscription.registrant_scope_id();
921        let trigger_occurrence_invocation = crate::runtime::causal::trigger_occurrence_invocation(
922            &originator_scope_id,
923            &occurrence.occurrence_id,
924        );
925        let registration = crate::ProcessRegistration::new(
926            reservation.process_id.clone(),
927            target.clone(),
928            crate::ProcessProvenance::new(subscription.registrant.clone())
929                .with_caused_by(trigger_occurrence_invocation.causal_ref()),
930        )
931        .with_identity(subscription.target_identity.clone())
932        .with_extra_event_types(subscription.event_types.clone())
933        .with_execution_env_ref(Some(subscription.env_ref.clone()))
934        .with_wake_target(subscription.wake_target.clone());
935        let descriptor_kind = subscription.target_identity.kind.clone();
936        let grant =
937            subscription
938                .wake_target
939                .clone()
940                .map(|session_scope| crate::ProcessStartGrant {
941                    session_scope,
942                    descriptor: crate::ProcessHandleDescriptor::new(
943                        Some(descriptor_kind.as_str()),
944                        subscription.target_label.as_deref(),
945                    ),
946                });
947        let execution_context = crate::ProcessExecutionContext::default()
948            .with_causal_invocation(Some(trigger_occurrence_invocation));
949        let command = crate::ProcessCommand::Start {
950            registration,
951            grant,
952            execution_context: Box::new(execution_context),
953        };
954        let effect_id = command.effect_id();
955        let invocation = crate::RuntimeInvocation::effect(
956            crate::RuntimeScope::new(originator_scope_id),
957            effect_id.clone(),
958            crate::RuntimeEffectKind::Process,
959            format!(
960                "trigger:{}:{}",
961                occurrence.occurrence_id, subscription.subscription_id
962            ),
963        )
964        .with_caused_by(Some(crate::CausalRef::TriggerOccurrence {
965            occurrence_id: occurrence.occurrence_id.clone(),
966        }));
967        let outcome = effect_controller
968            .execute_effect(
969                crate::RuntimeEffectEnvelope::new(
970                    invocation,
971                    crate::RuntimeEffectCommand::process(command),
972                ),
973                crate::RuntimeEffectLocalExecutor::processes(process_registry),
974            )
975            .await?;
976        match outcome {
977            crate::RuntimeEffectOutcome::Process {
978                result: crate::ProcessEffectOutcome::Start { .. },
979            } => Ok(()),
980            other => Err(PluginError::Session(format!(
981                "trigger process start returned the wrong outcome: {}",
982                other.kind().as_str()
983            ))),
984        }
985    }
986}
987
988fn trigger_delivery_failure_summary(errors: &[String]) -> Option<String> {
989    match errors {
990        [] => None,
991        [only] => Some(format!("trigger delivery failed: {only}")),
992        [first, rest @ ..] => Some(format!(
993            "trigger delivery failed for {} matching subscriptions: {first}; {} more failed",
994            errors.len(),
995            rest.len()
996        )),
997    }
998}
999
1000fn materialize_trigger_process_args(
1001    input_template: &BTreeMap<String, TriggerInputBinding>,
1002    event_payload: &serde_json::Value,
1003) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
1004    let mut args = serde_json::Map::new();
1005    for (input_name, input) in input_template {
1006        let value = match input {
1007            TriggerInputBinding::Event => event_payload.clone(),
1008            TriggerInputBinding::Fixed { value } => value.clone(),
1009        };
1010        args.insert(input_name.to_string(), value);
1011    }
1012    Ok(args)
1013}
1014
1015fn apply_trigger_inputs(
1016    mut target: crate::ProcessInput,
1017    args: serde_json::Map<String, serde_json::Value>,
1018) -> Result<crate::ProcessInput, PluginError> {
1019    match &mut target {
1020        crate::ProcessInput::Engine { payload, .. } => {
1021            let object = payload.as_object_mut().ok_or_else(|| {
1022                PluginError::Session(
1023                    "trigger engine target payload must be a JSON object".to_string(),
1024                )
1025            })?;
1026            object.insert("args".to_string(), serde_json::Value::Object(args));
1027            Ok(target)
1028        }
1029        other => Err(PluginError::Session(format!(
1030            "trigger target must be an engine process, got {}",
1031            other.engine_kind()
1032        ))),
1033    }
1034}
1035
1036pub fn validate_trigger_occurrence_request(
1037    request: &TriggerOccurrenceRequest,
1038) -> Result<(), PluginError> {
1039    if request.source_type.trim().is_empty() {
1040        return Err(PluginError::Session(
1041            "trigger occurrence requires source_type".to_string(),
1042        ));
1043    }
1044    if request.source_key.trim().is_empty() {
1045        return Err(PluginError::Session(
1046            "trigger occurrence requires source_key".to_string(),
1047        ));
1048    }
1049    if request.idempotency_key.trim().is_empty() {
1050        return Err(PluginError::Session(
1051            "trigger occurrence requires idempotency_key".to_string(),
1052        ));
1053    }
1054    Ok(())
1055}
1056
1057pub fn trigger_occurrence_request_hash(
1058    request: &TriggerOccurrenceRequest,
1059) -> Result<String, PluginError> {
1060    crate::stable_hash::stable_json_sha256_hex(&(
1061        request.source_type.as_str(),
1062        request.source_key.as_str(),
1063        &request.payload,
1064        &request.source,
1065    ))
1066    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072
1073    fn button_payload_schema() -> crate::LashSchema {
1074        crate::LashSchema::any()
1075    }
1076
1077    #[test]
1078    fn trigger_catalog_rejects_duplicate_trigger_source_identity() {
1079        let mut catalog = TriggerEventCatalog::new();
1080        catalog
1081            .declare(TriggerEvent::new(
1082                "Button",
1083                "ui.button",
1084                "pressed",
1085                button_payload_schema(),
1086            ))
1087            .expect("first trigger occurrence");
1088
1089        let err = catalog
1090            .declare(TriggerEvent::new(
1091                "AlternateButton",
1092                "ui.button",
1093                "pressed",
1094                button_payload_schema(),
1095            ))
1096            .expect_err("duplicate public source identity should be rejected");
1097
1098        assert!(err.contains("duplicate trigger source `ui.button.pressed`"));
1099    }
1100
1101    #[tokio::test]
1102    async fn trigger_store_rejects_mismatched_target_label() {
1103        let store = InMemoryTriggerStore::default();
1104        let draft = TriggerSubscriptionDraft::for_process(
1105            crate::ProcessOriginator::host(),
1106            crate::ProcessExecutionEnvRef::new("process-env:test"),
1107            "ui.button.pressed",
1108            "source-key",
1109            crate::ProcessInput::External {
1110                metadata: serde_json::json!({}),
1111            },
1112            crate::ProcessIdentity::new("external").with_label(Some("expected")),
1113        )
1114        .with_target_label("other");
1115
1116        let err = store
1117            .register_subscription(draft)
1118            .await
1119            .expect_err("mismatched target labels should be rejected");
1120        assert!(err.to_string().contains("target_label must match"));
1121    }
1122}