1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
9pub struct TriggerEvent {
10 pub resource_type: String,
11 pub alias: String,
12 pub event: String,
13 pub payload_schema: crate::LashSchema,
14}
15
16impl TriggerEvent {
17 pub fn new(
18 resource_type: impl Into<String>,
19 alias: impl Into<String>,
20 event: impl Into<String>,
21 payload_schema: crate::LashSchema,
22 ) -> Self {
23 Self {
24 resource_type: resource_type.into(),
25 alias: alias.into(),
26 event: event.into(),
27 payload_schema,
28 }
29 }
30
31 pub fn payload_schema(&self) -> &crate::LashSchema {
32 &self.payload_schema
33 }
34
35 pub fn key(&self) -> TriggerEventKey {
36 TriggerEventKey {
37 resource_type: self.resource_type.clone(),
38 alias: self.alias.clone(),
39 event: self.event.clone(),
40 }
41 }
42
43 pub fn source_type(&self) -> String {
44 trigger_event_type(&self.alias, &self.event)
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct TriggerEventKey {
50 pub resource_type: String,
51 pub alias: String,
52 pub event: String,
53}
54
55impl TriggerEventKey {
56 pub fn new(
57 resource_type: impl Into<String>,
58 alias: impl Into<String>,
59 event: impl Into<String>,
60 ) -> Self {
61 Self {
62 resource_type: resource_type.into(),
63 alias: alias.into(),
64 event: event.into(),
65 }
66 }
67
68 pub fn source_type(&self) -> String {
69 trigger_event_type(&self.alias, &self.event)
70 }
71}
72
73pub fn trigger_event_type(alias: &str, event: &str) -> String {
74 format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
78pub struct TriggerEventCatalog {
79 events: BTreeMap<TriggerEventKey, TriggerEvent>,
80}
81
82impl TriggerEventCatalog {
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn declare(&mut self, event: TriggerEvent) -> Result<(), String> {
88 let key = event.key();
89 if self.events.contains_key(&key) {
90 return Err(format!(
91 "duplicate trigger occurrence `{}.{}.{}`",
92 key.resource_type, key.alias, key.event
93 ));
94 }
95 let source_type = event.source_type();
96 if let Some(existing) = self
97 .events
98 .values()
99 .find(|existing| existing.source_type() == source_type)
100 {
101 return Err(format!(
102 "duplicate trigger source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103 existing.resource_type,
104 existing.alias,
105 existing.event,
106 key.resource_type,
107 key.alias,
108 key.event
109 ));
110 }
111 self.events.insert(key, event);
112 Ok(())
113 }
114
115 pub fn from_events(events: impl IntoIterator<Item = TriggerEvent>) -> Result<Self, String> {
116 let mut catalog = Self::new();
117 for event in events {
118 catalog.declare(event)?;
119 }
120 Ok(catalog)
121 }
122
123 pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&TriggerEvent> {
124 self.events
125 .get(&TriggerEventKey::new(resource_type, alias, event))
126 }
127
128 pub fn is_empty(&self) -> bool {
129 self.events.is_empty()
130 }
131
132 pub fn events(&self) -> impl Iterator<Item = &TriggerEvent> {
133 self.events.values()
134 }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerEmitReport {
139 #[serde(default, skip_serializing_if = "String::is_empty")]
140 pub occurrence_id: String,
141 pub started_process_ids: Vec<String>,
142}
143
144impl TriggerEmitReport {
145 pub fn empty() -> Self {
146 Self::default()
147 }
148
149 fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150 Self {
151 occurrence_id,
152 started_process_ids,
153 }
154 }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct TriggerOccurrenceRequest {
159 pub source_type: String,
160 pub source_key: String,
161 #[serde(default)]
162 pub payload: serde_json::Value,
163 pub idempotency_key: String,
164 #[serde(default, skip_serializing_if = "Option::is_none")]
165 pub source: Option<serde_json::Value>,
166}
167
168impl TriggerOccurrenceRequest {
169 pub fn new(
170 source_type: impl Into<String>,
171 source_key: impl Into<String>,
172 payload: serde_json::Value,
173 idempotency_key: impl Into<String>,
174 ) -> Self {
175 Self {
176 source_type: source_type.into(),
177 source_key: source_key.into(),
178 payload,
179 idempotency_key: idempotency_key.into(),
180 source: None,
181 }
182 }
183
184 pub fn with_source(mut self, source: serde_json::Value) -> Self {
185 self.source = Some(source);
186 self
187 }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct TriggerOccurrenceRecord {
192 pub occurrence_id: String,
193 pub source_type: String,
194 pub source_key: String,
195 #[serde(default)]
196 pub payload: serde_json::Value,
197 pub idempotency_key: String,
198 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub source: Option<serde_json::Value>,
200 pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerEventType(String);
206
207impl TriggerEventType {
208 pub fn new(value: impl Into<String>) -> Self {
209 Self(value.into())
210 }
211
212 pub fn as_str(&self) -> &str {
213 &self.0
214 }
215}
216
217impl From<String> for TriggerEventType {
218 fn from(value: String) -> Self {
219 Self::new(value)
220 }
221}
222
223impl From<&str> for TriggerEventType {
224 fn from(value: &str) -> Self {
225 Self::new(value)
226 }
227}
228
229impl AsRef<str> for TriggerEventType {
230 fn as_ref(&self) -> &str {
231 self.as_str()
232 }
233}
234
235impl std::fmt::Display for TriggerEventType {
236 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 formatter.write_str(self.as_str())
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243 pub handle: String,
244 pub source_key: String,
245 #[serde(default, skip_serializing_if = "Option::is_none")]
246 pub name: Option<String>,
247 pub source_type: TriggerEventType,
248 pub source: serde_json::Value,
249 pub target: TriggerTargetSummary,
250 #[serde(default = "default_enabled")]
251 pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256 pub label: Option<String>,
257 pub identity: crate::ProcessIdentity,
258 pub input: crate::ProcessInput,
259 #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
260 pub inputs: BTreeMap<String, TriggerInputBinding>,
261}
262
263#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
264#[serde(tag = "type", rename_all = "snake_case")]
265pub enum TriggerInputBinding {
266 Event,
267 Fixed { value: serde_json::Value },
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct TriggerSubscriptionDraft {
272 pub registrant: crate::ProcessOriginator,
273 pub env_ref: crate::ProcessExecutionEnvRef,
274 #[serde(default, skip_serializing_if = "Option::is_none")]
275 pub wake_target: Option<crate::SessionScope>,
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub name: Option<String>,
278 pub source_type: String,
279 pub source_key: String,
280 pub source: serde_json::Value,
281 pub payload_schema: crate::LashSchema,
282 pub target: crate::ProcessInput,
283 pub target_identity: crate::ProcessIdentity,
284 #[serde(default)]
285 pub event_types: Vec<crate::ProcessEventType>,
286 #[serde(default)]
287 pub input_template: BTreeMap<String, TriggerInputBinding>,
288 #[serde(default, skip_serializing_if = "Option::is_none")]
289 pub target_label: Option<String>,
290}
291
292#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
293pub struct TriggerSubscriptionRecord {
294 pub subscription_id: String,
295 pub registrant: crate::ProcessOriginator,
296 pub env_ref: crate::ProcessExecutionEnvRef,
297 #[serde(default, skip_serializing_if = "Option::is_none")]
298 pub wake_target: Option<crate::SessionScope>,
299 pub handle: String,
300 #[serde(default, skip_serializing_if = "Option::is_none")]
301 pub name: Option<String>,
302 pub source_type: String,
303 pub source_key: String,
304 pub source: serde_json::Value,
305 pub payload_schema: crate::LashSchema,
306 pub target: crate::ProcessInput,
307 pub target_identity: crate::ProcessIdentity,
308 #[serde(default)]
309 pub event_types: Vec<crate::ProcessEventType>,
310 #[serde(default)]
311 pub input_template: BTreeMap<String, TriggerInputBinding>,
312 #[serde(default, skip_serializing_if = "Option::is_none")]
313 pub target_label: Option<String>,
314 #[serde(default = "default_enabled")]
315 pub enabled: bool,
316 pub created_at_ms: u64,
317 pub updated_at_ms: u64,
318}
319
320impl TriggerSubscriptionRecord {
321 pub fn registrant_scope_id(&self) -> String {
322 self.registrant.scope_id()
323 }
324
325 pub fn registrant_session_id(&self) -> Option<&str> {
326 match &self.registrant {
327 crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
328 crate::ProcessOriginator::Host => None,
329 }
330 }
331}
332
333impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
334 fn from(route: &TriggerSubscriptionRecord) -> Self {
335 Self {
336 handle: route.handle.clone(),
337 source_key: route.source_key.clone(),
338 name: route.name.clone(),
339 source_type: TriggerEventType::new(route.source_type.clone()),
340 source: route.source.clone(),
341 target: TriggerTargetSummary {
342 label: route.target_label.clone(),
343 identity: route.target_identity.clone(),
344 input: route.target.clone(),
345 inputs: route.input_template.clone(),
346 },
347 enabled: route.enabled,
348 }
349 }
350}
351
352#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
353pub struct TriggerSubscriptionFilter {
354 #[serde(default, skip_serializing_if = "Option::is_none")]
355 pub session_id: Option<String>,
356 #[serde(default, skip_serializing_if = "Option::is_none")]
357 pub handle: Option<String>,
358 #[serde(default, skip_serializing_if = "Option::is_none")]
359 pub name: Option<String>,
360 #[serde(default, skip_serializing_if = "Option::is_none")]
361 pub source_type: Option<String>,
362 #[serde(default, skip_serializing_if = "Option::is_none")]
363 pub source_key: Option<String>,
364 #[serde(default, skip_serializing_if = "Option::is_none")]
365 pub target: Option<serde_json::Value>,
366 #[serde(default, skip_serializing_if = "Option::is_none")]
367 pub enabled: Option<bool>,
368}
369
370impl TriggerSubscriptionFilter {
371 pub fn for_session(session_id: impl Into<String>) -> Self {
372 Self {
373 session_id: Some(session_id.into()),
374 ..Self::default()
375 }
376 }
377
378 pub fn for_source_type(source_type: impl Into<String>) -> Self {
379 Self {
380 source_type: Some(source_type.into()),
381 ..Self::default()
382 }
383 }
384
385 pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
386 self.session_id
387 .as_deref()
388 .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
389 && self
390 .handle
391 .as_deref()
392 .is_none_or(|handle| record.handle == handle)
393 && self
394 .name
395 .as_deref()
396 .is_none_or(|name| record.name.as_deref() == Some(name))
397 && self
398 .source_type
399 .as_deref()
400 .is_none_or(|source_type| record.source_type == source_type)
401 && self
402 .source_key
403 .as_deref()
404 .is_none_or(|source_key| record.source_key == source_key)
405 && self.enabled.is_none_or(|enabled| record.enabled == enabled)
406 && self
407 .target
408 .as_ref()
409 .is_none_or(|target| record.target_identity.definition.as_ref() == Some(target))
410 }
411}
412
413#[derive(Clone, Debug, Serialize, Deserialize)]
414pub struct TriggerDeliveryReservation {
415 pub occurrence: TriggerOccurrenceRecord,
416 pub subscription: TriggerSubscriptionRecord,
417 pub process_id: String,
418}
419
420#[async_trait::async_trait]
421pub trait TriggerStore: Send + Sync {
422 fn durability_tier(&self) -> crate::DurabilityTier {
423 crate::DurabilityTier::Inline
424 }
425
426 async fn source_key_for_subscription(
427 &self,
428 source_type: &str,
429 source: &serde_json::Value,
430 ) -> Result<String, PluginError> {
431 default_trigger_source_key(source_type, source)
432 }
433
434 async fn register_subscription(
435 &self,
436 draft: TriggerSubscriptionDraft,
437 ) -> Result<TriggerSubscriptionRecord, PluginError>;
438
439 async fn list_subscriptions(
440 &self,
441 filter: TriggerSubscriptionFilter,
442 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
443
444 async fn cancel_subscription(
445 &self,
446 session_id: &str,
447 handle: &str,
448 ) -> Result<bool, PluginError>;
449
450 async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
451
452 async fn record_occurrence(
453 &self,
454 request: TriggerOccurrenceRequest,
455 ) -> Result<TriggerOccurrenceRecord, PluginError>;
456
457 async fn reserve_matching_deliveries(
458 &self,
459 occurrence_id: &str,
460 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
461}
462
463#[derive(Default)]
464pub struct InMemoryTriggerStore {
465 state: Mutex<InMemoryTriggerEventState>,
466}
467
468#[derive(Default)]
469struct InMemoryTriggerEventState {
470 next_subscription_seq: u64,
471 subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
472 occurrences: BTreeMap<String, TriggerOccurrenceRecord>,
473 occurrence_id_by_idempotency_key: BTreeMap<String, String>,
474 occurrence_hashes: BTreeMap<String, String>,
475 deliveries: BTreeSet<(String, String)>,
476}
477
478#[async_trait::async_trait]
479impl TriggerStore for InMemoryTriggerStore {
480 async fn register_subscription(
481 &self,
482 draft: TriggerSubscriptionDraft,
483 ) -> Result<TriggerSubscriptionRecord, PluginError> {
484 let mut state = self
485 .state
486 .lock()
487 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
488 state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
489 let handle = format!("trigger:{}", state.next_subscription_seq);
490 let subscription_id = format!("subscription:{}", state.next_subscription_seq);
491 let now = crate::runtime::current_epoch_ms();
492 let record = TriggerSubscriptionRecord {
493 subscription_id: subscription_id.clone(),
494 registrant: draft.registrant,
495 env_ref: draft.env_ref,
496 wake_target: draft.wake_target,
497 handle,
498 name: draft.name,
499 source_type: draft.source_type,
500 source_key: draft.source_key,
501 source: draft.source,
502 payload_schema: draft.payload_schema,
503 target: draft.target,
504 target_identity: draft.target_identity,
505 event_types: draft.event_types,
506 input_template: draft.input_template,
507 target_label: draft.target_label,
508 enabled: true,
509 created_at_ms: now,
510 updated_at_ms: now,
511 };
512 state.subscriptions.insert(subscription_id, record.clone());
513 Ok(record)
514 }
515
516 async fn list_subscriptions(
517 &self,
518 filter: TriggerSubscriptionFilter,
519 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
520 let state = self
521 .state
522 .lock()
523 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
524 let mut records = state
525 .subscriptions
526 .values()
527 .filter(|record| filter.matches(record))
528 .cloned()
529 .collect::<Vec<_>>();
530 records.sort_by(|left, right| {
531 left.registrant_scope_id()
532 .cmp(&right.registrant_scope_id())
533 .then_with(|| left.handle.cmp(&right.handle))
534 });
535 Ok(records)
536 }
537
538 async fn cancel_subscription(
539 &self,
540 session_id: &str,
541 handle: &str,
542 ) -> Result<bool, PluginError> {
543 let mut state = self
544 .state
545 .lock()
546 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
547 let now = crate::runtime::current_epoch_ms();
548 let Some(record) = state.subscriptions.values_mut().find(|record| {
549 record.registrant_session_id() == Some(session_id) && record.handle == handle
550 }) else {
551 return Ok(false);
552 };
553 let changed = record.enabled;
554 record.enabled = false;
555 record.updated_at_ms = now;
556 Ok(changed)
557 }
558
559 async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
560 let mut state = self
561 .state
562 .lock()
563 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
564 let before = state.subscriptions.len();
565 state
566 .subscriptions
567 .retain(|_, record| record.registrant_session_id() != Some(session_id));
568 Ok(before.saturating_sub(state.subscriptions.len()))
569 }
570
571 async fn record_occurrence(
572 &self,
573 request: TriggerOccurrenceRequest,
574 ) -> Result<TriggerOccurrenceRecord, PluginError> {
575 validate_trigger_occurrence_request(&request)?;
576 let mut state = self
577 .state
578 .lock()
579 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
580 let request_hash = trigger_occurrence_request_hash(&request)?;
581 if let Some(existing_id) = state
582 .occurrence_id_by_idempotency_key
583 .get(&request.idempotency_key)
584 .cloned()
585 {
586 let existing_hash = state
587 .occurrence_hashes
588 .get(&existing_id)
589 .cloned()
590 .unwrap_or_default();
591 if existing_hash != request_hash {
592 return Err(PluginError::Session(format!(
593 "trigger occurrence idempotency conflict for `{}`",
594 request.idempotency_key
595 )));
596 }
597 return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
598 PluginError::Session(format!(
599 "missing trigger occurrence `{existing_id}` for idempotency key"
600 ))
601 });
602 }
603 let occurrence_id = deterministic_occurrence_id(&request)?;
604 let record = TriggerOccurrenceRecord {
605 occurrence_id: occurrence_id.clone(),
606 source_type: request.source_type,
607 source_key: request.source_key,
608 payload: request.payload,
609 idempotency_key: request.idempotency_key.clone(),
610 source: request.source,
611 occurred_at_ms: crate::runtime::current_epoch_ms(),
612 };
613 state
614 .occurrence_id_by_idempotency_key
615 .insert(request.idempotency_key, occurrence_id.clone());
616 state
617 .occurrence_hashes
618 .insert(occurrence_id.clone(), request_hash);
619 state.occurrences.insert(occurrence_id, record.clone());
620 Ok(record)
621 }
622
623 async fn reserve_matching_deliveries(
624 &self,
625 occurrence_id: &str,
626 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
627 let mut state = self
628 .state
629 .lock()
630 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
631 let occurrence = state
632 .occurrences
633 .get(occurrence_id)
634 .cloned()
635 .ok_or_else(|| {
636 PluginError::Session(format!("unknown trigger occurrence `{occurrence_id}`"))
637 })?;
638 let subscriptions = state
639 .subscriptions
640 .values()
641 .filter(|record| {
642 record.enabled
643 && record.source_type == occurrence.source_type
644 && record.source_key == occurrence.source_key
645 })
646 .cloned()
647 .collect::<Vec<_>>();
648 let mut deliveries = Vec::new();
649 for subscription in subscriptions {
650 let key = (
651 occurrence.occurrence_id.clone(),
652 subscription.subscription_id.clone(),
653 );
654 if !state.deliveries.insert(key) {
655 continue;
656 }
657 let process_id = deterministic_delivery_process_id(
658 &occurrence.occurrence_id,
659 &subscription.subscription_id,
660 )?;
661 deliveries.push(TriggerDeliveryReservation {
662 occurrence: occurrence.clone(),
663 subscription,
664 process_id,
665 });
666 }
667 Ok(deliveries)
668 }
669}
670
671fn default_enabled() -> bool {
672 true
673}
674
675pub fn default_trigger_source_key(
676 source_type: &str,
677 source: &serde_json::Value,
678) -> Result<String, PluginError> {
679 let digest = crate::stable_hash::stable_json_sha256_hex(&(source_type, source))
680 .map_err(|err| PluginError::Session(format!("failed to hash trigger source key: {err}")))?;
681 Ok(format!("source:{source_type}:sha256:{digest}"))
682}
683
684pub fn empty_trigger_source_key(source_type: &str) -> Result<String, PluginError> {
685 default_trigger_source_key(source_type, &serde_json::json!({}))
686}
687
688pub fn deterministic_occurrence_id(
689 request: &TriggerOccurrenceRequest,
690) -> Result<String, PluginError> {
691 let digest = crate::stable_hash::stable_json_sha256_hex(&(
692 request.source_type.as_str(),
693 request.source_key.as_str(),
694 request.idempotency_key.as_str(),
695 ))
696 .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))?;
697 Ok(format!("trigger:{digest}"))
698}
699
700pub fn deterministic_delivery_process_id(
701 occurrence_id: &str,
702 subscription_id: &str,
703) -> Result<String, PluginError> {
704 let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
705 .map_err(|err| PluginError::Session(format!("failed to hash trigger delivery: {err}")))?;
706 Ok(format!("process:trigger:{digest}"))
707}
708
709#[derive(Clone)]
710pub struct TriggerRouter {
711 store: Arc<dyn TriggerStore>,
712 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
713 process_work_poke: Option<crate::ProcessWorkPoke>,
714}
715
716impl TriggerRouter {
717 pub fn new(
718 store: Arc<dyn TriggerStore>,
719 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
720 process_work_poke: Option<crate::ProcessWorkPoke>,
721 ) -> Self {
722 Self {
723 store,
724 process_registry,
725 process_work_poke,
726 }
727 }
728
729 pub fn store(&self) -> Arc<dyn TriggerStore> {
730 Arc::clone(&self.store)
731 }
732
733 pub async fn emit(
734 &self,
735 request: TriggerOccurrenceRequest,
736 effect_controller: &dyn crate::RuntimeEffectController,
737 ) -> Result<TriggerEmitReport, PluginError> {
738 let occurrence = self.store.record_occurrence(request).await?;
739 let reservations = self
740 .store
741 .reserve_matching_deliveries(&occurrence.occurrence_id)
742 .await?;
743 let Some(process_registry) = self.process_registry.as_ref() else {
744 if reservations.is_empty() {
745 return Ok(TriggerEmitReport::new(occurrence.occurrence_id, Vec::new()));
746 }
747 return Err(PluginError::Session(
748 "trigger delivery requires a process registry".to_string(),
749 ));
750 };
751 let mut started_process_ids = Vec::new();
752 let mut start_errors = Vec::new();
753 for reservation in reservations {
754 let process_id = reservation.process_id.clone();
755 if let Err(err) = self
756 .start_delivery(
757 &reservation,
758 Arc::clone(process_registry),
759 effect_controller,
760 )
761 .await
762 {
763 start_errors.push(format!(
764 "{}: {err}",
765 reservation.subscription.subscription_id
766 ));
767 continue;
768 }
769 started_process_ids.push(process_id);
770 }
771 if !started_process_ids.is_empty()
772 && let Some(poke) = self.process_work_poke.as_ref()
773 {
774 poke.poke();
775 }
776 if started_process_ids.is_empty()
777 && let Some(message) = trigger_delivery_failure_summary(&start_errors)
778 {
779 return Err(PluginError::Session(message));
780 }
781 Ok(TriggerEmitReport::new(
782 occurrence.occurrence_id,
783 started_process_ids,
784 ))
785 }
786
787 async fn start_delivery(
788 &self,
789 reservation: &TriggerDeliveryReservation,
790 process_registry: Arc<dyn crate::ProcessRegistry>,
791 effect_controller: &dyn crate::RuntimeEffectController,
792 ) -> Result<(), PluginError> {
793 let subscription = &reservation.subscription;
794 let occurrence = &reservation.occurrence;
795 subscription
796 .payload_schema
797 .validate(&occurrence.payload)
798 .map_err(|err| {
799 PluginError::Session(format!(
800 "invalid payload for trigger `{}`: {err}",
801 subscription.handle
802 ))
803 })?;
804 let args =
805 materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
806 let target = apply_trigger_inputs(subscription.target.clone(), args)?;
807 let originator_scope_id = subscription.registrant_scope_id();
808 let trigger_occurrence_invocation = crate::runtime::causal::trigger_occurrence_invocation(
809 &originator_scope_id,
810 &occurrence.occurrence_id,
811 );
812 let registration = crate::ProcessRegistration::new(
813 reservation.process_id.clone(),
814 target.clone(),
815 crate::ProcessProvenance::new(subscription.registrant.clone())
816 .with_caused_by(trigger_occurrence_invocation.causal_ref()),
817 )
818 .with_identity(subscription.target_identity.clone())
819 .with_extra_event_types(subscription.event_types.clone())
820 .with_execution_env_ref(Some(subscription.env_ref.clone()))
821 .with_wake_target(subscription.wake_target.clone());
822 let descriptor_kind = subscription.target_identity.kind.clone();
823 let grant =
824 subscription
825 .wake_target
826 .clone()
827 .map(|session_scope| crate::ProcessStartGrant {
828 session_scope,
829 descriptor: crate::ProcessHandleDescriptor::new(
830 Some(descriptor_kind.as_str()),
831 subscription.target_label.as_deref(),
832 ),
833 });
834 let execution_context = crate::ProcessExecutionContext::default()
835 .with_causal_invocation(Some(trigger_occurrence_invocation));
836 let command = crate::ProcessCommand::Start {
837 registration,
838 grant,
839 execution_context: Box::new(execution_context),
840 };
841 let effect_id = command.effect_id();
842 let invocation = crate::RuntimeInvocation::effect(
843 crate::RuntimeScope::new(originator_scope_id),
844 effect_id.clone(),
845 crate::RuntimeEffectKind::Process,
846 format!(
847 "trigger:{}:{}",
848 occurrence.occurrence_id, subscription.subscription_id
849 ),
850 )
851 .with_caused_by(Some(crate::CausalRef::TriggerOccurrence {
852 occurrence_id: occurrence.occurrence_id.clone(),
853 }));
854 let outcome = effect_controller
855 .execute_effect(
856 crate::RuntimeEffectEnvelope::new(
857 invocation,
858 crate::RuntimeEffectCommand::process(command),
859 ),
860 crate::RuntimeEffectLocalExecutor::processes(process_registry),
861 )
862 .await?;
863 match outcome {
864 crate::RuntimeEffectOutcome::Process {
865 result: crate::ProcessEffectOutcome::Start { .. },
866 } => Ok(()),
867 other => Err(PluginError::Session(format!(
868 "trigger process start returned the wrong outcome: {}",
869 other.kind().as_str()
870 ))),
871 }
872 }
873}
874
875fn trigger_delivery_failure_summary(errors: &[String]) -> Option<String> {
876 match errors {
877 [] => None,
878 [only] => Some(format!("trigger delivery failed: {only}")),
879 [first, rest @ ..] => Some(format!(
880 "trigger delivery failed for {} matching subscriptions: {first}; {} more failed",
881 errors.len(),
882 rest.len()
883 )),
884 }
885}
886
887fn materialize_trigger_process_args(
888 input_template: &BTreeMap<String, TriggerInputBinding>,
889 event_payload: &serde_json::Value,
890) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
891 let mut args = serde_json::Map::new();
892 for (input_name, input) in input_template {
893 let value = match input {
894 TriggerInputBinding::Event => event_payload.clone(),
895 TriggerInputBinding::Fixed { value } => value.clone(),
896 };
897 args.insert(input_name.to_string(), value);
898 }
899 Ok(args)
900}
901
902fn apply_trigger_inputs(
903 mut target: crate::ProcessInput,
904 args: serde_json::Map<String, serde_json::Value>,
905) -> Result<crate::ProcessInput, PluginError> {
906 match &mut target {
907 crate::ProcessInput::Engine { payload, .. } => {
908 let object = payload.as_object_mut().ok_or_else(|| {
909 PluginError::Session(
910 "trigger engine target payload must be a JSON object".to_string(),
911 )
912 })?;
913 object.insert("args".to_string(), serde_json::Value::Object(args));
914 Ok(target)
915 }
916 other => Err(PluginError::Session(format!(
917 "trigger target must be an engine process, got {}",
918 other.engine_kind()
919 ))),
920 }
921}
922
923pub fn validate_trigger_occurrence_request(
924 request: &TriggerOccurrenceRequest,
925) -> Result<(), PluginError> {
926 if request.source_type.trim().is_empty() {
927 return Err(PluginError::Session(
928 "trigger occurrence requires source_type".to_string(),
929 ));
930 }
931 if request.source_key.trim().is_empty() {
932 return Err(PluginError::Session(
933 "trigger occurrence requires source_key".to_string(),
934 ));
935 }
936 if request.idempotency_key.trim().is_empty() {
937 return Err(PluginError::Session(
938 "trigger occurrence requires idempotency_key".to_string(),
939 ));
940 }
941 Ok(())
942}
943
944pub fn trigger_occurrence_request_hash(
945 request: &TriggerOccurrenceRequest,
946) -> Result<String, PluginError> {
947 crate::stable_hash::stable_json_sha256_hex(&(
948 request.source_type.as_str(),
949 request.source_key.as_str(),
950 &request.payload,
951 &request.source,
952 ))
953 .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))
954}
955
956#[cfg(test)]
957mod tests {
958 use super::*;
959
960 fn button_payload_schema() -> crate::LashSchema {
961 crate::LashSchema::any()
962 }
963
964 #[test]
965 fn trigger_catalog_rejects_duplicate_trigger_source_identity() {
966 let mut catalog = TriggerEventCatalog::new();
967 catalog
968 .declare(TriggerEvent::new(
969 "Button",
970 "ui.button",
971 "pressed",
972 button_payload_schema(),
973 ))
974 .expect("first trigger occurrence");
975
976 let err = catalog
977 .declare(TriggerEvent::new(
978 "AlternateButton",
979 "ui.button",
980 "pressed",
981 button_payload_schema(),
982 ))
983 .expect_err("duplicate public source identity should be rejected");
984
985 assert!(err.contains("duplicate trigger source `ui.button.pressed`"));
986 }
987}