Skip to main content

lash_core/runtime/effect/
envelope.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7    LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8    LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionEnvironmentSync, LlmCallError};
12use crate::tool_dispatch::ToolTriggerEffectOutcome;
13use crate::{
14    AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15    ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16    ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17    ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22/// Durable category for a runtime effect.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26    LlmCall,
27    Direct,
28    ToolCall,
29    ToolBatch,
30    Process,
31    ExecCode,
32    Checkpoint,
33    SyncExecutionEnvironment,
34    Sleep,
35    AwaitEvent,
36    DurableStep,
37}
38
39impl RuntimeEffectKind {
40    pub fn as_str(self) -> &'static str {
41        match self {
42            Self::LlmCall => "llm_call",
43            Self::Direct => "direct",
44            Self::ToolCall => "tool_call",
45            Self::ToolBatch => "tool_batch",
46            Self::Process => "process",
47            Self::ExecCode => "exec_code",
48            Self::Checkpoint => "checkpoint",
49            Self::SyncExecutionEnvironment => "sync_execution_environment",
50            Self::Sleep => "sleep",
51            Self::AwaitEvent => "await_event",
52            Self::DurableStep => "durable_step",
53        }
54    }
55}
56
57/// Canonical lineage for a runtime-side invocation.
58#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
59pub struct RuntimeInvocation {
60    pub scope: RuntimeScope,
61    pub subject: RuntimeSubject,
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub caused_by: Option<CausalRef>,
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub replay: Option<RuntimeReplay>,
66}
67
68impl RuntimeInvocation {
69    pub fn effect(
70        scope: RuntimeScope,
71        effect_id: impl Into<String>,
72        kind: RuntimeEffectKind,
73        replay_key: impl Into<String>,
74    ) -> Self {
75        Self {
76            scope,
77            subject: RuntimeSubject::Effect {
78                effect_id: effect_id.into(),
79                kind,
80            },
81            caused_by: None,
82            replay: Some(RuntimeReplay {
83                key: replay_key.into(),
84            }),
85        }
86    }
87
88    pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
89        self.caused_by = caused_by;
90        self
91    }
92
93    pub fn effect_id(&self) -> Option<&str> {
94        match &self.subject {
95            RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
96            _ => None,
97        }
98    }
99
100    pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
101        match &self.subject {
102            RuntimeSubject::Effect { kind, .. } => Some(*kind),
103            _ => None,
104        }
105    }
106
107    pub fn replay_key(&self) -> Option<&str> {
108        self.replay.as_ref().map(|replay| replay.key.as_str())
109    }
110
111    pub fn causal_ref(&self) -> Option<CausalRef> {
112        match &self.subject {
113            RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
114                session_id: self.scope.session_id.clone(),
115                turn_id: self.scope.turn_id.clone(),
116                effect_id: effect_id.clone(),
117            }),
118            RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
119                process_id: process_id.clone(),
120            }),
121            RuntimeSubject::ProcessEvent {
122                process_id,
123                sequence,
124                ..
125            } => Some(CausalRef::ProcessEvent {
126                process_id: process_id.clone(),
127                sequence: *sequence,
128            }),
129            RuntimeSubject::TriggerOccurrence { occurrence_id } => {
130                Some(CausalRef::TriggerOccurrence {
131                    occurrence_id: occurrence_id.clone(),
132                })
133            }
134            RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
135                session_id: self.scope.session_id.clone(),
136                node_id: node_id.clone(),
137            }),
138        }
139    }
140}
141
142#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
143pub struct RuntimeScope {
144    pub session_id: String,
145    #[serde(default, skip_serializing_if = "Option::is_none")]
146    pub turn_id: Option<String>,
147    #[serde(default, skip_serializing_if = "Option::is_none")]
148    pub turn_index: Option<usize>,
149    #[serde(default, skip_serializing_if = "Option::is_none")]
150    pub protocol_iteration: Option<usize>,
151}
152
153impl RuntimeScope {
154    pub fn new(session_id: impl Into<String>) -> Self {
155        Self {
156            session_id: session_id.into(),
157            turn_id: None,
158            turn_index: None,
159            protocol_iteration: None,
160        }
161    }
162
163    pub fn for_turn(
164        session_id: impl Into<String>,
165        turn_id: impl Into<String>,
166        turn_index: usize,
167        protocol_iteration: usize,
168    ) -> Self {
169        Self {
170            session_id: session_id.into(),
171            turn_id: Some(turn_id.into()),
172            turn_index: Some(turn_index),
173            protocol_iteration: Some(protocol_iteration),
174        }
175    }
176}
177
178#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
179pub struct RuntimeReplay {
180    pub key: String,
181}
182
183#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(tag = "type", rename_all = "snake_case")]
185pub enum RuntimeSubject {
186    Effect {
187        effect_id: String,
188        kind: RuntimeEffectKind,
189    },
190    Process {
191        process_id: String,
192    },
193    ProcessEvent {
194        process_id: String,
195        sequence: u64,
196        event_type: String,
197    },
198    TriggerOccurrence {
199        occurrence_id: String,
200    },
201    SessionNode {
202        node_id: String,
203    },
204}
205
206/// Fully serializable envelope emitted at Lash's nondeterministic boundary.
207#[derive(Clone, Debug, Serialize, Deserialize)]
208pub struct RuntimeEffectEnvelope {
209    pub invocation: RuntimeInvocation,
210    pub command: RuntimeEffectCommand,
211}
212
213impl RuntimeEffectEnvelope {
214    pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
215        Self::try_new(invocation, command).expect("valid runtime effect invocation")
216    }
217
218    pub fn try_new(
219        invocation: RuntimeInvocation,
220        command: RuntimeEffectCommand,
221    ) -> Result<Self, RuntimeEffectControllerError> {
222        validate_effect_invocation(&invocation, command.kind())?;
223        validate_effect_command(&command)?;
224        Ok(Self {
225            invocation,
226            command,
227        })
228    }
229
230    pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
231        crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
232            RuntimeEffectControllerError::new(
233                "runtime_effect_envelope_hash",
234                format!("failed to serialize runtime effect envelope: {err}"),
235            )
236        })
237    }
238}
239
240fn validate_effect_invocation(
241    invocation: &RuntimeInvocation,
242    command_kind: RuntimeEffectKind,
243) -> Result<(), RuntimeEffectControllerError> {
244    let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
245        return Err(RuntimeEffectControllerError::new(
246            "runtime_effect_invocation_subject",
247            "runtime effect envelope subject must be an effect",
248        ));
249    };
250    if effect_id.trim().is_empty() {
251        return Err(RuntimeEffectControllerError::new(
252            "runtime_effect_invocation_subject",
253            "runtime effect envelope effect id must be non-empty",
254        ));
255    }
256    if *kind != command_kind {
257        return Err(RuntimeEffectControllerError::new(
258            "runtime_effect_invocation_kind",
259            format!(
260                "runtime effect invocation kind {} does not match command kind {}",
261                kind.as_str(),
262                command_kind.as_str()
263            ),
264        ));
265    }
266    if invocation
267        .replay
268        .as_ref()
269        .is_none_or(|replay| replay.key.is_empty())
270    {
271        return Err(RuntimeEffectControllerError::new(
272            "runtime_effect_replay_required",
273            "runtime effect envelope requires replay.key",
274        ));
275    }
276    Ok(())
277}
278
279fn validate_effect_command(
280    command: &RuntimeEffectCommand,
281) -> Result<(), RuntimeEffectControllerError> {
282    if let RuntimeEffectCommand::DurableStep { step_id, .. } = command
283        && step_id.trim().is_empty()
284    {
285        return Err(RuntimeEffectControllerError::new(
286            "runtime_effect_durable_step_id",
287            "runtime effect durable step id must be non-empty",
288        ));
289    }
290    if let RuntimeEffectCommand::ToolBatch { batch } = command {
291        if batch.batch_id.trim().is_empty() {
292            return Err(RuntimeEffectControllerError::new(
293                "runtime_effect_tool_batch_id",
294                "runtime effect tool batch id must be non-empty",
295            ));
296        }
297        if batch.calls.is_empty() {
298            return Err(RuntimeEffectControllerError::new(
299                "runtime_effect_tool_batch_empty",
300                "runtime effect tool batch must contain at least one prepared call",
301            ));
302        }
303        for (index, child) in batch.calls.iter().enumerate() {
304            if child.call.call_id.trim().is_empty() {
305                return Err(RuntimeEffectControllerError::new(
306                    "runtime_effect_tool_batch_child_call_id",
307                    format!("runtime effect tool batch child {index} has an empty call id"),
308                ));
309            }
310            if child.replay_suffix.trim().is_empty() {
311                return Err(RuntimeEffectControllerError::new(
312                    "runtime_effect_tool_batch_child_replay",
313                    format!("runtime effect tool batch child {index} has an empty replay suffix"),
314                ));
315            }
316        }
317    }
318    Ok(())
319}
320
321/// Serializable command emitted at Lash's nondeterministic runtime boundary.
322#[derive(Clone, Debug, Serialize, Deserialize)]
323#[serde(tag = "type", rename_all = "snake_case")]
324pub enum RuntimeEffectCommand {
325    LlmCall {
326        request: Box<LlmRequestSpec>,
327    },
328    Direct {
329        request: Box<LlmRequestSpec>,
330        usage_source: String,
331    },
332    ToolCall {
333        call: crate::PreparedToolCall,
334    },
335    ToolBatch {
336        batch: crate::PreparedToolBatch,
337    },
338    Process {
339        command: Box<ProcessCommand>,
340    },
341    ExecCode {
342        language: String,
343        code: String,
344    },
345    Checkpoint {
346        checkpoint: CheckpointKind,
347    },
348    SyncExecutionEnvironment {
349        update_machine_config: bool,
350    },
351    Sleep {
352        duration_ms: u64,
353    },
354    AwaitEvent {
355        key: crate::AwaitEventKey,
356    },
357    DurableStep {
358        step_id: String,
359        input: serde_json::Value,
360    },
361}
362
363impl RuntimeEffectCommand {
364    pub fn process(command: ProcessCommand) -> Self {
365        Self::Process {
366            command: Box::new(command),
367        }
368    }
369
370    pub fn kind(&self) -> RuntimeEffectKind {
371        match self {
372            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
373            Self::Direct { .. } => RuntimeEffectKind::Direct,
374            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
375            Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
376            Self::Process { .. } => RuntimeEffectKind::Process,
377            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
378            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
379            Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
380            Self::Sleep { .. } => RuntimeEffectKind::Sleep,
381            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
382            Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
383        }
384    }
385}
386
387/// Serializable operation against the process admin plane.
388#[derive(Clone, Debug, Serialize, Deserialize)]
389#[serde(tag = "op", rename_all = "snake_case")]
390pub enum ProcessCommand {
391    Start {
392        registration: ProcessRegistration,
393        #[serde(default, skip_serializing_if = "Option::is_none")]
394        grant: Option<ProcessStartGrant>,
395        #[serde(
396            default,
397            skip_serializing_if = "boxed_process_execution_context_is_empty"
398        )]
399        execution_context: Box<ProcessExecutionContext>,
400    },
401    List {
402        session_scope: SessionScope,
403        #[serde(default)]
404        mode: ProcessListMode,
405    },
406    Transfer {
407        from_scope: SessionScope,
408        to_scope: SessionScope,
409        process_ids: Vec<String>,
410    },
411    DeleteSession {
412        session_id: String,
413    },
414    Await {
415        process_id: String,
416    },
417    Cancel {
418        process_id: String,
419        reason: Option<String>,
420    },
421    Signal {
422        process_id: String,
423        signal_name: String,
424        signal_id: String,
425        request: crate::ProcessEventAppendRequest,
426    },
427}
428
429fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
430    context.is_empty()
431}
432
433type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
434
435impl ProcessCommand {
436    pub fn effect_id(&self) -> String {
437        match self {
438            Self::Start { registration, .. } => format!("process:start:{}", registration.id),
439            Self::List {
440                session_scope,
441                mode,
442            } => {
443                format!("process:list:{}:{}", session_scope.id(), mode.as_str())
444            }
445            Self::Transfer {
446                from_scope,
447                to_scope,
448                process_ids,
449            } => {
450                let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
451                    .unwrap_or_else(|_| "unhashable".to_string());
452                format!(
453                    "process:transfer:{}:{}:{digest}",
454                    from_scope.id(),
455                    to_scope.id()
456                )
457            }
458            Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
459            Self::Await { process_id } => format!("process:await:{process_id}"),
460            Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
461            Self::Signal {
462                process_id,
463                signal_name,
464                signal_id,
465                ..
466            } => {
467                format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
468            }
469        }
470    }
471}
472
473/// Serializable result of a process operation.
474#[derive(Clone, Debug, Serialize, Deserialize)]
475#[serde(tag = "op", rename_all = "snake_case")]
476pub enum ProcessEffectOutcome {
477    Start {
478        record: ProcessRecord,
479    },
480    List {
481        entries: Vec<ProcessHandleGrantEntry>,
482    },
483    Transfer,
484    DeleteSession {
485        report: crate::ProcessSessionDeleteReport,
486    },
487    Await {
488        output: ProcessAwaitOutput,
489    },
490    Cancel {
491        record: ProcessRecord,
492    },
493    Signal {
494        event: crate::ProcessEvent,
495    },
496}
497
498#[derive(Clone, Debug, Serialize, Deserialize)]
499pub struct ToolCallEffectOutcome {
500    pub launch: ToolCallLaunch,
501    #[serde(default, skip_serializing_if = "Vec::is_empty")]
502    pub triggers: Vec<ToolTriggerEffectOutcome>,
503}
504
505#[derive(Clone, Debug, Serialize, Deserialize)]
506pub struct ToolBatchEffectOutcome {
507    pub launches: Vec<ToolCallLaunch>,
508    #[serde(default, skip_serializing_if = "Vec::is_empty")]
509    pub triggers: Vec<ToolTriggerEffectOutcome>,
510}
511
512#[derive(Clone, Debug, Serialize, Deserialize)]
513#[serde(tag = "status", rename_all = "snake_case")]
514pub enum ToolCallLaunch {
515    Done {
516        result: CompletedToolCall,
517    },
518    Pending {
519        key: crate::AwaitEventKey,
520        pending: crate::PendingCompletion,
521        duration_ms: u64,
522    },
523}
524
525/// Serializable result of a runtime effect command.
526#[derive(Clone, Debug, Serialize, Deserialize)]
527#[serde(tag = "type", rename_all = "snake_case")]
528pub enum RuntimeEffectOutcome {
529    LlmCall {
530        result: Result<LlmResponse, LlmCallError>,
531        text_streamed: bool,
532    },
533    Direct {
534        result: Result<LlmResponse, LlmCallError>,
535    },
536    ToolCall {
537        launch: ToolCallLaunch,
538        #[serde(default, skip_serializing_if = "Vec::is_empty")]
539        triggers: Vec<ToolTriggerEffectOutcome>,
540    },
541    ToolBatch {
542        launches: Vec<ToolCallLaunch>,
543        #[serde(default, skip_serializing_if = "Vec::is_empty")]
544        triggers: Vec<ToolTriggerEffectOutcome>,
545    },
546    Process {
547        result: ProcessEffectOutcome,
548    },
549    ExecCode {
550        result: Result<ExecResponse, String>,
551    },
552    Checkpoint {
553        result: CheckpointOutcome,
554    },
555    SyncExecutionEnvironment {
556        result: Result<Option<ExecutionEnvironmentSync>, String>,
557    },
558    Sleep,
559    AwaitEvent {
560        resolution: crate::Resolution,
561    },
562    DurableStep {
563        value: serde_json::Value,
564    },
565}
566
567// =============================================================================
568// Request specs (serializable forms of LLM/Direct requests)
569// =============================================================================
570
571/// Serializable attachment data for runtime effect envelopes.
572///
573/// Effect envelopes carry attachment references only. Local executors resolve
574/// bytes from the configured attachment store when a provider request is
575/// actually executed.
576#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
577pub struct LlmAttachmentSpec {
578    pub reference: AttachmentRef,
579}
580
581impl LlmAttachmentSpec {
582    fn into_attachment(self) -> LlmAttachment {
583        LlmAttachment::reference(self.reference)
584    }
585}
586
587/// Serializable LLM request data. Live stream and provider-trace callbacks are
588/// attached by the local executor, and attachment bytes are resolved locally
589/// from refs rather than persisted in the effect envelope.
590#[derive(Clone, Debug, Serialize, Deserialize)]
591pub struct LlmRequestSpec {
592    pub model: String,
593    pub messages: Vec<LlmMessage>,
594    pub attachments: Vec<LlmAttachmentSpec>,
595    pub tools: Arc<Vec<LlmToolSpec>>,
596    pub tool_choice: LlmToolChoice,
597    pub model_variant: Option<String>,
598    #[serde(default)]
599    pub generation: crate::GenerationOptions,
600    pub session_id: Option<String>,
601    pub output_spec: Option<LlmOutputSpec>,
602}
603
604impl LlmRequestSpec {
605    pub async fn from_request(
606        request: &CoreLlmRequest,
607        attachment_store: &dyn AttachmentStore,
608    ) -> Result<Self, RuntimeEffectControllerError> {
609        Ok(Self {
610            model: request.model.clone(),
611            messages: request.messages.clone(),
612            attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
613                .await?,
614            tools: Arc::clone(&request.tools),
615            tool_choice: request.tool_choice.clone(),
616            model_variant: request.model_variant.clone(),
617            generation: request.generation.clone(),
618            session_id: request.session_id.clone(),
619            output_spec: request.output_spec.clone(),
620        })
621    }
622
623    pub fn into_request(
624        self,
625        stream_events: Option<LlmEventSender>,
626        provider_trace: Option<LlmProviderTraceSender>,
627    ) -> CoreLlmRequest {
628        CoreLlmRequest {
629            model: self.model,
630            messages: self.messages,
631            attachments: self
632                .attachments
633                .into_iter()
634                .map(LlmAttachmentSpec::into_attachment)
635                .collect(),
636            tools: self.tools,
637            tool_choice: self.tool_choice,
638            model_variant: self.model_variant,
639            generation: self.generation,
640            session_id: self.session_id,
641            output_spec: self.output_spec,
642            stream_events,
643            provider_trace,
644        }
645    }
646}
647
648async fn attachment_specs_from_attachments(
649    attachments: &[LlmAttachment],
650    attachment_store: &dyn AttachmentStore,
651) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
652    let mut specs = Vec::with_capacity(attachments.len());
653    for attachment in attachments {
654        specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
655    }
656    Ok(specs)
657}
658
659async fn attachment_spec_from_attachment(
660    attachment: &LlmAttachment,
661    attachment_store: &dyn AttachmentStore,
662) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
663    if let Some(reference) = attachment.reference.as_ref() {
664        return Ok(LlmAttachmentSpec {
665            reference: reference.clone(),
666        });
667    }
668    if attachment.data.is_empty() {
669        return Err(RuntimeEffectControllerError::new(
670            "runtime_effect_attachment_missing_reference",
671            "runtime effect attachment has neither a durable reference nor inline bytes",
672        ));
673    }
674    let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
675        RuntimeEffectControllerError::new(
676            "runtime_effect_attachment_media_type",
677            format!(
678                "attachment media type `{}` cannot be represented durably",
679                attachment.mime
680            ),
681        )
682    })?;
683    let reference = attachment_store
684        .put(
685            attachment.data.clone(),
686            AttachmentCreateMeta::new(media_type, None, None, None),
687        )
688        .await
689        .map_err(|err| {
690            RuntimeEffectControllerError::new(
691                "runtime_effect_attachment_store",
692                format!("failed to store attachment before runtime effect invocation: {err}"),
693            )
694        })?;
695    Ok(LlmAttachmentSpec { reference })
696}
697
698impl RuntimeEffectOutcome {
699    pub fn into_llm_call(
700        self,
701    ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
702        match self {
703            Self::LlmCall {
704                result,
705                text_streamed,
706            } => Ok((result, text_streamed)),
707            other => Err(RuntimeEffectControllerError::wrong_outcome(
708                RuntimeEffectKind::LlmCall,
709                other.kind(),
710            )),
711        }
712    }
713
714    pub fn into_direct_response(
715        self,
716    ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
717        match self {
718            Self::Direct { result } => Ok(result),
719            other => Err(RuntimeEffectControllerError::wrong_outcome(
720                RuntimeEffectKind::Direct,
721                other.kind(),
722            )),
723        }
724    }
725
726    pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
727        match self {
728            Self::ToolCall {
729                launch: ToolCallLaunch::Done { result },
730                ..
731            } => Ok(result),
732            Self::ToolCall {
733                launch: ToolCallLaunch::Pending { .. },
734                ..
735            } => Err(RuntimeEffectControllerError::new(
736                "runtime_effect_tool_call_pending",
737                "tool call launch is pending and has no completed output yet",
738            )),
739            other => Err(RuntimeEffectControllerError::wrong_outcome(
740                RuntimeEffectKind::ToolCall,
741                other.kind(),
742            )),
743        }
744    }
745
746    pub fn into_tool_call_effect(
747        self,
748    ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
749        match self {
750            Self::ToolCall { launch, triggers } => Ok(ToolCallEffectOutcome { launch, triggers }),
751            other => Err(RuntimeEffectControllerError::wrong_outcome(
752                RuntimeEffectKind::ToolCall,
753                other.kind(),
754            )),
755        }
756    }
757
758    pub fn into_tool_batch_effect(
759        self,
760    ) -> Result<ToolBatchEffectOutcome, RuntimeEffectControllerError> {
761        match self {
762            Self::ToolBatch { launches, triggers } => {
763                Ok(ToolBatchEffectOutcome { launches, triggers })
764            }
765            other => Err(RuntimeEffectControllerError::wrong_outcome(
766                RuntimeEffectKind::ToolBatch,
767                other.kind(),
768            )),
769        }
770    }
771
772    pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
773        match self {
774            Self::Process { result } => Ok(result),
775            other => Err(RuntimeEffectControllerError::wrong_outcome(
776                RuntimeEffectKind::Process,
777                other.kind(),
778            )),
779        }
780    }
781
782    pub fn into_exec_code(
783        self,
784    ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
785        match self {
786            Self::ExecCode { result } => Ok(result),
787            other => Err(RuntimeEffectControllerError::wrong_outcome(
788                RuntimeEffectKind::ExecCode,
789                other.kind(),
790            )),
791        }
792    }
793
794    pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
795        match self {
796            Self::Checkpoint { result } => Ok(result),
797            other => Err(RuntimeEffectControllerError::wrong_outcome(
798                RuntimeEffectKind::Checkpoint,
799                other.kind(),
800            )),
801        }
802    }
803
804    pub fn into_sync_execution_environment(
805        self,
806    ) -> Result<Result<Option<ExecutionEnvironmentSync>, String>, RuntimeEffectControllerError>
807    {
808        match self {
809            Self::SyncExecutionEnvironment { result } => Ok(result),
810            other => Err(RuntimeEffectControllerError::wrong_outcome(
811                RuntimeEffectKind::SyncExecutionEnvironment,
812                other.kind(),
813            )),
814        }
815    }
816
817    pub fn into_await_event(self) -> Result<crate::Resolution, RuntimeEffectControllerError> {
818        match self {
819            Self::AwaitEvent { resolution } => Ok(resolution),
820            other => Err(RuntimeEffectControllerError::wrong_outcome(
821                RuntimeEffectKind::AwaitEvent,
822                other.kind(),
823            )),
824        }
825    }
826
827    pub fn into_durable_step(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
828        match self {
829            Self::DurableStep { value } => Ok(value),
830            other => Err(RuntimeEffectControllerError::wrong_outcome(
831                RuntimeEffectKind::DurableStep,
832                other.kind(),
833            )),
834        }
835    }
836
837    pub fn kind(&self) -> RuntimeEffectKind {
838        match self {
839            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
840            Self::Direct { .. } => RuntimeEffectKind::Direct,
841            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
842            Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
843            Self::Process { .. } => RuntimeEffectKind::Process,
844            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
845            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
846            Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
847            Self::Sleep => RuntimeEffectKind::Sleep,
848            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
849            Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
850        }
851    }
852}