Skip to main content

lash_core/
host_events.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
9pub struct HostEvent {
10    pub resource_type: String,
11    pub alias: String,
12    pub event: String,
13    pub payload_ty: lashlang::NamedDataType,
14}
15
16impl HostEvent {
17    pub fn new(
18        resource_type: impl Into<String>,
19        alias: impl Into<String>,
20        event: impl Into<String>,
21        payload_ty: lashlang::NamedDataType,
22    ) -> Self {
23        Self {
24            resource_type: resource_type.into(),
25            alias: alias.into(),
26            event: event.into(),
27            payload_ty,
28        }
29    }
30
31    pub fn payload_type(&self) -> &lashlang::NamedDataType {
32        &self.payload_ty
33    }
34
35    pub fn key(&self) -> HostEventKey {
36        HostEventKey {
37            resource_type: self.resource_type.clone(),
38            alias: self.alias.clone(),
39            event: self.event.clone(),
40        }
41    }
42
43    pub fn source_type(&self) -> String {
44        host_event_source_type(&self.alias, &self.event)
45    }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct HostEventKey {
50    pub resource_type: String,
51    pub alias: String,
52    pub event: String,
53}
54
55impl HostEventKey {
56    pub fn new(
57        resource_type: impl Into<String>,
58        alias: impl Into<String>,
59        event: impl Into<String>,
60    ) -> Self {
61        Self {
62            resource_type: resource_type.into(),
63            alias: alias.into(),
64            event: event.into(),
65        }
66    }
67
68    pub fn source_type(&self) -> String {
69        host_event_source_type(&self.alias, &self.event)
70    }
71}
72
73pub fn host_event_source_type(alias: &str, event: &str) -> String {
74    format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
78pub struct HostEventCatalog {
79    events: BTreeMap<HostEventKey, HostEvent>,
80}
81
82impl HostEventCatalog {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    pub fn declare(&mut self, event: HostEvent) -> Result<(), String> {
88        let key = event.key();
89        if self.events.contains_key(&key) {
90            return Err(format!(
91                "duplicate host event `{}.{}.{}`",
92                key.resource_type, key.alias, key.event
93            ));
94        }
95        let source_type = event.source_type();
96        if let Some(existing) = self
97            .events
98            .values()
99            .find(|existing| existing.source_type() == source_type)
100        {
101            return Err(format!(
102                "duplicate host event source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103                existing.resource_type,
104                existing.alias,
105                existing.event,
106                key.resource_type,
107                key.alias,
108                key.event
109            ));
110        }
111        self.events.insert(key, event);
112        Ok(())
113    }
114
115    pub fn from_events(events: impl IntoIterator<Item = HostEvent>) -> Result<Self, String> {
116        let mut catalog = Self::new();
117        for event in events {
118            catalog.declare(event)?;
119        }
120        Ok(catalog)
121    }
122
123    pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&HostEvent> {
124        self.events
125            .get(&HostEventKey::new(resource_type, alias, event))
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.events.is_empty()
130    }
131
132    pub fn events(&self) -> impl Iterator<Item = &HostEvent> {
133        self.events.values()
134    }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct HostEventEmitReport {
139    #[serde(default, skip_serializing_if = "String::is_empty")]
140    pub occurrence_id: String,
141    pub started_process_ids: Vec<String>,
142}
143
144impl HostEventEmitReport {
145    pub fn empty() -> Self {
146        Self::default()
147    }
148
149    fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150        Self {
151            occurrence_id,
152            started_process_ids,
153        }
154    }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct HostEventOccurrenceRequest {
159    pub source_type: String,
160    pub source_key: String,
161    #[serde(default)]
162    pub payload: serde_json::Value,
163    pub idempotency_key: String,
164    #[serde(default, skip_serializing_if = "Option::is_none")]
165    pub source: Option<serde_json::Value>,
166}
167
168impl HostEventOccurrenceRequest {
169    pub fn new(
170        source_type: impl Into<String>,
171        source_key: impl Into<String>,
172        payload: serde_json::Value,
173        idempotency_key: impl Into<String>,
174    ) -> Self {
175        Self {
176            source_type: source_type.into(),
177            source_key: source_key.into(),
178            payload,
179            idempotency_key: idempotency_key.into(),
180            source: None,
181        }
182    }
183
184    pub fn with_source(mut self, source: serde_json::Value) -> Self {
185        self.source = Some(source);
186        self
187    }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct HostEventOccurrenceRecord {
192    pub occurrence_id: String,
193    pub source_type: String,
194    pub source_key: String,
195    #[serde(default)]
196    pub payload: serde_json::Value,
197    pub idempotency_key: String,
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub source: Option<serde_json::Value>,
200    pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerSourceType(String);
206
207impl TriggerSourceType {
208    pub fn new(value: impl Into<String>) -> Self {
209        Self(value.into())
210    }
211
212    pub fn as_str(&self) -> &str {
213        &self.0
214    }
215}
216
217impl From<String> for TriggerSourceType {
218    fn from(value: String) -> Self {
219        Self::new(value)
220    }
221}
222
223impl From<&str> for TriggerSourceType {
224    fn from(value: &str) -> Self {
225        Self::new(value)
226    }
227}
228
229impl AsRef<str> for TriggerSourceType {
230    fn as_ref(&self) -> &str {
231        self.as_str()
232    }
233}
234
235impl std::fmt::Display for TriggerSourceType {
236    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        formatter.write_str(self.as_str())
238    }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243    pub handle: String,
244    pub source_key: String,
245    #[serde(default, skip_serializing_if = "Option::is_none")]
246    pub name: Option<String>,
247    pub source_type: TriggerSourceType,
248    pub source: serde_json::Value,
249    pub target: TriggerTargetSummary,
250    #[serde(default = "default_enabled")]
251    pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256    pub process_name: String,
257    pub inputs: lashlang::TriggerInputTemplate,
258}
259
260#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
261pub struct TriggerSubscriptionDraft {
262    pub registrant: crate::ProcessOriginator,
263    pub env_ref: crate::ProcessExecutionEnvRef,
264    #[serde(default, skip_serializing_if = "Option::is_none")]
265    pub wake_target: Option<crate::SessionScope>,
266    #[serde(default, skip_serializing_if = "Option::is_none")]
267    pub name: Option<String>,
268    pub source_type: String,
269    pub source_key: String,
270    pub source: serde_json::Value,
271    pub event_ty: lashlang::TypeExpr,
272    pub module_ref: lashlang::ModuleRef,
273    pub required_surface_ref: lashlang::RequiredSurfaceRef,
274    pub process_ref: lashlang::ProcessRef,
275    pub process_name: String,
276    pub input_template: lashlang::TriggerInputTemplate,
277}
278
279#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
280pub struct TriggerSubscriptionRecord {
281    pub subscription_id: String,
282    pub registrant: crate::ProcessOriginator,
283    pub env_ref: crate::ProcessExecutionEnvRef,
284    #[serde(default, skip_serializing_if = "Option::is_none")]
285    pub wake_target: Option<crate::SessionScope>,
286    pub handle: String,
287    #[serde(default, skip_serializing_if = "Option::is_none")]
288    pub name: Option<String>,
289    pub source_type: String,
290    pub source_key: String,
291    pub source: serde_json::Value,
292    pub event_ty: lashlang::TypeExpr,
293    pub module_ref: lashlang::ModuleRef,
294    pub required_surface_ref: lashlang::RequiredSurfaceRef,
295    pub process_ref: lashlang::ProcessRef,
296    pub process_name: String,
297    pub input_template: lashlang::TriggerInputTemplate,
298    #[serde(default = "default_enabled")]
299    pub enabled: bool,
300    pub created_at_ms: u64,
301    pub updated_at_ms: u64,
302}
303
304impl TriggerSubscriptionRecord {
305    pub fn registrant_scope_id(&self) -> String {
306        self.registrant.scope_id()
307    }
308
309    pub fn registrant_session_id(&self) -> Option<&str> {
310        match &self.registrant {
311            crate::ProcessOriginator::Session { scope } => Some(scope.session_id.as_str()),
312            crate::ProcessOriginator::Host => None,
313        }
314    }
315}
316
317impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
318    fn from(route: &TriggerSubscriptionRecord) -> Self {
319        Self {
320            handle: route.handle.clone(),
321            source_key: route.source_key.clone(),
322            name: route.name.clone(),
323            source_type: TriggerSourceType::new(route.source_type.clone()),
324            source: route.source.clone(),
325            target: TriggerTargetSummary {
326                process_name: route.process_name.clone(),
327                inputs: route.input_template.clone(),
328            },
329            enabled: route.enabled,
330        }
331    }
332}
333
334#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
335pub struct TriggerSubscriptionFilter {
336    #[serde(default, skip_serializing_if = "Option::is_none")]
337    pub session_id: Option<String>,
338    #[serde(default, skip_serializing_if = "Option::is_none")]
339    pub handle: Option<String>,
340    #[serde(default, skip_serializing_if = "Option::is_none")]
341    pub name: Option<String>,
342    #[serde(default, skip_serializing_if = "Option::is_none")]
343    pub source_type: Option<String>,
344    #[serde(default, skip_serializing_if = "Option::is_none")]
345    pub source_key: Option<String>,
346    #[serde(default, skip_serializing_if = "Option::is_none")]
347    pub target: Option<lashlang::TriggerTargetIdentity>,
348    #[serde(default, skip_serializing_if = "Option::is_none")]
349    pub enabled: Option<bool>,
350}
351
352impl TriggerSubscriptionFilter {
353    pub fn for_session(session_id: impl Into<String>) -> Self {
354        Self {
355            session_id: Some(session_id.into()),
356            ..Self::default()
357        }
358    }
359
360    pub fn for_source_type(source_type: impl Into<String>) -> Self {
361        Self {
362            source_type: Some(source_type.into()),
363            ..Self::default()
364        }
365    }
366
367    pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
368        self.session_id
369            .as_deref()
370            .is_none_or(|session_id| record.registrant_session_id() == Some(session_id))
371            && self
372                .handle
373                .as_deref()
374                .is_none_or(|handle| record.handle == handle)
375            && self
376                .name
377                .as_deref()
378                .is_none_or(|name| record.name.as_deref() == Some(name))
379            && self
380                .source_type
381                .as_deref()
382                .is_none_or(|source_type| record.source_type == source_type)
383            && self
384                .source_key
385                .as_deref()
386                .is_none_or(|source_key| record.source_key == source_key)
387            && self.enabled.is_none_or(|enabled| record.enabled == enabled)
388            && self.target.as_ref().is_none_or(|target| {
389                target.matches(
390                    &record.module_ref,
391                    &record.required_surface_ref,
392                    &record.process_ref,
393                    &record.process_name,
394                )
395            })
396    }
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct TriggerDeliveryReservation {
401    pub occurrence: HostEventOccurrenceRecord,
402    pub subscription: TriggerSubscriptionRecord,
403    pub process_id: String,
404}
405
406#[async_trait::async_trait]
407pub trait HostEventStore: Send + Sync {
408    fn durability_tier(&self) -> crate::DurabilityTier {
409        crate::DurabilityTier::Inline
410    }
411
412    async fn source_key_for_subscription(
413        &self,
414        source_type: &str,
415        source: &serde_json::Value,
416    ) -> Result<String, PluginError> {
417        default_host_event_source_key(source_type, source)
418    }
419
420    async fn register_subscription(
421        &self,
422        draft: TriggerSubscriptionDraft,
423    ) -> Result<TriggerSubscriptionRecord, PluginError>;
424
425    async fn list_subscriptions(
426        &self,
427        filter: TriggerSubscriptionFilter,
428    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
429
430    async fn cancel_subscription(
431        &self,
432        session_id: &str,
433        handle: &str,
434    ) -> Result<bool, PluginError>;
435
436    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError>;
437
438    async fn record_occurrence(
439        &self,
440        request: HostEventOccurrenceRequest,
441    ) -> Result<HostEventOccurrenceRecord, PluginError>;
442
443    async fn reserve_matching_deliveries(
444        &self,
445        occurrence_id: &str,
446    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
447}
448
449#[derive(Default)]
450pub struct InMemoryHostEventStore {
451    state: Mutex<InMemoryHostEventState>,
452}
453
454#[derive(Default)]
455struct InMemoryHostEventState {
456    next_subscription_seq: u64,
457    subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
458    occurrences: BTreeMap<String, HostEventOccurrenceRecord>,
459    occurrence_id_by_idempotency_key: BTreeMap<String, String>,
460    occurrence_hashes: BTreeMap<String, String>,
461    deliveries: BTreeSet<(String, String)>,
462}
463
464#[async_trait::async_trait]
465impl HostEventStore for InMemoryHostEventStore {
466    async fn register_subscription(
467        &self,
468        draft: TriggerSubscriptionDraft,
469    ) -> Result<TriggerSubscriptionRecord, PluginError> {
470        let mut state = self
471            .state
472            .lock()
473            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
474        state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
475        let handle = format!("trigger:{}", state.next_subscription_seq);
476        let subscription_id = format!("subscription:{}", state.next_subscription_seq);
477        let now = crate::runtime::current_epoch_ms();
478        let record = TriggerSubscriptionRecord {
479            subscription_id: subscription_id.clone(),
480            registrant: draft.registrant,
481            env_ref: draft.env_ref,
482            wake_target: draft.wake_target,
483            handle,
484            name: draft.name,
485            source_type: draft.source_type,
486            source_key: draft.source_key,
487            source: draft.source,
488            event_ty: draft.event_ty,
489            module_ref: draft.module_ref,
490            required_surface_ref: draft.required_surface_ref,
491            process_ref: draft.process_ref,
492            process_name: draft.process_name,
493            input_template: draft.input_template,
494            enabled: true,
495            created_at_ms: now,
496            updated_at_ms: now,
497        };
498        state.subscriptions.insert(subscription_id, record.clone());
499        Ok(record)
500    }
501
502    async fn list_subscriptions(
503        &self,
504        filter: TriggerSubscriptionFilter,
505    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
506        let state = self
507            .state
508            .lock()
509            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
510        let mut records = state
511            .subscriptions
512            .values()
513            .filter(|record| filter.matches(record))
514            .cloned()
515            .collect::<Vec<_>>();
516        records.sort_by(|left, right| {
517            left.registrant_scope_id()
518                .cmp(&right.registrant_scope_id())
519                .then_with(|| left.handle.cmp(&right.handle))
520        });
521        Ok(records)
522    }
523
524    async fn cancel_subscription(
525        &self,
526        session_id: &str,
527        handle: &str,
528    ) -> Result<bool, PluginError> {
529        let mut state = self
530            .state
531            .lock()
532            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
533        let now = crate::runtime::current_epoch_ms();
534        let Some(record) = state.subscriptions.values_mut().find(|record| {
535            record.registrant_session_id() == Some(session_id) && record.handle == handle
536        }) else {
537            return Ok(false);
538        };
539        let changed = record.enabled;
540        record.enabled = false;
541        record.updated_at_ms = now;
542        Ok(changed)
543    }
544
545    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
546        let mut state = self
547            .state
548            .lock()
549            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
550        let before = state.subscriptions.len();
551        state
552            .subscriptions
553            .retain(|_, record| record.registrant_session_id() != Some(session_id));
554        Ok(before.saturating_sub(state.subscriptions.len()))
555    }
556
557    async fn record_occurrence(
558        &self,
559        request: HostEventOccurrenceRequest,
560    ) -> Result<HostEventOccurrenceRecord, PluginError> {
561        validate_host_event_occurrence_request(&request)?;
562        let mut state = self
563            .state
564            .lock()
565            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
566        let request_hash = host_event_occurrence_request_hash(&request)?;
567        if let Some(existing_id) = state
568            .occurrence_id_by_idempotency_key
569            .get(&request.idempotency_key)
570            .cloned()
571        {
572            let existing_hash = state
573                .occurrence_hashes
574                .get(&existing_id)
575                .cloned()
576                .unwrap_or_default();
577            if existing_hash != request_hash {
578                return Err(PluginError::Session(format!(
579                    "host event occurrence idempotency conflict for `{}`",
580                    request.idempotency_key
581                )));
582            }
583            return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
584                PluginError::Session(format!(
585                    "missing host event occurrence `{existing_id}` for idempotency key"
586                ))
587            });
588        }
589        let occurrence_id = deterministic_occurrence_id(&request)?;
590        let record = HostEventOccurrenceRecord {
591            occurrence_id: occurrence_id.clone(),
592            source_type: request.source_type,
593            source_key: request.source_key,
594            payload: request.payload,
595            idempotency_key: request.idempotency_key.clone(),
596            source: request.source,
597            occurred_at_ms: crate::runtime::current_epoch_ms(),
598        };
599        state
600            .occurrence_id_by_idempotency_key
601            .insert(request.idempotency_key, occurrence_id.clone());
602        state
603            .occurrence_hashes
604            .insert(occurrence_id.clone(), request_hash);
605        state.occurrences.insert(occurrence_id, record.clone());
606        Ok(record)
607    }
608
609    async fn reserve_matching_deliveries(
610        &self,
611        occurrence_id: &str,
612    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
613        let mut state = self
614            .state
615            .lock()
616            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
617        let occurrence = state
618            .occurrences
619            .get(occurrence_id)
620            .cloned()
621            .ok_or_else(|| {
622                PluginError::Session(format!("unknown host event occurrence `{occurrence_id}`"))
623            })?;
624        let subscriptions = state
625            .subscriptions
626            .values()
627            .filter(|record| {
628                record.enabled
629                    && record.source_type == occurrence.source_type
630                    && record.source_key == occurrence.source_key
631            })
632            .cloned()
633            .collect::<Vec<_>>();
634        let mut deliveries = Vec::new();
635        for subscription in subscriptions {
636            let key = (
637                occurrence.occurrence_id.clone(),
638                subscription.subscription_id.clone(),
639            );
640            if !state.deliveries.insert(key) {
641                continue;
642            }
643            let process_id = deterministic_delivery_process_id(
644                &occurrence.occurrence_id,
645                &subscription.subscription_id,
646            )?;
647            deliveries.push(TriggerDeliveryReservation {
648                occurrence: occurrence.clone(),
649                subscription,
650                process_id,
651            });
652        }
653        Ok(deliveries)
654    }
655}
656
657fn default_enabled() -> bool {
658    true
659}
660
661pub fn default_host_event_source_key(
662    source_type: &str,
663    source: &serde_json::Value,
664) -> Result<String, PluginError> {
665    let digest =
666        crate::stable_hash::stable_json_sha256_hex(&(source_type, source)).map_err(|err| {
667            PluginError::Session(format!("failed to hash host event source key: {err}"))
668        })?;
669    Ok(format!("source:{source_type}:sha256:{digest}"))
670}
671
672pub fn empty_host_event_source_key(source_type: &str) -> Result<String, PluginError> {
673    default_host_event_source_key(source_type, &serde_json::json!({}))
674}
675
676pub fn deterministic_occurrence_id(
677    request: &HostEventOccurrenceRequest,
678) -> Result<String, PluginError> {
679    let digest = crate::stable_hash::stable_json_sha256_hex(&(
680        request.source_type.as_str(),
681        request.source_key.as_str(),
682        request.idempotency_key.as_str(),
683    ))
684    .map_err(|err| PluginError::Session(format!("failed to hash host event occurrence: {err}")))?;
685    Ok(format!("host_event:{digest}"))
686}
687
688pub fn deterministic_delivery_process_id(
689    occurrence_id: &str,
690    subscription_id: &str,
691) -> Result<String, PluginError> {
692    let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
693        .map_err(|err| {
694            PluginError::Session(format!("failed to hash host event delivery: {err}"))
695        })?;
696    Ok(format!("process:host-event:{digest}"))
697}
698
699#[derive(Clone)]
700pub struct HostEventRouter {
701    store: Arc<dyn HostEventStore>,
702    artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
703    process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
704    process_work_poke: Option<crate::ProcessWorkPoke>,
705    host_profile_id: String,
706}
707
708impl HostEventRouter {
709    pub fn new(
710        store: Arc<dyn HostEventStore>,
711        artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
712        process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
713        process_work_poke: Option<crate::ProcessWorkPoke>,
714        host_profile_id: impl Into<String>,
715    ) -> Self {
716        Self {
717            store,
718            artifact_store,
719            process_registry,
720            process_work_poke,
721            host_profile_id: host_profile_id.into(),
722        }
723    }
724
725    pub fn store(&self) -> Arc<dyn HostEventStore> {
726        Arc::clone(&self.store)
727    }
728
729    pub async fn emit(
730        &self,
731        request: HostEventOccurrenceRequest,
732        effect_controller: &dyn crate::RuntimeEffectController,
733    ) -> Result<HostEventEmitReport, PluginError> {
734        let occurrence = self.store.record_occurrence(request).await?;
735        let reservations = self
736            .store
737            .reserve_matching_deliveries(&occurrence.occurrence_id)
738            .await?;
739        let Some(process_registry) = self.process_registry.as_ref() else {
740            if reservations.is_empty() {
741                return Ok(HostEventEmitReport::new(
742                    occurrence.occurrence_id,
743                    Vec::new(),
744                ));
745            }
746            return Err(PluginError::Session(
747                "host event delivery requires a process registry".to_string(),
748            ));
749        };
750        let mut started_process_ids = Vec::new();
751        for reservation in reservations {
752            let process_id = reservation.process_id.clone();
753            self.start_delivery(
754                &reservation,
755                Arc::clone(process_registry),
756                effect_controller,
757            )
758            .await?;
759            started_process_ids.push(process_id);
760        }
761        if !started_process_ids.is_empty()
762            && let Some(poke) = self.process_work_poke.as_ref()
763        {
764            poke.poke();
765        }
766        Ok(HostEventEmitReport::new(
767            occurrence.occurrence_id,
768            started_process_ids,
769        ))
770    }
771
772    async fn start_delivery(
773        &self,
774        reservation: &TriggerDeliveryReservation,
775        process_registry: Arc<dyn crate::ProcessRegistry>,
776        effect_controller: &dyn crate::RuntimeEffectController,
777    ) -> Result<(), PluginError> {
778        let subscription = &reservation.subscription;
779        let occurrence = &reservation.occurrence;
780        validate_payload(&occurrence.payload, &subscription.event_ty).map_err(|message| {
781            PluginError::Session(format!(
782                "invalid payload for trigger `{}`: {message}",
783                subscription.handle
784            ))
785        })?;
786        let artifact = self
787            .artifact_store
788            .get_module_artifact(&subscription.module_ref)
789            .await
790            .map_err(|err| {
791                PluginError::Session(format!(
792                    "failed to load trigger target module `{}`: {err}",
793                    subscription.module_ref
794                ))
795            })?
796            .ok_or_else(|| {
797                PluginError::Session(format!(
798                    "missing trigger target module `{}`",
799                    subscription.module_ref
800                ))
801            })?;
802        let signal_event_types = artifact
803            .canonical_ir
804            .process(&subscription.process_name)
805            .map(crate::lashlang_process_signal_event_types)
806            .unwrap_or_default();
807        let args =
808            materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
809        let originator_scope_id = subscription.registrant_scope_id();
810        let host_event_invocation = crate::runtime::causal::host_event_invocation(
811            &originator_scope_id,
812            &occurrence.occurrence_id,
813        );
814        let registration = crate::ProcessRegistration::new(
815            reservation.process_id.clone(),
816            crate::ProcessInput::LashlangProcess {
817                module_ref: subscription.module_ref.clone(),
818                process_ref: subscription.process_ref.clone(),
819                required_surface_ref: subscription.required_surface_ref.clone(),
820                process_name: subscription.process_name.clone(),
821                args,
822            },
823            crate::ProcessProvenance::new(
824                subscription.registrant.clone(),
825                self.host_profile_id.clone(),
826            )
827            .with_caused_by(host_event_invocation.causal_ref()),
828        )
829        .with_extra_event_types(
830            crate::lashlang_process_event_types()
831                .into_iter()
832                .chain(signal_event_types),
833        )
834        .with_execution_env_ref(Some(subscription.env_ref.clone()))
835        .with_wake_target(subscription.wake_target.clone());
836        let grant =
837            subscription
838                .wake_target
839                .clone()
840                .map(|session_scope| crate::ProcessStartGrant {
841                    session_scope,
842                    descriptor: crate::ProcessHandleDescriptor::new(
843                        Some("lashlang"),
844                        Some(subscription.process_name.as_str()),
845                    ),
846                });
847        let execution_context = crate::ProcessExecutionContext::default()
848            .with_causal_invocation(Some(host_event_invocation));
849        let command = crate::ProcessCommand::Start {
850            registration,
851            grant,
852            execution_context: Box::new(execution_context),
853        };
854        let effect_id = command.effect_id();
855        let invocation = crate::RuntimeInvocation::effect(
856            crate::RuntimeScope::new(originator_scope_id),
857            effect_id.clone(),
858            crate::RuntimeEffectKind::Process,
859            format!(
860                "host_event:{}:{}",
861                occurrence.occurrence_id, subscription.subscription_id
862            ),
863        )
864        .with_caused_by(Some(crate::CausalRef::HostEvent {
865            occurrence_id: occurrence.occurrence_id.clone(),
866        }));
867        let outcome = effect_controller
868            .execute_effect(
869                crate::RuntimeEffectEnvelope::new(
870                    invocation,
871                    crate::RuntimeEffectCommand::process(command),
872                ),
873                crate::RuntimeEffectLocalExecutor::process_control(process_registry),
874            )
875            .await?;
876        match outcome {
877            crate::RuntimeEffectOutcome::Process {
878                result: crate::ProcessEffectOutcome::Start { .. },
879            } => Ok(()),
880            other => Err(PluginError::Session(format!(
881                "host event process start returned the wrong outcome: {}",
882                other.kind().as_str()
883            ))),
884        }
885    }
886}
887
888fn materialize_trigger_process_args(
889    input_template: &lashlang::TriggerInputTemplate,
890    event_payload: &serde_json::Value,
891) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
892    let mut args = lashlang::Record::default();
893    for (input_name, input) in input_template.entries() {
894        let value = match input {
895            lashlang::TriggerInputBinding::Event => event_payload.clone(),
896            lashlang::TriggerInputBinding::Fixed { value } => value.clone(),
897        };
898        args.insert(input_name.to_string(), lashlang::from_json(value));
899    }
900    match serde_json::to_value(lashlang::Value::Record(Arc::new(args)))
901        .map_err(|err| PluginError::Session(format!("serialize trigger process args: {err}")))?
902    {
903        serde_json::Value::Object(map) => Ok(map),
904        _ => Err(PluginError::Session(
905            "trigger process args must serialize as an object".to_string(),
906        )),
907    }
908}
909
910pub(crate) fn validate_payload(
911    value: &serde_json::Value,
912    ty: &lashlang::TypeExpr,
913) -> Result<(), String> {
914    if json_matches_type(value, ty) {
915        Ok(())
916    } else {
917        Err(format!("expected {}", lashlang::format_type_expr(ty)))
918    }
919}
920
921fn json_matches_type(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> bool {
922    match ty {
923        lashlang::TypeExpr::Any => true,
924        lashlang::TypeExpr::Ref(_) => false,
925        lashlang::TypeExpr::Str => value.is_string(),
926        lashlang::TypeExpr::Int => value.as_i64().is_some() || value.as_u64().is_some(),
927        lashlang::TypeExpr::Float => value.is_number(),
928        lashlang::TypeExpr::Bool => value.is_boolean(),
929        lashlang::TypeExpr::Dict => value.is_object(),
930        lashlang::TypeExpr::Null => value.is_null(),
931        lashlang::TypeExpr::Enum(values) => value
932            .as_str()
933            .is_some_and(|value| values.iter().any(|candidate| candidate.as_str() == value)),
934        lashlang::TypeExpr::List(item) => value.as_array().is_some_and(|items| {
935            items
936                .iter()
937                .all(|item_value| json_matches_type(item_value, item))
938        }),
939        lashlang::TypeExpr::Object(fields) => {
940            let Some(map) = value.as_object() else {
941                return false;
942            };
943            fields
944                .iter()
945                .all(|field| match map.get(field.name.as_str()) {
946                    Some(field_value) => json_matches_type(field_value, &field.ty),
947                    None => field.optional,
948                })
949        }
950        lashlang::TypeExpr::Union(items) => items.iter().any(|item| json_matches_type(value, item)),
951        lashlang::TypeExpr::Process { .. } | lashlang::TypeExpr::TriggerHandle(_) => {
952            value.is_object()
953        }
954    }
955}
956
957pub fn validate_host_event_occurrence_request(
958    request: &HostEventOccurrenceRequest,
959) -> Result<(), PluginError> {
960    if request.source_type.trim().is_empty() {
961        return Err(PluginError::Session(
962            "host event occurrence requires source_type".to_string(),
963        ));
964    }
965    if request.source_key.trim().is_empty() {
966        return Err(PluginError::Session(
967            "host event occurrence requires source_key".to_string(),
968        ));
969    }
970    if request.idempotency_key.trim().is_empty() {
971        return Err(PluginError::Session(
972            "host event occurrence requires idempotency_key".to_string(),
973        ));
974    }
975    Ok(())
976}
977
978pub fn host_event_occurrence_request_hash(
979    request: &HostEventOccurrenceRequest,
980) -> Result<String, PluginError> {
981    crate::stable_hash::stable_json_sha256_hex(&(
982        request.source_type.as_str(),
983        request.source_key.as_str(),
984        &request.payload,
985        &request.source,
986    ))
987    .map_err(|err| PluginError::Session(format!("failed to hash host event occurrence: {err}")))
988}
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993
994    fn button_payload_type() -> lashlang::NamedDataType {
995        lashlang::NamedDataType::object(
996            "ui.button.Pressed",
997            vec![lashlang::TypeField {
998                name: "button".into(),
999                ty: lashlang::TypeExpr::Str,
1000                optional: false,
1001            }],
1002        )
1003        .expect("valid host event payload")
1004    }
1005
1006    #[test]
1007    fn host_event_catalog_rejects_duplicate_trigger_source_identity() {
1008        let mut catalog = HostEventCatalog::new();
1009        catalog
1010            .declare(HostEvent::new(
1011                "Button",
1012                "ui.button",
1013                "pressed",
1014                button_payload_type(),
1015            ))
1016            .expect("first host event");
1017
1018        let err = catalog
1019            .declare(HostEvent::new(
1020                "AlternateButton",
1021                "ui.button",
1022                "pressed",
1023                button_payload_type(),
1024            ))
1025            .expect_err("duplicate public source identity should be rejected");
1026
1027        assert!(err.contains("duplicate host event source `ui.button.pressed`"));
1028    }
1029}