1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
9pub struct HostEvent {
10 pub resource_type: String,
11 pub alias: String,
12 pub event: String,
13 pub payload_ty: lashlang::NamedDataType,
14}
15
16impl HostEvent {
17 pub fn new(
18 resource_type: impl Into<String>,
19 alias: impl Into<String>,
20 event: impl Into<String>,
21 payload_ty: lashlang::NamedDataType,
22 ) -> Self {
23 Self {
24 resource_type: resource_type.into(),
25 alias: alias.into(),
26 event: event.into(),
27 payload_ty,
28 }
29 }
30
31 pub fn payload_type(&self) -> &lashlang::NamedDataType {
32 &self.payload_ty
33 }
34
35 pub fn key(&self) -> HostEventKey {
36 HostEventKey {
37 resource_type: self.resource_type.clone(),
38 alias: self.alias.clone(),
39 event: self.event.clone(),
40 }
41 }
42
43 pub fn source_type(&self) -> String {
44 host_event_source_type(&self.alias, &self.event)
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct HostEventKey {
50 pub resource_type: String,
51 pub alias: String,
52 pub event: String,
53}
54
55impl HostEventKey {
56 pub fn new(
57 resource_type: impl Into<String>,
58 alias: impl Into<String>,
59 event: impl Into<String>,
60 ) -> Self {
61 Self {
62 resource_type: resource_type.into(),
63 alias: alias.into(),
64 event: event.into(),
65 }
66 }
67
68 pub fn source_type(&self) -> String {
69 host_event_source_type(&self.alias, &self.event)
70 }
71}
72
73pub fn host_event_source_type(alias: &str, event: &str) -> String {
74 format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
78pub struct HostEventCatalog {
79 events: BTreeMap<HostEventKey, HostEvent>,
80}
81
82impl HostEventCatalog {
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn declare(&mut self, event: HostEvent) -> Result<(), String> {
88 let key = event.key();
89 if self.events.contains_key(&key) {
90 return Err(format!(
91 "duplicate host event `{}.{}.{}`",
92 key.resource_type, key.alias, key.event
93 ));
94 }
95 let source_type = event.source_type();
96 if let Some(existing) = self
97 .events
98 .values()
99 .find(|existing| existing.source_type() == source_type)
100 {
101 return Err(format!(
102 "duplicate host event source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103 existing.resource_type,
104 existing.alias,
105 existing.event,
106 key.resource_type,
107 key.alias,
108 key.event
109 ));
110 }
111 self.events.insert(key, event);
112 Ok(())
113 }
114
115 pub fn from_events(events: impl IntoIterator<Item = HostEvent>) -> Result<Self, String> {
116 let mut catalog = Self::new();
117 for event in events {
118 catalog.declare(event)?;
119 }
120 Ok(catalog)
121 }
122
123 pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&HostEvent> {
124 self.events
125 .get(&HostEventKey::new(resource_type, alias, event))
126 }
127
128 pub fn is_empty(&self) -> bool {
129 self.events.is_empty()
130 }
131
132 pub fn events(&self) -> impl Iterator<Item = &HostEvent> {
133 self.events.values()
134 }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct HostEventEmitReport {
139 #[serde(default, skip_serializing_if = "String::is_empty")]
140 pub occurrence_id: String,
141 pub started_process_ids: Vec<String>,
142}
143
144impl HostEventEmitReport {
145 pub fn empty() -> Self {
146 Self::default()
147 }
148
149 fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150 Self {
151 occurrence_id,
152 started_process_ids,
153 }
154 }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct HostEventOccurrenceRequest {
159 pub source_type: String,
160 pub source_key: String,
161 #[serde(default)]
162 pub payload: serde_json::Value,
163 pub idempotency_key: String,
164 #[serde(default, skip_serializing_if = "Option::is_none")]
165 pub source: Option<serde_json::Value>,
166}
167
168impl HostEventOccurrenceRequest {
169 pub fn new(
170 source_type: impl Into<String>,
171 source_key: impl Into<String>,
172 payload: serde_json::Value,
173 idempotency_key: impl Into<String>,
174 ) -> Self {
175 Self {
176 source_type: source_type.into(),
177 source_key: source_key.into(),
178 payload,
179 idempotency_key: idempotency_key.into(),
180 source: None,
181 }
182 }
183
184 pub fn with_source(mut self, source: serde_json::Value) -> Self {
185 self.source = Some(source);
186 self
187 }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct HostEventOccurrenceRecord {
192 pub occurrence_id: String,
193 pub source_type: String,
194 pub source_key: String,
195 #[serde(default)]
196 pub payload: serde_json::Value,
197 pub idempotency_key: String,
198 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub source: Option<serde_json::Value>,
200 pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerSourceType(String);
206
207impl TriggerSourceType {
208 pub fn new(value: impl Into<String>) -> Self {
209 Self(value.into())
210 }
211
212 pub fn as_str(&self) -> &str {
213 &self.0
214 }
215}
216
217impl From<String> for TriggerSourceType {
218 fn from(value: String) -> Self {
219 Self::new(value)
220 }
221}
222
223impl From<&str> for TriggerSourceType {
224 fn from(value: &str) -> Self {
225 Self::new(value)
226 }
227}
228
229impl AsRef<str> for TriggerSourceType {
230 fn as_ref(&self) -> &str {
231 self.as_str()
232 }
233}
234
235impl std::fmt::Display for TriggerSourceType {
236 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 formatter.write_str(self.as_str())
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243 pub handle: String,
244 pub source_key: String,
245 #[serde(default, skip_serializing_if = "Option::is_none")]
246 pub name: Option<String>,
247 pub source_type: TriggerSourceType,
248 pub source: serde_json::Value,
249 pub target: TriggerTargetSummary,
250 #[serde(default = "default_enabled")]
251 pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256 pub process_name: String,
257 pub inputs: lashlang::TriggerInputTemplate,
258}
259
260#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
261pub struct TriggerSubscriptionDraft {
262 pub session_id: String,
263 #[serde(default, skip_serializing_if = "Option::is_none")]
264 pub name: Option<String>,
265 pub source_type: String,
266 pub source_key: String,
267 pub source: serde_json::Value,
268 pub event_ty: lashlang::TypeExpr,
269 pub module_ref: lashlang::ModuleRef,
270 pub required_surface_ref: lashlang::RequiredSurfaceRef,
271 pub process_ref: lashlang::ProcessRef,
272 pub process_name: String,
273 pub input_template: lashlang::TriggerInputTemplate,
274}
275
276#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
277pub struct TriggerSubscriptionRecord {
278 pub subscription_id: String,
279 pub session_id: String,
280 pub handle: String,
281 #[serde(default, skip_serializing_if = "Option::is_none")]
282 pub name: Option<String>,
283 pub source_type: String,
284 pub source_key: String,
285 pub source: serde_json::Value,
286 pub event_ty: lashlang::TypeExpr,
287 pub module_ref: lashlang::ModuleRef,
288 pub required_surface_ref: lashlang::RequiredSurfaceRef,
289 pub process_ref: lashlang::ProcessRef,
290 pub process_name: String,
291 pub input_template: lashlang::TriggerInputTemplate,
292 #[serde(default = "default_enabled")]
293 pub enabled: bool,
294 pub created_at_ms: u64,
295 pub updated_at_ms: u64,
296}
297
298impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
299 fn from(route: &TriggerSubscriptionRecord) -> Self {
300 Self {
301 handle: route.handle.clone(),
302 source_key: route.source_key.clone(),
303 name: route.name.clone(),
304 source_type: TriggerSourceType::new(route.source_type.clone()),
305 source: route.source.clone(),
306 target: TriggerTargetSummary {
307 process_name: route.process_name.clone(),
308 inputs: route.input_template.clone(),
309 },
310 enabled: route.enabled,
311 }
312 }
313}
314
315#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
316pub struct TriggerSubscriptionFilter {
317 #[serde(default, skip_serializing_if = "Option::is_none")]
318 pub session_id: Option<String>,
319 #[serde(default, skip_serializing_if = "Option::is_none")]
320 pub handle: Option<String>,
321 #[serde(default, skip_serializing_if = "Option::is_none")]
322 pub name: Option<String>,
323 #[serde(default, skip_serializing_if = "Option::is_none")]
324 pub source_type: Option<String>,
325 #[serde(default, skip_serializing_if = "Option::is_none")]
326 pub source_key: Option<String>,
327 #[serde(default, skip_serializing_if = "Option::is_none")]
328 pub target: Option<lashlang::TriggerTargetIdentity>,
329 #[serde(default, skip_serializing_if = "Option::is_none")]
330 pub enabled: Option<bool>,
331}
332
333impl TriggerSubscriptionFilter {
334 pub fn for_session(session_id: impl Into<String>) -> Self {
335 Self {
336 session_id: Some(session_id.into()),
337 ..Self::default()
338 }
339 }
340
341 pub fn for_source_type(source_type: impl Into<String>) -> Self {
342 Self {
343 source_type: Some(source_type.into()),
344 ..Self::default()
345 }
346 }
347
348 pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
349 self.session_id
350 .as_deref()
351 .is_none_or(|session_id| record.session_id == session_id)
352 && self
353 .handle
354 .as_deref()
355 .is_none_or(|handle| record.handle == handle)
356 && self
357 .name
358 .as_deref()
359 .is_none_or(|name| record.name.as_deref() == Some(name))
360 && self
361 .source_type
362 .as_deref()
363 .is_none_or(|source_type| record.source_type == source_type)
364 && self
365 .source_key
366 .as_deref()
367 .is_none_or(|source_key| record.source_key == source_key)
368 && self.enabled.is_none_or(|enabled| record.enabled == enabled)
369 && self.target.as_ref().is_none_or(|target| {
370 target.matches(
371 &record.module_ref,
372 &record.required_surface_ref,
373 &record.process_ref,
374 &record.process_name,
375 )
376 })
377 }
378}
379
380#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
381pub struct TriggerDeliveryReservation {
382 pub occurrence: HostEventOccurrenceRecord,
383 pub subscription: TriggerSubscriptionRecord,
384 pub process_id: String,
385}
386
387#[async_trait::async_trait]
388pub trait HostEventStore: Send + Sync {
389 fn durability_tier(&self) -> crate::DurabilityTier {
390 crate::DurabilityTier::Inline
391 }
392
393 async fn source_key_for_subscription(
394 &self,
395 source_type: &str,
396 source: &serde_json::Value,
397 ) -> Result<String, PluginError> {
398 default_host_event_source_key(source_type, source)
399 }
400
401 async fn register_subscription(
402 &self,
403 draft: TriggerSubscriptionDraft,
404 ) -> Result<TriggerSubscriptionRecord, PluginError>;
405
406 async fn list_subscriptions(
407 &self,
408 filter: TriggerSubscriptionFilter,
409 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
410
411 async fn cancel_subscription(
412 &self,
413 session_id: &str,
414 handle: &str,
415 ) -> Result<bool, PluginError>;
416
417 async fn record_occurrence(
418 &self,
419 request: HostEventOccurrenceRequest,
420 ) -> Result<HostEventOccurrenceRecord, PluginError>;
421
422 async fn reserve_matching_deliveries(
423 &self,
424 occurrence_id: &str,
425 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
426}
427
428#[derive(Default)]
429pub struct InMemoryHostEventStore {
430 state: Mutex<InMemoryHostEventState>,
431}
432
433#[derive(Default)]
434struct InMemoryHostEventState {
435 next_subscription_seq: u64,
436 subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
437 occurrences: BTreeMap<String, HostEventOccurrenceRecord>,
438 occurrence_id_by_idempotency_key: BTreeMap<String, String>,
439 occurrence_hashes: BTreeMap<String, String>,
440 deliveries: BTreeSet<(String, String)>,
441}
442
443#[async_trait::async_trait]
444impl HostEventStore for InMemoryHostEventStore {
445 async fn register_subscription(
446 &self,
447 draft: TriggerSubscriptionDraft,
448 ) -> Result<TriggerSubscriptionRecord, PluginError> {
449 let mut state = self
450 .state
451 .lock()
452 .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
453 state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
454 let handle = format!("trigger:{}", state.next_subscription_seq);
455 let subscription_id = format!("subscription:{}", state.next_subscription_seq);
456 let now = crate::runtime::current_epoch_ms();
457 let record = TriggerSubscriptionRecord {
458 subscription_id: subscription_id.clone(),
459 session_id: draft.session_id,
460 handle,
461 name: draft.name,
462 source_type: draft.source_type,
463 source_key: draft.source_key,
464 source: draft.source,
465 event_ty: draft.event_ty,
466 module_ref: draft.module_ref,
467 required_surface_ref: draft.required_surface_ref,
468 process_ref: draft.process_ref,
469 process_name: draft.process_name,
470 input_template: draft.input_template,
471 enabled: true,
472 created_at_ms: now,
473 updated_at_ms: now,
474 };
475 state.subscriptions.insert(subscription_id, record.clone());
476 Ok(record)
477 }
478
479 async fn list_subscriptions(
480 &self,
481 filter: TriggerSubscriptionFilter,
482 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
483 let state = self
484 .state
485 .lock()
486 .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
487 let mut records = state
488 .subscriptions
489 .values()
490 .filter(|record| filter.matches(record))
491 .cloned()
492 .collect::<Vec<_>>();
493 records.sort_by(|left, right| {
494 left.session_id
495 .cmp(&right.session_id)
496 .then_with(|| left.handle.cmp(&right.handle))
497 });
498 Ok(records)
499 }
500
501 async fn cancel_subscription(
502 &self,
503 session_id: &str,
504 handle: &str,
505 ) -> Result<bool, PluginError> {
506 let mut state = self
507 .state
508 .lock()
509 .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
510 let now = crate::runtime::current_epoch_ms();
511 let Some(record) = state
512 .subscriptions
513 .values_mut()
514 .find(|record| record.session_id == session_id && record.handle == handle)
515 else {
516 return Ok(false);
517 };
518 let changed = record.enabled;
519 record.enabled = false;
520 record.updated_at_ms = now;
521 Ok(changed)
522 }
523
524 async fn record_occurrence(
525 &self,
526 request: HostEventOccurrenceRequest,
527 ) -> Result<HostEventOccurrenceRecord, PluginError> {
528 validate_host_event_occurrence_request(&request)?;
529 let mut state = self
530 .state
531 .lock()
532 .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
533 let request_hash = host_event_occurrence_request_hash(&request)?;
534 if let Some(existing_id) = state
535 .occurrence_id_by_idempotency_key
536 .get(&request.idempotency_key)
537 .cloned()
538 {
539 let existing_hash = state
540 .occurrence_hashes
541 .get(&existing_id)
542 .cloned()
543 .unwrap_or_default();
544 if existing_hash != request_hash {
545 return Err(PluginError::Session(format!(
546 "host event occurrence idempotency conflict for `{}`",
547 request.idempotency_key
548 )));
549 }
550 return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
551 PluginError::Session(format!(
552 "missing host event occurrence `{existing_id}` for idempotency key"
553 ))
554 });
555 }
556 let occurrence_id = deterministic_occurrence_id(&request)?;
557 let record = HostEventOccurrenceRecord {
558 occurrence_id: occurrence_id.clone(),
559 source_type: request.source_type,
560 source_key: request.source_key,
561 payload: request.payload,
562 idempotency_key: request.idempotency_key.clone(),
563 source: request.source,
564 occurred_at_ms: crate::runtime::current_epoch_ms(),
565 };
566 state
567 .occurrence_id_by_idempotency_key
568 .insert(request.idempotency_key, occurrence_id.clone());
569 state
570 .occurrence_hashes
571 .insert(occurrence_id.clone(), request_hash);
572 state.occurrences.insert(occurrence_id, record.clone());
573 Ok(record)
574 }
575
576 async fn reserve_matching_deliveries(
577 &self,
578 occurrence_id: &str,
579 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
580 let mut state = self
581 .state
582 .lock()
583 .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
584 let occurrence = state
585 .occurrences
586 .get(occurrence_id)
587 .cloned()
588 .ok_or_else(|| {
589 PluginError::Session(format!("unknown host event occurrence `{occurrence_id}`"))
590 })?;
591 let subscriptions = state
592 .subscriptions
593 .values()
594 .filter(|record| {
595 record.enabled
596 && record.source_type == occurrence.source_type
597 && record.source_key == occurrence.source_key
598 })
599 .cloned()
600 .collect::<Vec<_>>();
601 let mut deliveries = Vec::new();
602 for subscription in subscriptions {
603 let key = (
604 occurrence.occurrence_id.clone(),
605 subscription.subscription_id.clone(),
606 );
607 if !state.deliveries.insert(key) {
608 continue;
609 }
610 let process_id = deterministic_delivery_process_id(
611 &occurrence.occurrence_id,
612 &subscription.subscription_id,
613 )?;
614 deliveries.push(TriggerDeliveryReservation {
615 occurrence: occurrence.clone(),
616 subscription,
617 process_id,
618 });
619 }
620 Ok(deliveries)
621 }
622}
623
624fn default_enabled() -> bool {
625 true
626}
627
628pub fn default_host_event_source_key(
629 source_type: &str,
630 source: &serde_json::Value,
631) -> Result<String, PluginError> {
632 let digest =
633 crate::stable_hash::stable_json_sha256_hex(&(source_type, source)).map_err(|err| {
634 PluginError::Session(format!("failed to hash host event source key: {err}"))
635 })?;
636 Ok(format!("source:{source_type}:sha256:{digest}"))
637}
638
639pub fn empty_host_event_source_key(source_type: &str) -> Result<String, PluginError> {
640 default_host_event_source_key(source_type, &serde_json::json!({}))
641}
642
643pub fn deterministic_occurrence_id(
644 request: &HostEventOccurrenceRequest,
645) -> Result<String, PluginError> {
646 let digest = crate::stable_hash::stable_json_sha256_hex(&(
647 request.source_type.as_str(),
648 request.source_key.as_str(),
649 request.idempotency_key.as_str(),
650 ))
651 .map_err(|err| PluginError::Session(format!("failed to hash host event occurrence: {err}")))?;
652 Ok(format!("host_event:{digest}"))
653}
654
655pub fn deterministic_delivery_process_id(
656 occurrence_id: &str,
657 subscription_id: &str,
658) -> Result<String, PluginError> {
659 let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
660 .map_err(|err| {
661 PluginError::Session(format!("failed to hash host event delivery: {err}"))
662 })?;
663 Ok(format!("process:host-event:{digest}"))
664}
665
666#[derive(Clone)]
667pub struct HostEventRouter {
668 store: Arc<dyn HostEventStore>,
669 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
670 process_work_poke: Option<crate::ProcessWorkPoke>,
671 host_profile_id: String,
672}
673
674impl HostEventRouter {
675 pub fn new(
676 store: Arc<dyn HostEventStore>,
677 process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
678 process_work_poke: Option<crate::ProcessWorkPoke>,
679 host_profile_id: impl Into<String>,
680 ) -> Self {
681 Self {
682 store,
683 process_registry,
684 process_work_poke,
685 host_profile_id: host_profile_id.into(),
686 }
687 }
688
689 pub fn store(&self) -> Arc<dyn HostEventStore> {
690 Arc::clone(&self.store)
691 }
692
693 pub async fn emit(
694 &self,
695 request: HostEventOccurrenceRequest,
696 effect_controller: &dyn crate::RuntimeEffectController,
697 ) -> Result<HostEventEmitReport, PluginError> {
698 let occurrence = self.store.record_occurrence(request).await?;
699 let reservations = self
700 .store
701 .reserve_matching_deliveries(&occurrence.occurrence_id)
702 .await?;
703 let Some(process_registry) = self.process_registry.as_ref() else {
704 if reservations.is_empty() {
705 return Ok(HostEventEmitReport::new(
706 occurrence.occurrence_id,
707 Vec::new(),
708 ));
709 }
710 return Err(PluginError::Session(
711 "host event delivery requires a process registry".to_string(),
712 ));
713 };
714 let mut started_process_ids = Vec::new();
715 for reservation in reservations {
716 let process_id = reservation.process_id.clone();
717 self.start_delivery(
718 &reservation,
719 Arc::clone(process_registry),
720 effect_controller,
721 )
722 .await?;
723 started_process_ids.push(process_id);
724 }
725 if !started_process_ids.is_empty()
726 && let Some(poke) = self.process_work_poke.as_ref()
727 {
728 poke.poke();
729 }
730 Ok(HostEventEmitReport::new(
731 occurrence.occurrence_id,
732 started_process_ids,
733 ))
734 }
735
736 async fn start_delivery(
737 &self,
738 reservation: &TriggerDeliveryReservation,
739 process_registry: Arc<dyn crate::ProcessRegistry>,
740 effect_controller: &dyn crate::RuntimeEffectController,
741 ) -> Result<(), PluginError> {
742 let subscription = &reservation.subscription;
743 let occurrence = &reservation.occurrence;
744 validate_payload(&occurrence.payload, &subscription.event_ty).map_err(|message| {
745 PluginError::Session(format!(
746 "invalid payload for trigger `{}`: {message}",
747 subscription.handle
748 ))
749 })?;
750 let args =
751 materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
752 let owner_scope = crate::ProcessScope::new(subscription.session_id.clone());
753 let host_event_invocation = crate::runtime::causal::host_event_invocation(
754 &subscription.session_id,
755 &occurrence.occurrence_id,
756 );
757 let registration = crate::ProcessRegistration::new(
758 reservation.process_id.clone(),
759 crate::ProcessInput::LashlangProcess {
760 module_ref: subscription.module_ref.clone(),
761 process_ref: subscription.process_ref.clone(),
762 required_surface_ref: subscription.required_surface_ref.clone(),
763 process_name: subscription.process_name.clone(),
764 args,
765 },
766 )
767 .with_extra_event_types(crate::lashlang_process_event_types())
768 .with_process_provenance(
769 crate::ProcessProvenance::new(owner_scope.clone(), self.host_profile_id.clone())
770 .with_caused_by(host_event_invocation.causal_ref()),
771 );
772 let grant = crate::ProcessStartGrant {
773 owner_scope: owner_scope.clone(),
774 descriptor: crate::ProcessHandleDescriptor::new(
775 Some("lashlang"),
776 Some(subscription.process_name.as_str()),
777 ),
778 };
779 let execution_context = crate::ProcessExecutionContext::default()
780 .with_causal_invocation(Some(host_event_invocation))
781 .with_wake_target_scope(owner_scope);
782 let command = crate::ProcessCommand::Start {
783 registration,
784 grant: Some(grant),
785 execution_context: Box::new(execution_context),
786 };
787 let effect_id = command.effect_id();
788 let invocation = crate::RuntimeInvocation::effect(
789 crate::RuntimeScope::new(subscription.session_id.clone()),
790 effect_id.clone(),
791 crate::RuntimeEffectKind::Process,
792 format!(
793 "host_event:{}:{}",
794 occurrence.occurrence_id, subscription.subscription_id
795 ),
796 )
797 .with_caused_by(Some(crate::CausalRef::HostEvent {
798 occurrence_id: occurrence.occurrence_id.clone(),
799 }));
800 let outcome = effect_controller
801 .execute_effect(
802 crate::RuntimeEffectEnvelope::new(
803 invocation,
804 crate::RuntimeEffectCommand::Process { command },
805 ),
806 crate::RuntimeEffectLocalExecutor::process_control(process_registry),
807 )
808 .await?;
809 match outcome {
810 crate::RuntimeEffectOutcome::Process {
811 result: crate::ProcessEffectOutcome::Start { .. },
812 } => Ok(()),
813 other => Err(PluginError::Session(format!(
814 "host event process start returned the wrong outcome: {}",
815 other.kind().as_str()
816 ))),
817 }
818 }
819}
820
821fn materialize_trigger_process_args(
822 input_template: &lashlang::TriggerInputTemplate,
823 event_payload: &serde_json::Value,
824) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
825 let mut args = lashlang::Record::default();
826 for (input_name, input) in input_template.entries() {
827 let value = match input {
828 lashlang::TriggerInputBinding::Event => event_payload.clone(),
829 lashlang::TriggerInputBinding::Fixed { value } => value.clone(),
830 };
831 args.insert(input_name.to_string(), lashlang::from_json(value));
832 }
833 match serde_json::to_value(lashlang::Value::Record(Arc::new(args)))
834 .map_err(|err| PluginError::Session(format!("serialize trigger process args: {err}")))?
835 {
836 serde_json::Value::Object(map) => Ok(map),
837 _ => Err(PluginError::Session(
838 "trigger process args must serialize as an object".to_string(),
839 )),
840 }
841}
842
843pub(crate) fn validate_payload(
844 value: &serde_json::Value,
845 ty: &lashlang::TypeExpr,
846) -> Result<(), String> {
847 if json_matches_type(value, ty) {
848 Ok(())
849 } else {
850 Err(format!("expected {}", lashlang::format_type_expr(ty)))
851 }
852}
853
854fn json_matches_type(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> bool {
855 match ty {
856 lashlang::TypeExpr::Any => true,
857 lashlang::TypeExpr::Ref(_) => false,
858 lashlang::TypeExpr::Str => value.is_string(),
859 lashlang::TypeExpr::Int => value.as_i64().is_some() || value.as_u64().is_some(),
860 lashlang::TypeExpr::Float => value.is_number(),
861 lashlang::TypeExpr::Bool => value.is_boolean(),
862 lashlang::TypeExpr::Dict => value.is_object(),
863 lashlang::TypeExpr::Null => value.is_null(),
864 lashlang::TypeExpr::Enum(values) => value
865 .as_str()
866 .is_some_and(|value| values.iter().any(|candidate| candidate.as_str() == value)),
867 lashlang::TypeExpr::List(item) => value.as_array().is_some_and(|items| {
868 items
869 .iter()
870 .all(|item_value| json_matches_type(item_value, item))
871 }),
872 lashlang::TypeExpr::Object(fields) => {
873 let Some(map) = value.as_object() else {
874 return false;
875 };
876 fields
877 .iter()
878 .all(|field| match map.get(field.name.as_str()) {
879 Some(field_value) => json_matches_type(field_value, &field.ty),
880 None => field.optional,
881 })
882 }
883 lashlang::TypeExpr::Union(items) => items.iter().any(|item| json_matches_type(value, item)),
884 lashlang::TypeExpr::Process { .. } | lashlang::TypeExpr::TriggerHandle(_) => {
885 value.is_object()
886 }
887 }
888}
889
890pub fn validate_host_event_occurrence_request(
891 request: &HostEventOccurrenceRequest,
892) -> Result<(), PluginError> {
893 if request.source_type.trim().is_empty() {
894 return Err(PluginError::Session(
895 "host event occurrence requires source_type".to_string(),
896 ));
897 }
898 if request.source_key.trim().is_empty() {
899 return Err(PluginError::Session(
900 "host event occurrence requires source_key".to_string(),
901 ));
902 }
903 if request.idempotency_key.trim().is_empty() {
904 return Err(PluginError::Session(
905 "host event occurrence requires idempotency_key".to_string(),
906 ));
907 }
908 Ok(())
909}
910
911pub fn host_event_occurrence_request_hash(
912 request: &HostEventOccurrenceRequest,
913) -> Result<String, PluginError> {
914 crate::stable_hash::stable_json_sha256_hex(&(
915 request.source_type.as_str(),
916 request.source_key.as_str(),
917 &request.payload,
918 &request.source,
919 ))
920 .map_err(|err| PluginError::Session(format!("failed to hash host event occurrence: {err}")))
921}
922
923#[cfg(test)]
924mod tests {
925 use super::*;
926
927 fn button_payload_type() -> lashlang::NamedDataType {
928 lashlang::NamedDataType::object(
929 "ui.button.Pressed",
930 vec![lashlang::TypeField {
931 name: "button".into(),
932 ty: lashlang::TypeExpr::Str,
933 optional: false,
934 }],
935 )
936 .expect("valid host event payload")
937 }
938
939 #[test]
940 fn host_event_catalog_rejects_duplicate_trigger_source_identity() {
941 let mut catalog = HostEventCatalog::new();
942 catalog
943 .declare(HostEvent::new(
944 "Button",
945 "ui.button",
946 "pressed",
947 button_payload_type(),
948 ))
949 .expect("first host event");
950
951 let err = catalog
952 .declare(HostEvent::new(
953 "AlternateButton",
954 "ui.button",
955 "pressed",
956 button_payload_type(),
957 ))
958 .expect_err("duplicate public source identity should be rejected");
959
960 assert!(err.contains("duplicate host event source `ui.button.pressed`"));
961 }
962}