Skip to main content

lash_core/
host_events.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::plugin::PluginError;
7
8#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
9pub struct HostEvent {
10    pub resource_type: String,
11    pub alias: String,
12    pub event: String,
13    pub payload_ty: lashlang::NamedDataType,
14}
15
16impl HostEvent {
17    pub fn new(
18        resource_type: impl Into<String>,
19        alias: impl Into<String>,
20        event: impl Into<String>,
21        payload_ty: lashlang::NamedDataType,
22    ) -> Self {
23        Self {
24            resource_type: resource_type.into(),
25            alias: alias.into(),
26            event: event.into(),
27            payload_ty,
28        }
29    }
30
31    pub fn payload_type(&self) -> &lashlang::NamedDataType {
32        &self.payload_ty
33    }
34
35    pub fn key(&self) -> HostEventKey {
36        HostEventKey {
37            resource_type: self.resource_type.clone(),
38            alias: self.alias.clone(),
39            event: self.event.clone(),
40        }
41    }
42
43    pub fn source_type(&self) -> String {
44        host_event_source_type(&self.alias, &self.event)
45    }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49pub struct HostEventKey {
50    pub resource_type: String,
51    pub alias: String,
52    pub event: String,
53}
54
55impl HostEventKey {
56    pub fn new(
57        resource_type: impl Into<String>,
58        alias: impl Into<String>,
59        event: impl Into<String>,
60    ) -> Self {
61        Self {
62            resource_type: resource_type.into(),
63            alias: alias.into(),
64            event: event.into(),
65        }
66    }
67
68    pub fn source_type(&self) -> String {
69        host_event_source_type(&self.alias, &self.event)
70    }
71}
72
73pub fn host_event_source_type(alias: &str, event: &str) -> String {
74    format!("{alias}.{event}")
75}
76
77#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
78pub struct HostEventCatalog {
79    events: BTreeMap<HostEventKey, HostEvent>,
80}
81
82impl HostEventCatalog {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    pub fn declare(&mut self, event: HostEvent) -> Result<(), String> {
88        let key = event.key();
89        if self.events.contains_key(&key) {
90            return Err(format!(
91                "duplicate host event `{}.{}.{}`",
92                key.resource_type, key.alias, key.event
93            ));
94        }
95        let source_type = event.source_type();
96        if let Some(existing) = self
97            .events
98            .values()
99            .find(|existing| existing.source_type() == source_type)
100        {
101            return Err(format!(
102                "duplicate host event source `{source_type}` declared by `{}.{}.{}` and `{}.{}.{}`",
103                existing.resource_type,
104                existing.alias,
105                existing.event,
106                key.resource_type,
107                key.alias,
108                key.event
109            ));
110        }
111        self.events.insert(key, event);
112        Ok(())
113    }
114
115    pub fn from_events(events: impl IntoIterator<Item = HostEvent>) -> Result<Self, String> {
116        let mut catalog = Self::new();
117        for event in events {
118            catalog.declare(event)?;
119        }
120        Ok(catalog)
121    }
122
123    pub fn get(&self, resource_type: &str, alias: &str, event: &str) -> Option<&HostEvent> {
124        self.events
125            .get(&HostEventKey::new(resource_type, alias, event))
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.events.is_empty()
130    }
131
132    pub fn events(&self) -> impl Iterator<Item = &HostEvent> {
133        self.events.values()
134    }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
138pub struct HostEventEmitReport {
139    #[serde(default, skip_serializing_if = "String::is_empty")]
140    pub occurrence_id: String,
141    pub started_process_ids: Vec<String>,
142}
143
144impl HostEventEmitReport {
145    pub fn empty() -> Self {
146        Self::default()
147    }
148
149    fn new(occurrence_id: String, started_process_ids: Vec<String>) -> Self {
150        Self {
151            occurrence_id,
152            started_process_ids,
153        }
154    }
155}
156
157#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
158pub struct HostEventOccurrenceRequest {
159    pub source_type: String,
160    pub source_key: String,
161    #[serde(default)]
162    pub payload: serde_json::Value,
163    pub idempotency_key: String,
164    #[serde(default, skip_serializing_if = "Option::is_none")]
165    pub source: Option<serde_json::Value>,
166}
167
168impl HostEventOccurrenceRequest {
169    pub fn new(
170        source_type: impl Into<String>,
171        source_key: impl Into<String>,
172        payload: serde_json::Value,
173        idempotency_key: impl Into<String>,
174    ) -> Self {
175        Self {
176            source_type: source_type.into(),
177            source_key: source_key.into(),
178            payload,
179            idempotency_key: idempotency_key.into(),
180            source: None,
181        }
182    }
183
184    pub fn with_source(mut self, source: serde_json::Value) -> Self {
185        self.source = Some(source);
186        self
187    }
188}
189
190#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
191pub struct HostEventOccurrenceRecord {
192    pub occurrence_id: String,
193    pub source_type: String,
194    pub source_key: String,
195    #[serde(default)]
196    pub payload: serde_json::Value,
197    pub idempotency_key: String,
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub source: Option<serde_json::Value>,
200    pub occurred_at_ms: u64,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
204#[serde(transparent)]
205pub struct TriggerSourceType(String);
206
207impl TriggerSourceType {
208    pub fn new(value: impl Into<String>) -> Self {
209        Self(value.into())
210    }
211
212    pub fn as_str(&self) -> &str {
213        &self.0
214    }
215}
216
217impl From<String> for TriggerSourceType {
218    fn from(value: String) -> Self {
219        Self::new(value)
220    }
221}
222
223impl From<&str> for TriggerSourceType {
224    fn from(value: &str) -> Self {
225        Self::new(value)
226    }
227}
228
229impl AsRef<str> for TriggerSourceType {
230    fn as_ref(&self) -> &str {
231        self.as_str()
232    }
233}
234
235impl std::fmt::Display for TriggerSourceType {
236    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        formatter.write_str(self.as_str())
238    }
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct TriggerRegistration {
243    pub handle: String,
244    pub source_key: String,
245    #[serde(default, skip_serializing_if = "Option::is_none")]
246    pub name: Option<String>,
247    pub source_type: TriggerSourceType,
248    pub source: serde_json::Value,
249    pub target: TriggerTargetSummary,
250    #[serde(default = "default_enabled")]
251    pub enabled: bool,
252}
253
254#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
255pub struct TriggerTargetSummary {
256    pub process_name: String,
257    pub inputs: lashlang::TriggerInputTemplate,
258}
259
260#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
261pub struct TriggerSubscriptionDraft {
262    pub session_id: String,
263    #[serde(default, skip_serializing_if = "Option::is_none")]
264    pub name: Option<String>,
265    pub source_type: String,
266    pub source_key: String,
267    pub source: serde_json::Value,
268    pub event_ty: lashlang::TypeExpr,
269    pub module_ref: lashlang::ModuleRef,
270    pub required_surface_ref: lashlang::RequiredSurfaceRef,
271    pub process_ref: lashlang::ProcessRef,
272    pub process_name: String,
273    pub input_template: lashlang::TriggerInputTemplate,
274}
275
276#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
277pub struct TriggerSubscriptionRecord {
278    pub subscription_id: String,
279    pub session_id: String,
280    pub handle: String,
281    #[serde(default, skip_serializing_if = "Option::is_none")]
282    pub name: Option<String>,
283    pub source_type: String,
284    pub source_key: String,
285    pub source: serde_json::Value,
286    pub event_ty: lashlang::TypeExpr,
287    pub module_ref: lashlang::ModuleRef,
288    pub required_surface_ref: lashlang::RequiredSurfaceRef,
289    pub process_ref: lashlang::ProcessRef,
290    pub process_name: String,
291    pub input_template: lashlang::TriggerInputTemplate,
292    #[serde(default = "default_enabled")]
293    pub enabled: bool,
294    pub created_at_ms: u64,
295    pub updated_at_ms: u64,
296}
297
298impl From<&TriggerSubscriptionRecord> for TriggerRegistration {
299    fn from(route: &TriggerSubscriptionRecord) -> Self {
300        Self {
301            handle: route.handle.clone(),
302            source_key: route.source_key.clone(),
303            name: route.name.clone(),
304            source_type: TriggerSourceType::new(route.source_type.clone()),
305            source: route.source.clone(),
306            target: TriggerTargetSummary {
307                process_name: route.process_name.clone(),
308                inputs: route.input_template.clone(),
309            },
310            enabled: route.enabled,
311        }
312    }
313}
314
315#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
316pub struct TriggerSubscriptionFilter {
317    #[serde(default, skip_serializing_if = "Option::is_none")]
318    pub session_id: Option<String>,
319    #[serde(default, skip_serializing_if = "Option::is_none")]
320    pub handle: Option<String>,
321    #[serde(default, skip_serializing_if = "Option::is_none")]
322    pub name: Option<String>,
323    #[serde(default, skip_serializing_if = "Option::is_none")]
324    pub source_type: Option<String>,
325    #[serde(default, skip_serializing_if = "Option::is_none")]
326    pub source_key: Option<String>,
327    #[serde(default, skip_serializing_if = "Option::is_none")]
328    pub target: Option<lashlang::TriggerTargetIdentity>,
329    #[serde(default, skip_serializing_if = "Option::is_none")]
330    pub enabled: Option<bool>,
331}
332
333impl TriggerSubscriptionFilter {
334    pub fn for_session(session_id: impl Into<String>) -> Self {
335        Self {
336            session_id: Some(session_id.into()),
337            ..Self::default()
338        }
339    }
340
341    pub fn for_source_type(source_type: impl Into<String>) -> Self {
342        Self {
343            source_type: Some(source_type.into()),
344            ..Self::default()
345        }
346    }
347
348    pub fn matches(&self, record: &TriggerSubscriptionRecord) -> bool {
349        self.session_id
350            .as_deref()
351            .is_none_or(|session_id| record.session_id == session_id)
352            && self
353                .handle
354                .as_deref()
355                .is_none_or(|handle| record.handle == handle)
356            && self
357                .name
358                .as_deref()
359                .is_none_or(|name| record.name.as_deref() == Some(name))
360            && self
361                .source_type
362                .as_deref()
363                .is_none_or(|source_type| record.source_type == source_type)
364            && self
365                .source_key
366                .as_deref()
367                .is_none_or(|source_key| record.source_key == source_key)
368            && self.enabled.is_none_or(|enabled| record.enabled == enabled)
369            && self.target.as_ref().is_none_or(|target| {
370                target.matches(
371                    &record.module_ref,
372                    &record.required_surface_ref,
373                    &record.process_ref,
374                    &record.process_name,
375                )
376            })
377    }
378}
379
380#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
381pub struct TriggerDeliveryReservation {
382    pub occurrence: HostEventOccurrenceRecord,
383    pub subscription: TriggerSubscriptionRecord,
384    pub process_id: String,
385}
386
387#[async_trait::async_trait]
388pub trait HostEventStore: Send + Sync {
389    fn durability_tier(&self) -> crate::DurabilityTier {
390        crate::DurabilityTier::Inline
391    }
392
393    async fn source_key_for_subscription(
394        &self,
395        source_type: &str,
396        source: &serde_json::Value,
397    ) -> Result<String, PluginError> {
398        default_host_event_source_key(source_type, source)
399    }
400
401    async fn register_subscription(
402        &self,
403        draft: TriggerSubscriptionDraft,
404    ) -> Result<TriggerSubscriptionRecord, PluginError>;
405
406    async fn list_subscriptions(
407        &self,
408        filter: TriggerSubscriptionFilter,
409    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError>;
410
411    async fn cancel_subscription(
412        &self,
413        session_id: &str,
414        handle: &str,
415    ) -> Result<bool, PluginError>;
416
417    async fn record_occurrence(
418        &self,
419        request: HostEventOccurrenceRequest,
420    ) -> Result<HostEventOccurrenceRecord, PluginError>;
421
422    async fn reserve_matching_deliveries(
423        &self,
424        occurrence_id: &str,
425    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError>;
426}
427
428#[derive(Default)]
429pub struct InMemoryHostEventStore {
430    state: Mutex<InMemoryHostEventState>,
431}
432
433#[derive(Default)]
434struct InMemoryHostEventState {
435    next_subscription_seq: u64,
436    subscriptions: BTreeMap<String, TriggerSubscriptionRecord>,
437    occurrences: BTreeMap<String, HostEventOccurrenceRecord>,
438    occurrence_id_by_idempotency_key: BTreeMap<String, String>,
439    occurrence_hashes: BTreeMap<String, String>,
440    deliveries: BTreeSet<(String, String)>,
441}
442
443#[async_trait::async_trait]
444impl HostEventStore for InMemoryHostEventStore {
445    async fn register_subscription(
446        &self,
447        draft: TriggerSubscriptionDraft,
448    ) -> Result<TriggerSubscriptionRecord, PluginError> {
449        let mut state = self
450            .state
451            .lock()
452            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
453        state.next_subscription_seq = state.next_subscription_seq.saturating_add(1);
454        let handle = format!("trigger:{}", state.next_subscription_seq);
455        let subscription_id = format!("subscription:{}", state.next_subscription_seq);
456        let now = crate::runtime::current_epoch_ms();
457        let record = TriggerSubscriptionRecord {
458            subscription_id: subscription_id.clone(),
459            session_id: draft.session_id,
460            handle,
461            name: draft.name,
462            source_type: draft.source_type,
463            source_key: draft.source_key,
464            source: draft.source,
465            event_ty: draft.event_ty,
466            module_ref: draft.module_ref,
467            required_surface_ref: draft.required_surface_ref,
468            process_ref: draft.process_ref,
469            process_name: draft.process_name,
470            input_template: draft.input_template,
471            enabled: true,
472            created_at_ms: now,
473            updated_at_ms: now,
474        };
475        state.subscriptions.insert(subscription_id, record.clone());
476        Ok(record)
477    }
478
479    async fn list_subscriptions(
480        &self,
481        filter: TriggerSubscriptionFilter,
482    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
483        let state = self
484            .state
485            .lock()
486            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
487        let mut records = state
488            .subscriptions
489            .values()
490            .filter(|record| filter.matches(record))
491            .cloned()
492            .collect::<Vec<_>>();
493        records.sort_by(|left, right| {
494            left.session_id
495                .cmp(&right.session_id)
496                .then_with(|| left.handle.cmp(&right.handle))
497        });
498        Ok(records)
499    }
500
501    async fn cancel_subscription(
502        &self,
503        session_id: &str,
504        handle: &str,
505    ) -> Result<bool, PluginError> {
506        let mut state = self
507            .state
508            .lock()
509            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
510        let now = crate::runtime::current_epoch_ms();
511        let Some(record) = state
512            .subscriptions
513            .values_mut()
514            .find(|record| record.session_id == session_id && record.handle == handle)
515        else {
516            return Ok(false);
517        };
518        let changed = record.enabled;
519        record.enabled = false;
520        record.updated_at_ms = now;
521        Ok(changed)
522    }
523
524    async fn record_occurrence(
525        &self,
526        request: HostEventOccurrenceRequest,
527    ) -> Result<HostEventOccurrenceRecord, PluginError> {
528        validate_host_event_occurrence_request(&request)?;
529        let mut state = self
530            .state
531            .lock()
532            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
533        let request_hash = host_event_occurrence_request_hash(&request)?;
534        if let Some(existing_id) = state
535            .occurrence_id_by_idempotency_key
536            .get(&request.idempotency_key)
537            .cloned()
538        {
539            let existing_hash = state
540                .occurrence_hashes
541                .get(&existing_id)
542                .cloned()
543                .unwrap_or_default();
544            if existing_hash != request_hash {
545                return Err(PluginError::Session(format!(
546                    "host event occurrence idempotency conflict for `{}`",
547                    request.idempotency_key
548                )));
549            }
550            return state.occurrences.get(&existing_id).cloned().ok_or_else(|| {
551                PluginError::Session(format!(
552                    "missing host event occurrence `{existing_id}` for idempotency key"
553                ))
554            });
555        }
556        let occurrence_id = deterministic_occurrence_id(&request)?;
557        let record = HostEventOccurrenceRecord {
558            occurrence_id: occurrence_id.clone(),
559            source_type: request.source_type,
560            source_key: request.source_key,
561            payload: request.payload,
562            idempotency_key: request.idempotency_key.clone(),
563            source: request.source,
564            occurred_at_ms: crate::runtime::current_epoch_ms(),
565        };
566        state
567            .occurrence_id_by_idempotency_key
568            .insert(request.idempotency_key, occurrence_id.clone());
569        state
570            .occurrence_hashes
571            .insert(occurrence_id.clone(), request_hash);
572        state.occurrences.insert(occurrence_id, record.clone());
573        Ok(record)
574    }
575
576    async fn reserve_matching_deliveries(
577        &self,
578        occurrence_id: &str,
579    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
580        let mut state = self
581            .state
582            .lock()
583            .map_err(|_| PluginError::Session("host event store lock poisoned".to_string()))?;
584        let occurrence = state
585            .occurrences
586            .get(occurrence_id)
587            .cloned()
588            .ok_or_else(|| {
589                PluginError::Session(format!("unknown host event occurrence `{occurrence_id}`"))
590            })?;
591        let subscriptions = state
592            .subscriptions
593            .values()
594            .filter(|record| {
595                record.enabled
596                    && record.source_type == occurrence.source_type
597                    && record.source_key == occurrence.source_key
598            })
599            .cloned()
600            .collect::<Vec<_>>();
601        let mut deliveries = Vec::new();
602        for subscription in subscriptions {
603            let key = (
604                occurrence.occurrence_id.clone(),
605                subscription.subscription_id.clone(),
606            );
607            if !state.deliveries.insert(key) {
608                continue;
609            }
610            let process_id = deterministic_delivery_process_id(
611                &occurrence.occurrence_id,
612                &subscription.subscription_id,
613            )?;
614            deliveries.push(TriggerDeliveryReservation {
615                occurrence: occurrence.clone(),
616                subscription,
617                process_id,
618            });
619        }
620        Ok(deliveries)
621    }
622}
623
624fn default_enabled() -> bool {
625    true
626}
627
628pub fn default_host_event_source_key(
629    source_type: &str,
630    source: &serde_json::Value,
631) -> Result<String, PluginError> {
632    let digest =
633        crate::stable_hash::stable_json_sha256_hex(&(source_type, source)).map_err(|err| {
634            PluginError::Session(format!("failed to hash host event source key: {err}"))
635        })?;
636    Ok(format!("source:{source_type}:sha256:{digest}"))
637}
638
639pub fn empty_host_event_source_key(source_type: &str) -> Result<String, PluginError> {
640    default_host_event_source_key(source_type, &serde_json::json!({}))
641}
642
643pub fn deterministic_occurrence_id(
644    request: &HostEventOccurrenceRequest,
645) -> Result<String, PluginError> {
646    let digest = crate::stable_hash::stable_json_sha256_hex(&(
647        request.source_type.as_str(),
648        request.source_key.as_str(),
649        request.idempotency_key.as_str(),
650    ))
651    .map_err(|err| PluginError::Session(format!("failed to hash host event occurrence: {err}")))?;
652    Ok(format!("host_event:{digest}"))
653}
654
655pub fn deterministic_delivery_process_id(
656    occurrence_id: &str,
657    subscription_id: &str,
658) -> Result<String, PluginError> {
659    let digest = crate::stable_hash::stable_json_sha256_hex(&(occurrence_id, subscription_id))
660        .map_err(|err| {
661            PluginError::Session(format!("failed to hash host event delivery: {err}"))
662        })?;
663    Ok(format!("process:host-event:{digest}"))
664}
665
666#[derive(Clone)]
667pub struct HostEventRouter {
668    store: Arc<dyn HostEventStore>,
669    process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
670    process_work_poke: Option<crate::ProcessWorkPoke>,
671    host_profile_id: String,
672}
673
674impl HostEventRouter {
675    pub fn new(
676        store: Arc<dyn HostEventStore>,
677        process_registry: Option<Arc<dyn crate::ProcessRegistry>>,
678        process_work_poke: Option<crate::ProcessWorkPoke>,
679        host_profile_id: impl Into<String>,
680    ) -> Self {
681        Self {
682            store,
683            process_registry,
684            process_work_poke,
685            host_profile_id: host_profile_id.into(),
686        }
687    }
688
689    pub fn store(&self) -> Arc<dyn HostEventStore> {
690        Arc::clone(&self.store)
691    }
692
693    pub async fn emit(
694        &self,
695        request: HostEventOccurrenceRequest,
696        effect_controller: &dyn crate::RuntimeEffectController,
697    ) -> Result<HostEventEmitReport, PluginError> {
698        let occurrence = self.store.record_occurrence(request).await?;
699        let reservations = self
700            .store
701            .reserve_matching_deliveries(&occurrence.occurrence_id)
702            .await?;
703        let Some(process_registry) = self.process_registry.as_ref() else {
704            if reservations.is_empty() {
705                return Ok(HostEventEmitReport::new(
706                    occurrence.occurrence_id,
707                    Vec::new(),
708                ));
709            }
710            return Err(PluginError::Session(
711                "host event delivery requires a process registry".to_string(),
712            ));
713        };
714        let mut started_process_ids = Vec::new();
715        for reservation in reservations {
716            let process_id = reservation.process_id.clone();
717            self.start_delivery(
718                &reservation,
719                Arc::clone(process_registry),
720                effect_controller,
721            )
722            .await?;
723            started_process_ids.push(process_id);
724        }
725        if !started_process_ids.is_empty()
726            && let Some(poke) = self.process_work_poke.as_ref()
727        {
728            poke.poke();
729        }
730        Ok(HostEventEmitReport::new(
731            occurrence.occurrence_id,
732            started_process_ids,
733        ))
734    }
735
736    async fn start_delivery(
737        &self,
738        reservation: &TriggerDeliveryReservation,
739        process_registry: Arc<dyn crate::ProcessRegistry>,
740        effect_controller: &dyn crate::RuntimeEffectController,
741    ) -> Result<(), PluginError> {
742        let subscription = &reservation.subscription;
743        let occurrence = &reservation.occurrence;
744        validate_payload(&occurrence.payload, &subscription.event_ty).map_err(|message| {
745            PluginError::Session(format!(
746                "invalid payload for trigger `{}`: {message}",
747                subscription.handle
748            ))
749        })?;
750        let args =
751            materialize_trigger_process_args(&subscription.input_template, &occurrence.payload)?;
752        let owner_scope = crate::ProcessScope::new(subscription.session_id.clone());
753        let host_event_invocation = crate::runtime::causal::host_event_invocation(
754            &subscription.session_id,
755            &occurrence.occurrence_id,
756        );
757        let registration = crate::ProcessRegistration::new(
758            reservation.process_id.clone(),
759            crate::ProcessInput::LashlangProcess {
760                module_ref: subscription.module_ref.clone(),
761                process_ref: subscription.process_ref.clone(),
762                required_surface_ref: subscription.required_surface_ref.clone(),
763                process_name: subscription.process_name.clone(),
764                args,
765            },
766        )
767        .with_extra_event_types(crate::lashlang_process_event_types())
768        .with_process_provenance(
769            crate::ProcessProvenance::new(owner_scope.clone(), self.host_profile_id.clone())
770                .with_caused_by(host_event_invocation.causal_ref()),
771        );
772        let grant = crate::ProcessStartGrant {
773            owner_scope: owner_scope.clone(),
774            descriptor: crate::ProcessHandleDescriptor::new(
775                Some("lashlang"),
776                Some(subscription.process_name.as_str()),
777            ),
778        };
779        let execution_context = crate::ProcessExecutionContext::default()
780            .with_causal_invocation(Some(host_event_invocation))
781            .with_wake_target_scope(owner_scope);
782        let command = crate::ProcessCommand::Start {
783            registration,
784            grant: Some(grant),
785            execution_context: Box::new(execution_context),
786        };
787        let effect_id = command.effect_id();
788        let invocation = crate::RuntimeInvocation::effect(
789            crate::RuntimeScope::new(subscription.session_id.clone()),
790            effect_id.clone(),
791            crate::RuntimeEffectKind::Process,
792            format!(
793                "host_event:{}:{}",
794                occurrence.occurrence_id, subscription.subscription_id
795            ),
796        )
797        .with_caused_by(Some(crate::CausalRef::HostEvent {
798            occurrence_id: occurrence.occurrence_id.clone(),
799        }));
800        let outcome = effect_controller
801            .execute_effect(
802                crate::RuntimeEffectEnvelope::new(
803                    invocation,
804                    crate::RuntimeEffectCommand::Process { command },
805                ),
806                crate::RuntimeEffectLocalExecutor::process_control(process_registry),
807            )
808            .await?;
809        match outcome {
810            crate::RuntimeEffectOutcome::Process {
811                result: crate::ProcessEffectOutcome::Start { .. },
812            } => Ok(()),
813            other => Err(PluginError::Session(format!(
814                "host event process start returned the wrong outcome: {}",
815                other.kind().as_str()
816            ))),
817        }
818    }
819}
820
821fn materialize_trigger_process_args(
822    input_template: &lashlang::TriggerInputTemplate,
823    event_payload: &serde_json::Value,
824) -> Result<serde_json::Map<String, serde_json::Value>, PluginError> {
825    let mut args = lashlang::Record::default();
826    for (input_name, input) in input_template.entries() {
827        let value = match input {
828            lashlang::TriggerInputBinding::Event => event_payload.clone(),
829            lashlang::TriggerInputBinding::Fixed { value } => value.clone(),
830        };
831        args.insert(input_name.to_string(), lashlang::from_json(value));
832    }
833    match serde_json::to_value(lashlang::Value::Record(Arc::new(args)))
834        .map_err(|err| PluginError::Session(format!("serialize trigger process args: {err}")))?
835    {
836        serde_json::Value::Object(map) => Ok(map),
837        _ => Err(PluginError::Session(
838            "trigger process args must serialize as an object".to_string(),
839        )),
840    }
841}
842
843pub(crate) fn validate_payload(
844    value: &serde_json::Value,
845    ty: &lashlang::TypeExpr,
846) -> Result<(), String> {
847    if json_matches_type(value, ty) {
848        Ok(())
849    } else {
850        Err(format!("expected {}", lashlang::format_type_expr(ty)))
851    }
852}
853
854fn json_matches_type(value: &serde_json::Value, ty: &lashlang::TypeExpr) -> bool {
855    match ty {
856        lashlang::TypeExpr::Any => true,
857        lashlang::TypeExpr::Ref(_) => false,
858        lashlang::TypeExpr::Str => value.is_string(),
859        lashlang::TypeExpr::Int => value.as_i64().is_some() || value.as_u64().is_some(),
860        lashlang::TypeExpr::Float => value.is_number(),
861        lashlang::TypeExpr::Bool => value.is_boolean(),
862        lashlang::TypeExpr::Dict => value.is_object(),
863        lashlang::TypeExpr::Null => value.is_null(),
864        lashlang::TypeExpr::Enum(values) => value
865            .as_str()
866            .is_some_and(|value| values.iter().any(|candidate| candidate.as_str() == value)),
867        lashlang::TypeExpr::List(item) => value.as_array().is_some_and(|items| {
868            items
869                .iter()
870                .all(|item_value| json_matches_type(item_value, item))
871        }),
872        lashlang::TypeExpr::Object(fields) => {
873            let Some(map) = value.as_object() else {
874                return false;
875            };
876            fields
877                .iter()
878                .all(|field| match map.get(field.name.as_str()) {
879                    Some(field_value) => json_matches_type(field_value, &field.ty),
880                    None => field.optional,
881                })
882        }
883        lashlang::TypeExpr::Union(items) => items.iter().any(|item| json_matches_type(value, item)),
884        lashlang::TypeExpr::Process { .. } | lashlang::TypeExpr::TriggerHandle(_) => {
885            value.is_object()
886        }
887    }
888}
889
890pub fn validate_host_event_occurrence_request(
891    request: &HostEventOccurrenceRequest,
892) -> Result<(), PluginError> {
893    if request.source_type.trim().is_empty() {
894        return Err(PluginError::Session(
895            "host event occurrence requires source_type".to_string(),
896        ));
897    }
898    if request.source_key.trim().is_empty() {
899        return Err(PluginError::Session(
900            "host event occurrence requires source_key".to_string(),
901        ));
902    }
903    if request.idempotency_key.trim().is_empty() {
904        return Err(PluginError::Session(
905            "host event occurrence requires idempotency_key".to_string(),
906        ));
907    }
908    Ok(())
909}
910
911pub fn host_event_occurrence_request_hash(
912    request: &HostEventOccurrenceRequest,
913) -> Result<String, PluginError> {
914    crate::stable_hash::stable_json_sha256_hex(&(
915        request.source_type.as_str(),
916        request.source_key.as_str(),
917        &request.payload,
918        &request.source,
919    ))
920    .map_err(|err| PluginError::Session(format!("failed to hash host event occurrence: {err}")))
921}
922
923#[cfg(test)]
924mod tests {
925    use super::*;
926
927    fn button_payload_type() -> lashlang::NamedDataType {
928        lashlang::NamedDataType::object(
929            "ui.button.Pressed",
930            vec![lashlang::TypeField {
931                name: "button".into(),
932                ty: lashlang::TypeExpr::Str,
933                optional: false,
934            }],
935        )
936        .expect("valid host event payload")
937    }
938
939    #[test]
940    fn host_event_catalog_rejects_duplicate_trigger_source_identity() {
941        let mut catalog = HostEventCatalog::new();
942        catalog
943            .declare(HostEvent::new(
944                "Button",
945                "ui.button",
946                "pressed",
947                button_payload_type(),
948            ))
949            .expect("first host event");
950
951        let err = catalog
952            .declare(HostEvent::new(
953                "AlternateButton",
954                "ui.button",
955                "pressed",
956                button_payload_type(),
957            ))
958            .expect_err("duplicate public source identity should be rejected");
959
960        assert!(err.contains("duplicate host event source `ui.button.pressed`"));
961    }
962}