1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
9pub struct TriggerEvent {
10 pub resource_type: String,
11 pub alias: String,
12 pub event: String,
13 pub payload_schema: crate::LashSchema,
14}
15
16impl TriggerEvent {
17 pub fn new(
18 resource_type: impl Into<String>,
19 alias: impl Into<String>,
20 event: impl Into<String>,
21 payload_schema: crate::LashSchema,
22 ) -> Self {
23 Self {
24 resource_type: resource_type.into(),
25 alias: alias.into(),
26 event: event.into(),
27 payload_schema,
28 }
29 }
30
31 pub fn payload_schema(&self) -> &crate::LashSchema {
32 &self.payload_schema
33 }
34
35 pub fn key(&self) -> TriggerEventKey {
36 TriggerEventKey {
37 resource_type: self.resource_type.clone(),
38 alias: self.alias.clone(),
39 event: self.event.clone(),
40 }
41 }
42
43 pub fn source_type(&self) -> String {
44 trigger_event_type(&self.alias, &self.event)
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct TriggerEventKey {
50 pub resource_type: String,
51 pub alias: String,
52 pub event: String,
53}
54
55impl TriggerEventKey {
56 pub fn new(
57 resource_type: impl Into<String>,
58 alias: impl Into<String>,
59 event: impl Into<String>,
60 ) -> Self {
61 Self {
62 resource_type: resource_type.into(),
63 alias: alias.into(),
64 event: event.into(),
65 }
66 }
67
68 pub fn source_type(&self) -> String {
69 trigger_event_type(&self.alias, &self.event)
70 }
71}
72
73pub fn trigger_event_type(alias: &str, event: &str) -> String {
74 format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
78pub struct TriggerEventCatalog {
79 events: BTreeMap<TriggerEventKey, TriggerEvent>,
80}
81
82impl TriggerEventCatalog {
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn declare(&mut self, event: TriggerEvent) -> Result<(), String> {
88 let key = event.key();
89 if self.events.contains_key(&key) {
90 return Err(format!(
91 "duplicate trigger occurrence `{}.{}.{}`",
92 key.resource_type, key.alias, key.event
93 ));
94 }
95 let source_type = event.source_type();
96 if let Some(existing) = self
97 .events
98 .values()
99 .find(|existing| existing.source_type() == source_type)
100 {
101 return Err(format!(
102 "duplicate trigger source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103 existing.resource_type,
104 existing.alias,
105 existing.event,
106 key.resource_type,
107 key.alias,
108 key.event
109 ));
110 }
111 self.events.insert(key, event);
112 Ok(())
113 }
114
115 pub fn from_events(events: impl IntoIterator<Item = TriggerEvent>) -> Result<Self, String> {
116 let mut catalog = Self::new();
117 for event in events {
118 catalog.declare(event)?;
119 }
120 Ok(catalog)
121 }
122
123 pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&TriggerEvent> {
124 self.events
125 .get(&TriggerEventKey::new(resource_type, alias, event))
126 }
127
128 pub fn is_empty(&self) -> bool {
129 self.events.is_empty()
130 }
131
132 pub fn events(&self) -> impl Iterator<Item = &TriggerEvent> {
133 self.events.values()
134 }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerEmitReport {
139 #[serde(default, skip_serializing_if = "String::is_empty")]
140 pub occurrence_id: String,
141 pub started_process_ids: Vec<String>,
142}
143
144impl TriggerEmitReport {
145 pub fn empty() -> Self {
146 Self::default()
147 }
148
149 fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150 Self {
151 occurrence_id,
152 started_process_ids,
153 }
154 }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct TriggerOccurrenceRequest {
159 pub source_type: String,
160 pub source_key: String,
161 #[serde(default)]
162 pub payload: serde_json::Value,
163 pub idempotency_key: String,
164 #[serde(default, skip_serializing_if = "Option::is_none")]
165 pub source: Option<serde_json::Value>,
166}
167
168impl TriggerOccurrenceRequest {
169 pub fn new(
170 source_type: impl Into<String>,
171 source_key: impl Into<String>,
172 payload: serde_json::Value,
173 idempotency_key: impl Into<String>,
174 ) -> Self {
175 Self {
176 source_type: source_type.into(),
177 source_key: source_key.into(),
178 payload,
179 idempotency_key: idempotency_key.into(),
180 source: None,
181 }
182 }
183
184 pub fn with_source(mut self, source: serde_json::Value) -> Self {
185 self.source = Some(source);
186 self
187 }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct TriggerOccurrenceRecord {
192 pub occurrence_id: String,
193 pub source_type: String,
194 pub source_key: String,
195 #[serde(default)]
196 pub payload: serde_json::Value,
197 pub idempotency_key: String,
198 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub source: Option<serde_json::Value>,
200 pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerEventType(String);
206
207impl TriggerEventType {
208 pub fn new(value: impl Into<String>) -> Self {
209 Self(value.into())
210 }
211
212 pub fn as_str(&self) -> &str {
213 &self.0
214 }
215}
216
217impl From<String> for TriggerEventType {
218 fn from(value: String) -> Self {
219 Self::new(value)
220 }
221}
222
223impl From<&str> for TriggerEventType {
224 fn from(value: &str) -> Self {
225 Self::new(value)
226 }
227}
228
229impl AsRef<str> for TriggerEventType {
230 fn as_ref(&self) -> &str {
231 self.as_str()
232 }
233}
234
235impl std::fmt::Display for TriggerEventType {
236 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 formatter.write_str(self.as_str())
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243 pub handle: String,
244 pub source_key: String,
245 #[serde(default, skip_serializing_if = "Option::is_none")]
246 pub name: Option<String>,
247 pub source_type: TriggerEventType,
248 pub source: serde_json::Value,
249 pub target: TriggerTargetSummary,
250 #[serde(default = "default_enabled")]
251 pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256 pub label: Option<String>,
257 pub identity: crate::ProcessIdentity,
258 pub input: crate::ProcessInput,
259 #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
260 pub inputs: BTreeMap<String, TriggerInputBinding>,
261}
262
263#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
264#[serde(tag = "type", rename_all = "snake_case")]
265pub enum TriggerInputBinding {
266 Event,
267 Fixed { value: serde_json::Value },
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct TriggerSubscriptionDraft {
272 pub registrant: crate::ProcessOriginator,
273 pub env_ref: crate::ProcessExecutionEnvRef,
274 #[serde(default, skip_serializing_if = "Option::is_none")]
275 pub wake_target: Option<crate::SessionScope>,
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub name: Option<String>,
278 pub source_type: String,
279 pub source_key: String,
280 pub source: serde_json::Value,
281 pub payload_schema: crate::LashSchema,
282 pub target: crate::ProcessInput,
283 pub target_identity: crate::ProcessIdentity,
284 #[serde(default)]
285 pub event_types: Vec<crate::ProcessEventType>,
286 #[serde(default)]
287 pub input_template: BTreeMap<String, TriggerInputBinding>,
288 #[serde(default, skip_serializing_if = "Option::is_none")]
289 pub target_label: Option<String>,
290}
291
292impl TriggerSubscriptionDraft {
293 pub fn for_process(
294 registrant: crate::ProcessOriginator,
295 env_ref: crate::ProcessExecutionEnvRef,
296 source_type: impl Into<String>,
297 source_key: impl Into<String>,
298 target: crate::ProcessInput,
299 target_identity: crate::ProcessIdentity,
300 ) -> Self {
301 let target_label = target_identity.label.clone();
302 Self {
303 registrant,
304 env_ref,
305 wake_target: None,
306 name: None,
307 source_type: source_type.into(),
308 source_key: source_key.into(),
309 source: serde_json::Value::Object(serde_json::Map::new()),
310 payload_schema: crate::LashSchema::new(serde_json::Value::Object(
311 serde_json::Map::new(),
312 )),
313 target,
314 target_identity,
315 event_types: Vec::new(),
316 input_template: BTreeMap::new(),
317 target_label,
318 }
319 }
320
321 pub fn with_name(mut self, name: impl Into<String>) -> Self {
322 self.name = Some(name.into());
323 self
324 }
325
326 pub fn with_source(mut self, source: serde_json::Value) -> Self {
327 self.source = source;
328 self
329 }
330
331 pub fn with_payload_schema(mut self, payload_schema: crate::LashSchema) -> Self {
332 self.payload_schema = payload_schema;
333 self
334 }
335
336 pub fn with_wake_target(mut self, wake_target: crate::SessionScope) -> Self {
337 self.wake_target = Some(wake_target);
338 self
339 }
340
341 pub fn with_event_types(
342 mut self,
343 event_types: impl IntoIterator<Item = crate::ProcessEventType>,
344 ) -> Self {
345 self.event_types = event_types.into_iter().collect();
346 self
347 }
348
349 pub fn with_input_template(
350 mut self,
351 input_template: BTreeMap<String, TriggerInputBinding>,
352 ) -> Self {
353 self.input_template = input_template;
354 self
355 }
356
357 pub fn with_target_label(mut self, target_label: impl Into<String>) -> Self {
358 self.target_label = Some(target_label.into());
359 self
360 }
361
362 pub fn validate(&self) -> Result<(), PluginError> {
363 validate_trigger_subscription_target_label(
364 self.target_label.as_deref(),
365 self.target_identity.label.as_deref(),
366 )
367 }
368}
369
370#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
371pub struct TriggerSubscriptionRecord {
372 pub subscription_id: String,
373 pub registrant: crate::ProcessOriginator,
374 pub env_ref: crate::ProcessExecutionEnvRef,
375 #[serde(default, skip_serializing_if = "Option::is_none")]
376 pub wake_target: Option<crate::SessionScope>,
377 pub handle: String,
378 #[serde(default, skip_serializing_if = "Option::is_none")]
379 pub name: Option<String>,
380 pub source_type: String,
381 pub source_key: String,
382 pub source: serde_json::Value,
383 pub payload_schema: crate::LashSchema,
384 pub target: crate::ProcessInput,
385 pub target_identity: crate::ProcessIdentity,
386 #[serde(default)]
387 pub event_types: Vec<crate::ProcessEventType>,
388 #[serde(default)]
389 pub input_template: BTreeMap<String, TriggerInputBinding>,
390 #[serde(default, skip_serializing_if = "Option::is_none")]
391 pub target_label: Option<String>,
392 #[serde(default = "default_enabled")]
393 pub enabled: bool,
394 pub created_at_ms: u64,
395 pub updated_at_ms: u64,
396}
397
398impl TriggerSubscriptionRecord {
399 pub fn registrant_scope_id(&self) -> String {
400 self.registrant.scope_id()
401 }
402
403 pub fn registrant_session_id(&self) -> Option<&str> {
404 match &self.registrant {
405 crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
406 crate::ProcessOriginator::Host => None,
407 }
408 }
409}
410
411fn validate_trigger_subscription_target_label(
412 target_label: Option<&str>,
413 identity_label: Option<&str>,
414) -> Result<(), PluginError> {
415 match (target_label, identity_label) {
416 (Some(target_label), Some(identity_label)) if target_label != identity_label => {
417 Err(PluginError::Session(
418 "trigger target_label must match target_identity.label when both are present"
419 .to_string(),
420 ))
421 }
422 _ => Ok(()),
423 }
424}
425
426impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
427 fn from(route: &TriggerSubscriptionRecord) -> Self {
428 Self {
429 handle: route.handle.clone(),
430 source_key: route.source_key.clone(),
431 name: route.name.clone(),
432 source_type: TriggerEventType::new(route.source_type.clone()),
433 source: route.source.clone(),
434 target: TriggerTargetSummary {
435 label: route.target_label.clone(),
436 identity: route.target_identity.clone(),
437 input: route.target.clone(),
438 inputs: route.input_template.clone(),
439 },
440 enabled: route.enabled,
441 }
442 }
443}
444
445#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
446pub struct TriggerSubscriptionFilter {
447 #[serde(default, skip_serializing_if = "Option::is_none")]
448 pub session_id: Option<String>,
449 #[serde(default, skip_serializing_if = "Option::is_none")]
450 pub handle: Option<String>,
451 #[serde(default, skip_serializing_if = "Option::is_none")]
452 pub name: Option<String>,
453 #[serde(default, skip_serializing_if = "Option::is_none")]
454 pub source_type: Option<String>,
455 #[serde(default, skip_serializing_if = "Option::is_none")]
456 pub source_key: Option<String>,
457 #[serde(default, skip_serializing_if = "Option::is_none")]
458 pub target: Option<serde_json::Value>,
459 #[serde(default, skip_serializing_if = "Option::is_none")]
460 pub enabled: Option<bool>,
461}
462
463impl TriggerSubscriptionFilter {
464 pub fn for_session(session_id: impl Into<String>) -> Self {
465 Self {
466 session_id: Some(session_id.into()),
467 ..Self::default()
468 }
469 }
470
471 pub fn for_source_type(source_type: impl Into<String>) -> Self {
472 Self {
473 source_type: Some(source_type.into()),
474 ..Self::default()
475 }
476 }
477
478 pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
479 self.session_id
480 .as_deref()
481 .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
482 && self
483 .handle
484 .as_deref()
485 .is_none_or(|handle| record.handle == handle)
486 && self
487 .name
488 .as_deref()
489 .is_none_or(|name| record.name.as_deref() == Some(name))
490 && self
491 .source_type
492 .as_deref()
493 .is_none_or(|source_type| record.source_type == source_type)
494 && self
495 .source_key
496 .as_deref()
497 .is_none_or(|source_key| record.source_key == source_key)
498 && self.enabled.is_none_or(|enabled| record.enabled == enabled)
499 && self
500 .target
501 .as_ref()
502 .is_none_or(|target| record.target_identity.definition.as_ref() == Some(target))
503 }
504}
505
506#[derive(Clone, Debug, Serialize, Deserialize)]
507pub struct TriggerDeliveryReservation {
508 pub occurrence: TriggerOccurrenceRecord,
509 pub subscription: TriggerSubscriptionRecord,
510 pub process_id: String,
511}
512
513#[async_trait::async_trait]
514pub trait TriggerStore: Send + Sync {
515 fn durability_tier(&self) -> crate::DurabilityTier {
516 crate::DurabilityTier::Inline
517 }
518
519 async fn source_key_for_subscription(
520 &self,
521 source_type: &str,
522 source: &serde_json::Value,
523 ) -> Result<String, PluginError> {
524 default_trigger_source_key(source_type, source)
525 }
526
527 async fn register_subscription(
528 &self,
529 draft: TriggerSubscriptionDraft,
530 ) -> Result<TriggerSubscriptionRecord, PluginError>;
531
532 async fn list_subscriptions(
533 &self,
534 filter: TriggerSubscriptionFilter,
535 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
536
537 async fn cancel_subscription(
538 &self,
539 session_id: &str,
540 handle: &str,
541 ) -> Result<bool, PluginError>;
542
543 async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
544
545 async fn record_occurrence(
546 &self,
547 request: TriggerOccurrenceRequest,
548 ) -> Result<TriggerOccurrenceRecord, PluginError>;
549
550 async fn reserve_matching_deliveries(
551 &self,
552 occurrence_id: &str,
553 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
554}
555
556#[derive(Default)]
557pub struct InMemoryTriggerStore {
558 state: Mutex<InMemoryTriggerEventState>,
559}
560
561#[derive(Default)]
562struct InMemoryTriggerEventState {
563 next_subscription_seq: u64,
564 subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
565 occurrences: BTreeMap<String, TriggerOccurrenceRecord>,
566 occurrence_id_by_idempotency_key: BTreeMap<String, String>,
567 occurrence_hashes: BTreeMap<String, String>,
568 deliveries: BTreeSet<(String, String)>,
569}
570
571#[async_trait::async_trait]
572impl TriggerStore for InMemoryTriggerStore {
573 async fn register_subscription(
574 &self,
575 draft: TriggerSubscriptionDraft,
576 ) -> Result<TriggerSubscriptionRecord, PluginError> {
577 draft.validate()?;
578 let mut state = self
579 .state
580 .lock()
581 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
582 state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
583 let handle = format!("trigger:{}", state.next_subscription_seq);
584 let subscription_id = format!("subscription:{}", state.next_subscription_seq);
585 let now = crate::runtime::current_epoch_ms();
586 let record = TriggerSubscriptionRecord {
587 subscription_id: subscription_id.clone(),
588 registrant: draft.registrant,
589 env_ref: draft.env_ref,
590 wake_target: draft.wake_target,
591 handle,
592 name: draft.name,
593 source_type: draft.source_type,
594 source_key: draft.source_key,
595 source: draft.source,
596 payload_schema: draft.payload_schema,
597 target: draft.target,
598 target_identity: draft.target_identity,
599 event_types: draft.event_types,
600 input_template: draft.input_template,
601 target_label: draft.target_label,
602 enabled: true,
603 created_at_ms: now,
604 updated_at_ms: now,
605 };
606 state.subscriptions.insert(subscription_id, record.clone());
607 Ok(record)
608 }
609
610 async fn list_subscriptions(
611 &self,
612 filter: TriggerSubscriptionFilter,
613 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
614 let state = self
615 .state
616 .lock()
617 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
618 let mut records = state
619 .subscriptions
620 .values()
621 .filter(|record| filter.matches(record))
622 .cloned()
623 .collect::<Vec<_>>();
624 records.sort_by(|left, right| {
625 left.registrant_scope_id()
626 .cmp(&right.registrant_scope_id())
627 .then_with(|| left.handle.cmp(&right.handle))
628 });
629 Ok(records)
630 }
631
632 async fn cancel_subscription(
633 &self,
634 session_id: &str,
635 handle: &str,
636 ) -> Result<bool, PluginError> {
637 let mut state = self
638 .state
639 .lock()
640 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
641 let now = crate::runtime::current_epoch_ms();
642 let Some(record) = state.subscriptions.values_mut().find(|record| {
643 record.registrant_session_id() == Some(session_id) && record.handle == handle
644 }) else {
645 return Ok(false);
646 };
647 let changed = record.enabled;
648 record.enabled = false;
649 record.updated_at_ms = now;
650 Ok(changed)
651 }
652
653 async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
654 let mut state = self
655 .state
656 .lock()
657 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
658 let before = state.subscriptions.len();
659 state
660 .subscriptions
661 .retain(|_, record| record.registrant_session_id() != Some(session_id));
662 Ok(before.saturating_sub(state.subscriptions.len()))
663 }
664
665 async fn record_occurrence(
666 &self,
667 request: TriggerOccurrenceRequest,
668 ) -> Result<TriggerOccurrenceRecord, PluginError> {
669 validate_trigger_occurrence_request(&request)?;
670 let mut state = self
671 .state
672 .lock()
673 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
674 let request_hash = trigger_occurrence_request_hash(&request)?;
675 if let Some(existing_id) = state
676 .occurrence_id_by_idempotency_key
677 .get(&request.idempotency_key)
678 .cloned()
679 {
680 let existing_hash = state
681 .occurrence_hashes
682 .get(&existing_id)
683 .cloned()
684 .unwrap_or_default();
685 if existing_hash != request_hash {
686 return Err(PluginError::Session(format!(
687 "trigger occurrence idempotency conflict for `{}`",
688 request.idempotency_key
689 )));
690 }
691 return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
692 PluginError::Session(format!(
693 "missing trigger occurrence `{existing_id}` for idempotency key"
694 ))
695 });
696 }
697 let occurrence_id = deterministic_occurrence_id(&request)?;
698 let record = TriggerOccurrenceRecord {
699 occurrence_id: occurrence_id.clone(),
700 source_type: request.source_type,
701 source_key: request.source_key,
702 payload: request.payload,
703 idempotency_key: request.idempotency_key.clone(),
704 source: request.source,
705 occurred_at_ms: crate::runtime::current_epoch_ms(),
706 };
707 state
708 .occurrence_id_by_idempotency_key
709 .insert(request.idempotency_key, occurrence_id.clone());
710 state
711 .occurrence_hashes
712 .insert(occurrence_id.clone(), request_hash);
713 state.occurrences.insert(occurrence_id, record.clone());
714 Ok(record)
715 }
716
717 async fn reserve_matching_deliveries(
718 &self,
719 occurrence_id: &str,
720 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
721 let mut state = self
722 .state
723 .lock()
724 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
725 let occurrence = state
726 .occurrences
727 .get(occurrence_id)
728 .cloned()
729 .ok_or_else(|| {
730 PluginError::Session(format!("unknown trigger occurrence `{occurrence_id}`"))
731 })?;
732 let subscriptions = state
733 .subscriptions
734 .values()
735 .filter(|record| {
736 record.enabled
737 && record.source_type == occurrence.source_type
738 && record.source_key == occurrence.source_key
739 })
740 .cloned()
741 .collect::<Vec<_>>();
742 let mut deliveries = Vec::new();
743 for subscription in subscriptions {
744 let key = (
745 occurrence.occurrence_id.clone(),
746 subscription.subscription_id.clone(),
747 );
748 if !state.deliveries.insert(key) {
749 continue;
750 }
751 let process_id = deterministic_delivery_process_id(
752 &occurrence.occurrence_id,
753 &subscription.subscription_id,
754 )?;
755 deliveries.push(TriggerDeliveryReservation {
756 occurrence: occurrence.clone(),
757 subscription,
758 process_id,
759 });
760 }
761 Ok(deliveries)
762 }
763}
764
765fn default_enabled() -> bool {
766 true
767}
768
769pub fn default_trigger_source_key(
770 source_type: &str,
771 source: &serde_json::Value,
772) -> Result<String, PluginError> {
773 let digest = crate::stable_hash::stable_json_sha256_hex(&(source_type, source))
774 .map_err(|err| PluginError::Session(format!("failed to hash trigger source key: {err}")))?;
775 Ok(format!("source:{source_type}:sha256:{digest}"))
776}
777
778pub fn empty_trigger_source_key(source_type: &str) -> Result<String, PluginError> {
779 default_trigger_source_key(source_type, &serde_json::json!({}))
780}
781
782pub fn deterministic_occurrence_id(
783 request: &TriggerOccurrenceRequest,
784) -> Result<String, PluginError> {
785 let digest = crate::stable_hash::stable_json_sha256_hex(&(
786 request.source_type.as_str(),
787 request.source_key.as_str(),
788 request.idempotency_key.as_str(),
789 ))
790 .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))?;
791 Ok(format!("trigger:{digest}"))
792}
793
794pub fn deterministic_delivery_process_id(
795 occurrence_id: &str,
796 subscription_id: &str,
797) -> Result<String, PluginError> {
798 let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
799 .map_err(|err| PluginError::Session(format!("failed to hash trigger delivery: {err}")))?;
800 Ok(format!("process:trigger:{digest}"))
801}
802
803#[derive(Clone)]
804pub struct TriggerRouter {
805 store: Arc<dyn TriggerStore>,
806 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
807 process_work_poke: Option<crate::ProcessWorkPoke>,
808}
809
810impl TriggerRouter {
811 pub fn new(
812 store: Arc<dyn TriggerStore>,
813 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
814 process_work_poke: Option<crate::ProcessWorkPoke>,
815 ) -> Self {
816 Self {
817 store,
818 process_registry,
819 process_work_poke,
820 }
821 }
822
823 pub fn store(&self) -> Arc<dyn TriggerStore> {
824 Arc::clone(&self.store)
825 }
826
827 pub async fn emit(
828 &self,
829 request: TriggerOccurrenceRequest,
830 effect_controller: &dyn crate::RuntimeEffectController,
831 ) -> Result<TriggerEmitReport, PluginError> {
832 let occurrence = self.store.record_occurrence(request).await?;
833 let reservations = self
834 .store
835 .reserve_matching_deliveries(&occurrence.occurrence_id)
836 .await?;
837 let Some(process_registry) = self.process_registry.as_ref() else {
838 if reservations.is_empty() {
839 return Ok(TriggerEmitReport::new(occurrence.occurrence_id, Vec::new()));
840 }
841 return Err(PluginError::Session(
842 "trigger delivery requires a process registry".to_string(),
843 ));
844 };
845 let mut started_process_ids = Vec::new();
846 let mut start_errors = Vec::new();
847 for reservation in reservations {
848 let process_id = reservation.process_id.clone();
849 if let Err(err) = self
850 .start_delivery(
851 &reservation,
852 Arc::clone(process_registry),
853 effect_controller,
854 )
855 .await
856 {
857 start_errors.push(format!(
858 "{}: {err}",
859 reservation.subscription.subscription_id
860 ));
861 continue;
862 }
863 started_process_ids.push(process_id);
864 }
865 if !started_process_ids.is_empty()
866 && let Some(poke) = self.process_work_poke.as_ref()
867 {
868 poke.poke();
869 }
870 if started_process_ids.is_empty()
871 && let Some(message) = trigger_delivery_failure_summary(&start_errors)
872 {
873 return Err(PluginError::Session(message));
874 }
875 Ok(TriggerEmitReport::new(
876 occurrence.occurrence_id,
877 started_process_ids,
878 ))
879 }
880
881 async fn start_delivery(
882 &self,
883 reservation: &TriggerDeliveryReservation,
884 process_registry: Arc<dyn crate::ProcessRegistry>,
885 effect_controller: &dyn crate::RuntimeEffectController,
886 ) -> Result<(), PluginError> {
887 let subscription = &reservation.subscription;
888 let occurrence = &reservation.occurrence;
889 subscription
890 .payload_schema
891 .validate(&occurrence.payload)
892 .map_err(|err| {
893 PluginError::Session(format!(
894 "invalid payload for trigger `{}`: {err}",
895 subscription.handle
896 ))
897 })?;
898 let args =
899 materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
900 let target = apply_trigger_inputs(subscription.target.clone(), args)?;
901 let originator_scope_id = subscription.registrant_scope_id();
902 let trigger_occurrence_invocation = crate::runtime::causal::trigger_occurrence_invocation(
903 &originator_scope_id,
904 &occurrence.occurrence_id,
905 );
906 let registration = crate::ProcessRegistration::new(
907 reservation.process_id.clone(),
908 target.clone(),
909 crate::ProcessProvenance::new(subscription.registrant.clone())
910 .with_caused_by(trigger_occurrence_invocation.causal_ref()),
911 )
912 .with_identity(subscription.target_identity.clone())
913 .with_extra_event_types(subscription.event_types.clone())
914 .with_execution_env_ref(Some(subscription.env_ref.clone()))
915 .with_wake_target(subscription.wake_target.clone());
916 let descriptor_kind = subscription.target_identity.kind.clone();
917 let grant =
918 subscription
919 .wake_target
920 .clone()
921 .map(|session_scope| crate::ProcessStartGrant {
922 session_scope,
923 descriptor: crate::ProcessHandleDescriptor::new(
924 Some(descriptor_kind.as_str()),
925 subscription.target_label.as_deref(),
926 ),
927 });
928 let execution_context = crate::ProcessExecutionContext::default()
929 .with_causal_invocation(Some(trigger_occurrence_invocation));
930 let command = crate::ProcessCommand::Start {
931 registration,
932 grant,
933 execution_context: Box::new(execution_context),
934 };
935 let effect_id = command.effect_id();
936 let invocation = crate::RuntimeInvocation::effect(
937 crate::RuntimeScope::new(originator_scope_id),
938 effect_id.clone(),
939 crate::RuntimeEffectKind::Process,
940 format!(
941 "trigger:{}:{}",
942 occurrence.occurrence_id, subscription.subscription_id
943 ),
944 )
945 .with_caused_by(Some(crate::CausalRef::TriggerOccurrence {
946 occurrence_id: occurrence.occurrence_id.clone(),
947 }));
948 let outcome = effect_controller
949 .execute_effect(
950 crate::RuntimeEffectEnvelope::new(
951 invocation,
952 crate::RuntimeEffectCommand::process(command),
953 ),
954 crate::RuntimeEffectLocalExecutor::processes(process_registry),
955 )
956 .await?;
957 match outcome {
958 crate::RuntimeEffectOutcome::Process {
959 result: crate::ProcessEffectOutcome::Start { .. },
960 } => Ok(()),
961 other => Err(PluginError::Session(format!(
962 "trigger process start returned the wrong outcome: {}",
963 other.kind().as_str()
964 ))),
965 }
966 }
967}
968
969fn trigger_delivery_failure_summary(errors: &[String]) -> Option<String> {
970 match errors {
971 [] => None,
972 [only] => Some(format!("trigger delivery failed: {only}")),
973 [first, rest @ ..] => Some(format!(
974 "trigger delivery failed for {} matching subscriptions: {first}; {} more failed",
975 errors.len(),
976 rest.len()
977 )),
978 }
979}
980
981fn materialize_trigger_process_args(
982 input_template: &BTreeMap<String, TriggerInputBinding>,
983 event_payload: &serde_json::Value,
984) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
985 let mut args = serde_json::Map::new();
986 for (input_name, input) in input_template {
987 let value = match input {
988 TriggerInputBinding::Event => event_payload.clone(),
989 TriggerInputBinding::Fixed { value } => value.clone(),
990 };
991 args.insert(input_name.to_string(), value);
992 }
993 Ok(args)
994}
995
996fn apply_trigger_inputs(
997 mut target: crate::ProcessInput,
998 args: serde_json::Map<String, serde_json::Value>,
999) -> Result<crate::ProcessInput, PluginError> {
1000 match &mut target {
1001 crate::ProcessInput::Engine { payload, .. } => {
1002 let object = payload.as_object_mut().ok_or_else(|| {
1003 PluginError::Session(
1004 "trigger engine target payload must be a JSON object".to_string(),
1005 )
1006 })?;
1007 object.insert("args".to_string(), serde_json::Value::Object(args));
1008 Ok(target)
1009 }
1010 other => Err(PluginError::Session(format!(
1011 "trigger target must be an engine process, got {}",
1012 other.engine_kind()
1013 ))),
1014 }
1015}
1016
1017pub fn validate_trigger_occurrence_request(
1018 request: &TriggerOccurrenceRequest,
1019) -> Result<(), PluginError> {
1020 if request.source_type.trim().is_empty() {
1021 return Err(PluginError::Session(
1022 "trigger occurrence requires source_type".to_string(),
1023 ));
1024 }
1025 if request.source_key.trim().is_empty() {
1026 return Err(PluginError::Session(
1027 "trigger occurrence requires source_key".to_string(),
1028 ));
1029 }
1030 if request.idempotency_key.trim().is_empty() {
1031 return Err(PluginError::Session(
1032 "trigger occurrence requires idempotency_key".to_string(),
1033 ));
1034 }
1035 Ok(())
1036}
1037
1038pub fn trigger_occurrence_request_hash(
1039 request: &TriggerOccurrenceRequest,
1040) -> Result<String, PluginError> {
1041 crate::stable_hash::stable_json_sha256_hex(&(
1042 request.source_type.as_str(),
1043 request.source_key.as_str(),
1044 &request.payload,
1045 &request.source,
1046 ))
1047 .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))
1048}
1049
1050#[cfg(test)]
1051mod tests {
1052 use super::*;
1053
1054 fn button_payload_schema() -> crate::LashSchema {
1055 crate::LashSchema::any()
1056 }
1057
1058 #[test]
1059 fn trigger_catalog_rejects_duplicate_trigger_source_identity() {
1060 let mut catalog = TriggerEventCatalog::new();
1061 catalog
1062 .declare(TriggerEvent::new(
1063 "Button",
1064 "ui.button",
1065 "pressed",
1066 button_payload_schema(),
1067 ))
1068 .expect("first trigger occurrence");
1069
1070 let err = catalog
1071 .declare(TriggerEvent::new(
1072 "AlternateButton",
1073 "ui.button",
1074 "pressed",
1075 button_payload_schema(),
1076 ))
1077 .expect_err("duplicate public source identity should be rejected");
1078
1079 assert!(err.contains("duplicate trigger source `ui.button.pressed`"));
1080 }
1081
1082 #[tokio::test]
1083 async fn trigger_store_rejects_mismatched_target_label() {
1084 let store = InMemoryTriggerStore::default();
1085 let draft = TriggerSubscriptionDraft::for_process(
1086 crate::ProcessOriginator::host(),
1087 crate::ProcessExecutionEnvRef::new("process-env:test"),
1088 "ui.button.pressed",
1089 "source-key",
1090 crate::ProcessInput::External {
1091 metadata: serde_json::json!({}),
1092 },
1093 crate::ProcessIdentity::new("external").with_label(Some("expected")),
1094 )
1095 .with_target_label("other");
1096
1097 let err = store
1098 .register_subscription(draft)
1099 .await
1100 .expect_err("mismatched target labels should be rejected");
1101 assert!(err.to_string().contains("target_label must match"));
1102 }
1103}