Skip to main content

lash_core/
triggers.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
9pub struct TriggerEvent {
10    pub resource_type: String,
11    pub alias: String,
12    pub event: String,
13    pub payload_schema: crate::LashSchema,
14}
15
16impl TriggerEvent {
17    pub fn new(
18        resource_type: impl Into<String>,
19        alias: impl Into<String>,
20        event: impl Into<String>,
21        payload_schema: crate::LashSchema,
22    ) -> Self {
23        Self {
24            resource_type: resource_type.into(),
25            alias: alias.into(),
26            event: event.into(),
27            payload_schema,
28        }
29    }
30
31    pub fn payload_schema(&self) -> &crate::LashSchema {
32        &self.payload_schema
33    }
34
35    pub fn key(&self) -> TriggerEventKey {
36        TriggerEventKey {
37            resource_type: self.resource_type.clone(),
38            alias: self.alias.clone(),
39            event: self.event.clone(),
40        }
41    }
42
43    pub fn source_type(&self) -> String {
44        trigger_event_type(&self.alias, &self.event)
45    }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct TriggerEventKey {
50    pub resource_type: String,
51    pub alias: String,
52    pub event: String,
53}
54
55impl TriggerEventKey {
56    pub fn new(
57        resource_type: impl Into<String>,
58        alias: impl Into<String>,
59        event: impl Into<String>,
60    ) -> Self {
61        Self {
62            resource_type: resource_type.into(),
63            alias: alias.into(),
64            event: event.into(),
65        }
66    }
67
68    pub fn source_type(&self) -> String {
69        trigger_event_type(&self.alias, &self.event)
70    }
71}
72
73pub fn trigger_event_type(alias: &str, event: &str) -> String {
74    format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
78pub struct TriggerEventCatalog {
79    events: BTreeMap<TriggerEventKey, TriggerEvent>,
80}
81
82impl TriggerEventCatalog {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    pub fn declare(&mut self, event: TriggerEvent) -> Result<(), String> {
88        let key = event.key();
89        if self.events.contains_key(&key) {
90            return Err(format!(
91                "duplicate trigger occurrence `{}.{}.{}`",
92                key.resource_type, key.alias, key.event
93            ));
94        }
95        let source_type = event.source_type();
96        if let Some(existing) = self
97            .events
98            .values()
99            .find(|existing| existing.source_type() == source_type)
100        {
101            return Err(format!(
102                "duplicate trigger source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103                existing.resource_type,
104                existing.alias,
105                existing.event,
106                key.resource_type,
107                key.alias,
108                key.event
109            ));
110        }
111        self.events.insert(key, event);
112        Ok(())
113    }
114
115    pub fn from_events(events: impl IntoIterator<Item = TriggerEvent>) -> Result<Self, String> {
116        let mut catalog = Self::new();
117        for event in events {
118            catalog.declare(event)?;
119        }
120        Ok(catalog)
121    }
122
123    pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&TriggerEvent> {
124        self.events
125            .get(&TriggerEventKey::new(resource_type, alias, event))
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.events.is_empty()
130    }
131
132    pub fn events(&self) -> impl Iterator<Item = &TriggerEvent> {
133        self.events.values()
134    }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerEmitReport {
139    #[serde(default, skip_serializing_if = "String::is_empty")]
140    pub occurrence_id: String,
141    pub started_process_ids: Vec<String>,
142}
143
144impl TriggerEmitReport {
145    pub fn empty() -> Self {
146        Self::default()
147    }
148
149    fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150        Self {
151            occurrence_id,
152            started_process_ids,
153        }
154    }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct TriggerOccurrenceRequest {
159    pub source_type: String,
160    pub source_key: String,
161    #[serde(default)]
162    pub payload: serde_json::Value,
163    pub idempotency_key: String,
164    #[serde(default, skip_serializing_if = "Option::is_none")]
165    pub source: Option<serde_json::Value>,
166}
167
168impl TriggerOccurrenceRequest {
169    pub fn new(
170        source_type: impl Into<String>,
171        source_key: impl Into<String>,
172        payload: serde_json::Value,
173        idempotency_key: impl Into<String>,
174    ) -> Self {
175        Self {
176            source_type: source_type.into(),
177            source_key: source_key.into(),
178            payload,
179            idempotency_key: idempotency_key.into(),
180            source: None,
181        }
182    }
183
184    pub fn with_source(mut self, source: serde_json::Value) -> Self {
185        self.source = Some(source);
186        self
187    }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct TriggerOccurrenceRecord {
192    pub occurrence_id: String,
193    pub source_type: String,
194    pub source_key: String,
195    #[serde(default)]
196    pub payload: serde_json::Value,
197    pub idempotency_key: String,
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub source: Option<serde_json::Value>,
200    pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerEventType(String);
206
207impl TriggerEventType {
208    pub fn new(value: impl Into<String>) -> Self {
209        Self(value.into())
210    }
211
212    pub fn as_str(&self) -> &str {
213        &self.0
214    }
215}
216
217impl From<String> for TriggerEventType {
218    fn from(value: String) -> Self {
219        Self::new(value)
220    }
221}
222
223impl From<&str> for TriggerEventType {
224    fn from(value: &str) -> Self {
225        Self::new(value)
226    }
227}
228
229impl AsRef<str> for TriggerEventType {
230    fn as_ref(&self) -> &str {
231        self.as_str()
232    }
233}
234
235impl std::fmt::Display for TriggerEventType {
236    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        formatter.write_str(self.as_str())
238    }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243    pub handle: String,
244    pub source_key: String,
245    #[serde(default, skip_serializing_if = "Option::is_none")]
246    pub name: Option<String>,
247    pub source_type: TriggerEventType,
248    pub source: serde_json::Value,
249    pub target: TriggerTargetSummary,
250    #[serde(default = "default_enabled")]
251    pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256    pub label: Option<String>,
257    pub identity: crate::ProcessIdentity,
258    pub input: crate::ProcessInput,
259    #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
260    pub inputs: BTreeMap<String, TriggerInputBinding>,
261}
262
263#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
264#[serde(tag = "type", rename_all = "snake_case")]
265pub enum TriggerInputBinding {
266    Event,
267    Fixed { value: serde_json::Value },
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct TriggerSubscriptionDraft {
272    pub registrant: crate::ProcessOriginator,
273    pub env_ref: crate::ProcessExecutionEnvRef,
274    #[serde(default, skip_serializing_if = "Option::is_none")]
275    pub wake_target: Option<crate::SessionScope>,
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub name: Option<String>,
278    pub source_type: String,
279    pub source_key: String,
280    pub source: serde_json::Value,
281    pub payload_schema: crate::LashSchema,
282    pub target: crate::ProcessInput,
283    pub target_identity: crate::ProcessIdentity,
284    #[serde(default)]
285    pub event_types: Vec<crate::ProcessEventType>,
286    #[serde(default)]
287    pub input_template: BTreeMap<String, TriggerInputBinding>,
288    #[serde(default, skip_serializing_if = "Option::is_none")]
289    pub target_label: Option<String>,
290}
291
292impl TriggerSubscriptionDraft {
293    pub fn for_process(
294        registrant: crate::ProcessOriginator,
295        env_ref: crate::ProcessExecutionEnvRef,
296        source_type: impl Into<String>,
297        source_key: impl Into<String>,
298        target: crate::ProcessInput,
299        target_identity: crate::ProcessIdentity,
300    ) -> Self {
301        let target_label = target_identity.label.clone();
302        Self {
303            registrant,
304            env_ref,
305            wake_target: None,
306            name: None,
307            source_type: source_type.into(),
308            source_key: source_key.into(),
309            source: serde_json::Value::Object(serde_json::Map::new()),
310            payload_schema: crate::LashSchema::new(serde_json::Value::Object(
311                serde_json::Map::new(),
312            )),
313            target,
314            target_identity,
315            event_types: Vec::new(),
316            input_template: BTreeMap::new(),
317            target_label,
318        }
319    }
320
321    pub fn with_name(mut self, name: impl Into<String>) -> Self {
322        self.name = Some(name.into());
323        self
324    }
325
326    pub fn with_source(mut self, source: serde_json::Value) -> Self {
327        self.source = source;
328        self
329    }
330
331    pub fn with_payload_schema(mut self, payload_schema: crate::LashSchema) -> Self {
332        self.payload_schema = payload_schema;
333        self
334    }
335
336    pub fn with_wake_target(mut self, wake_target: crate::SessionScope) -> Self {
337        self.wake_target = Some(wake_target);
338        self
339    }
340
341    pub fn with_event_types(
342        mut self,
343        event_types: impl IntoIterator<Item = crate::ProcessEventType>,
344    ) -> Self {
345        self.event_types = event_types.into_iter().collect();
346        self
347    }
348
349    pub fn with_input_template(
350        mut self,
351        input_template: BTreeMap<String, TriggerInputBinding>,
352    ) -> Self {
353        self.input_template = input_template;
354        self
355    }
356
357    pub fn with_target_label(mut self, target_label: impl Into<String>) -> Self {
358        self.target_label = Some(target_label.into());
359        self
360    }
361
362    pub fn validate(&self) -> Result<(), PluginError> {
363        validate_trigger_subscription_target_label(
364            self.target_label.as_deref(),
365            self.target_identity.label.as_deref(),
366        )
367    }
368}
369
370#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
371pub struct TriggerSubscriptionRecord {
372    pub subscription_id: String,
373    pub registrant: crate::ProcessOriginator,
374    pub env_ref: crate::ProcessExecutionEnvRef,
375    #[serde(default, skip_serializing_if = "Option::is_none")]
376    pub wake_target: Option<crate::SessionScope>,
377    pub handle: String,
378    #[serde(default, skip_serializing_if = "Option::is_none")]
379    pub name: Option<String>,
380    pub source_type: String,
381    pub source_key: String,
382    pub source: serde_json::Value,
383    pub payload_schema: crate::LashSchema,
384    pub target: crate::ProcessInput,
385    pub target_identity: crate::ProcessIdentity,
386    #[serde(default)]
387    pub event_types: Vec<crate::ProcessEventType>,
388    #[serde(default)]
389    pub input_template: BTreeMap<String, TriggerInputBinding>,
390    #[serde(default, skip_serializing_if = "Option::is_none")]
391    pub target_label: Option<String>,
392    #[serde(default = "default_enabled")]
393    pub enabled: bool,
394    pub created_at_ms: u64,
395    pub updated_at_ms: u64,
396}
397
398impl TriggerSubscriptionRecord {
399    pub fn registrant_scope_id(&self) -> String {
400        self.registrant.scope_id()
401    }
402
403    pub fn registrant_session_id(&self) -> Option<&str> {
404        match &self.registrant {
405            crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
406            crate::ProcessOriginator::Host => None,
407        }
408    }
409}
410
411fn validate_trigger_subscription_target_label(
412    target_label: Option<&str>,
413    identity_label: Option<&str>,
414) -> Result<(), PluginError> {
415    match (target_label, identity_label) {
416        (Some(target_label), Some(identity_label)) if target_label != identity_label => {
417            Err(PluginError::Session(
418                "trigger target_label must match target_identity.label when both are present"
419                    .to_string(),
420            ))
421        }
422        _ => Ok(()),
423    }
424}
425
426impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
427    fn from(route: &TriggerSubscriptionRecord) -> Self {
428        Self {
429            handle: route.handle.clone(),
430            source_key: route.source_key.clone(),
431            name: route.name.clone(),
432            source_type: TriggerEventType::new(route.source_type.clone()),
433            source: route.source.clone(),
434            target: TriggerTargetSummary {
435                label: route.target_label.clone(),
436                identity: route.target_identity.clone(),
437                input: route.target.clone(),
438                inputs: route.input_template.clone(),
439            },
440            enabled: route.enabled,
441        }
442    }
443}
444
445#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
446pub struct TriggerSubscriptionFilter {
447    #[serde(default, skip_serializing_if = "Option::is_none")]
448    pub session_id: Option<String>,
449    #[serde(default, skip_serializing_if = "Option::is_none")]
450    pub handle: Option<String>,
451    #[serde(default, skip_serializing_if = "Option::is_none")]
452    pub name: Option<String>,
453    #[serde(default, skip_serializing_if = "Option::is_none")]
454    pub source_type: Option<String>,
455    #[serde(default, skip_serializing_if = "Option::is_none")]
456    pub source_key: Option<String>,
457    #[serde(default, skip_serializing_if = "Option::is_none")]
458    pub target: Option<serde_json::Value>,
459    #[serde(default, skip_serializing_if = "Option::is_none")]
460    pub enabled: Option<bool>,
461}
462
463impl TriggerSubscriptionFilter {
464    pub fn for_session(session_id: impl Into<String>) -> Self {
465        Self {
466            session_id: Some(session_id.into()),
467            ..Self::default()
468        }
469    }
470
471    pub fn for_source_type(source_type: impl Into<String>) -> Self {
472        Self {
473            source_type: Some(source_type.into()),
474            ..Self::default()
475        }
476    }
477
478    pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
479        self.session_id
480            .as_deref()
481            .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
482            && self
483                .handle
484                .as_deref()
485                .is_none_or(|handle| record.handle == handle)
486            && self
487                .name
488                .as_deref()
489                .is_none_or(|name| record.name.as_deref() == Some(name))
490            && self
491                .source_type
492                .as_deref()
493                .is_none_or(|source_type| record.source_type == source_type)
494            && self
495                .source_key
496                .as_deref()
497                .is_none_or(|source_key| record.source_key == source_key)
498            && self.enabled.is_none_or(|enabled| record.enabled == enabled)
499            && self
500                .target
501                .as_ref()
502                .is_none_or(|target| record.target_identity.definition.as_ref() == Some(target))
503    }
504}
505
506#[derive(Clone, Debug, Serialize, Deserialize)]
507pub struct TriggerDeliveryReservation {
508    pub occurrence: TriggerOccurrenceRecord,
509    pub subscription: TriggerSubscriptionRecord,
510    pub process_id: String,
511}
512
513#[async_trait::async_trait]
514pub trait TriggerStore: Send + Sync {
515    fn durability_tier(&self) -> crate::DurabilityTier {
516        crate::DurabilityTier::Inline
517    }
518
519    async fn source_key_for_subscription(
520        &self,
521        source_type: &str,
522        source: &serde_json::Value,
523    ) -> Result<String, PluginError> {
524        default_trigger_source_key(source_type, source)
525    }
526
527    async fn register_subscription(
528        &self,
529        draft: TriggerSubscriptionDraft,
530    ) -> Result<TriggerSubscriptionRecord, PluginError>;
531
532    async fn list_subscriptions(
533        &self,
534        filter: TriggerSubscriptionFilter,
535    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
536
537    async fn cancel_subscription(
538        &self,
539        session_id: &str,
540        handle: &str,
541    ) -> Result<bool, PluginError>;
542
543    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
544
545    async fn record_occurrence(
546        &self,
547        request: TriggerOccurrenceRequest,
548    ) -> Result<TriggerOccurrenceRecord, PluginError>;
549
550    async fn reserve_matching_deliveries(
551        &self,
552        occurrence_id: &str,
553    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
554}
555
556#[derive(Default)]
557pub struct InMemoryTriggerStore {
558    state: Mutex<InMemoryTriggerEventState>,
559}
560
561#[derive(Default)]
562struct InMemoryTriggerEventState {
563    next_subscription_seq: u64,
564    subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
565    occurrences: BTreeMap<String, TriggerOccurrenceRecord>,
566    occurrence_id_by_idempotency_key: BTreeMap<String, String>,
567    occurrence_hashes: BTreeMap<String, String>,
568    deliveries: BTreeSet<(String, String)>,
569}
570
571#[async_trait::async_trait]
572impl TriggerStore for InMemoryTriggerStore {
573    async fn register_subscription(
574        &self,
575        draft: TriggerSubscriptionDraft,
576    ) -> Result<TriggerSubscriptionRecord, PluginError> {
577        draft.validate()?;
578        let mut state = self
579            .state
580            .lock()
581            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
582        state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
583        let handle = format!("trigger:{}", state.next_subscription_seq);
584        let subscription_id = format!("subscription:{}", state.next_subscription_seq);
585        let now = crate::runtime::current_epoch_ms();
586        let record = TriggerSubscriptionRecord {
587            subscription_id: subscription_id.clone(),
588            registrant: draft.registrant,
589            env_ref: draft.env_ref,
590            wake_target: draft.wake_target,
591            handle,
592            name: draft.name,
593            source_type: draft.source_type,
594            source_key: draft.source_key,
595            source: draft.source,
596            payload_schema: draft.payload_schema,
597            target: draft.target,
598            target_identity: draft.target_identity,
599            event_types: draft.event_types,
600            input_template: draft.input_template,
601            target_label: draft.target_label,
602            enabled: true,
603            created_at_ms: now,
604            updated_at_ms: now,
605        };
606        state.subscriptions.insert(subscription_id, record.clone());
607        Ok(record)
608    }
609
610    async fn list_subscriptions(
611        &self,
612        filter: TriggerSubscriptionFilter,
613    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
614        let state = self
615            .state
616            .lock()
617            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
618        let mut records = state
619            .subscriptions
620            .values()
621            .filter(|record| filter.matches(record))
622            .cloned()
623            .collect::<Vec<_>>();
624        records.sort_by(|left, right| {
625            left.registrant_scope_id()
626                .cmp(&right.registrant_scope_id())
627                .then_with(|| left.handle.cmp(&right.handle))
628        });
629        Ok(records)
630    }
631
632    async fn cancel_subscription(
633        &self,
634        session_id: &str,
635        handle: &str,
636    ) -> Result<bool, PluginError> {
637        let mut state = self
638            .state
639            .lock()
640            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
641        let now = crate::runtime::current_epoch_ms();
642        let Some(record) = state.subscriptions.values_mut().find(|record| {
643            record.registrant_session_id() == Some(session_id) && record.handle == handle
644        }) else {
645            return Ok(false);
646        };
647        let changed = record.enabled;
648        record.enabled = false;
649        record.updated_at_ms = now;
650        Ok(changed)
651    }
652
653    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
654        let mut state = self
655            .state
656            .lock()
657            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
658        let before = state.subscriptions.len();
659        state
660            .subscriptions
661            .retain(|_, record| record.registrant_session_id() != Some(session_id));
662        Ok(before.saturating_sub(state.subscriptions.len()))
663    }
664
665    async fn record_occurrence(
666        &self,
667        request: TriggerOccurrenceRequest,
668    ) -> Result<TriggerOccurrenceRecord, PluginError> {
669        validate_trigger_occurrence_request(&request)?;
670        let mut state = self
671            .state
672            .lock()
673            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
674        let request_hash = trigger_occurrence_request_hash(&request)?;
675        if let Some(existing_id) = state
676            .occurrence_id_by_idempotency_key
677            .get(&request.idempotency_key)
678            .cloned()
679        {
680            let existing_hash = state
681                .occurrence_hashes
682                .get(&existing_id)
683                .cloned()
684                .unwrap_or_default();
685            if existing_hash != request_hash {
686                return Err(PluginError::Session(format!(
687                    "trigger occurrence idempotency conflict for `{}`",
688                    request.idempotency_key
689                )));
690            }
691            return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
692                PluginError::Session(format!(
693                    "missing trigger occurrence `{existing_id}` for idempotency key"
694                ))
695            });
696        }
697        let occurrence_id = deterministic_occurrence_id(&request)?;
698        let record = TriggerOccurrenceRecord {
699            occurrence_id: occurrence_id.clone(),
700            source_type: request.source_type,
701            source_key: request.source_key,
702            payload: request.payload,
703            idempotency_key: request.idempotency_key.clone(),
704            source: request.source,
705            occurred_at_ms: crate::runtime::current_epoch_ms(),
706        };
707        state
708            .occurrence_id_by_idempotency_key
709            .insert(request.idempotency_key, occurrence_id.clone());
710        state
711            .occurrence_hashes
712            .insert(occurrence_id.clone(), request_hash);
713        state.occurrences.insert(occurrence_id, record.clone());
714        Ok(record)
715    }
716
717    async fn reserve_matching_deliveries(
718        &self,
719        occurrence_id: &str,
720    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
721        let mut state = self
722            .state
723            .lock()
724            .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
725        let occurrence = state
726            .occurrences
727            .get(occurrence_id)
728            .cloned()
729            .ok_or_else(|| {
730                PluginError::Session(format!("unknown trigger occurrence `{occurrence_id}`"))
731            })?;
732        let subscriptions = state
733            .subscriptions
734            .values()
735            .filter(|record| {
736                record.enabled
737                    && record.source_type == occurrence.source_type
738                    && record.source_key == occurrence.source_key
739            })
740            .cloned()
741            .collect::<Vec<_>>();
742        let mut deliveries = Vec::new();
743        for subscription in subscriptions {
744            let key = (
745                occurrence.occurrence_id.clone(),
746                subscription.subscription_id.clone(),
747            );
748            if !state.deliveries.insert(key) {
749                continue;
750            }
751            let process_id = deterministic_delivery_process_id(
752                &occurrence.occurrence_id,
753                &subscription.subscription_id,
754            )?;
755            deliveries.push(TriggerDeliveryReservation {
756                occurrence: occurrence.clone(),
757                subscription,
758                process_id,
759            });
760        }
761        Ok(deliveries)
762    }
763}
764
765fn default_enabled() -> bool {
766    true
767}
768
769pub fn default_trigger_source_key(
770    source_type: &str,
771    source: &serde_json::Value,
772) -> Result<String, PluginError> {
773    let digest = crate::stable_hash::stable_json_sha256_hex(&(source_type, source))
774        .map_err(|err| PluginError::Session(format!("failed to hash trigger source key: {err}")))?;
775    Ok(format!("source:{source_type}:sha256:{digest}"))
776}
777
778pub fn empty_trigger_source_key(source_type: &str) -> Result<String, PluginError> {
779    default_trigger_source_key(source_type, &serde_json::json!({}))
780}
781
782pub fn deterministic_occurrence_id(
783    request: &TriggerOccurrenceRequest,
784) -> Result<String, PluginError> {
785    let digest = crate::stable_hash::stable_json_sha256_hex(&(
786        request.source_type.as_str(),
787        request.source_key.as_str(),
788        request.idempotency_key.as_str(),
789    ))
790    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))?;
791    Ok(format!("trigger:{digest}"))
792}
793
794pub fn deterministic_delivery_process_id(
795    occurrence_id: &str,
796    subscription_id: &str,
797) -> Result<String, PluginError> {
798    let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
799        .map_err(|err| PluginError::Session(format!("failed to hash trigger delivery: {err}")))?;
800    Ok(format!("process:trigger:{digest}"))
801}
802
803#[derive(Clone)]
804pub struct TriggerRouter {
805    store: Arc<dyn TriggerStore>,
806    process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
807    process_work_poke: Option<crate::ProcessWorkPoke>,
808}
809
810impl TriggerRouter {
811    pub fn new(
812        store: Arc<dyn TriggerStore>,
813        process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
814        process_work_poke: Option<crate::ProcessWorkPoke>,
815    ) -> Self {
816        Self {
817            store,
818            process_registry,
819            process_work_poke,
820        }
821    }
822
823    pub fn store(&self) -> Arc<dyn TriggerStore> {
824        Arc::clone(&self.store)
825    }
826
827    pub async fn emit(
828        &self,
829        request: TriggerOccurrenceRequest,
830        effect_controller: &dyn crate::RuntimeEffectController,
831    ) -> Result<TriggerEmitReport, PluginError> {
832        let occurrence = self.store.record_occurrence(request).await?;
833        let reservations = self
834            .store
835            .reserve_matching_deliveries(&occurrence.occurrence_id)
836            .await?;
837        let Some(process_registry) = self.process_registry.as_ref() else {
838            if reservations.is_empty() {
839                return Ok(TriggerEmitReport::new(occurrence.occurrence_id, Vec::new()));
840            }
841            return Err(PluginError::Session(
842                "trigger delivery requires a process registry".to_string(),
843            ));
844        };
845        let mut started_process_ids = Vec::new();
846        let mut start_errors = Vec::new();
847        for reservation in reservations {
848            let process_id = reservation.process_id.clone();
849            if let Err(err) = self
850                .start_delivery(
851                    &reservation,
852                    Arc::clone(process_registry),
853                    effect_controller,
854                )
855                .await
856            {
857                start_errors.push(format!(
858                    "{}: {err}",
859                    reservation.subscription.subscription_id
860                ));
861                continue;
862            }
863            started_process_ids.push(process_id);
864        }
865        if !started_process_ids.is_empty()
866            && let Some(poke) = self.process_work_poke.as_ref()
867        {
868            poke.poke();
869        }
870        if started_process_ids.is_empty()
871            && let Some(message) = trigger_delivery_failure_summary(&start_errors)
872        {
873            return Err(PluginError::Session(message));
874        }
875        Ok(TriggerEmitReport::new(
876            occurrence.occurrence_id,
877            started_process_ids,
878        ))
879    }
880
881    async fn start_delivery(
882        &self,
883        reservation: &TriggerDeliveryReservation,
884        process_registry: Arc<dyn crate::ProcessRegistry>,
885        effect_controller: &dyn crate::RuntimeEffectController,
886    ) -> Result<(), PluginError> {
887        let subscription = &reservation.subscription;
888        let occurrence = &reservation.occurrence;
889        subscription
890            .payload_schema
891            .validate(&occurrence.payload)
892            .map_err(|err| {
893                PluginError::Session(format!(
894                    "invalid payload for trigger `{}`: {err}",
895                    subscription.handle
896                ))
897            })?;
898        let args =
899            materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
900        let target = apply_trigger_inputs(subscription.target.clone(), args)?;
901        let originator_scope_id = subscription.registrant_scope_id();
902        let trigger_occurrence_invocation = crate::runtime::causal::trigger_occurrence_invocation(
903            &originator_scope_id,
904            &occurrence.occurrence_id,
905        );
906        let registration = crate::ProcessRegistration::new(
907            reservation.process_id.clone(),
908            target.clone(),
909            crate::ProcessProvenance::new(subscription.registrant.clone())
910                .with_caused_by(trigger_occurrence_invocation.causal_ref()),
911        )
912        .with_identity(subscription.target_identity.clone())
913        .with_extra_event_types(subscription.event_types.clone())
914        .with_execution_env_ref(Some(subscription.env_ref.clone()))
915        .with_wake_target(subscription.wake_target.clone());
916        let descriptor_kind = subscription.target_identity.kind.clone();
917        let grant =
918            subscription
919                .wake_target
920                .clone()
921                .map(|session_scope| crate::ProcessStartGrant {
922                    session_scope,
923                    descriptor: crate::ProcessHandleDescriptor::new(
924                        Some(descriptor_kind.as_str()),
925                        subscription.target_label.as_deref(),
926                    ),
927                });
928        let execution_context = crate::ProcessExecutionContext::default()
929            .with_causal_invocation(Some(trigger_occurrence_invocation));
930        let command = crate::ProcessCommand::Start {
931            registration,
932            grant,
933            execution_context: Box::new(execution_context),
934        };
935        let effect_id = command.effect_id();
936        let invocation = crate::RuntimeInvocation::effect(
937            crate::RuntimeScope::new(originator_scope_id),
938            effect_id.clone(),
939            crate::RuntimeEffectKind::Process,
940            format!(
941                "trigger:{}:{}",
942                occurrence.occurrence_id, subscription.subscription_id
943            ),
944        )
945        .with_caused_by(Some(crate::CausalRef::TriggerOccurrence {
946            occurrence_id: occurrence.occurrence_id.clone(),
947        }));
948        let outcome = effect_controller
949            .execute_effect(
950                crate::RuntimeEffectEnvelope::new(
951                    invocation,
952                    crate::RuntimeEffectCommand::process(command),
953                ),
954                crate::RuntimeEffectLocalExecutor::processes(process_registry),
955            )
956            .await?;
957        match outcome {
958            crate::RuntimeEffectOutcome::Process {
959                result: crate::ProcessEffectOutcome::Start { .. },
960            } => Ok(()),
961            other => Err(PluginError::Session(format!(
962                "trigger process start returned the wrong outcome: {}",
963                other.kind().as_str()
964            ))),
965        }
966    }
967}
968
969fn trigger_delivery_failure_summary(errors: &[String]) -> Option<String> {
970    match errors {
971        [] => None,
972        [only] => Some(format!("trigger delivery failed: {only}")),
973        [first, rest @ ..] => Some(format!(
974            "trigger delivery failed for {} matching subscriptions: {first}; {} more failed",
975            errors.len(),
976            rest.len()
977        )),
978    }
979}
980
981fn materialize_trigger_process_args(
982    input_template: &BTreeMap<String, TriggerInputBinding>,
983    event_payload: &serde_json::Value,
984) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
985    let mut args = serde_json::Map::new();
986    for (input_name, input) in input_template {
987        let value = match input {
988            TriggerInputBinding::Event => event_payload.clone(),
989            TriggerInputBinding::Fixed { value } => value.clone(),
990        };
991        args.insert(input_name.to_string(), value);
992    }
993    Ok(args)
994}
995
996fn apply_trigger_inputs(
997    mut target: crate::ProcessInput,
998    args: serde_json::Map<String, serde_json::Value>,
999) -> Result<crate::ProcessInput, PluginError> {
1000    match &mut target {
1001        crate::ProcessInput::Engine { payload, .. } => {
1002            let object = payload.as_object_mut().ok_or_else(|| {
1003                PluginError::Session(
1004                    "trigger engine target payload must be a JSON object".to_string(),
1005                )
1006            })?;
1007            object.insert("args".to_string(), serde_json::Value::Object(args));
1008            Ok(target)
1009        }
1010        other => Err(PluginError::Session(format!(
1011            "trigger target must be an engine process, got {}",
1012            other.engine_kind()
1013        ))),
1014    }
1015}
1016
1017pub fn validate_trigger_occurrence_request(
1018    request: &TriggerOccurrenceRequest,
1019) -> Result<(), PluginError> {
1020    if request.source_type.trim().is_empty() {
1021        return Err(PluginError::Session(
1022            "trigger occurrence requires source_type".to_string(),
1023        ));
1024    }
1025    if request.source_key.trim().is_empty() {
1026        return Err(PluginError::Session(
1027            "trigger occurrence requires source_key".to_string(),
1028        ));
1029    }
1030    if request.idempotency_key.trim().is_empty() {
1031        return Err(PluginError::Session(
1032            "trigger occurrence requires idempotency_key".to_string(),
1033        ));
1034    }
1035    Ok(())
1036}
1037
1038pub fn trigger_occurrence_request_hash(
1039    request: &TriggerOccurrenceRequest,
1040) -> Result<String, PluginError> {
1041    crate::stable_hash::stable_json_sha256_hex(&(
1042        request.source_type.as_str(),
1043        request.source_key.as_str(),
1044        &request.payload,
1045        &request.source,
1046    ))
1047    .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))
1048}
1049
1050#[cfg(test)]
1051mod tests {
1052    use super::*;
1053
1054    fn button_payload_schema() -> crate::LashSchema {
1055        crate::LashSchema::any()
1056    }
1057
1058    #[test]
1059    fn trigger_catalog_rejects_duplicate_trigger_source_identity() {
1060        let mut catalog = TriggerEventCatalog::new();
1061        catalog
1062            .declare(TriggerEvent::new(
1063                "Button",
1064                "ui.button",
1065                "pressed",
1066                button_payload_schema(),
1067            ))
1068            .expect("first trigger occurrence");
1069
1070        let err = catalog
1071            .declare(TriggerEvent::new(
1072                "AlternateButton",
1073                "ui.button",
1074                "pressed",
1075                button_payload_schema(),
1076            ))
1077            .expect_err("duplicate public source identity should be rejected");
1078
1079        assert!(err.contains("duplicate trigger source `ui.button.pressed`"));
1080    }
1081
1082    #[tokio::test]
1083    async fn trigger_store_rejects_mismatched_target_label() {
1084        let store = InMemoryTriggerStore::default();
1085        let draft = TriggerSubscriptionDraft::for_process(
1086            crate::ProcessOriginator::host(),
1087            crate::ProcessExecutionEnvRef::new("process-env:test"),
1088            "ui.button.pressed",
1089            "source-key",
1090            crate::ProcessInput::External {
1091                metadata: serde_json::json!({}),
1092            },
1093            crate::ProcessIdentity::new("external").with_label(Some("expected")),
1094        )
1095        .with_target_label("other");
1096
1097        let err = store
1098            .register_subscription(draft)
1099            .await
1100            .expect_err("mismatched target labels should be rejected");
1101        assert!(err.to_string().contains("target_label must match"));
1102    }
1103}