1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
9pub struct TriggerEvent {
10 pub resource_type: String,
11 pub alias: String,
12 pub event: String,
13 pub payload_ty: lashlang::NamedDataType,
14}
15
16impl TriggerEvent {
17 pub fn new(
18 resource_type: impl Into<String>,
19 alias: impl Into<String>,
20 event: impl Into<String>,
21 payload_ty: lashlang::NamedDataType,
22 ) -> Self {
23 Self {
24 resource_type: resource_type.into(),
25 alias: alias.into(),
26 event: event.into(),
27 payload_ty,
28 }
29 }
30
31 pub fn payload_type(&self) -> &lashlang::NamedDataType {
32 &self.payload_ty
33 }
34
35 pub fn key(&self) -> TriggerEventKey {
36 TriggerEventKey {
37 resource_type: self.resource_type.clone(),
38 alias: self.alias.clone(),
39 event: self.event.clone(),
40 }
41 }
42
43 pub fn source_type(&self) -> String {
44 trigger_event_type(&self.alias, &self.event)
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct TriggerEventKey {
50 pub resource_type: String,
51 pub alias: String,
52 pub event: String,
53}
54
55impl TriggerEventKey {
56 pub fn new(
57 resource_type: impl Into<String>,
58 alias: impl Into<String>,
59 event: impl Into<String>,
60 ) -> Self {
61 Self {
62 resource_type: resource_type.into(),
63 alias: alias.into(),
64 event: event.into(),
65 }
66 }
67
68 pub fn source_type(&self) -> String {
69 trigger_event_type(&self.alias, &self.event)
70 }
71}
72
73pub fn trigger_event_type(alias: &str, event: &str) -> String {
74 format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
78pub struct TriggerEventCatalog {
79 events: BTreeMap<TriggerEventKey, TriggerEvent>,
80}
81
82impl TriggerEventCatalog {
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn declare(&mut self, event: TriggerEvent) -> Result<(), String> {
88 let key = event.key();
89 if self.events.contains_key(&key) {
90 return Err(format!(
91 "duplicate trigger occurrence `{}.{}.{}`",
92 key.resource_type, key.alias, key.event
93 ));
94 }
95 let source_type = event.source_type();
96 if let Some(existing) = self
97 .events
98 .values()
99 .find(|existing| existing.source_type() == source_type)
100 {
101 return Err(format!(
102 "duplicate trigger source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103 existing.resource_type,
104 existing.alias,
105 existing.event,
106 key.resource_type,
107 key.alias,
108 key.event
109 ));
110 }
111 self.events.insert(key, event);
112 Ok(())
113 }
114
115 pub fn from_events(events: impl IntoIterator<Item = TriggerEvent>) -> Result<Self, String> {
116 let mut catalog = Self::new();
117 for event in events {
118 catalog.declare(event)?;
119 }
120 Ok(catalog)
121 }
122
123 pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&TriggerEvent> {
124 self.events
125 .get(&TriggerEventKey::new(resource_type, alias, event))
126 }
127
128 pub fn is_empty(&self) -> bool {
129 self.events.is_empty()
130 }
131
132 pub fn events(&self) -> impl Iterator<Item = &TriggerEvent> {
133 self.events.values()
134 }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerEmitReport {
139 #[serde(default, skip_serializing_if = "String::is_empty")]
140 pub occurrence_id: String,
141 pub started_process_ids: Vec<String>,
142}
143
144impl TriggerEmitReport {
145 pub fn empty() -> Self {
146 Self::default()
147 }
148
149 fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150 Self {
151 occurrence_id,
152 started_process_ids,
153 }
154 }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct TriggerOccurrenceRequest {
159 pub source_type: String,
160 pub source_key: String,
161 #[serde(default)]
162 pub payload: serde_json::Value,
163 pub idempotency_key: String,
164 #[serde(default, skip_serializing_if = "Option::is_none")]
165 pub source: Option<serde_json::Value>,
166}
167
168impl TriggerOccurrenceRequest {
169 pub fn new(
170 source_type: impl Into<String>,
171 source_key: impl Into<String>,
172 payload: serde_json::Value,
173 idempotency_key: impl Into<String>,
174 ) -> Self {
175 Self {
176 source_type: source_type.into(),
177 source_key: source_key.into(),
178 payload,
179 idempotency_key: idempotency_key.into(),
180 source: None,
181 }
182 }
183
184 pub fn with_source(mut self, source: serde_json::Value) -> Self {
185 self.source = Some(source);
186 self
187 }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct TriggerOccurrenceRecord {
192 pub occurrence_id: String,
193 pub source_type: String,
194 pub source_key: String,
195 #[serde(default)]
196 pub payload: serde_json::Value,
197 pub idempotency_key: String,
198 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub source: Option<serde_json::Value>,
200 pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerEventType(String);
206
207impl TriggerEventType {
208 pub fn new(value: impl Into<String>) -> Self {
209 Self(value.into())
210 }
211
212 pub fn as_str(&self) -> &str {
213 &self.0
214 }
215}
216
217impl From<String> for TriggerEventType {
218 fn from(value: String) -> Self {
219 Self::new(value)
220 }
221}
222
223impl From<&str> for TriggerEventType {
224 fn from(value: &str) -> Self {
225 Self::new(value)
226 }
227}
228
229impl AsRef<str> for TriggerEventType {
230 fn as_ref(&self) -> &str {
231 self.as_str()
232 }
233}
234
235impl std::fmt::Display for TriggerEventType {
236 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 formatter.write_str(self.as_str())
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243 pub handle: String,
244 pub source_key: String,
245 #[serde(default, skip_serializing_if = "Option::is_none")]
246 pub name: Option<String>,
247 pub source_type: TriggerEventType,
248 pub source: serde_json::Value,
249 pub target: TriggerTargetSummary,
250 #[serde(default = "default_enabled")]
251 pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256 pub process_name: String,
257 pub inputs: lashlang::TriggerInputTemplate,
258}
259
260#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
261pub struct TriggerSubscriptionDraft {
262 pub registrant: crate::ProcessOriginator,
263 pub env_ref: crate::ProcessExecutionEnvRef,
264 #[serde(default, skip_serializing_if = "Option::is_none")]
265 pub wake_target: Option<crate::SessionScope>,
266 #[serde(default, skip_serializing_if = "Option::is_none")]
267 pub name: Option<String>,
268 pub source_type: String,
269 pub source_key: String,
270 pub source: serde_json::Value,
271 pub event_ty: lashlang::TypeExpr,
272 pub module_ref: lashlang::ModuleRef,
273 pub host_requirements_ref: lashlang::HostRequirementsRef,
274 pub process_ref: lashlang::ProcessRef,
275 pub process_name: String,
276 pub input_template: lashlang::TriggerInputTemplate,
277}
278
279#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
280pub struct TriggerSubscriptionRecord {
281 pub subscription_id: String,
282 pub registrant: crate::ProcessOriginator,
283 pub env_ref: crate::ProcessExecutionEnvRef,
284 #[serde(default, skip_serializing_if = "Option::is_none")]
285 pub wake_target: Option<crate::SessionScope>,
286 pub handle: String,
287 #[serde(default, skip_serializing_if = "Option::is_none")]
288 pub name: Option<String>,
289 pub source_type: String,
290 pub source_key: String,
291 pub source: serde_json::Value,
292 pub event_ty: lashlang::TypeExpr,
293 pub module_ref: lashlang::ModuleRef,
294 pub host_requirements_ref: lashlang::HostRequirementsRef,
295 pub process_ref: lashlang::ProcessRef,
296 pub process_name: String,
297 pub input_template: lashlang::TriggerInputTemplate,
298 #[serde(default = "default_enabled")]
299 pub enabled: bool,
300 pub created_at_ms: u64,
301 pub updated_at_ms: u64,
302}
303
304impl TriggerSubscriptionRecord {
305 pub fn registrant_scope_id(&self) -> String {
306 self.registrant.scope_id()
307 }
308
309 pub fn registrant_session_id(&self) -> Option<&str> {
310 match &self.registrant {
311 crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
312 crate::ProcessOriginator::Host => None,
313 }
314 }
315}
316
317impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
318 fn from(route: &TriggerSubscriptionRecord) -> Self {
319 Self {
320 handle: route.handle.clone(),
321 source_key: route.source_key.clone(),
322 name: route.name.clone(),
323 source_type: TriggerEventType::new(route.source_type.clone()),
324 source: route.source.clone(),
325 target: TriggerTargetSummary {
326 process_name: route.process_name.clone(),
327 inputs: route.input_template.clone(),
328 },
329 enabled: route.enabled,
330 }
331 }
332}
333
334#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
335pub struct TriggerSubscriptionFilter {
336 #[serde(default, skip_serializing_if = "Option::is_none")]
337 pub session_id: Option<String>,
338 #[serde(default, skip_serializing_if = "Option::is_none")]
339 pub handle: Option<String>,
340 #[serde(default, skip_serializing_if = "Option::is_none")]
341 pub name: Option<String>,
342 #[serde(default, skip_serializing_if = "Option::is_none")]
343 pub source_type: Option<String>,
344 #[serde(default, skip_serializing_if = "Option::is_none")]
345 pub source_key: Option<String>,
346 #[serde(default, skip_serializing_if = "Option::is_none")]
347 pub target: Option<lashlang::TriggerTargetIdentity>,
348 #[serde(default, skip_serializing_if = "Option::is_none")]
349 pub enabled: Option<bool>,
350}
351
352impl TriggerSubscriptionFilter {
353 pub fn for_session(session_id: impl Into<String>) -> Self {
354 Self {
355 session_id: Some(session_id.into()),
356 ..Self::default()
357 }
358 }
359
360 pub fn for_source_type(source_type: impl Into<String>) -> Self {
361 Self {
362 source_type: Some(source_type.into()),
363 ..Self::default()
364 }
365 }
366
367 pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
368 self.session_id
369 .as_deref()
370 .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
371 && self
372 .handle
373 .as_deref()
374 .is_none_or(|handle| record.handle == handle)
375 && self
376 .name
377 .as_deref()
378 .is_none_or(|name| record.name.as_deref() == Some(name))
379 && self
380 .source_type
381 .as_deref()
382 .is_none_or(|source_type| record.source_type == source_type)
383 && self
384 .source_key
385 .as_deref()
386 .is_none_or(|source_key| record.source_key == source_key)
387 && self.enabled.is_none_or(|enabled| record.enabled == enabled)
388 && self.target.as_ref().is_none_or(|target| {
389 target.matches(
390 &record.module_ref,
391 &record.host_requirements_ref,
392 &record.process_ref,
393 &record.process_name,
394 )
395 })
396 }
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct TriggerDeliveryReservation {
401 pub occurrence: TriggerOccurrenceRecord,
402 pub subscription: TriggerSubscriptionRecord,
403 pub process_id: String,
404}
405
406#[async_trait::async_trait]
407pub trait TriggerStore: Send + Sync {
408 fn durability_tier(&self) -> crate::DurabilityTier {
409 crate::DurabilityTier::Inline
410 }
411
412 async fn source_key_for_subscription(
413 &self,
414 source_type: &str,
415 source: &serde_json::Value,
416 ) -> Result<String, PluginError> {
417 default_trigger_source_key(source_type, source)
418 }
419
420 async fn register_subscription(
421 &self,
422 draft: TriggerSubscriptionDraft,
423 ) -> Result<TriggerSubscriptionRecord, PluginError>;
424
425 async fn list_subscriptions(
426 &self,
427 filter: TriggerSubscriptionFilter,
428 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
429
430 async fn cancel_subscription(
431 &self,
432 session_id: &str,
433 handle: &str,
434 ) -> Result<bool, PluginError>;
435
436 async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
437
438 async fn record_occurrence(
439 &self,
440 request: TriggerOccurrenceRequest,
441 ) -> Result<TriggerOccurrenceRecord, PluginError>;
442
443 async fn reserve_matching_deliveries(
444 &self,
445 occurrence_id: &str,
446 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
447}
448
449#[derive(Default)]
450pub struct InMemoryTriggerStore {
451 state: Mutex<InMemoryTriggerEventState>,
452}
453
454#[derive(Default)]
455struct InMemoryTriggerEventState {
456 next_subscription_seq: u64,
457 subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
458 occurrences: BTreeMap<String, TriggerOccurrenceRecord>,
459 occurrence_id_by_idempotency_key: BTreeMap<String, String>,
460 occurrence_hashes: BTreeMap<String, String>,
461 deliveries: BTreeSet<(String, String)>,
462}
463
464#[async_trait::async_trait]
465impl TriggerStore for InMemoryTriggerStore {
466 async fn register_subscription(
467 &self,
468 draft: TriggerSubscriptionDraft,
469 ) -> Result<TriggerSubscriptionRecord, PluginError> {
470 let mut state = self
471 .state
472 .lock()
473 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
474 state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
475 let handle = format!("trigger:{}", state.next_subscription_seq);
476 let subscription_id = format!("subscription:{}", state.next_subscription_seq);
477 let now = crate::runtime::current_epoch_ms();
478 let record = TriggerSubscriptionRecord {
479 subscription_id: subscription_id.clone(),
480 registrant: draft.registrant,
481 env_ref: draft.env_ref,
482 wake_target: draft.wake_target,
483 handle,
484 name: draft.name,
485 source_type: draft.source_type,
486 source_key: draft.source_key,
487 source: draft.source,
488 event_ty: draft.event_ty,
489 module_ref: draft.module_ref,
490 host_requirements_ref: draft.host_requirements_ref,
491 process_ref: draft.process_ref,
492 process_name: draft.process_name,
493 input_template: draft.input_template,
494 enabled: true,
495 created_at_ms: now,
496 updated_at_ms: now,
497 };
498 state.subscriptions.insert(subscription_id, record.clone());
499 Ok(record)
500 }
501
502 async fn list_subscriptions(
503 &self,
504 filter: TriggerSubscriptionFilter,
505 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
506 let state = self
507 .state
508 .lock()
509 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
510 let mut records = state
511 .subscriptions
512 .values()
513 .filter(|record| filter.matches(record))
514 .cloned()
515 .collect::<Vec<_>>();
516 records.sort_by(|left, right| {
517 left.registrant_scope_id()
518 .cmp(&right.registrant_scope_id())
519 .then_with(|| left.handle.cmp(&right.handle))
520 });
521 Ok(records)
522 }
523
524 async fn cancel_subscription(
525 &self,
526 session_id: &str,
527 handle: &str,
528 ) -> Result<bool, PluginError> {
529 let mut state = self
530 .state
531 .lock()
532 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
533 let now = crate::runtime::current_epoch_ms();
534 let Some(record) = state.subscriptions.values_mut().find(|record| {
535 record.registrant_session_id() == Some(session_id) && record.handle == handle
536 }) else {
537 return Ok(false);
538 };
539 let changed = record.enabled;
540 record.enabled = false;
541 record.updated_at_ms = now;
542 Ok(changed)
543 }
544
545 async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
546 let mut state = self
547 .state
548 .lock()
549 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
550 let before = state.subscriptions.len();
551 state
552 .subscriptions
553 .retain(|_, record| record.registrant_session_id() != Some(session_id));
554 Ok(before.saturating_sub(state.subscriptions.len()))
555 }
556
557 async fn record_occurrence(
558 &self,
559 request: TriggerOccurrenceRequest,
560 ) -> Result<TriggerOccurrenceRecord, PluginError> {
561 validate_trigger_occurrence_request(&request)?;
562 let mut state = self
563 .state
564 .lock()
565 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
566 let request_hash = trigger_occurrence_request_hash(&request)?;
567 if let Some(existing_id) = state
568 .occurrence_id_by_idempotency_key
569 .get(&request.idempotency_key)
570 .cloned()
571 {
572 let existing_hash = state
573 .occurrence_hashes
574 .get(&existing_id)
575 .cloned()
576 .unwrap_or_default();
577 if existing_hash != request_hash {
578 return Err(PluginError::Session(format!(
579 "trigger occurrence idempotency conflict for `{}`",
580 request.idempotency_key
581 )));
582 }
583 return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
584 PluginError::Session(format!(
585 "missing trigger occurrence `{existing_id}` for idempotency key"
586 ))
587 });
588 }
589 let occurrence_id = deterministic_occurrence_id(&request)?;
590 let record = TriggerOccurrenceRecord {
591 occurrence_id: occurrence_id.clone(),
592 source_type: request.source_type,
593 source_key: request.source_key,
594 payload: request.payload,
595 idempotency_key: request.idempotency_key.clone(),
596 source: request.source,
597 occurred_at_ms: crate::runtime::current_epoch_ms(),
598 };
599 state
600 .occurrence_id_by_idempotency_key
601 .insert(request.idempotency_key, occurrence_id.clone());
602 state
603 .occurrence_hashes
604 .insert(occurrence_id.clone(), request_hash);
605 state.occurrences.insert(occurrence_id, record.clone());
606 Ok(record)
607 }
608
609 async fn reserve_matching_deliveries(
610 &self,
611 occurrence_id: &str,
612 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
613 let mut state = self
614 .state
615 .lock()
616 .map_err(|_| PluginError::Session("trigger store lock poisoned".to_string()))?;
617 let occurrence = state
618 .occurrences
619 .get(occurrence_id)
620 .cloned()
621 .ok_or_else(|| {
622 PluginError::Session(format!("unknown trigger occurrence `{occurrence_id}`"))
623 })?;
624 let subscriptions = state
625 .subscriptions
626 .values()
627 .filter(|record| {
628 record.enabled
629 && record.source_type == occurrence.source_type
630 && record.source_key == occurrence.source_key
631 })
632 .cloned()
633 .collect::<Vec<_>>();
634 let mut deliveries = Vec::new();
635 for subscription in subscriptions {
636 let key = (
637 occurrence.occurrence_id.clone(),
638 subscription.subscription_id.clone(),
639 );
640 if !state.deliveries.insert(key) {
641 continue;
642 }
643 let process_id = deterministic_delivery_process_id(
644 &occurrence.occurrence_id,
645 &subscription.subscription_id,
646 )?;
647 deliveries.push(TriggerDeliveryReservation {
648 occurrence: occurrence.clone(),
649 subscription,
650 process_id,
651 });
652 }
653 Ok(deliveries)
654 }
655}
656
657fn default_enabled() -> bool {
658 true
659}
660
661pub fn default_trigger_source_key(
662 source_type: &str,
663 source: &serde_json::Value,
664) -> Result<String, PluginError> {
665 let digest = crate::stable_hash::stable_json_sha256_hex(&(source_type, source))
666 .map_err(|err| PluginError::Session(format!("failed to hash trigger source key: {err}")))?;
667 Ok(format!("source:{source_type}:sha256:{digest}"))
668}
669
670pub fn empty_trigger_source_key(source_type: &str) -> Result<String, PluginError> {
671 default_trigger_source_key(source_type, &serde_json::json!({}))
672}
673
674pub fn deterministic_occurrence_id(
675 request: &TriggerOccurrenceRequest,
676) -> Result<String, PluginError> {
677 let digest = crate::stable_hash::stable_json_sha256_hex(&(
678 request.source_type.as_str(),
679 request.source_key.as_str(),
680 request.idempotency_key.as_str(),
681 ))
682 .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))?;
683 Ok(format!("trigger:{digest}"))
684}
685
686pub fn deterministic_delivery_process_id(
687 occurrence_id: &str,
688 subscription_id: &str,
689) -> Result<String, PluginError> {
690 let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
691 .map_err(|err| PluginError::Session(format!("failed to hash trigger delivery: {err}")))?;
692 Ok(format!("process:trigger:{digest}"))
693}
694
695#[derive(Clone)]
696pub struct TriggerRouter {
697 store: Arc<dyn TriggerStore>,
698 artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
699 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
700 process_work_poke: Option<crate::ProcessWorkPoke>,
701 host_profile_id: String,
702}
703
704impl TriggerRouter {
705 pub fn new(
706 store: Arc<dyn TriggerStore>,
707 artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
708 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
709 process_work_poke: Option<crate::ProcessWorkPoke>,
710 host_profile_id: impl Into<String>,
711 ) -> Self {
712 Self {
713 store,
714 artifact_store,
715 process_registry,
716 process_work_poke,
717 host_profile_id: host_profile_id.into(),
718 }
719 }
720
721 pub fn store(&self) -> Arc<dyn TriggerStore> {
722 Arc::clone(&self.store)
723 }
724
725 pub async fn emit(
726 &self,
727 request: TriggerOccurrenceRequest,
728 effect_controller: &dyn crate::RuntimeEffectController,
729 ) -> Result<TriggerEmitReport, PluginError> {
730 let occurrence = self.store.record_occurrence(request).await?;
731 let reservations = self
732 .store
733 .reserve_matching_deliveries(&occurrence.occurrence_id)
734 .await?;
735 let Some(process_registry) = self.process_registry.as_ref() else {
736 if reservations.is_empty() {
737 return Ok(TriggerEmitReport::new(occurrence.occurrence_id, Vec::new()));
738 }
739 return Err(PluginError::Session(
740 "trigger delivery requires a process registry".to_string(),
741 ));
742 };
743 let mut started_process_ids = Vec::new();
744 let mut start_errors = Vec::new();
745 for reservation in reservations {
746 let process_id = reservation.process_id.clone();
747 if let Err(err) = self
748 .start_delivery(
749 &reservation,
750 Arc::clone(process_registry),
751 effect_controller,
752 )
753 .await
754 {
755 start_errors.push(format!(
756 "{}: {err}",
757 reservation.subscription.subscription_id
758 ));
759 continue;
760 }
761 started_process_ids.push(process_id);
762 }
763 if !started_process_ids.is_empty()
764 && let Some(poke) = self.process_work_poke.as_ref()
765 {
766 poke.poke();
767 }
768 if started_process_ids.is_empty()
769 && let Some(message) = trigger_delivery_failure_summary(&start_errors)
770 {
771 return Err(PluginError::Session(message));
772 }
773 Ok(TriggerEmitReport::new(
774 occurrence.occurrence_id,
775 started_process_ids,
776 ))
777 }
778
779 async fn start_delivery(
780 &self,
781 reservation: &TriggerDeliveryReservation,
782 process_registry: Arc<dyn crate::ProcessRegistry>,
783 effect_controller: &dyn crate::RuntimeEffectController,
784 ) -> Result<(), PluginError> {
785 let subscription = &reservation.subscription;
786 let occurrence = &reservation.occurrence;
787 validate_payload(&occurrence.payload, &subscription.event_ty).map_err(|message| {
788 PluginError::Session(format!(
789 "invalid payload for trigger `{}`: {message}",
790 subscription.handle
791 ))
792 })?;
793 let artifact = self
794 .artifact_store
795 .get_module_artifact(&subscription.module_ref)
796 .await
797 .map_err(|err| {
798 PluginError::Session(format!(
799 "failed to load trigger target module `{}`: {err}",
800 subscription.module_ref
801 ))
802 })?
803 .ok_or_else(|| {
804 PluginError::Session(format!(
805 "missing trigger target module `{}`",
806 subscription.module_ref
807 ))
808 })?;
809 let signal_event_types = artifact
810 .canonical_ir
811 .process(&subscription.process_name)
812 .map(crate::lashlang_process_signal_event_types)
813 .unwrap_or_default();
814 let args =
815 materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
816 let originator_scope_id = subscription.registrant_scope_id();
817 let trigger_occurrence_invocation = crate::runtime::causal::trigger_occurrence_invocation(
818 &originator_scope_id,
819 &occurrence.occurrence_id,
820 );
821 let registration = crate::ProcessRegistration::new(
822 reservation.process_id.clone(),
823 crate::ProcessInput::LashlangProcess {
824 module_ref: subscription.module_ref.clone(),
825 process_ref: subscription.process_ref.clone(),
826 host_requirements_ref: subscription.host_requirements_ref.clone(),
827 process_name: subscription.process_name.clone(),
828 args,
829 },
830 crate::ProcessProvenance::new(
831 subscription.registrant.clone(),
832 self.host_profile_id.clone(),
833 )
834 .with_caused_by(trigger_occurrence_invocation.causal_ref()),
835 )
836 .with_extra_event_types(
837 crate::lashlang_process_event_types()
838 .into_iter()
839 .chain(signal_event_types),
840 )
841 .with_execution_env_ref(Some(subscription.env_ref.clone()))
842 .with_wake_target(subscription.wake_target.clone());
843 let grant =
844 subscription
845 .wake_target
846 .clone()
847 .map(|session_scope| crate::ProcessStartGrant {
848 session_scope,
849 descriptor: crate::ProcessHandleDescriptor::new(
850 Some("lashlang"),
851 Some(subscription.process_name.as_str()),
852 ),
853 });
854 let execution_context = crate::ProcessExecutionContext::default()
855 .with_causal_invocation(Some(trigger_occurrence_invocation));
856 let command = crate::ProcessCommand::Start {
857 registration,
858 grant,
859 execution_context: Box::new(execution_context),
860 };
861 let effect_id = command.effect_id();
862 let invocation = crate::RuntimeInvocation::effect(
863 crate::RuntimeScope::new(originator_scope_id),
864 effect_id.clone(),
865 crate::RuntimeEffectKind::Process,
866 format!(
867 "trigger:{}:{}",
868 occurrence.occurrence_id, subscription.subscription_id
869 ),
870 )
871 .with_caused_by(Some(crate::CausalRef::TriggerOccurrence {
872 occurrence_id: occurrence.occurrence_id.clone(),
873 }));
874 let outcome = effect_controller
875 .execute_effect(
876 crate::RuntimeEffectEnvelope::new(
877 invocation,
878 crate::RuntimeEffectCommand::process(command),
879 ),
880 crate::RuntimeEffectLocalExecutor::processes(process_registry),
881 )
882 .await?;
883 match outcome {
884 crate::RuntimeEffectOutcome::Process {
885 result: crate::ProcessEffectOutcome::Start { .. },
886 } => Ok(()),
887 other => Err(PluginError::Session(format!(
888 "trigger process start returned the wrong outcome: {}",
889 other.kind().as_str()
890 ))),
891 }
892 }
893}
894
895fn trigger_delivery_failure_summary(errors: &[String]) -> Option<String> {
896 match errors {
897 [] => None,
898 [only] => Some(format!("trigger delivery failed: {only}")),
899 [first, rest @ ..] => Some(format!(
900 "trigger delivery failed for {} matching subscriptions: {first}; {} more failed",
901 errors.len(),
902 rest.len()
903 )),
904 }
905}
906
907fn materialize_trigger_process_args(
908 input_template: &lashlang::TriggerInputTemplate,
909 event_payload: &serde_json::Value,
910) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
911 let mut args = lashlang::Record::default();
912 for (input_name, input) in input_template.entries() {
913 let value = match input {
914 lashlang::TriggerInputBinding::Event => event_payload.clone(),
915 lashlang::TriggerInputBinding::Fixed { value } => value.clone(),
916 };
917 args.insert(input_name.to_string(), lashlang::from_json(value));
918 }
919 match serde_json::to_value(lashlang::Value::Record(Arc::new(args)))
920 .map_err(|err| PluginError::Session(format!("serialize trigger process args: {err}")))?
921 {
922 serde_json::Value::Object(map) => Ok(map),
923 _ => Err(PluginError::Session(
924 "trigger process args must serialize as an object".to_string(),
925 )),
926 }
927}
928
929pub fn validate_payload(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> Result<(), String> {
930 if json_matches_type(value, ty) {
931 Ok(())
932 } else {
933 Err(format!("expected {}", lashlang::format_type_expr(ty)))
934 }
935}
936
937fn json_matches_type(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> bool {
938 match ty {
939 lashlang::TypeExpr::Any => true,
940 lashlang::TypeExpr::Ref(_) => false,
941 lashlang::TypeExpr::Str => value.is_string(),
942 lashlang::TypeExpr::Int => value.as_i64().is_some() || value.as_u64().is_some(),
943 lashlang::TypeExpr::Float => value.is_number(),
944 lashlang::TypeExpr::Bool => value.is_boolean(),
945 lashlang::TypeExpr::Dict => value.is_object(),
946 lashlang::TypeExpr::Null => value.is_null(),
947 lashlang::TypeExpr::Enum(values) => value
948 .as_str()
949 .is_some_and(|value| values.iter().any(|candidate| candidate.as_str() == value)),
950 lashlang::TypeExpr::List(item) => value.as_array().is_some_and(|items| {
951 items
952 .iter()
953 .all(|item_value| json_matches_type(item_value, item))
954 }),
955 lashlang::TypeExpr::Object(fields) => {
956 let Some(map) = value.as_object() else {
957 return false;
958 };
959 fields
960 .iter()
961 .all(|field| match map.get(field.name.as_str()) {
962 Some(field_value) => json_matches_type(field_value, &field.ty),
963 None => field.optional,
964 })
965 }
966 lashlang::TypeExpr::Union(items) => items.iter().any(|item| json_matches_type(value, item)),
967 lashlang::TypeExpr::Process { .. } | lashlang::TypeExpr::TriggerHandle(_) => {
968 value.is_object()
969 }
970 }
971}
972
973pub fn validate_trigger_occurrence_request(
974 request: &TriggerOccurrenceRequest,
975) -> Result<(), PluginError> {
976 if request.source_type.trim().is_empty() {
977 return Err(PluginError::Session(
978 "trigger occurrence requires source_type".to_string(),
979 ));
980 }
981 if request.source_key.trim().is_empty() {
982 return Err(PluginError::Session(
983 "trigger occurrence requires source_key".to_string(),
984 ));
985 }
986 if request.idempotency_key.trim().is_empty() {
987 return Err(PluginError::Session(
988 "trigger occurrence requires idempotency_key".to_string(),
989 ));
990 }
991 Ok(())
992}
993
994pub fn trigger_occurrence_request_hash(
995 request: &TriggerOccurrenceRequest,
996) -> Result<String, PluginError> {
997 crate::stable_hash::stable_json_sha256_hex(&(
998 request.source_type.as_str(),
999 request.source_key.as_str(),
1000 &request.payload,
1001 &request.source,
1002 ))
1003 .map_err(|err| PluginError::Session(format!("failed to hash trigger occurrence: {err}")))
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use super::*;
1009
1010 fn button_payload_type() -> lashlang::NamedDataType {
1011 lashlang::NamedDataType::object(
1012 "ui.button.Pressed",
1013 vec![lashlang::TypeField {
1014 name: "button".into(),
1015 ty: lashlang::TypeExpr::Str,
1016 optional: false,
1017 }],
1018 )
1019 .expect("valid trigger occurrence payload")
1020 }
1021
1022 #[test]
1023 fn trigger_catalog_rejects_duplicate_trigger_source_identity() {
1024 let mut catalog = TriggerEventCatalog::new();
1025 catalog
1026 .declare(TriggerEvent::new(
1027 "Button",
1028 "ui.button",
1029 "pressed",
1030 button_payload_type(),
1031 ))
1032 .expect("first trigger occurrence");
1033
1034 let err = catalog
1035 .declare(TriggerEvent::new(
1036 "AlternateButton",
1037 "ui.button",
1038 "pressed",
1039 button_payload_type(),
1040 ))
1041 .expect_err("duplicate public source identity should be rejected");
1042
1043 assert!(err.contains("duplicate trigger source `ui.button.pressed`"));
1044 }
1045
1046 #[test]
1047 fn trigger_subscription_record_rejects_legacy_required_surface_ref() {
1048 let mut inputs = BTreeMap::new();
1049 inputs.insert("event".to_string(), lashlang::TriggerInputBinding::Event);
1050 let record = TriggerSubscriptionRecord {
1051 subscription_id: "subscription:1".to_string(),
1052 registrant: crate::ProcessOriginator::session(crate::SessionScope::new("session-a")),
1053 env_ref: crate::ProcessExecutionEnvRef::new("process-env:session-a"),
1054 wake_target: Some(crate::SessionScope::new("session-a")),
1055 handle: "trigger:1".to_string(),
1056 name: Some("button watcher".to_string()),
1057 source_type: "ui.button.pressed".to_string(),
1058 source_key: empty_trigger_source_key("ui.button.pressed").expect("source key"),
1059 source: serde_json::json!({}),
1060 event_ty: lashlang::TypeExpr::Object(vec![lashlang::TypeField {
1061 name: "button".into(),
1062 ty: lashlang::TypeExpr::Str,
1063 optional: false,
1064 }]),
1065 module_ref: lashlang::ModuleRef::new(&lashlang::ContentHash::new("module")),
1066 host_requirements_ref: lashlang::HostRequirementsRef::new(&lashlang::ContentHash::new(
1067 "surface",
1068 )),
1069 process_ref: lashlang::ProcessRef::new(lashlang::ContentHash::new("process"), 0),
1070 process_name: "on_button".to_string(),
1071 input_template: lashlang::TriggerInputTemplate::new(inputs),
1072 enabled: true,
1073 created_at_ms: 1,
1074 updated_at_ms: 1,
1075 };
1076 let mut value = serde_json::to_value(record).expect("record json");
1077 let object = value.as_object_mut().expect("record object");
1078 let legacy_ref = object
1079 .remove("host_requirements_ref")
1080 .expect("host requirements ref");
1081 object.insert("required_surface_ref".to_string(), legacy_ref);
1082
1083 let err = serde_json::from_value::<TriggerSubscriptionRecord>(value)
1084 .expect_err("legacy record must be malformed");
1085
1086 assert!(err.to_string().contains("host_requirements_ref"));
1087 }
1088}