Skip to main content

lash_remote_protocol/
triggers.rs

1//! Trigger envelopes: occurrence emission, subscriptions, and registrations.
2
3use std::collections::BTreeMap;
4
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7
8use crate::processes::{
9    RemoteProcessDefinitionIdentity, RemoteProcessEventType, RemoteProcessExecutionEnvRef,
10    RemoteProcessIdentity, RemoteProcessInput, RemoteProcessOriginator, RemoteSessionScope,
11};
12use crate::registry_errors::{RemoteProtocolError, require_non_empty};
13use crate::{REMOTE_PROTOCOL_VERSION, ensure_protocol_version};
14
15#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
16pub struct RemoteTriggerOccurrenceRequest {
17    pub protocol_version: u32,
18    pub source_type: String,
19    pub source_key: String,
20    #[serde(default)]
21    pub payload: serde_json::Value,
22    pub idempotency_key: String,
23    #[serde(default, skip_serializing_if = "Option::is_none")]
24    pub source: Option<serde_json::Value>,
25}
26
27impl RemoteTriggerOccurrenceRequest {
28    pub fn new(
29        source_type: impl Into<String>,
30        source_key: impl Into<String>,
31        payload: serde_json::Value,
32        idempotency_key: impl Into<String>,
33    ) -> Self {
34        Self {
35            protocol_version: REMOTE_PROTOCOL_VERSION,
36            source_type: source_type.into(),
37            source_key: source_key.into(),
38            payload,
39            idempotency_key: idempotency_key.into(),
40            source: None,
41        }
42    }
43
44    pub fn with_source(mut self, source: serde_json::Value) -> Self {
45        self.source = Some(source);
46        self
47    }
48
49    pub fn validate(&self) -> Result<(), RemoteProtocolError> {
50        ensure_protocol_version(self.protocol_version)?;
51        require_non_empty(
52            "RemoteTriggerOccurrenceRequest",
53            "source_type",
54            &self.source_type,
55        )?;
56        require_non_empty(
57            "RemoteTriggerOccurrenceRequest",
58            "source_key",
59            &self.source_key,
60        )?;
61        require_non_empty(
62            "RemoteTriggerOccurrenceRequest",
63            "idempotency_key",
64            &self.idempotency_key,
65        )
66    }
67}
68
69#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
70pub struct RemoteTriggerOccurrenceRecord {
71    pub occurrence_id: String,
72    pub source_type: String,
73    pub source_key: String,
74    #[serde(default)]
75    pub payload: serde_json::Value,
76    pub idempotency_key: String,
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub source: Option<serde_json::Value>,
79    pub occurred_at_ms: u64,
80}
81
82#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
83pub struct RemoteTriggerEmitReport {
84    pub protocol_version: u32,
85    #[serde(default, skip_serializing_if = "String::is_empty")]
86    pub occurrence_id: String,
87    #[serde(default, skip_serializing_if = "Vec::is_empty")]
88    pub started_process_ids: Vec<String>,
89}
90
91impl RemoteTriggerEmitReport {
92    pub fn validate(&self) -> Result<(), RemoteProtocolError> {
93        ensure_protocol_version(self.protocol_version)
94    }
95}
96
97#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
98pub struct RemoteTriggerSubscriptionFilter {
99    pub protocol_version: u32,
100    #[serde(default, skip_serializing_if = "Option::is_none")]
101    pub session_id: Option<String>,
102    #[serde(default, skip_serializing_if = "Option::is_none")]
103    pub handle: Option<String>,
104    #[serde(default, skip_serializing_if = "Option::is_none")]
105    pub name: Option<String>,
106    #[serde(default, skip_serializing_if = "Option::is_none")]
107    pub source_type: Option<String>,
108    #[serde(default, skip_serializing_if = "Option::is_none")]
109    pub source_key: Option<String>,
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    pub target: Option<RemoteProcessDefinitionIdentity>,
112    #[serde(default, skip_serializing_if = "Option::is_none")]
113    pub enabled: Option<bool>,
114}
115
116impl Default for RemoteTriggerSubscriptionFilter {
117    fn default() -> Self {
118        Self {
119            protocol_version: REMOTE_PROTOCOL_VERSION,
120            session_id: None,
121            handle: None,
122            name: None,
123            source_type: None,
124            source_key: None,
125            target: None,
126            enabled: None,
127        }
128    }
129}
130
131impl RemoteTriggerSubscriptionFilter {
132    pub fn for_session(session_id: impl Into<String>) -> Self {
133        Self {
134            protocol_version: REMOTE_PROTOCOL_VERSION,
135            session_id: Some(session_id.into()),
136            ..Self::default()
137        }
138    }
139
140    pub fn for_source_type(source_type: impl Into<String>) -> Self {
141        Self {
142            protocol_version: REMOTE_PROTOCOL_VERSION,
143            source_type: Some(source_type.into()),
144            ..Self::default()
145        }
146    }
147
148    pub fn validate(&self) -> Result<(), RemoteProtocolError> {
149        ensure_protocol_version(self.protocol_version)
150    }
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
154pub struct RemoteTriggerRegistration {
155    pub handle: String,
156    pub source_key: String,
157    #[serde(default, skip_serializing_if = "Option::is_none")]
158    pub name: Option<String>,
159    pub source_type: String,
160    #[serde(default)]
161    pub source: serde_json::Value,
162    pub target: RemoteTriggerTargetSummary,
163    #[serde(default = "default_true")]
164    pub enabled: bool,
165}
166
167#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
168pub struct RemoteTriggerTargetSummary {
169    #[serde(default, skip_serializing_if = "Option::is_none")]
170    pub label: Option<String>,
171    pub identity: RemoteProcessIdentity,
172    pub input: RemoteProcessInput,
173    #[serde(default)]
174    pub inputs: RemoteTriggerInputTemplate,
175}
176
177fn default_true() -> bool {
178    true
179}
180
181#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, JsonSchema)]
182#[serde(transparent)]
183pub struct RemoteTriggerInputTemplate {
184    pub entries: BTreeMap<String, RemoteTriggerInputBinding>,
185}
186
187impl RemoteTriggerInputTemplate {
188    pub fn new(entries: BTreeMap<String, RemoteTriggerInputBinding>) -> Self {
189        Self { entries }
190    }
191
192    pub fn validate(&self, type_name: &'static str) -> Result<(), RemoteProtocolError> {
193        for name in self.entries.keys() {
194            require_non_empty(type_name, "input_template key", name)?;
195        }
196        Ok(())
197    }
198}
199
200#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
201#[serde(tag = "kind", rename_all = "snake_case")]
202pub enum RemoteTriggerInputBinding {
203    Event,
204    Fixed { value: serde_json::Value },
205}
206
207#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
208pub struct RemoteTriggerSubscriptionDraft {
209    pub protocol_version: u32,
210    pub registrant: RemoteProcessOriginator,
211    pub env_ref: RemoteProcessExecutionEnvRef,
212    #[serde(default, skip_serializing_if = "Option::is_none")]
213    pub wake_target: Option<RemoteSessionScope>,
214    #[serde(default, skip_serializing_if = "Option::is_none")]
215    pub name: Option<String>,
216    pub source_type: String,
217    pub source_key: String,
218    #[serde(default)]
219    pub source: serde_json::Value,
220    #[serde(default)]
221    pub payload_schema: serde_json::Value,
222    pub target: RemoteProcessInput,
223    pub target_identity: RemoteProcessIdentity,
224    #[serde(default, skip_serializing_if = "Vec::is_empty")]
225    pub event_types: Vec<RemoteProcessEventType>,
226    #[serde(default)]
227    pub input_template: RemoteTriggerInputTemplate,
228    #[serde(default, skip_serializing_if = "Option::is_none")]
229    pub target_label: Option<String>,
230}
231
232impl RemoteTriggerSubscriptionDraft {
233    pub fn for_process(
234        registrant: RemoteProcessOriginator,
235        env_ref: RemoteProcessExecutionEnvRef,
236        source_type: impl Into<String>,
237        source_key: impl Into<String>,
238        target: RemoteProcessInput,
239        target_identity: RemoteProcessIdentity,
240    ) -> Self {
241        let target_label = target_identity.label.clone();
242        Self {
243            protocol_version: REMOTE_PROTOCOL_VERSION,
244            registrant,
245            env_ref,
246            wake_target: None,
247            name: None,
248            source_type: source_type.into(),
249            source_key: source_key.into(),
250            source: serde_json::Value::Object(serde_json::Map::new()),
251            payload_schema: serde_json::Value::Object(serde_json::Map::new()),
252            target,
253            target_identity,
254            event_types: Vec::new(),
255            input_template: RemoteTriggerInputTemplate::default(),
256            target_label,
257        }
258    }
259
260    pub fn with_name(mut self, name: impl Into<String>) -> Self {
261        self.name = Some(name.into());
262        self
263    }
264
265    pub fn with_source(mut self, source: serde_json::Value) -> Self {
266        self.source = source;
267        self
268    }
269
270    pub fn with_payload_schema(mut self, payload_schema: serde_json::Value) -> Self {
271        self.payload_schema = payload_schema;
272        self
273    }
274
275    pub fn with_wake_target(mut self, wake_target: RemoteSessionScope) -> Self {
276        self.wake_target = Some(wake_target);
277        self
278    }
279
280    pub fn with_event_types(
281        mut self,
282        event_types: impl IntoIterator<Item = RemoteProcessEventType>,
283    ) -> Self {
284        self.event_types = event_types.into_iter().collect();
285        self
286    }
287
288    pub fn with_input_template(mut self, input_template: RemoteTriggerInputTemplate) -> Self {
289        self.input_template = input_template;
290        self
291    }
292
293    pub fn with_target_label(mut self, target_label: impl Into<String>) -> Self {
294        self.target_label = Some(target_label.into());
295        self
296    }
297
298    pub fn validate(&self) -> Result<(), RemoteProtocolError> {
299        ensure_protocol_version(self.protocol_version)?;
300        self.registrant.validate("RemoteTriggerSubscriptionDraft")?;
301        self.env_ref.validate("RemoteTriggerSubscriptionDraft")?;
302        if let Some(wake_target) = &self.wake_target {
303            wake_target.validate("RemoteTriggerSubscriptionDraft")?;
304        }
305        require_non_empty(
306            "RemoteTriggerSubscriptionDraft",
307            "source_type",
308            &self.source_type,
309        )?;
310        require_non_empty(
311            "RemoteTriggerSubscriptionDraft",
312            "source_key",
313            &self.source_key,
314        )?;
315        self.target.validate("RemoteTriggerSubscriptionDraft")?;
316        self.target_identity
317            .validate("RemoteTriggerSubscriptionDraft")?;
318        for event_type in &self.event_types {
319            event_type.validate("RemoteTriggerSubscriptionDraft")?;
320        }
321        validate_remote_trigger_target_label(
322            "RemoteTriggerSubscriptionDraft",
323            self.target_label.as_deref(),
324            self.target_identity.label.as_deref(),
325        )?;
326        self.input_template
327            .validate("RemoteTriggerSubscriptionDraft")
328    }
329}
330
331#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
332pub struct RemoteTriggerSubscriptionRecord {
333    pub subscription_id: String,
334    pub registrant: RemoteProcessOriginator,
335    pub env_ref: RemoteProcessExecutionEnvRef,
336    #[serde(default, skip_serializing_if = "Option::is_none")]
337    pub wake_target: Option<RemoteSessionScope>,
338    pub handle: String,
339    #[serde(default, skip_serializing_if = "Option::is_none")]
340    pub name: Option<String>,
341    pub source_type: String,
342    pub source_key: String,
343    #[serde(default)]
344    pub source: serde_json::Value,
345    #[serde(default)]
346    pub payload_schema: serde_json::Value,
347    pub target: RemoteProcessInput,
348    pub target_identity: RemoteProcessIdentity,
349    #[serde(default, skip_serializing_if = "Vec::is_empty")]
350    pub event_types: Vec<RemoteProcessEventType>,
351    #[serde(default)]
352    pub input_template: RemoteTriggerInputTemplate,
353    #[serde(default, skip_serializing_if = "Option::is_none")]
354    pub target_label: Option<String>,
355    #[serde(default = "default_true")]
356    pub enabled: bool,
357    pub created_at_ms: u64,
358    pub updated_at_ms: u64,
359}
360
361impl RemoteTriggerSubscriptionRecord {
362    pub fn validate(&self, type_name: &'static str) -> Result<(), RemoteProtocolError> {
363        require_non_empty(type_name, "subscription_id", &self.subscription_id)?;
364        self.registrant.validate(type_name)?;
365        self.env_ref.validate(type_name)?;
366        if let Some(wake_target) = &self.wake_target {
367            wake_target.validate(type_name)?;
368        }
369        require_non_empty(type_name, "handle", &self.handle)?;
370        require_non_empty(type_name, "source_type", &self.source_type)?;
371        require_non_empty(type_name, "source_key", &self.source_key)?;
372        self.target.validate(type_name)?;
373        self.target_identity.validate(type_name)?;
374        for event_type in &self.event_types {
375            event_type.validate(type_name)?;
376        }
377        validate_remote_trigger_target_label(
378            type_name,
379            self.target_label.as_deref(),
380            self.target_identity.label.as_deref(),
381        )?;
382        self.input_template.validate(type_name)
383    }
384}
385
386fn validate_remote_trigger_target_label(
387    type_name: &'static str,
388    target_label: Option<&str>,
389    identity_label: Option<&str>,
390) -> Result<(), RemoteProtocolError> {
391    match (target_label, identity_label) {
392        (Some(target_label), Some(identity_label)) if target_label != identity_label => {
393            Err(RemoteProtocolError::InvalidEnvelope {
394                type_name,
395                message: "target_label must match target_identity.label when both are present"
396                    .to_string(),
397            })
398        }
399        _ => Ok(()),
400    }
401}
402
403#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
404pub struct RemoteTriggerRegisterSubscriptionRequest {
405    pub protocol_version: u32,
406    pub draft: RemoteTriggerSubscriptionDraft,
407}
408
409impl RemoteTriggerRegisterSubscriptionRequest {
410    pub fn validate(&self) -> Result<(), RemoteProtocolError> {
411        ensure_protocol_version(self.protocol_version)?;
412        if self.draft.protocol_version != self.protocol_version {
413            return Err(RemoteProtocolError::MismatchedNestedProtocolVersion {
414                parent: "RemoteTriggerRegisterSubscriptionRequest",
415                child: "draft",
416                parent_version: self.protocol_version,
417                child_version: self.draft.protocol_version,
418            });
419        }
420        self.draft.validate()
421    }
422}
423
424#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
425pub struct RemoteTriggerRegisterSubscriptionResult {
426    pub protocol_version: u32,
427    pub record: RemoteTriggerSubscriptionRecord,
428}
429
430impl RemoteTriggerRegisterSubscriptionResult {
431    pub fn validate(&self) -> Result<(), RemoteProtocolError> {
432        ensure_protocol_version(self.protocol_version)?;
433        self.record
434            .validate("RemoteTriggerRegisterSubscriptionResult")
435    }
436}
437
438#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
439pub struct RemoteTriggerListSubscriptionsResponse {
440    pub protocol_version: u32,
441    #[serde(default)]
442    pub subscriptions: Vec<RemoteTriggerSubscriptionRecord>,
443}
444
445impl RemoteTriggerListSubscriptionsResponse {
446    pub fn validate(&self) -> Result<(), RemoteProtocolError> {
447        ensure_protocol_version(self.protocol_version)?;
448        for record in &self.subscriptions {
449            record.validate("RemoteTriggerListSubscriptionsResponse")?;
450        }
451        Ok(())
452    }
453}
454
455#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
456pub struct RemoteTriggerCancelSubscriptionRequest {
457    pub protocol_version: u32,
458    pub session_id: String,
459    pub handle: String,
460}
461
462impl RemoteTriggerCancelSubscriptionRequest {
463    pub fn validate(&self) -> Result<(), RemoteProtocolError> {
464        ensure_protocol_version(self.protocol_version)?;
465        require_non_empty(
466            "RemoteTriggerCancelSubscriptionRequest",
467            "session_id",
468            &self.session_id,
469        )?;
470        require_non_empty(
471            "RemoteTriggerCancelSubscriptionRequest",
472            "handle",
473            &self.handle,
474        )
475    }
476}
477
478#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
479pub struct RemoteTriggerCancelSubscriptionResult {
480    pub protocol_version: u32,
481    pub session_id: String,
482    pub handle: String,
483    pub cancelled: bool,
484}
485
486impl RemoteTriggerCancelSubscriptionResult {
487    pub fn validate(&self) -> Result<(), RemoteProtocolError> {
488        ensure_protocol_version(self.protocol_version)?;
489        require_non_empty(
490            "RemoteTriggerCancelSubscriptionResult",
491            "session_id",
492            &self.session_id,
493        )?;
494        require_non_empty(
495            "RemoteTriggerCancelSubscriptionResult",
496            "handle",
497            &self.handle,
498        )
499    }
500}