Skip to main content

lash_core/runtime/effect/
envelope.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7    LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8    LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionEnvironmentSync, LlmCallError};
12use crate::tool_dispatch::ToolTriggerEffectOutcome;
13use crate::{
14    AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15    ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16    ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17    ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22/// Durable category for a runtime effect.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26    LlmCall,
27    Direct,
28    ToolCall,
29    Process,
30    ExecCode,
31    Checkpoint,
32    SyncExecutionEnvironment,
33    Sleep,
34    AwaitEvent,
35    DurableStep,
36}
37
38impl RuntimeEffectKind {
39    pub fn as_str(self) -> &'static str {
40        match self {
41            Self::LlmCall => "llm_call",
42            Self::Direct => "direct",
43            Self::ToolCall => "tool_call",
44            Self::Process => "process",
45            Self::ExecCode => "exec_code",
46            Self::Checkpoint => "checkpoint",
47            Self::SyncExecutionEnvironment => "sync_execution_environment",
48            Self::Sleep => "sleep",
49            Self::AwaitEvent => "await_event",
50            Self::DurableStep => "durable_step",
51        }
52    }
53}
54
55/// Canonical lineage for a runtime-side invocation.
56#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
57pub struct RuntimeInvocation {
58    pub scope: RuntimeScope,
59    pub subject: RuntimeSubject,
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub caused_by: Option<CausalRef>,
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub replay: Option<RuntimeReplay>,
64}
65
66impl RuntimeInvocation {
67    pub fn effect(
68        scope: RuntimeScope,
69        effect_id: impl Into<String>,
70        kind: RuntimeEffectKind,
71        replay_key: impl Into<String>,
72    ) -> Self {
73        Self {
74            scope,
75            subject: RuntimeSubject::Effect {
76                effect_id: effect_id.into(),
77                kind,
78            },
79            caused_by: None,
80            replay: Some(RuntimeReplay {
81                key: replay_key.into(),
82            }),
83        }
84    }
85
86    pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
87        self.caused_by = caused_by;
88        self
89    }
90
91    pub fn effect_id(&self) -> Option<&str> {
92        match &self.subject {
93            RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
94            _ => None,
95        }
96    }
97
98    pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
99        match &self.subject {
100            RuntimeSubject::Effect { kind, .. } => Some(*kind),
101            _ => None,
102        }
103    }
104
105    pub fn replay_key(&self) -> Option<&str> {
106        self.replay.as_ref().map(|replay| replay.key.as_str())
107    }
108
109    pub fn causal_ref(&self) -> Option<CausalRef> {
110        match &self.subject {
111            RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
112                session_id: self.scope.session_id.clone(),
113                turn_id: self.scope.turn_id.clone(),
114                effect_id: effect_id.clone(),
115            }),
116            RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
117                process_id: process_id.clone(),
118            }),
119            RuntimeSubject::ProcessEvent {
120                process_id,
121                sequence,
122                ..
123            } => Some(CausalRef::ProcessEvent {
124                process_id: process_id.clone(),
125                sequence: *sequence,
126            }),
127            RuntimeSubject::TriggerOccurrence { occurrence_id } => {
128                Some(CausalRef::TriggerOccurrence {
129                    occurrence_id: occurrence_id.clone(),
130                })
131            }
132            RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
133                session_id: self.scope.session_id.clone(),
134                node_id: node_id.clone(),
135            }),
136        }
137    }
138}
139
140#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
141pub struct RuntimeScope {
142    pub session_id: String,
143    #[serde(default, skip_serializing_if = "Option::is_none")]
144    pub turn_id: Option<String>,
145    #[serde(default, skip_serializing_if = "Option::is_none")]
146    pub turn_index: Option<usize>,
147    #[serde(default, skip_serializing_if = "Option::is_none")]
148    pub protocol_iteration: Option<usize>,
149}
150
151impl RuntimeScope {
152    pub fn new(session_id: impl Into<String>) -> Self {
153        Self {
154            session_id: session_id.into(),
155            turn_id: None,
156            turn_index: None,
157            protocol_iteration: None,
158        }
159    }
160
161    pub fn for_turn(
162        session_id: impl Into<String>,
163        turn_id: impl Into<String>,
164        turn_index: usize,
165        protocol_iteration: usize,
166    ) -> Self {
167        Self {
168            session_id: session_id.into(),
169            turn_id: Some(turn_id.into()),
170            turn_index: Some(turn_index),
171            protocol_iteration: Some(protocol_iteration),
172        }
173    }
174}
175
176#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
177pub struct RuntimeReplay {
178    pub key: String,
179}
180
181#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
182#[serde(tag = "type", rename_all = "snake_case")]
183pub enum RuntimeSubject {
184    Effect {
185        effect_id: String,
186        kind: RuntimeEffectKind,
187    },
188    Process {
189        process_id: String,
190    },
191    ProcessEvent {
192        process_id: String,
193        sequence: u64,
194        event_type: String,
195    },
196    TriggerOccurrence {
197        occurrence_id: String,
198    },
199    SessionNode {
200        node_id: String,
201    },
202}
203
204/// Fully serializable envelope emitted at Lash's nondeterministic boundary.
205#[derive(Clone, Debug, Serialize, Deserialize)]
206pub struct RuntimeEffectEnvelope {
207    pub invocation: RuntimeInvocation,
208    pub command: RuntimeEffectCommand,
209}
210
211impl RuntimeEffectEnvelope {
212    pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
213        Self::try_new(invocation, command).expect("valid runtime effect invocation")
214    }
215
216    pub fn try_new(
217        invocation: RuntimeInvocation,
218        command: RuntimeEffectCommand,
219    ) -> Result<Self, RuntimeEffectControllerError> {
220        validate_effect_invocation(&invocation, command.kind())?;
221        validate_effect_command(&command)?;
222        Ok(Self {
223            invocation,
224            command,
225        })
226    }
227
228    pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
229        crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
230            RuntimeEffectControllerError::new(
231                "runtime_effect_envelope_hash",
232                format!("failed to serialize runtime effect envelope: {err}"),
233            )
234        })
235    }
236}
237
238fn validate_effect_invocation(
239    invocation: &RuntimeInvocation,
240    command_kind: RuntimeEffectKind,
241) -> Result<(), RuntimeEffectControllerError> {
242    let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
243        return Err(RuntimeEffectControllerError::new(
244            "runtime_effect_invocation_subject",
245            "runtime effect envelope subject must be an effect",
246        ));
247    };
248    if effect_id.trim().is_empty() {
249        return Err(RuntimeEffectControllerError::new(
250            "runtime_effect_invocation_subject",
251            "runtime effect envelope effect id must be non-empty",
252        ));
253    }
254    if *kind != command_kind {
255        return Err(RuntimeEffectControllerError::new(
256            "runtime_effect_invocation_kind",
257            format!(
258                "runtime effect invocation kind {} does not match command kind {}",
259                kind.as_str(),
260                command_kind.as_str()
261            ),
262        ));
263    }
264    if invocation
265        .replay
266        .as_ref()
267        .is_none_or(|replay| replay.key.is_empty())
268    {
269        return Err(RuntimeEffectControllerError::new(
270            "runtime_effect_replay_required",
271            "runtime effect envelope requires replay.key",
272        ));
273    }
274    Ok(())
275}
276
277fn validate_effect_command(
278    command: &RuntimeEffectCommand,
279) -> Result<(), RuntimeEffectControllerError> {
280    if let RuntimeEffectCommand::DurableStep { step_id, .. } = command
281        && step_id.trim().is_empty()
282    {
283        return Err(RuntimeEffectControllerError::new(
284            "runtime_effect_durable_step_id",
285            "runtime effect durable step id must be non-empty",
286        ));
287    }
288    Ok(())
289}
290
291/// Serializable command emitted at Lash's nondeterministic runtime boundary.
292#[derive(Clone, Debug, Serialize, Deserialize)]
293#[serde(tag = "type", rename_all = "snake_case")]
294pub enum RuntimeEffectCommand {
295    LlmCall {
296        request: Box<LlmRequestSpec>,
297    },
298    Direct {
299        request: Box<LlmRequestSpec>,
300        usage_source: String,
301    },
302    ToolCall {
303        call: crate::PreparedToolCall,
304    },
305    Process {
306        command: Box<ProcessCommand>,
307    },
308    ExecCode {
309        language: String,
310        code: String,
311    },
312    Checkpoint {
313        checkpoint: CheckpointKind,
314    },
315    SyncExecutionEnvironment {
316        update_machine_config: bool,
317    },
318    Sleep {
319        duration_ms: u64,
320    },
321    AwaitEvent {
322        key: crate::AwaitEventKey,
323    },
324    DurableStep {
325        step_id: String,
326        input: serde_json::Value,
327    },
328}
329
330impl RuntimeEffectCommand {
331    pub fn process(command: ProcessCommand) -> Self {
332        Self::Process {
333            command: Box::new(command),
334        }
335    }
336
337    pub fn kind(&self) -> RuntimeEffectKind {
338        match self {
339            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
340            Self::Direct { .. } => RuntimeEffectKind::Direct,
341            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
342            Self::Process { .. } => RuntimeEffectKind::Process,
343            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
344            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
345            Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
346            Self::Sleep { .. } => RuntimeEffectKind::Sleep,
347            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
348            Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
349        }
350    }
351}
352
353/// Serializable operation against the process admin plane.
354#[derive(Clone, Debug, Serialize, Deserialize)]
355#[serde(tag = "op", rename_all = "snake_case")]
356pub enum ProcessCommand {
357    Start {
358        registration: ProcessRegistration,
359        #[serde(default, skip_serializing_if = "Option::is_none")]
360        grant: Option<ProcessStartGrant>,
361        #[serde(
362            default,
363            skip_serializing_if = "boxed_process_execution_context_is_empty"
364        )]
365        execution_context: Box<ProcessExecutionContext>,
366    },
367    List {
368        session_scope: SessionScope,
369        #[serde(default)]
370        mode: ProcessListMode,
371    },
372    Transfer {
373        from_scope: SessionScope,
374        to_scope: SessionScope,
375        process_ids: Vec<String>,
376    },
377    DeleteSession {
378        session_id: String,
379    },
380    Await {
381        process_id: String,
382    },
383    Cancel {
384        process_id: String,
385        reason: Option<String>,
386    },
387    Signal {
388        process_id: String,
389        signal_name: String,
390        signal_id: String,
391        request: crate::ProcessEventAppendRequest,
392    },
393}
394
395fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
396    context.is_empty()
397}
398
399type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
400
401impl ProcessCommand {
402    pub fn effect_id(&self) -> String {
403        match self {
404            Self::Start { registration, .. } => format!("process:start:{}", registration.id),
405            Self::List {
406                session_scope,
407                mode,
408            } => {
409                format!("process:list:{}:{}", session_scope.id(), mode.as_str())
410            }
411            Self::Transfer {
412                from_scope,
413                to_scope,
414                process_ids,
415            } => {
416                let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
417                    .unwrap_or_else(|_| "unhashable".to_string());
418                format!(
419                    "process:transfer:{}:{}:{digest}",
420                    from_scope.id(),
421                    to_scope.id()
422                )
423            }
424            Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
425            Self::Await { process_id } => format!("process:await:{process_id}"),
426            Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
427            Self::Signal {
428                process_id,
429                signal_name,
430                signal_id,
431                ..
432            } => {
433                format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
434            }
435        }
436    }
437}
438
439/// Serializable result of a process operation.
440#[derive(Clone, Debug, Serialize, Deserialize)]
441#[serde(tag = "op", rename_all = "snake_case")]
442pub enum ProcessEffectOutcome {
443    Start {
444        record: ProcessRecord,
445    },
446    List {
447        entries: Vec<ProcessHandleGrantEntry>,
448    },
449    Transfer,
450    DeleteSession {
451        report: crate::ProcessSessionDeleteReport,
452    },
453    Await {
454        output: ProcessAwaitOutput,
455    },
456    Cancel {
457        record: ProcessRecord,
458    },
459    Signal {
460        event: crate::ProcessEvent,
461    },
462}
463
464#[derive(Clone, Debug, Serialize, Deserialize)]
465pub struct ToolCallEffectOutcome {
466    pub launch: ToolCallLaunch,
467    #[serde(default, skip_serializing_if = "Vec::is_empty")]
468    pub triggers: Vec<ToolTriggerEffectOutcome>,
469}
470
471#[derive(Clone, Debug, Serialize, Deserialize)]
472#[serde(tag = "status", rename_all = "snake_case")]
473pub enum ToolCallLaunch {
474    Done {
475        result: CompletedToolCall,
476    },
477    Pending {
478        key: crate::AwaitEventKey,
479        pending: crate::PendingCompletion,
480        duration_ms: u64,
481    },
482}
483
484/// Serializable result of a runtime effect command.
485#[derive(Clone, Debug, Serialize, Deserialize)]
486#[serde(tag = "type", rename_all = "snake_case")]
487pub enum RuntimeEffectOutcome {
488    LlmCall {
489        result: Result<LlmResponse, LlmCallError>,
490        text_streamed: bool,
491    },
492    Direct {
493        result: Result<LlmResponse, LlmCallError>,
494    },
495    ToolCall {
496        launch: ToolCallLaunch,
497        #[serde(default, skip_serializing_if = "Vec::is_empty")]
498        triggers: Vec<ToolTriggerEffectOutcome>,
499    },
500    Process {
501        result: ProcessEffectOutcome,
502    },
503    ExecCode {
504        result: Result<ExecResponse, String>,
505    },
506    Checkpoint {
507        result: CheckpointOutcome,
508    },
509    SyncExecutionEnvironment {
510        result: Result<Option<ExecutionEnvironmentSync>, String>,
511    },
512    Sleep,
513    AwaitEvent {
514        resolution: crate::Resolution,
515    },
516    DurableStep {
517        value: serde_json::Value,
518    },
519}
520
521// =============================================================================
522// Request specs (serializable forms of LLM/Direct requests)
523// =============================================================================
524
525/// Serializable attachment data for runtime effect envelopes.
526///
527/// Effect envelopes carry attachment references only. Local executors resolve
528/// bytes from the configured attachment store when a provider request is
529/// actually executed.
530#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
531pub struct LlmAttachmentSpec {
532    pub reference: AttachmentRef,
533}
534
535impl LlmAttachmentSpec {
536    fn into_attachment(self) -> LlmAttachment {
537        LlmAttachment::reference(self.reference)
538    }
539}
540
541/// Serializable LLM request data. Live stream and provider-trace callbacks are
542/// attached by the local executor, and attachment bytes are resolved locally
543/// from refs rather than persisted in the effect envelope.
544#[derive(Clone, Debug, Serialize, Deserialize)]
545pub struct LlmRequestSpec {
546    pub model: String,
547    pub messages: Vec<LlmMessage>,
548    pub attachments: Vec<LlmAttachmentSpec>,
549    pub tools: Arc<Vec<LlmToolSpec>>,
550    pub tool_choice: LlmToolChoice,
551    pub model_variant: Option<String>,
552    #[serde(default)]
553    pub generation: crate::GenerationOptions,
554    pub session_id: Option<String>,
555    pub output_spec: Option<LlmOutputSpec>,
556}
557
558impl LlmRequestSpec {
559    pub async fn from_request(
560        request: &CoreLlmRequest,
561        attachment_store: &dyn AttachmentStore,
562    ) -> Result<Self, RuntimeEffectControllerError> {
563        Ok(Self {
564            model: request.model.clone(),
565            messages: request.messages.clone(),
566            attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
567                .await?,
568            tools: Arc::clone(&request.tools),
569            tool_choice: request.tool_choice.clone(),
570            model_variant: request.model_variant.clone(),
571            generation: request.generation.clone(),
572            session_id: request.session_id.clone(),
573            output_spec: request.output_spec.clone(),
574        })
575    }
576
577    pub fn into_request(
578        self,
579        stream_events: Option<LlmEventSender>,
580        provider_trace: Option<LlmProviderTraceSender>,
581    ) -> CoreLlmRequest {
582        CoreLlmRequest {
583            model: self.model,
584            messages: self.messages,
585            attachments: self
586                .attachments
587                .into_iter()
588                .map(LlmAttachmentSpec::into_attachment)
589                .collect(),
590            tools: self.tools,
591            tool_choice: self.tool_choice,
592            model_variant: self.model_variant,
593            generation: self.generation,
594            session_id: self.session_id,
595            output_spec: self.output_spec,
596            stream_events,
597            provider_trace,
598        }
599    }
600}
601
602async fn attachment_specs_from_attachments(
603    attachments: &[LlmAttachment],
604    attachment_store: &dyn AttachmentStore,
605) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
606    let mut specs = Vec::with_capacity(attachments.len());
607    for attachment in attachments {
608        specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
609    }
610    Ok(specs)
611}
612
613async fn attachment_spec_from_attachment(
614    attachment: &LlmAttachment,
615    attachment_store: &dyn AttachmentStore,
616) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
617    if let Some(reference) = attachment.reference.as_ref() {
618        return Ok(LlmAttachmentSpec {
619            reference: reference.clone(),
620        });
621    }
622    if attachment.data.is_empty() {
623        return Err(RuntimeEffectControllerError::new(
624            "runtime_effect_attachment_missing_reference",
625            "runtime effect attachment has neither a durable reference nor inline bytes",
626        ));
627    }
628    let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
629        RuntimeEffectControllerError::new(
630            "runtime_effect_attachment_media_type",
631            format!(
632                "attachment media type `{}` cannot be represented durably",
633                attachment.mime
634            ),
635        )
636    })?;
637    let reference = attachment_store
638        .put(
639            attachment.data.clone(),
640            AttachmentCreateMeta::new(media_type, None, None, None),
641        )
642        .await
643        .map_err(|err| {
644            RuntimeEffectControllerError::new(
645                "runtime_effect_attachment_store",
646                format!("failed to store attachment before runtime effect invocation: {err}"),
647            )
648        })?;
649    Ok(LlmAttachmentSpec { reference })
650}
651
652impl RuntimeEffectOutcome {
653    pub fn into_llm_call(
654        self,
655    ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
656        match self {
657            Self::LlmCall {
658                result,
659                text_streamed,
660            } => Ok((result, text_streamed)),
661            other => Err(RuntimeEffectControllerError::wrong_outcome(
662                RuntimeEffectKind::LlmCall,
663                other.kind(),
664            )),
665        }
666    }
667
668    pub fn into_direct_response(
669        self,
670    ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
671        match self {
672            Self::Direct { result } => Ok(result),
673            other => Err(RuntimeEffectControllerError::wrong_outcome(
674                RuntimeEffectKind::Direct,
675                other.kind(),
676            )),
677        }
678    }
679
680    pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
681        match self {
682            Self::ToolCall {
683                launch: ToolCallLaunch::Done { result },
684                ..
685            } => Ok(result),
686            Self::ToolCall {
687                launch: ToolCallLaunch::Pending { .. },
688                ..
689            } => Err(RuntimeEffectControllerError::new(
690                "runtime_effect_tool_call_pending",
691                "tool call launch is pending and has no completed output yet",
692            )),
693            other => Err(RuntimeEffectControllerError::wrong_outcome(
694                RuntimeEffectKind::ToolCall,
695                other.kind(),
696            )),
697        }
698    }
699
700    pub fn into_tool_call_effect(
701        self,
702    ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
703        match self {
704            Self::ToolCall { launch, triggers } => Ok(ToolCallEffectOutcome { launch, triggers }),
705            other => Err(RuntimeEffectControllerError::wrong_outcome(
706                RuntimeEffectKind::ToolCall,
707                other.kind(),
708            )),
709        }
710    }
711
712    pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
713        match self {
714            Self::Process { result } => Ok(result),
715            other => Err(RuntimeEffectControllerError::wrong_outcome(
716                RuntimeEffectKind::Process,
717                other.kind(),
718            )),
719        }
720    }
721
722    pub fn into_exec_code(
723        self,
724    ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
725        match self {
726            Self::ExecCode { result } => Ok(result),
727            other => Err(RuntimeEffectControllerError::wrong_outcome(
728                RuntimeEffectKind::ExecCode,
729                other.kind(),
730            )),
731        }
732    }
733
734    pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
735        match self {
736            Self::Checkpoint { result } => Ok(result),
737            other => Err(RuntimeEffectControllerError::wrong_outcome(
738                RuntimeEffectKind::Checkpoint,
739                other.kind(),
740            )),
741        }
742    }
743
744    pub fn into_sync_execution_environment(
745        self,
746    ) -> Result<Result<Option<ExecutionEnvironmentSync>, String>, RuntimeEffectControllerError>
747    {
748        match self {
749            Self::SyncExecutionEnvironment { result } => Ok(result),
750            other => Err(RuntimeEffectControllerError::wrong_outcome(
751                RuntimeEffectKind::SyncExecutionEnvironment,
752                other.kind(),
753            )),
754        }
755    }
756
757    pub fn into_await_event(self) -> Result<crate::Resolution, RuntimeEffectControllerError> {
758        match self {
759            Self::AwaitEvent { resolution } => Ok(resolution),
760            other => Err(RuntimeEffectControllerError::wrong_outcome(
761                RuntimeEffectKind::AwaitEvent,
762                other.kind(),
763            )),
764        }
765    }
766
767    pub fn into_durable_step(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
768        match self {
769            Self::DurableStep { value } => Ok(value),
770            other => Err(RuntimeEffectControllerError::wrong_outcome(
771                RuntimeEffectKind::DurableStep,
772                other.kind(),
773            )),
774        }
775    }
776
777    pub fn kind(&self) -> RuntimeEffectKind {
778        match self {
779            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
780            Self::Direct { .. } => RuntimeEffectKind::Direct,
781            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
782            Self::Process { .. } => RuntimeEffectKind::Process,
783            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
784            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
785            Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
786            Self::Sleep => RuntimeEffectKind::Sleep,
787            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
788            Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
789        }
790    }
791}