1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
9pub struct TriggerEvent {
10 pub resource_type: String,
11 pub alias: String,
12 pub event: String,
13 pub payload_schema: crate::LashSchema,
14}
15
16impl TriggerEvent {
17 pub fn new(
18 resource_type: impl Into<String>,
19 alias: impl Into<String>,
20 event: impl Into<String>,
21 payload_schema: crate::LashSchema,
22 ) -> Self {
23 Self {
24 resource_type: resource_type.into(),
25 alias: alias.into(),
26 event: event.into(),
27 payload_schema,
28 }
29 }
30
31 pub fn payload_schema(&self) -> &crate::LashSchema {
32 &self.payload_schema
33 }
34
35 pub fn key(&self) -> TriggerEventKey {
36 TriggerEventKey {
37 resource_type: self.resource_type.clone(),
38 alias: self.alias.clone(),
39 event: self.event.clone(),
40 }
41 }
42
43 pub fn source_type(&self) -> String {
44 trigger_event_type(&self.alias, &self.event)
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct TriggerEventKey {
50 pub resource_type: String,
51 pub alias: String,
52 pub event: String,
53}
54
55impl TriggerEventKey {
56 pub fn new(
57 resource_type: impl Into<String>,
58 alias: impl Into<String>,
59 event: impl Into<String>,
60 ) -> Self {
61 Self {
62 resource_type: resource_type.into(),
63 alias: alias.into(),
64 event: event.into(),
65 }
66 }
67
68 pub fn source_type(&self) -> String {
69 trigger_event_type(&self.alias, &self.event)
70 }
71}
72
73pub fn trigger_event_type(alias: &str, event: &str) -> String {
74 format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
78pub struct TriggerEventCatalog {
79 events: BTreeMap<TriggerEventKey, TriggerEvent>,
80}
81
82impl TriggerEventCatalog {
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn declare(&mut self, event: TriggerEvent) -> Result<(), String> {
88 let key = event.key();
89 if self.events.contains_key(&key) {
90 return Err(format!(
91 "duplicate trigger occurrence `{}.{}.{}`",
92 key.resource_type, key.alias, key.event
93 ));
94 }
95 let source_type = event.source_type();
96 if let Some(existing) = self
97 .events
98 .values()
99 .find(|existing| existing.source_type() == source_type)
100 {
101 return Err(format!(
102 "duplicate trigger source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103 existing.resource_type,
104 existing.alias,
105 existing.event,
106 key.resource_type,
107 key.alias,
108 key.event
109 ));
110 }
111 self.events.insert(key, event);
112 Ok(())
113 }
114
115 pub fn from_events(events: impl IntoIterator<Item = TriggerEvent>) -> Result<Self, String> {
116 let mut catalog = Self::new();
117 for event in events {
118 catalog.declare(event)?;
119 }
120 Ok(catalog)
121 }
122
123 pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&TriggerEvent> {
124 self.events
125 .get(&TriggerEventKey::new(resource_type, alias, event))
126 }
127
128 pub fn is_empty(&self) -> bool {
129 self.events.is_empty()
130 }
131
132 pub fn events(&self) -> impl Iterator<Item = &TriggerEvent> {
133 self.events.values()
134 }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerEmitReport {
139 #[serde(default, skip_serializing_if = "String::is_empty")]
140 pub occurrence_id: String,
141 pub started_process_ids: Vec<String>,
142}
143
144impl TriggerEmitReport {
145 pub fn empty() -> Self {
146 Self::default()
147 }
148
149 fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150 Self {
151 occurrence_id,
152 started_process_ids,
153 }
154 }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct TriggerOccurrenceRequest {
159 pub source_type: String,
160 pub source_key: String,
161 #[serde(default)]
162 pub payload: serde_json::Value,
163 pub idempotency_key: String,
164 #[serde(default, skip_serializing_if = "Option::is_none")]
165 pub source: Option<serde_json::Value>,
166}
167
168impl TriggerOccurrenceRequest {
169 pub fn new(
170 source_type: impl Into<String>,
171 source_key: impl Into<String>,
172 payload: serde_json::Value,
173 idempotency_key: impl Into<String>,
174 ) -> Self {
175 Self {
176 source_type: source_type.into(),
177 source_key: source_key.into(),
178 payload,
179 idempotency_key: idempotency_key.into(),
180 source: None,
181 }
182 }
183
184 pub fn with_source(mut self, source: serde_json::Value) -> Self {
185 self.source = Some(source);
186 self
187 }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct TriggerOccurrenceRecord {
192 pub occurrence_id: String,
193 pub source_type: String,
194 pub source_key: String,
195 #[serde(default)]
196 pub payload: serde_json::Value,
197 pub idempotency_key: String,
198 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub source: Option<serde_json::Value>,
200 pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerEventType(String);
206
207impl TriggerEventType {
208 pub fn new(value: impl Into<String>) -> Self {
209 Self(value.into())
210 }
211
212 pub fn as_str(&self) -> &str {
213 &self.0
214 }
215}
216
217impl From<String> for TriggerEventType {
218 fn from(value: String) -> Self {
219 Self::new(value)
220 }
221}
222
223impl From<&str> for TriggerEventType {
224 fn from(value: &str) -> Self {
225 Self::new(value)
226 }
227}
228
229impl AsRef<str> for TriggerEventType {
230 fn as_ref(&self) -> &str {
231 self.as_str()
232 }
233}
234
235impl std::fmt::Display for TriggerEventType {
236 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 formatter.write_str(self.as_str())
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243 pub handle: String,
244 pub source_key: String,
245 #[serde(default, skip_serializing_if = "Option::is_none")]
246 pub name: Option<String>,
247 pub source_type: TriggerEventType,
248 pub source: serde_json::Value,
249 pub target: TriggerTargetSummary,
250 #[serde(default = "default_enabled")]
251 pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256 pub label: Option<String>,
257 pub identity: crate::ProcessIdentity,
258 pub input: crate::ProcessInput,
259 #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
260 pub inputs: BTreeMap<String, TriggerInputBinding>,
261}
262
263#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
264#[serde(tag = "type", rename_all = "snake_case")]
265pub enum TriggerInputBinding {
266 Event,
267 Fixed { value: serde_json::Value },
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct TriggerSubscriptionDraft {
272 pub registrant: crate::ProcessOriginator,
273 pub env_ref: crate::ProcessExecutionEnvRef,
274 #[serde(default, skip_serializing_if = "Option::is_none")]
275 pub wake_target: Option<crate::SessionScope>,
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub name: Option<String>,
278 pub source_type: String,
279 pub source_key: String,
280 pub source: serde_json::Value,
281 pub payload_schema: crate::LashSchema,
282 pub target: crate::ProcessInput,
283 pub target_identity: crate::ProcessIdentity,
284 #[serde(default)]
285 pub event_types: Vec<crate::ProcessEventType>,
286 #[serde(default)]
287 pub input_template: BTreeMap<String, TriggerInputBinding>,
288 #[serde(default, skip_serializing_if = "Option::is_none")]
289 pub target_label: Option<String>,
290}
291
292impl TriggerSubscriptionDraft {
293 pub fn for_process(
294 registrant: crate::ProcessOriginator,
295 env_ref: crate::ProcessExecutionEnvRef,
296 source_type: impl Into<String>,
297 source_key: impl Into<String>,
298 target: crate::ProcessInput,
299 target_identity: crate::ProcessIdentity,
300 ) -> Self {
301 let target_label = target_identity.label.clone();
302 Self {
303 registrant,
304 env_ref,
305 wake_target: None,
306 name: None,
307 source_type: source_type.into(),
308 source_key: source_key.into(),
309 source: serde_json::Value::Object(serde_json::Map::new()),
310 payload_schema: crate::LashSchema::new(serde_json::Value::Object(
311 serde_json::Map::new(),
312 )),
313 target,
314 target_identity,
315 event_types: Vec::new(),
316 input_template: BTreeMap::new(),
317 target_label,
318 }
319 }
320
321 pub fn with_name(mut self, name: impl Into<String>) -> Self {
322 self.name = Some(name.into());
323 self
324 }
325
326 pub fn with_source(mut self, source: serde_json::Value) -> Self {
327 self.source = source;
328 self
329 }
330
331 pub fn with_payload_schema(mut self, payload_schema: crate::LashSchema) -> Self {
332 self.payload_schema = payload_schema;
333 self
334 }
335
336 pub fn with_wake_target(mut self, wake_target: crate::SessionScope) -> Self {
337 self.wake_target = Some(wake_target);
338 self
339 }
340
341 pub fn with_event_types(
342 mut self,
343 event_types: impl IntoIterator<Item = crate::ProcessEventType>,
344 ) -> Self {
345 self.event_types = event_types.into_iter().collect();
346 self
347 }
348
349 pub fn with_input_template(
350 mut self,
351 input_template: BTreeMap<String, TriggerInputBinding>,
352 ) -> Self {
353 self.input_template = input_template;
354 self
355 }
356
357 pub fn with_target_label(mut self, target_label: impl Into<String>) -> Self {
358 self.target_label = Some(target_label.into());
359 self
360 }
361
362 pub fn validate(&self) -> Result<(), PluginError> {
363 validate_trigger_subscription_target_label(
364 self.target_label.as_deref(),
365 self.target_identity.label.as_deref(),
366 )
367 }
368}
369
370#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
371pub struct TriggerSubscriptionRecord {
372 pub subscription_id: String,
373 pub registrant: crate::ProcessOriginator,
374 pub env_ref: crate::ProcessExecutionEnvRef,
375 #[serde(default, skip_serializing_if = "Option::is_none")]
376 pub wake_target: Option<crate::SessionScope>,
377 pub handle: String,
378 #[serde(default, skip_serializing_if = "Option::is_none")]
379 pub name: Option<String>,
380 pub source_type: String,
381 pub source_key: String,
382 pub source: serde_json::Value,
383 pub payload_schema: crate::LashSchema,
384 pub target: crate::ProcessInput,
385 pub target_identity: crate::ProcessIdentity,
386 #[serde(default)]
387 pub event_types: Vec<crate::ProcessEventType>,
388 #[serde(default)]
389 pub input_template: BTreeMap<String, TriggerInputBinding>,
390 #[serde(default, skip_serializing_if = "Option::is_none")]
391 pub target_label: Option<String>,
392 #[serde(default = "default_enabled")]
393 pub enabled: bool,
394 pub created_at_ms: u64,
395 pub updated_at_ms: u64,
396}
397
398impl TriggerSubscriptionRecord {
399 pub fn registrant_scope_id(&self) -> String {
400 self.registrant.scope_id()
401 }
402
403 pub fn registrant_session_id(&self) -> Option<&str> {
404 match &self.registrant {
405 crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
406 crate::ProcessOriginator::Host => None,
407 }
408 }
409}
410
411fn validate_trigger_subscription_target_label(
412 target_label: Option<&str>,
413 identity_label: Option<&str>,
414) -> Result<(), PluginError> {
415 match (target_label, identity_label) {
416 (Some(target_label), Some(identity_label)) if target_label != identity_label => {
417 Err(PluginError::Session(
418 "trigger target_label must match target_identity.label when both are present"
419 .to_string(),
420 ))
421 }
422 _ => Ok(()),
423 }
424}
425
426impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
427 fn from(route: &TriggerSubscriptionRecord) -> Self {
428 Self {
429 handle: route.handle.clone(),
430 source_key: route.source_key.clone(),
431 name: route.name.clone(),
432 source_type: TriggerEventType::new(route.source_type.clone()),
433 source: route.source.clone(),
434 target: TriggerTargetSummary {
435 label: route.target_label.clone(),
436 identity: route.target_identity.clone(),
437 input: route.target.clone(),
438 inputs: route.input_template.clone(),
439 },
440 enabled: route.enabled,
441 }
442 }
443}
444
445#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
446pub struct TriggerSubscriptionFilter {
447 #[serde(default, skip_serializing_if = "Option::is_none")]
448 pub session_id: Option<String>,
449 #[serde(default, skip_serializing_if = "Option::is_none")]
450 pub handle: Option<String>,
451 #[serde(default, skip_serializing_if = "Option::is_none")]
452 pub name: Option<String>,
453 #[serde(default, skip_serializing_if = "Option::is_none")]
454 pub source_type: Option<String>,
455 #[serde(default, skip_serializing_if = "Option::is_none")]
456 pub source_key: Option<String>,
457 #[serde(default, skip_serializing_if = "Option::is_none")]
458 pub target: Option<serde_json::Value>,
459 #[serde(default, skip_serializing_if = "Option::is_none")]
460 pub enabled: Option<bool>,
461}
462
463impl TriggerSubscriptionFilter {
464 pub fn for_session(session_id: impl Into<String>) -> Self {
465 Self {
466 session_id: Some(session_id.into()),
467 ..Self::default()
468 }
469 }
470
471 pub fn for_source_type(source_type: impl Into<String>) -> Self {
472 Self {
473 source_type: Some(source_type.into()),
474 ..Self::default()
475 }
476 }
477
478 pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
479 self.session_id
480 .as_deref()
481 .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
482 && self
483 .handle
484 .as_deref()
485 .is_none_or(|handle| record.handle == handle)
486 && self
487 .name
488 .as_deref()
489 .is_none_or(|name| record.name.as_deref() == Some(name))
490 && self
491 .source_type
492 .as_deref()
493 .is_none_or(|source_type| record.source_type == source_type)
494 && self
495 .source_key
496 .as_deref()
497 .is_none_or(|source_key| record.source_key == source_key)
498 && self.enabled.is_none_or(|enabled| record.enabled == enabled)
499 && self
500 .target
501 .as_ref()
502 .is_none_or(|target| record.target_identity.definition.as_ref() == Some(target))
503 }
504}
505
506#[derive(Clone, Debug, Serialize, Deserialize)]
507pub struct TriggerDeliveryReservation {
508 pub occurrence: TriggerOccurrenceRecord,
509 pub subscription: TriggerSubscriptionRecord,
510 pub process_id: String,
511}
512
513#[async_trait::async_trait]
514pub trait TriggerStore: Send + Sync {
515 fn durability_tier(&self) -> crate::DurabilityTier {
516 crate::DurabilityTier::Inline
517 }
518
519 async fn source_key_for_subscription(
520 &self,
521 source_type: &str,
522 source: &serde_json::Value,
523 ) -> Result<String, PluginError> {
524 default_trigger_source_key(source_type, source)
525 }
526
527 async fn register_subscription(
528 &self,
529 draft: TriggerSubscriptionDraft,
530 ) -> Result<TriggerSubscriptionRecord, PluginError>;
531
532 async fn list_subscriptions(
533 &self,
534 filter: TriggerSubscriptionFilter,
535 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
536
537 async fn cancel_subscription(
538 &self,
539 session_id: &str,
540 handle: &str,
541 ) -> Result<bool, PluginError>;
542
543 async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
544
545 async fn record_occurrence(
546 &self,
547 request: TriggerOccurrenceRequest,
548 ) -> Result<TriggerOccurrenceRecord, PluginError>;
549
550 async fn reserve_matching_deliveries(
551 &self,
552 occurrence_id: &str,
553 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
554}
555
556pub struct InMemoryTriggerStore {
557 clock: Arc<dyn crate::Clock>,
558 state: Mutex<InMemoryTriggerEventState>,
559}
560
561impl InMemoryTriggerStore {
562 pub fn new() -> Self {
563 Self::with_clock(Arc::new(crate::SystemClock))
564 }
565
566 pub fn with_clock(clock: Arc<dyn crate::Clock>) -> Self {
567 Self {
568 clock,
569 state: Mutex::new(InMemoryTriggerEventState::default()),
570 }
571 }
572}
573
574impl Default for InMemoryTriggerStore {
575 fn default() -> Self {
576 Self::new()
577 }
578}
579
580#[derive(Default)]
581struct InMemoryTriggerEventState {
582 next_subscription_seq: u64,
583 subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
584 occurrences: BTreeMap<String, TriggerOccurrenceRecord>,
585 occurrence_id_by_idempotency_key: BTreeMap<String, String>,
586 occurrence_hashes: BTreeMap<String, String>,
587 deliveries: BTreeSet<(String, String)>,
588}
589
590#[async_trait::async_trait]
591impl TriggerStore for InMemoryTriggerStore {
592 async fn register_subscription(
593 &self,
594 draft: TriggerSubscriptionDraft,
595 ) -> Result<TriggerSubscriptionRecord, PluginError> {
596 draft.validate()?;
597 let mut state = self
598 .state
599 .lock()
600 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
601 state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
602 let handle = format!("trigger:{}", state.next_subscription_seq);
603 let subscription_id = format!("subscription:{}", state.next_subscription_seq);
604 let now = self.clock.timestamp_ms();
605 let record = TriggerSubscriptionRecord {
606 subscription_id: subscription_id.clone(),
607 registrant: draft.registrant,
608 env_ref: draft.env_ref,
609 wake_target: draft.wake_target,
610 handle,
611 name: draft.name,
612 source_type: draft.source_type,
613 source_key: draft.source_key,
614 source: draft.source,
615 payload_schema: draft.payload_schema,
616 target: draft.target,
617 target_identity: draft.target_identity,
618 event_types: draft.event_types,
619 input_template: draft.input_template,
620 target_label: draft.target_label,
621 enabled: true,
622 created_at_ms: now,
623 updated_at_ms: now,
624 };
625 state.subscriptions.insert(subscription_id, record.clone());
626 Ok(record)
627 }
628
629 async fn list_subscriptions(
630 &self,
631 filter: TriggerSubscriptionFilter,
632 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
633 let state = self
634 .state
635 .lock()
636 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
637 let mut records = state
638 .subscriptions
639 .values()
640 .filter(|record| filter.matches(record))
641 .cloned()
642 .collect::<Vec<_>>();
643 records.sort_by(|left, right| {
644 left.registrant_scope_id()
645 .cmp(&right.registrant_scope_id())
646 .then_with(|| left.handle.cmp(&right.handle))
647 });
648 Ok(records)
649 }
650
651 async fn cancel_subscription(
652 &self,
653 session_id: &str,
654 handle: &str,
655 ) -> Result<bool, PluginError> {
656 let mut state = self
657 .state
658 .lock()
659 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
660 let now = self.clock.timestamp_ms();
661 let Some(record) = state.subscriptions.values_mut().find(|record| {
662 record.registrant_session_id() == Some(session_id) && record.handle == handle
663 }) else {
664 return Ok(false);
665 };
666 let changed = record.enabled;
667 record.enabled = false;
668 record.updated_at_ms = now;
669 Ok(changed)
670 }
671
672 async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
673 let mut state = self
674 .state
675 .lock()
676 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
677 let before = state.subscriptions.len();
678 state
679 .subscriptions
680 .retain(|_, record| record.registrant_session_id() != Some(session_id));
681 Ok(before.saturating_sub(state.subscriptions.len()))
682 }
683
684 async fn record_occurrence(
685 &self,
686 request: TriggerOccurrenceRequest,
687 ) -> Result<TriggerOccurrenceRecord, PluginError> {
688 validate_trigger_occurrence_request(&request)?;
689 let mut state = self
690 .state
691 .lock()
692 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
693 let request_hash = trigger_occurrence_request_hash(&request)?;
694 if let Some(existing_id) = state
695 .occurrence_id_by_idempotency_key
696 .get(&request.idempotency_key)
697 .cloned()
698 {
699 let existing_hash = state
700 .occurrence_hashes
701 .get(&existing_id)
702 .cloned()
703 .unwrap_or_default();
704 if existing_hash != request_hash {
705 return Err(PluginError::Session(format!(
706 "trigger occurrence idempotency conflict for `{}`",
707 request.idempotency_key
708 )));
709 }
710 return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
711 PluginError::Session(format!(
712 "missing trigger occurrence `{existing_id}` for idempotency key"
713 ))
714 });
715 }
716 let occurrence_id = deterministic_occurrence_id(&request)?;
717 let record = TriggerOccurrenceRecord {
718 occurrence_id: occurrence_id.clone(),
719 source_type: request.source_type,
720 source_key: request.source_key,
721 payload: request.payload,
722 idempotency_key: request.idempotency_key.clone(),
723 source: request.source,
724 occurred_at_ms: self.clock.timestamp_ms(),
725 };
726 state
727 .occurrence_id_by_idempotency_key
728 .insert(request.idempotency_key, occurrence_id.clone());
729 state
730 .occurrence_hashes
731 .insert(occurrence_id.clone(), request_hash);
732 state.occurrences.insert(occurrence_id, record.clone());
733 Ok(record)
734 }
735
736 async fn reserve_matching_deliveries(
737 &self,
738 occurrence_id: &str,
739 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
740 let mut state = self
741 .state
742 .lock()
743 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
744 let occurrence = state
745 .occurrences
746 .get(occurrence_id)
747 .cloned()
748 .ok_or_else(|| {
749 PluginError::Session(format!("unknown trigger occurrence `{occurrence_id}`"))
750 })?;
751 let subscriptions = state
752 .subscriptions
753 .values()
754 .filter(|record| {
755 record.enabled
756 && record.source_type == occurrence.source_type
757 && record.source_key == occurrence.source_key
758 })
759 .cloned()
760 .collect::<Vec<_>>();
761 let mut deliveries = Vec::new();
762 for subscription in subscriptions {
763 let key = (
764 occurrence.occurrence_id.clone(),
765 subscription.subscription_id.clone(),
766 );
767 if !state.deliveries.insert(key) {
768 continue;
769 }
770 let process_id = deterministic_delivery_process_id(
771 &occurrence.occurrence_id,
772 &subscription.subscription_id,
773 )?;
774 deliveries.push(TriggerDeliveryReservation {
775 occurrence: occurrence.clone(),
776 subscription,
777 process_id,
778 });
779 }
780 Ok(deliveries)
781 }
782}
783
784fn default_enabled() -> bool {
785 true
786}
787
788pub fn default_trigger_source_key(
789 source_type: &str,
790 source: &serde_json::Value,
791) -> Result<String, PluginError> {
792 let digest = crate::stable_hash::stable_json_sha256_hex(&(source_type, source))
793 .map_err(|err| PluginError::Session(format!("failed to hash trigger source key: {err}")))?;
794 Ok(format!("source:{source_type}:sha256:{digest}"))
795}
796
797pub fn empty_trigger_source_key(source_type: &str) -> Result<String, PluginError> {
798 default_trigger_source_key(source_type, &serde_json::json!({}))
799}
800
801pub fn deterministic_occurrence_id(
802 request: &TriggerOccurrenceRequest,
803) -> Result<String, PluginError> {
804 let digest = crate::stable_hash::stable_json_sha256_hex(&(
805 request.source_type.as_str(),
806 request.source_key.as_str(),
807 request.idempotency_key.as_str(),
808 ))
809 .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))?;
810 Ok(format!("trigger:{digest}"))
811}
812
813pub fn deterministic_delivery_process_id(
814 occurrence_id: &str,
815 subscription_id: &str,
816) -> Result<String, PluginError> {
817 let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
818 .map_err(|err| PluginError::Session(format!("failed to hash trigger delivery: {err}")))?;
819 Ok(format!("process:trigger:{digest}"))
820}
821
822#[derive(Clone)]
823pub struct TriggerRouter {
824 store: Arc<dyn TriggerStore>,
825 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
826 process_work_driver: Option<crate::ProcessWorkDriver>,
827}
828
829impl TriggerRouter {
830 pub fn new(
831 store: Arc<dyn TriggerStore>,
832 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
833 process_work_driver: Option<crate::ProcessWorkDriver>,
834 ) -> Self {
835 Self {
836 store,
837 process_registry,
838 process_work_driver,
839 }
840 }
841
842 pub fn store(&self) -> Arc<dyn TriggerStore> {
843 Arc::clone(&self.store)
844 }
845
846 pub async fn emit(
847 &self,
848 request: TriggerOccurrenceRequest,
849 effect_controller: &dyn crate::RuntimeEffectController,
850 ) -> Result<TriggerEmitReport, PluginError> {
851 let occurrence = self.store.record_occurrence(request).await?;
852 let reservations = self
853 .store
854 .reserve_matching_deliveries(&occurrence.occurrence_id)
855 .await?;
856 let Some(process_registry) = self.process_registry.as_ref() else {
857 if reservations.is_empty() {
858 return Ok(TriggerEmitReport::new(occurrence.occurrence_id, Vec::new()));
859 }
860 return Err(PluginError::Session(
861 "trigger delivery requires a process registry".to_string(),
862 ));
863 };
864 let mut started_process_ids = Vec::new();
865 let mut start_errors = Vec::new();
866 for reservation in reservations {
867 let process_id = reservation.process_id.clone();
868 if let Err(err) = self
869 .start_delivery(
870 &reservation,
871 Arc::clone(process_registry),
872 effect_controller,
873 )
874 .await
875 {
876 start_errors.push(format!(
877 "{}: {err}",
878 reservation.subscription.subscription_id
879 ));
880 continue;
881 }
882 started_process_ids.push(process_id);
883 }
884 if !started_process_ids.is_empty()
885 && let Some(driver) = self.process_work_driver.as_ref()
886 {
887 driver.claim_and_run_pending("trigger_delivery").await?;
888 }
889 if started_process_ids.is_empty()
890 && let Some(message) = trigger_delivery_failure_summary(&start_errors)
891 {
892 return Err(PluginError::Session(message));
893 }
894 Ok(TriggerEmitReport::new(
895 occurrence.occurrence_id,
896 started_process_ids,
897 ))
898 }
899
900 async fn start_delivery(
901 &self,
902 reservation: &TriggerDeliveryReservation,
903 process_registry: Arc<dyn crate::ProcessRegistry>,
904 effect_controller: &dyn crate::RuntimeEffectController,
905 ) -> Result<(), PluginError> {
906 let subscription = &reservation.subscription;
907 let occurrence = &reservation.occurrence;
908 subscription
909 .payload_schema
910 .validate(&occurrence.payload)
911 .map_err(|err| {
912 PluginError::Session(format!(
913 "invalid payload for trigger `{}`: {err}",
914 subscription.handle
915 ))
916 })?;
917 let args =
918 materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
919 let target = apply_trigger_inputs(subscription.target.clone(), args)?;
920 let originator_scope_id = subscription.registrant_scope_id();
921 let trigger_occurrence_invocation = crate::runtime::causal::trigger_occurrence_invocation(
922 &originator_scope_id,
923 &occurrence.occurrence_id,
924 );
925 let registration = crate::ProcessRegistration::new(
926 reservation.process_id.clone(),
927 target.clone(),
928 crate::ProcessProvenance::new(subscription.registrant.clone())
929 .with_caused_by(trigger_occurrence_invocation.causal_ref()),
930 )
931 .with_identity(subscription.target_identity.clone())
932 .with_extra_event_types(subscription.event_types.clone())
933 .with_execution_env_ref(Some(subscription.env_ref.clone()))
934 .with_wake_target(subscription.wake_target.clone());
935 let descriptor_kind = subscription.target_identity.kind.clone();
936 let grant =
937 subscription
938 .wake_target
939 .clone()
940 .map(|session_scope| crate::ProcessStartGrant {
941 session_scope,
942 descriptor: crate::ProcessHandleDescriptor::new(
943 Some(descriptor_kind.as_str()),
944 subscription.target_label.as_deref(),
945 ),
946 });
947 let execution_context = crate::ProcessExecutionContext::default()
948 .with_causal_invocation(Some(trigger_occurrence_invocation));
949 let command = crate::ProcessCommand::Start {
950 registration,
951 grant,
952 execution_context: Box::new(execution_context),
953 };
954 let effect_id = command.effect_id();
955 let invocation = crate::RuntimeInvocation::effect(
956 crate::RuntimeScope::new(originator_scope_id),
957 effect_id.clone(),
958 crate::RuntimeEffectKind::Process,
959 format!(
960 "trigger:{}:{}",
961 occurrence.occurrence_id, subscription.subscription_id
962 ),
963 )
964 .with_caused_by(Some(crate::CausalRef::TriggerOccurrence {
965 occurrence_id: occurrence.occurrence_id.clone(),
966 }));
967 let outcome = effect_controller
968 .execute_effect(
969 crate::RuntimeEffectEnvelope::new(
970 invocation,
971 crate::RuntimeEffectCommand::process(command),
972 ),
973 crate::RuntimeEffectLocalExecutor::processes(process_registry),
974 )
975 .await?;
976 match outcome {
977 crate::RuntimeEffectOutcome::Process {
978 result: crate::ProcessEffectOutcome::Start { .. },
979 } => Ok(()),
980 other => Err(PluginError::Session(format!(
981 "trigger process start returned the wrong outcome: {}",
982 other.kind().as_str()
983 ))),
984 }
985 }
986}
987
988fn trigger_delivery_failure_summary(errors: &[String]) -> Option<String> {
989 match errors {
990 [] => None,
991 [only] => Some(format!("trigger delivery failed: {only}")),
992 [first, rest @ ..] => Some(format!(
993 "trigger delivery failed for {} matching subscriptions: {first}; {} more failed",
994 errors.len(),
995 rest.len()
996 )),
997 }
998}
999
1000fn materialize_trigger_process_args(
1001 input_template: &BTreeMap<String, TriggerInputBinding>,
1002 event_payload: &serde_json::Value,
1003) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
1004 let mut args = serde_json::Map::new();
1005 for (input_name, input) in input_template {
1006 let value = match input {
1007 TriggerInputBinding::Event => event_payload.clone(),
1008 TriggerInputBinding::Fixed { value } => value.clone(),
1009 };
1010 args.insert(input_name.to_string(), value);
1011 }
1012 Ok(args)
1013}
1014
1015fn apply_trigger_inputs(
1016 mut target: crate::ProcessInput,
1017 args: serde_json::Map<String, serde_json::Value>,
1018) -> Result<crate::ProcessInput, PluginError> {
1019 match &mut target {
1020 crate::ProcessInput::Engine { payload, .. } => {
1021 let object = payload.as_object_mut().ok_or_else(|| {
1022 PluginError::Session(
1023 "trigger engine target payload must be a JSON object".to_string(),
1024 )
1025 })?;
1026 object.insert("args".to_string(), serde_json::Value::Object(args));
1027 Ok(target)
1028 }
1029 other => Err(PluginError::Session(format!(
1030 "trigger target must be an engine process, got {}",
1031 other.engine_kind()
1032 ))),
1033 }
1034}
1035
1036pub fn validate_trigger_occurrence_request(
1037 request: &TriggerOccurrenceRequest,
1038) -> Result<(), PluginError> {
1039 if request.source_type.trim().is_empty() {
1040 return Err(PluginError::Session(
1041 "trigger occurrence requires source_type".to_string(),
1042 ));
1043 }
1044 if request.source_key.trim().is_empty() {
1045 return Err(PluginError::Session(
1046 "trigger occurrence requires source_key".to_string(),
1047 ));
1048 }
1049 if request.idempotency_key.trim().is_empty() {
1050 return Err(PluginError::Session(
1051 "trigger occurrence requires idempotency_key".to_string(),
1052 ));
1053 }
1054 Ok(())
1055}
1056
1057pub fn trigger_occurrence_request_hash(
1058 request: &TriggerOccurrenceRequest,
1059) -> Result<String, PluginError> {
1060 crate::stable_hash::stable_json_sha256_hex(&(
1061 request.source_type.as_str(),
1062 request.source_key.as_str(),
1063 &request.payload,
1064 &request.source,
1065 ))
1066 .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use super::*;
1072
1073 fn button_payload_schema() -> crate::LashSchema {
1074 crate::LashSchema::any()
1075 }
1076
1077 #[test]
1078 fn trigger_catalog_rejects_duplicate_trigger_source_identity() {
1079 let mut catalog = TriggerEventCatalog::new();
1080 catalog
1081 .declare(TriggerEvent::new(
1082 "Button",
1083 "ui.button",
1084 "pressed",
1085 button_payload_schema(),
1086 ))
1087 .expect("first trigger occurrence");
1088
1089 let err = catalog
1090 .declare(TriggerEvent::new(
1091 "AlternateButton",
1092 "ui.button",
1093 "pressed",
1094 button_payload_schema(),
1095 ))
1096 .expect_err("duplicate public source identity should be rejected");
1097
1098 assert!(err.contains("duplicate trigger source `ui.button.pressed`"));
1099 }
1100
1101 #[tokio::test]
1102 async fn trigger_store_rejects_mismatched_target_label() {
1103 let store = InMemoryTriggerStore::default();
1104 let draft = TriggerSubscriptionDraft::for_process(
1105 crate::ProcessOriginator::host(),
1106 crate::ProcessExecutionEnvRef::new("process-env:test"),
1107 "ui.button.pressed",
1108 "source-key",
1109 crate::ProcessInput::External {
1110 metadata: serde_json::json!({}),
1111 },
1112 crate::ProcessIdentity::new("external").with_label(Some("expected")),
1113 )
1114 .with_target_label("other");
1115
1116 let err = store
1117 .register_subscription(draft)
1118 .await
1119 .expect_err("mismatched target labels should be rejected");
1120 assert!(err.to_string().contains("target_label must match"));
1121 }
1122}