Skip to main content

lash_core/runtime/effect/
envelope.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7    LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8    LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionEnvironmentSync, LlmCallError};
12use crate::tool_dispatch::ToolTriggerEffectOutcome;
13use crate::{
14    AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15    ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16    ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17    ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22/// Durable category for a runtime effect.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26    LlmCall,
27    Direct,
28    ToolAttempt,
29    ToolBatch,
30    Process,
31    ExecCode,
32    Checkpoint,
33    SyncExecutionEnvironment,
34    Sleep,
35    AwaitEvent,
36    DurableStep,
37}
38
39impl RuntimeEffectKind {
40    pub fn as_str(self) -> &'static str {
41        match self {
42            Self::LlmCall => "llm_call",
43            Self::Direct => "direct",
44            Self::ToolAttempt => "tool_attempt",
45            Self::ToolBatch => "tool_batch",
46            Self::Process => "process",
47            Self::ExecCode => "exec_code",
48            Self::Checkpoint => "checkpoint",
49            Self::SyncExecutionEnvironment => "sync_execution_environment",
50            Self::Sleep => "sleep",
51            Self::AwaitEvent => "await_event",
52            Self::DurableStep => "durable_step",
53        }
54    }
55}
56
57/// Canonical lineage for a runtime-side invocation.
58#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
59pub struct RuntimeInvocation {
60    pub scope: RuntimeScope,
61    pub subject: RuntimeSubject,
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub caused_by: Option<CausalRef>,
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub replay: Option<RuntimeReplay>,
66}
67
68impl RuntimeInvocation {
69    pub fn effect(
70        scope: RuntimeScope,
71        effect_id: impl Into<String>,
72        kind: RuntimeEffectKind,
73        replay_key: impl Into<String>,
74    ) -> Self {
75        Self {
76            scope,
77            subject: RuntimeSubject::Effect {
78                effect_id: effect_id.into(),
79                kind,
80            },
81            caused_by: None,
82            replay: Some(RuntimeReplay {
83                key: replay_key.into(),
84            }),
85        }
86    }
87
88    pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
89        self.caused_by = caused_by;
90        self
91    }
92
93    pub fn effect_id(&self) -> Option<&str> {
94        match &self.subject {
95            RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
96            _ => None,
97        }
98    }
99
100    pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
101        match &self.subject {
102            RuntimeSubject::Effect { kind, .. } => Some(*kind),
103            _ => None,
104        }
105    }
106
107    pub fn replay_key(&self) -> Option<&str> {
108        self.replay.as_ref().map(|replay| replay.key.as_str())
109    }
110
111    pub fn causal_ref(&self) -> Option<CausalRef> {
112        match &self.subject {
113            RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
114                session_id: self.scope.session_id.clone(),
115                turn_id: self.scope.turn_id.clone(),
116                effect_id: effect_id.clone(),
117            }),
118            RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
119                process_id: process_id.clone(),
120            }),
121            RuntimeSubject::ProcessEvent {
122                process_id,
123                sequence,
124                ..
125            } => Some(CausalRef::ProcessEvent {
126                process_id: process_id.clone(),
127                sequence: *sequence,
128            }),
129            RuntimeSubject::TriggerOccurrence { occurrence_id } => {
130                Some(CausalRef::TriggerOccurrence {
131                    occurrence_id: occurrence_id.clone(),
132                })
133            }
134            RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
135                session_id: self.scope.session_id.clone(),
136                node_id: node_id.clone(),
137            }),
138        }
139    }
140}
141
142#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
143pub struct RuntimeScope {
144    pub session_id: String,
145    #[serde(default, skip_serializing_if = "Option::is_none")]
146    pub turn_id: Option<String>,
147    #[serde(default, skip_serializing_if = "Option::is_none")]
148    pub turn_index: Option<usize>,
149    #[serde(default, skip_serializing_if = "Option::is_none")]
150    pub protocol_iteration: Option<usize>,
151}
152
153impl RuntimeScope {
154    pub fn new(session_id: impl Into<String>) -> Self {
155        Self {
156            session_id: session_id.into(),
157            turn_id: None,
158            turn_index: None,
159            protocol_iteration: None,
160        }
161    }
162
163    pub fn for_turn(
164        session_id: impl Into<String>,
165        turn_id: impl Into<String>,
166        turn_index: usize,
167        protocol_iteration: usize,
168    ) -> Self {
169        Self {
170            session_id: session_id.into(),
171            turn_id: Some(turn_id.into()),
172            turn_index: Some(turn_index),
173            protocol_iteration: Some(protocol_iteration),
174        }
175    }
176}
177
178#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
179pub struct RuntimeReplay {
180    pub key: String,
181}
182
183#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(tag = "type", rename_all = "snake_case")]
185pub enum RuntimeSubject {
186    Effect {
187        effect_id: String,
188        kind: RuntimeEffectKind,
189    },
190    Process {
191        process_id: String,
192    },
193    ProcessEvent {
194        process_id: String,
195        sequence: u64,
196        event_type: String,
197    },
198    TriggerOccurrence {
199        occurrence_id: String,
200    },
201    SessionNode {
202        node_id: String,
203    },
204}
205
206/// Fully serializable envelope emitted at Lash's nondeterministic boundary.
207#[derive(Clone, Debug, Serialize, Deserialize)]
208pub struct RuntimeEffectEnvelope {
209    pub invocation: RuntimeInvocation,
210    pub command: RuntimeEffectCommand,
211}
212
213impl RuntimeEffectEnvelope {
214    pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
215        Self::try_new(invocation, command).expect("valid runtime effect invocation")
216    }
217
218    pub fn try_new(
219        invocation: RuntimeInvocation,
220        command: RuntimeEffectCommand,
221    ) -> Result<Self, RuntimeEffectControllerError> {
222        validate_effect_invocation(&invocation, command.kind())?;
223        validate_effect_command(&command)?;
224        Ok(Self {
225            invocation,
226            command,
227        })
228    }
229
230    pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
231        crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
232            RuntimeEffectControllerError::new(
233                "runtime_effect_envelope_hash",
234                format!("failed to serialize runtime effect envelope: {err}"),
235            )
236        })
237    }
238}
239
240fn validate_effect_invocation(
241    invocation: &RuntimeInvocation,
242    command_kind: RuntimeEffectKind,
243) -> Result<(), RuntimeEffectControllerError> {
244    let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
245        return Err(RuntimeEffectControllerError::new(
246            "runtime_effect_invocation_subject",
247            "runtime effect envelope subject must be an effect",
248        ));
249    };
250    if effect_id.trim().is_empty() {
251        return Err(RuntimeEffectControllerError::new(
252            "runtime_effect_invocation_subject",
253            "runtime effect envelope effect id must be non-empty",
254        ));
255    }
256    if *kind != command_kind {
257        return Err(RuntimeEffectControllerError::new(
258            "runtime_effect_invocation_kind",
259            format!(
260                "runtime effect invocation kind {} does not match command kind {}",
261                kind.as_str(),
262                command_kind.as_str()
263            ),
264        ));
265    }
266    if invocation
267        .replay
268        .as_ref()
269        .is_none_or(|replay| replay.key.is_empty())
270    {
271        return Err(RuntimeEffectControllerError::new(
272            "runtime_effect_replay_required",
273            "runtime effect envelope requires replay.key",
274        ));
275    }
276    Ok(())
277}
278
279fn validate_effect_command(
280    command: &RuntimeEffectCommand,
281) -> Result<(), RuntimeEffectControllerError> {
282    if let RuntimeEffectCommand::DurableStep { step_id, .. } = command
283        && step_id.trim().is_empty()
284    {
285        return Err(RuntimeEffectControllerError::new(
286            "runtime_effect_durable_step_id",
287            "runtime effect durable step id must be non-empty",
288        ));
289    }
290    if let RuntimeEffectCommand::ToolAttempt {
291        call,
292        attempt,
293        max_attempts,
294    } = command
295    {
296        if call.call_id.trim().is_empty() {
297            return Err(RuntimeEffectControllerError::new(
298                "runtime_effect_tool_attempt_call_id",
299                "runtime effect tool attempt requires a non-empty call id",
300            ));
301        }
302        if *attempt == 0 || *max_attempts == 0 || *attempt > *max_attempts {
303            return Err(RuntimeEffectControllerError::new(
304                "runtime_effect_tool_attempt_index",
305                format!(
306                    "runtime effect tool attempt must satisfy 1 <= attempt <= max_attempts, got {attempt}/{max_attempts}"
307                ),
308            ));
309        }
310    }
311    if let RuntimeEffectCommand::ToolBatch { batch } = command {
312        if batch.batch_id.trim().is_empty() {
313            return Err(RuntimeEffectControllerError::new(
314                "runtime_effect_tool_batch_id",
315                "runtime effect tool batch id must be non-empty",
316            ));
317        }
318        if batch.calls.is_empty() {
319            return Err(RuntimeEffectControllerError::new(
320                "runtime_effect_tool_batch_empty",
321                "runtime effect tool batch must contain at least one prepared call",
322            ));
323        }
324        for (index, call) in batch.calls.iter().enumerate() {
325            if call.call.call_id.trim().is_empty() {
326                return Err(RuntimeEffectControllerError::new(
327                    "runtime_effect_tool_batch_call_id",
328                    format!("runtime effect tool batch call {index} has an empty call id"),
329                ));
330            }
331            if call.replay_suffix.trim().is_empty() {
332                return Err(RuntimeEffectControllerError::new(
333                    "runtime_effect_tool_batch_call_replay",
334                    format!("runtime effect tool batch call {index} has an empty replay suffix"),
335                ));
336            }
337        }
338    }
339    Ok(())
340}
341
342/// Serializable command emitted at Lash's nondeterministic runtime boundary.
343#[derive(Clone, Debug, Serialize, Deserialize)]
344#[serde(tag = "type", rename_all = "snake_case")]
345pub enum RuntimeEffectCommand {
346    LlmCall {
347        request: Box<LlmRequestSpec>,
348    },
349    Direct {
350        request: Box<LlmRequestSpec>,
351        usage_source: String,
352    },
353    ToolAttempt {
354        call: crate::PreparedToolCall,
355        attempt: u32,
356        max_attempts: u32,
357    },
358    ToolBatch {
359        batch: crate::PreparedToolBatch,
360    },
361    Process {
362        command: Box<ProcessCommand>,
363    },
364    ExecCode {
365        language: String,
366        code: String,
367    },
368    Checkpoint {
369        checkpoint: CheckpointKind,
370    },
371    SyncExecutionEnvironment {
372        update_machine_config: bool,
373    },
374    Sleep {
375        duration_ms: u64,
376    },
377    AwaitEvent {
378        key: crate::AwaitEventKey,
379    },
380    DurableStep {
381        step_id: String,
382        input: serde_json::Value,
383    },
384}
385
386impl RuntimeEffectCommand {
387    pub fn process(command: ProcessCommand) -> Self {
388        Self::Process {
389            command: Box::new(command),
390        }
391    }
392
393    pub fn kind(&self) -> RuntimeEffectKind {
394        match self {
395            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
396            Self::Direct { .. } => RuntimeEffectKind::Direct,
397            Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
398            Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
399            Self::Process { .. } => RuntimeEffectKind::Process,
400            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
401            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
402            Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
403            Self::Sleep { .. } => RuntimeEffectKind::Sleep,
404            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
405            Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
406        }
407    }
408}
409
410/// Serializable operation against the process admin plane.
411#[derive(Clone, Debug, Serialize, Deserialize)]
412#[serde(tag = "op", rename_all = "snake_case")]
413#[allow(clippy::large_enum_variant)]
414pub enum ProcessCommand {
415    Start {
416        registration: ProcessRegistration,
417        #[serde(default, skip_serializing_if = "Option::is_none")]
418        grant: Option<ProcessStartGrant>,
419        #[serde(
420            default,
421            skip_serializing_if = "boxed_process_execution_context_is_empty"
422        )]
423        execution_context: Box<ProcessExecutionContext>,
424    },
425    List {
426        session_scope: SessionScope,
427        #[serde(default)]
428        mode: ProcessListMode,
429    },
430    Transfer {
431        from_scope: SessionScope,
432        to_scope: SessionScope,
433        process_ids: Vec<String>,
434    },
435    DeleteSession {
436        session_id: String,
437    },
438    Await {
439        process_id: String,
440    },
441    Cancel {
442        process_id: String,
443        reason: Option<String>,
444    },
445    Signal {
446        process_id: String,
447        signal_name: String,
448        signal_id: String,
449        request: crate::ProcessEventAppendRequest,
450    },
451}
452
453fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
454    context.is_empty()
455}
456
457type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
458
459impl ProcessCommand {
460    pub fn effect_id(&self) -> String {
461        match self {
462            Self::Start { registration, .. } => format!("process:start:{}", registration.id),
463            Self::List {
464                session_scope,
465                mode,
466            } => {
467                format!("process:list:{}:{}", session_scope.id(), mode.as_str())
468            }
469            Self::Transfer {
470                from_scope,
471                to_scope,
472                process_ids,
473            } => {
474                let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
475                    .unwrap_or_else(|_| "unhashable".to_string());
476                format!(
477                    "process:transfer:{}:{}:{digest}",
478                    from_scope.id(),
479                    to_scope.id()
480                )
481            }
482            Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
483            Self::Await { process_id } => format!("process:await:{process_id}"),
484            Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
485            Self::Signal {
486                process_id,
487                signal_name,
488                signal_id,
489                ..
490            } => {
491                format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
492            }
493        }
494    }
495}
496
497/// Serializable result of a process operation.
498#[derive(Clone, Debug, Serialize, Deserialize)]
499#[serde(tag = "op", rename_all = "snake_case")]
500pub enum ProcessEffectOutcome {
501    Start {
502        record: ProcessRecord,
503    },
504    List {
505        entries: Vec<ProcessHandleGrantEntry>,
506    },
507    Transfer,
508    DeleteSession {
509        report: crate::ProcessSessionDeleteReport,
510    },
511    Await {
512        output: ProcessAwaitOutput,
513    },
514    Cancel {
515        record: ProcessRecord,
516    },
517    Signal {
518        event: crate::ProcessEvent,
519    },
520}
521
522#[derive(Clone, Debug, Serialize, Deserialize)]
523pub struct ToolAttemptEffectOutcome {
524    pub launch: ToolAttemptLaunch,
525    #[serde(default, skip_serializing_if = "Vec::is_empty")]
526    pub triggers: Vec<ToolTriggerEffectOutcome>,
527}
528
529#[derive(Clone, Debug, Serialize, Deserialize)]
530pub struct ToolBatchEffectOutcome {
531    pub launches: Vec<ToolCallLaunch>,
532    #[serde(default, skip_serializing_if = "Vec::is_empty")]
533    pub triggers: Vec<ToolTriggerEffectOutcome>,
534}
535
536#[derive(Clone, Debug, Serialize, Deserialize)]
537#[serde(tag = "status", rename_all = "snake_case")]
538#[allow(clippy::large_enum_variant)]
539pub enum ToolCallLaunch {
540    Done {
541        result: CompletedToolCall,
542    },
543    Pending {
544        key: crate::AwaitEventKey,
545        pending: crate::PendingCompletion,
546        duration_ms: u64,
547    },
548}
549
550#[derive(Clone, Debug, Serialize, Deserialize)]
551#[serde(tag = "status", rename_all = "snake_case")]
552pub enum ToolAttemptLaunch {
553    Done {
554        record: crate::ToolCallRecord,
555    },
556    Pending {
557        key: crate::AwaitEventKey,
558        pending: crate::PendingCompletion,
559        duration_ms: u64,
560    },
561}
562
563/// Serializable result of a runtime effect command.
564#[derive(Clone, Debug, Serialize, Deserialize)]
565#[serde(tag = "type", rename_all = "snake_case")]
566#[allow(clippy::large_enum_variant)]
567pub enum RuntimeEffectOutcome {
568    LlmCall {
569        result: Result<LlmResponse, LlmCallError>,
570        text_streamed: bool,
571    },
572    Direct {
573        result: Result<LlmResponse, LlmCallError>,
574    },
575    ToolAttempt {
576        launch: ToolAttemptLaunch,
577        #[serde(default, skip_serializing_if = "Vec::is_empty")]
578        triggers: Vec<ToolTriggerEffectOutcome>,
579    },
580    ToolBatch {
581        launches: Vec<ToolCallLaunch>,
582        #[serde(default, skip_serializing_if = "Vec::is_empty")]
583        triggers: Vec<ToolTriggerEffectOutcome>,
584    },
585    Process {
586        result: ProcessEffectOutcome,
587    },
588    ExecCode {
589        result: Result<ExecResponse, String>,
590    },
591    Checkpoint {
592        result: CheckpointOutcome,
593    },
594    SyncExecutionEnvironment {
595        result: Result<Option<ExecutionEnvironmentSync>, String>,
596    },
597    Sleep,
598    AwaitEvent {
599        resolution: crate::Resolution,
600    },
601    DurableStep {
602        value: serde_json::Value,
603    },
604}
605
606// =============================================================================
607// Request specs (serializable forms of LLM/Direct requests)
608// =============================================================================
609
610/// Serializable attachment data for runtime effect envelopes.
611///
612/// Effect envelopes carry attachment references only. Local executors resolve
613/// bytes from the configured attachment store when a provider request is
614/// actually executed.
615#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
616pub struct LlmAttachmentSpec {
617    pub reference: AttachmentRef,
618}
619
620impl LlmAttachmentSpec {
621    fn into_attachment(self) -> LlmAttachment {
622        LlmAttachment::reference(self.reference)
623    }
624}
625
626/// Serializable LLM request data. Live stream and provider-trace callbacks are
627/// attached by the local executor, and attachment bytes are resolved locally
628/// from refs rather than persisted in the effect envelope.
629#[derive(Clone, Debug, Serialize, Deserialize)]
630pub struct LlmRequestSpec {
631    pub model: String,
632    pub messages: Vec<LlmMessage>,
633    pub attachments: Vec<LlmAttachmentSpec>,
634    pub tools: Arc<Vec<LlmToolSpec>>,
635    pub tool_choice: LlmToolChoice,
636    pub model_variant: Option<String>,
637    #[serde(default)]
638    pub generation: crate::GenerationOptions,
639    pub session_id: Option<String>,
640    pub output_spec: Option<LlmOutputSpec>,
641}
642
643impl LlmRequestSpec {
644    pub async fn from_request(
645        request: &CoreLlmRequest,
646        attachment_store: &dyn AttachmentStore,
647    ) -> Result<Self, RuntimeEffectControllerError> {
648        Ok(Self {
649            model: request.model.clone(),
650            messages: request.messages.clone(),
651            attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
652                .await?,
653            tools: Arc::clone(&request.tools),
654            tool_choice: request.tool_choice.clone(),
655            model_variant: request.model_variant.clone(),
656            generation: request.generation.clone(),
657            session_id: request.session_id.clone(),
658            output_spec: request.output_spec.clone(),
659        })
660    }
661
662    pub fn into_request(
663        self,
664        stream_events: Option<LlmEventSender>,
665        provider_trace: Option<LlmProviderTraceSender>,
666    ) -> CoreLlmRequest {
667        CoreLlmRequest {
668            model: self.model,
669            messages: self.messages,
670            attachments: self
671                .attachments
672                .into_iter()
673                .map(LlmAttachmentSpec::into_attachment)
674                .collect(),
675            tools: self.tools,
676            tool_choice: self.tool_choice,
677            model_variant: self.model_variant,
678            generation: self.generation,
679            session_id: self.session_id,
680            output_spec: self.output_spec,
681            stream_events,
682            provider_trace,
683        }
684    }
685}
686
687async fn attachment_specs_from_attachments(
688    attachments: &[LlmAttachment],
689    attachment_store: &dyn AttachmentStore,
690) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
691    let mut specs = Vec::with_capacity(attachments.len());
692    for attachment in attachments {
693        specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
694    }
695    Ok(specs)
696}
697
698async fn attachment_spec_from_attachment(
699    attachment: &LlmAttachment,
700    attachment_store: &dyn AttachmentStore,
701) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
702    if let Some(reference) = attachment.reference.as_ref() {
703        return Ok(LlmAttachmentSpec {
704            reference: reference.clone(),
705        });
706    }
707    if attachment.data.is_empty() {
708        return Err(RuntimeEffectControllerError::new(
709            "runtime_effect_attachment_missing_reference",
710            "runtime effect attachment has neither a durable reference nor inline bytes",
711        ));
712    }
713    let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
714        RuntimeEffectControllerError::new(
715            "runtime_effect_attachment_media_type",
716            format!(
717                "attachment media type `{}` cannot be represented durably",
718                attachment.mime
719            ),
720        )
721    })?;
722    let reference = attachment_store
723        .put(
724            attachment.data.clone(),
725            AttachmentCreateMeta::new(media_type, None, None, None),
726        )
727        .await
728        .map_err(|err| {
729            RuntimeEffectControllerError::new(
730                "runtime_effect_attachment_store",
731                format!("failed to store attachment before runtime effect invocation: {err}"),
732            )
733        })?;
734    Ok(LlmAttachmentSpec { reference })
735}
736
737impl RuntimeEffectOutcome {
738    pub fn into_llm_call(
739        self,
740    ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
741        match self {
742            Self::LlmCall {
743                result,
744                text_streamed,
745            } => Ok((result, text_streamed)),
746            other => Err(RuntimeEffectControllerError::wrong_outcome(
747                RuntimeEffectKind::LlmCall,
748                other.kind(),
749            )),
750        }
751    }
752
753    pub fn into_direct_response(
754        self,
755    ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
756        match self {
757            Self::Direct { result } => Ok(result),
758            other => Err(RuntimeEffectControllerError::wrong_outcome(
759                RuntimeEffectKind::Direct,
760                other.kind(),
761            )),
762        }
763    }
764
765    pub fn into_tool_attempt_effect(
766        self,
767    ) -> Result<ToolAttemptEffectOutcome, RuntimeEffectControllerError> {
768        match self {
769            Self::ToolAttempt { launch, triggers } => {
770                Ok(ToolAttemptEffectOutcome { launch, triggers })
771            }
772            other => Err(RuntimeEffectControllerError::wrong_outcome(
773                RuntimeEffectKind::ToolAttempt,
774                other.kind(),
775            )),
776        }
777    }
778
779    pub fn into_tool_batch_effect(
780        self,
781    ) -> Result<ToolBatchEffectOutcome, RuntimeEffectControllerError> {
782        match self {
783            Self::ToolBatch { launches, triggers } => {
784                Ok(ToolBatchEffectOutcome { launches, triggers })
785            }
786            other => Err(RuntimeEffectControllerError::wrong_outcome(
787                RuntimeEffectKind::ToolBatch,
788                other.kind(),
789            )),
790        }
791    }
792
793    pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
794        match self {
795            Self::Process { result } => Ok(result),
796            other => Err(RuntimeEffectControllerError::wrong_outcome(
797                RuntimeEffectKind::Process,
798                other.kind(),
799            )),
800        }
801    }
802
803    pub fn into_exec_code(
804        self,
805    ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
806        match self {
807            Self::ExecCode { result } => Ok(result),
808            other => Err(RuntimeEffectControllerError::wrong_outcome(
809                RuntimeEffectKind::ExecCode,
810                other.kind(),
811            )),
812        }
813    }
814
815    pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
816        match self {
817            Self::Checkpoint { result } => Ok(result),
818            other => Err(RuntimeEffectControllerError::wrong_outcome(
819                RuntimeEffectKind::Checkpoint,
820                other.kind(),
821            )),
822        }
823    }
824
825    pub fn into_sync_execution_environment(
826        self,
827    ) -> Result<Result<Option<ExecutionEnvironmentSync>, String>, RuntimeEffectControllerError>
828    {
829        match self {
830            Self::SyncExecutionEnvironment { result } => Ok(result),
831            other => Err(RuntimeEffectControllerError::wrong_outcome(
832                RuntimeEffectKind::SyncExecutionEnvironment,
833                other.kind(),
834            )),
835        }
836    }
837
838    pub fn into_await_event(self) -> Result<crate::Resolution, RuntimeEffectControllerError> {
839        match self {
840            Self::AwaitEvent { resolution } => Ok(resolution),
841            other => Err(RuntimeEffectControllerError::wrong_outcome(
842                RuntimeEffectKind::AwaitEvent,
843                other.kind(),
844            )),
845        }
846    }
847
848    pub fn into_durable_step(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
849        match self {
850            Self::DurableStep { value } => Ok(value),
851            other => Err(RuntimeEffectControllerError::wrong_outcome(
852                RuntimeEffectKind::DurableStep,
853                other.kind(),
854            )),
855        }
856    }
857
858    pub fn kind(&self) -> RuntimeEffectKind {
859        match self {
860            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
861            Self::Direct { .. } => RuntimeEffectKind::Direct,
862            Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
863            Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
864            Self::Process { .. } => RuntimeEffectKind::Process,
865            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
866            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
867            Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
868            Self::Sleep => RuntimeEffectKind::Sleep,
869            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
870            Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
871        }
872    }
873}