Skip to main content

lash_core/runtime/effect/
envelope.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7    LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8    LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionEnvironmentSync, LlmCallError};
12use crate::tool_dispatch::ToolTriggerEffectOutcome;
13use crate::{
14    AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15    ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16    ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17    ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22/// Durable category for a runtime effect.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26    LlmCall,
27    Direct,
28    ToolAttempt,
29    ToolBatch,
30    Process,
31    ExecCode,
32    Checkpoint,
33    SyncExecutionEnvironment,
34    Sleep,
35    AwaitEvent,
36    DurableStep,
37}
38
39impl RuntimeEffectKind {
40    pub fn as_str(self) -> &'static str {
41        match self {
42            Self::LlmCall => "llm_call",
43            Self::Direct => "direct",
44            Self::ToolAttempt => "tool_attempt",
45            Self::ToolBatch => "tool_batch",
46            Self::Process => "process",
47            Self::ExecCode => "exec_code",
48            Self::Checkpoint => "checkpoint",
49            Self::SyncExecutionEnvironment => "sync_execution_environment",
50            Self::Sleep => "sleep",
51            Self::AwaitEvent => "await_event",
52            Self::DurableStep => "durable_step",
53        }
54    }
55}
56
57/// Canonical lineage for a runtime-side invocation.
58#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
59pub struct RuntimeInvocation {
60    pub scope: RuntimeScope,
61    pub subject: RuntimeSubject,
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub caused_by: Option<CausalRef>,
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub replay: Option<RuntimeReplay>,
66}
67
68impl RuntimeInvocation {
69    pub fn effect(
70        scope: RuntimeScope,
71        effect_id: impl Into<String>,
72        kind: RuntimeEffectKind,
73        replay_key: impl Into<String>,
74    ) -> Self {
75        Self {
76            scope,
77            subject: RuntimeSubject::Effect {
78                effect_id: effect_id.into(),
79                kind,
80            },
81            caused_by: None,
82            replay: Some(RuntimeReplay {
83                key: replay_key.into(),
84            }),
85        }
86    }
87
88    pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
89        self.caused_by = caused_by;
90        self
91    }
92
93    pub fn effect_id(&self) -> Option<&str> {
94        match &self.subject {
95            RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
96            _ => None,
97        }
98    }
99
100    pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
101        match &self.subject {
102            RuntimeSubject::Effect { kind, .. } => Some(*kind),
103            _ => None,
104        }
105    }
106
107    pub fn replay_key(&self) -> Option<&str> {
108        self.replay.as_ref().map(|replay| replay.key.as_str())
109    }
110
111    pub fn causal_ref(&self) -> Option<CausalRef> {
112        match &self.subject {
113            RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
114                session_id: self.scope.session_id.clone(),
115                turn_id: self.scope.turn_id.clone(),
116                effect_id: effect_id.clone(),
117            }),
118            RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
119                process_id: process_id.clone(),
120            }),
121            RuntimeSubject::ProcessEvent {
122                process_id,
123                sequence,
124                ..
125            } => Some(CausalRef::ProcessEvent {
126                process_id: process_id.clone(),
127                sequence: *sequence,
128            }),
129            RuntimeSubject::TriggerOccurrence { occurrence_id } => {
130                Some(CausalRef::TriggerOccurrence {
131                    occurrence_id: occurrence_id.clone(),
132                })
133            }
134            RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
135                session_id: self.scope.session_id.clone(),
136                node_id: node_id.clone(),
137            }),
138        }
139    }
140}
141
142#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
143pub struct RuntimeScope {
144    pub session_id: String,
145    #[serde(default, skip_serializing_if = "Option::is_none")]
146    pub turn_id: Option<String>,
147    #[serde(default, skip_serializing_if = "Option::is_none")]
148    pub turn_index: Option<usize>,
149    #[serde(default, skip_serializing_if = "Option::is_none")]
150    pub protocol_iteration: Option<usize>,
151}
152
153impl RuntimeScope {
154    pub fn new(session_id: impl Into<String>) -> Self {
155        Self {
156            session_id: session_id.into(),
157            turn_id: None,
158            turn_index: None,
159            protocol_iteration: None,
160        }
161    }
162
163    pub fn for_turn(
164        session_id: impl Into<String>,
165        turn_id: impl Into<String>,
166        turn_index: usize,
167        protocol_iteration: usize,
168    ) -> Self {
169        Self {
170            session_id: session_id.into(),
171            turn_id: Some(turn_id.into()),
172            turn_index: Some(turn_index),
173            protocol_iteration: Some(protocol_iteration),
174        }
175    }
176}
177
178#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
179pub struct RuntimeReplay {
180    pub key: String,
181}
182
183#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(tag = "type", rename_all = "snake_case")]
185pub enum RuntimeSubject {
186    Effect {
187        effect_id: String,
188        kind: RuntimeEffectKind,
189    },
190    Process {
191        process_id: String,
192    },
193    ProcessEvent {
194        process_id: String,
195        sequence: u64,
196        event_type: String,
197    },
198    TriggerOccurrence {
199        occurrence_id: String,
200    },
201    SessionNode {
202        node_id: String,
203    },
204}
205
206/// Fully serializable envelope emitted at Lash's nondeterministic boundary.
207#[derive(Clone, Debug, Serialize, Deserialize)]
208pub struct RuntimeEffectEnvelope {
209    pub invocation: RuntimeInvocation,
210    pub command: RuntimeEffectCommand,
211}
212
213impl RuntimeEffectEnvelope {
214    pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
215        Self::try_new(invocation, command).expect("valid runtime effect invocation")
216    }
217
218    pub fn try_new(
219        invocation: RuntimeInvocation,
220        command: RuntimeEffectCommand,
221    ) -> Result<Self, RuntimeEffectControllerError> {
222        validate_effect_invocation(&invocation, command.kind())?;
223        validate_effect_command(&command)?;
224        Ok(Self {
225            invocation,
226            command,
227        })
228    }
229
230    pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
231        crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
232            RuntimeEffectControllerError::new(
233                "runtime_effect_envelope_hash",
234                format!("failed to serialize runtime effect envelope: {err}"),
235            )
236        })
237    }
238}
239
240fn validate_effect_invocation(
241    invocation: &RuntimeInvocation,
242    command_kind: RuntimeEffectKind,
243) -> Result<(), RuntimeEffectControllerError> {
244    let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
245        return Err(RuntimeEffectControllerError::new(
246            "runtime_effect_invocation_subject",
247            "runtime effect envelope subject must be an effect",
248        ));
249    };
250    if effect_id.trim().is_empty() {
251        return Err(RuntimeEffectControllerError::new(
252            "runtime_effect_invocation_subject",
253            "runtime effect envelope effect id must be non-empty",
254        ));
255    }
256    if *kind != command_kind {
257        return Err(RuntimeEffectControllerError::new(
258            "runtime_effect_invocation_kind",
259            format!(
260                "runtime effect invocation kind {} does not match command kind {}",
261                kind.as_str(),
262                command_kind.as_str()
263            ),
264        ));
265    }
266    if invocation
267        .replay
268        .as_ref()
269        .is_none_or(|replay| replay.key.is_empty())
270    {
271        return Err(RuntimeEffectControllerError::new(
272            "runtime_effect_replay_required",
273            "runtime effect envelope requires replay.key",
274        ));
275    }
276    Ok(())
277}
278
279fn validate_effect_command(
280    command: &RuntimeEffectCommand,
281) -> Result<(), RuntimeEffectControllerError> {
282    if let RuntimeEffectCommand::DurableStep { step_id, .. } = command
283        && step_id.trim().is_empty()
284    {
285        return Err(RuntimeEffectControllerError::new(
286            "runtime_effect_durable_step_id",
287            "runtime effect durable step id must be non-empty",
288        ));
289    }
290    if let RuntimeEffectCommand::ToolAttempt {
291        call,
292        execution_grant: _,
293        attempt,
294        max_attempts,
295    } = command
296    {
297        if call.call_id.trim().is_empty() {
298            return Err(RuntimeEffectControllerError::new(
299                "runtime_effect_tool_attempt_call_id",
300                "runtime effect tool attempt requires a non-empty call id",
301            ));
302        }
303        if *attempt == 0 || *max_attempts == 0 || *attempt > *max_attempts {
304            return Err(RuntimeEffectControllerError::new(
305                "runtime_effect_tool_attempt_index",
306                format!(
307                    "runtime effect tool attempt must satisfy 1 <= attempt <= max_attempts, got {attempt}/{max_attempts}"
308                ),
309            ));
310        }
311    }
312    if let RuntimeEffectCommand::ToolBatch { batch } = command {
313        if batch.batch_id.trim().is_empty() {
314            return Err(RuntimeEffectControllerError::new(
315                "runtime_effect_tool_batch_id",
316                "runtime effect tool batch id must be non-empty",
317            ));
318        }
319        if batch.calls.is_empty() {
320            return Err(RuntimeEffectControllerError::new(
321                "runtime_effect_tool_batch_empty",
322                "runtime effect tool batch must contain at least one prepared call",
323            ));
324        }
325        for (index, call) in batch.calls.iter().enumerate() {
326            if call.call.call_id.trim().is_empty() {
327                return Err(RuntimeEffectControllerError::new(
328                    "runtime_effect_tool_batch_call_id",
329                    format!("runtime effect tool batch call {index} has an empty call id"),
330                ));
331            }
332            if call.replay_suffix.trim().is_empty() {
333                return Err(RuntimeEffectControllerError::new(
334                    "runtime_effect_tool_batch_call_replay",
335                    format!("runtime effect tool batch call {index} has an empty replay suffix"),
336                ));
337            }
338        }
339    }
340    Ok(())
341}
342
343/// Serializable command emitted at Lash's nondeterministic runtime boundary.
344#[derive(Clone, Debug, Serialize, Deserialize)]
345#[serde(tag = "type", rename_all = "snake_case")]
346pub enum RuntimeEffectCommand {
347    LlmCall {
348        request: Box<LlmRequestSpec>,
349    },
350    Direct {
351        request: Box<LlmRequestSpec>,
352        usage_source: String,
353    },
354    ToolAttempt {
355        call: crate::PreparedToolCall,
356        #[serde(default, skip_serializing_if = "Option::is_none")]
357        execution_grant: Option<Box<crate::ToolExecutionGrant>>,
358        attempt: u32,
359        max_attempts: u32,
360    },
361    ToolBatch {
362        batch: crate::PreparedToolBatch,
363    },
364    Process {
365        command: Box<ProcessCommand>,
366    },
367    ExecCode {
368        language: String,
369        code: String,
370    },
371    Checkpoint {
372        checkpoint: CheckpointKind,
373    },
374    SyncExecutionEnvironment {
375        update_machine_config: bool,
376    },
377    Sleep {
378        duration_ms: u64,
379    },
380    AwaitEvent {
381        key: crate::AwaitEventKey,
382    },
383    DurableStep {
384        step_id: String,
385        input: serde_json::Value,
386    },
387}
388
389impl RuntimeEffectCommand {
390    pub fn process(command: ProcessCommand) -> Self {
391        Self::Process {
392            command: Box::new(command),
393        }
394    }
395
396    pub fn kind(&self) -> RuntimeEffectKind {
397        match self {
398            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
399            Self::Direct { .. } => RuntimeEffectKind::Direct,
400            Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
401            Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
402            Self::Process { .. } => RuntimeEffectKind::Process,
403            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
404            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
405            Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
406            Self::Sleep { .. } => RuntimeEffectKind::Sleep,
407            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
408            Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
409        }
410    }
411}
412
413/// Serializable operation against the process admin plane.
414#[derive(Clone, Debug, Serialize, Deserialize)]
415#[serde(tag = "op", rename_all = "snake_case")]
416#[allow(clippy::large_enum_variant)]
417pub enum ProcessCommand {
418    Start {
419        registration: ProcessRegistration,
420        #[serde(default, skip_serializing_if = "Option::is_none")]
421        grant: Option<ProcessStartGrant>,
422        #[serde(
423            default,
424            skip_serializing_if = "boxed_process_execution_context_is_empty"
425        )]
426        execution_context: Box<ProcessExecutionContext>,
427    },
428    List {
429        session_scope: SessionScope,
430        #[serde(default)]
431        mode: ProcessListMode,
432    },
433    Transfer {
434        from_scope: SessionScope,
435        to_scope: SessionScope,
436        process_ids: Vec<String>,
437    },
438    DeleteSession {
439        session_id: String,
440    },
441    Await {
442        process_id: String,
443    },
444    Cancel {
445        process_id: String,
446        reason: Option<String>,
447    },
448    Signal {
449        process_id: String,
450        signal_name: String,
451        signal_id: String,
452        request: crate::ProcessEventAppendRequest,
453    },
454}
455
456fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
457    context.is_empty()
458}
459
460type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
461
462impl ProcessCommand {
463    pub fn effect_id(&self) -> String {
464        match self {
465            Self::Start { registration, .. } => format!("process:start:{}", registration.id),
466            Self::List {
467                session_scope,
468                mode,
469            } => {
470                format!("process:list:{}:{}", session_scope.id(), mode.as_str())
471            }
472            Self::Transfer {
473                from_scope,
474                to_scope,
475                process_ids,
476            } => {
477                let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
478                    .unwrap_or_else(|_| "unhashable".to_string());
479                format!(
480                    "process:transfer:{}:{}:{digest}",
481                    from_scope.id(),
482                    to_scope.id()
483                )
484            }
485            Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
486            Self::Await { process_id } => format!("process:await:{process_id}"),
487            Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
488            Self::Signal {
489                process_id,
490                signal_name,
491                signal_id,
492                ..
493            } => {
494                format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
495            }
496        }
497    }
498}
499
500/// Serializable result of a process operation.
501#[derive(Clone, Debug, Serialize, Deserialize)]
502#[serde(tag = "op", rename_all = "snake_case")]
503pub enum ProcessEffectOutcome {
504    Start {
505        // Boxed so the fat durable record does not size the whole outcome enum
506        // (and the runtime effect enum wrapping it) inline through the recursive
507        // effect executor.
508        record: Box<ProcessRecord>,
509    },
510    List {
511        entries: Vec<ProcessHandleGrantEntry>,
512    },
513    Transfer,
514    DeleteSession {
515        report: crate::ProcessSessionDeleteReport,
516    },
517    Await {
518        output: ProcessAwaitOutput,
519    },
520    Cancel {
521        record: Box<ProcessRecord>,
522    },
523    Signal {
524        // Boxed for the same reason as the record variants: a fat event should
525        // not size the outcome enum inline through the recursive executor.
526        event: Box<crate::ProcessEvent>,
527    },
528}
529
530#[derive(Clone, Debug, Serialize, Deserialize)]
531pub struct ToolAttemptEffectOutcome {
532    pub launch: ToolAttemptLaunch,
533    #[serde(default, skip_serializing_if = "Vec::is_empty")]
534    pub triggers: Vec<ToolTriggerEffectOutcome>,
535}
536
537#[derive(Clone, Debug, Serialize, Deserialize)]
538pub struct ToolBatchEffectOutcome {
539    pub launches: Vec<ToolCallLaunch>,
540    #[serde(default, skip_serializing_if = "Vec::is_empty")]
541    pub triggers: Vec<ToolTriggerEffectOutcome>,
542}
543
544#[derive(Clone, Debug, Serialize, Deserialize)]
545#[serde(tag = "status", rename_all = "snake_case")]
546#[allow(clippy::large_enum_variant)]
547pub enum ToolCallLaunch {
548    Done {
549        result: CompletedToolCall,
550    },
551    Pending {
552        key: crate::AwaitEventKey,
553        pending: crate::PendingCompletion,
554        duration_ms: u64,
555    },
556}
557
558#[derive(Clone, Debug, Serialize, Deserialize)]
559#[serde(tag = "status", rename_all = "snake_case")]
560pub enum ToolAttemptLaunch {
561    Done {
562        record: crate::ToolCallRecord,
563    },
564    Pending {
565        key: crate::AwaitEventKey,
566        pending: crate::PendingCompletion,
567        duration_ms: u64,
568    },
569}
570
571/// Serializable result of a runtime effect command.
572#[derive(Clone, Debug, Serialize, Deserialize)]
573#[serde(tag = "type", rename_all = "snake_case")]
574#[allow(clippy::large_enum_variant)]
575pub enum RuntimeEffectOutcome {
576    LlmCall {
577        result: Result<LlmResponse, LlmCallError>,
578        text_streamed: bool,
579    },
580    Direct {
581        result: Result<LlmResponse, LlmCallError>,
582    },
583    ToolAttempt {
584        launch: ToolAttemptLaunch,
585        #[serde(default, skip_serializing_if = "Vec::is_empty")]
586        triggers: Vec<ToolTriggerEffectOutcome>,
587    },
588    ToolBatch {
589        launches: Vec<ToolCallLaunch>,
590        #[serde(default, skip_serializing_if = "Vec::is_empty")]
591        triggers: Vec<ToolTriggerEffectOutcome>,
592    },
593    Process {
594        result: ProcessEffectOutcome,
595    },
596    ExecCode {
597        result: Result<ExecResponse, String>,
598    },
599    Checkpoint {
600        result: CheckpointOutcome,
601    },
602    SyncExecutionEnvironment {
603        result: Result<Option<ExecutionEnvironmentSync>, String>,
604    },
605    Sleep,
606    AwaitEvent {
607        resolution: crate::Resolution,
608    },
609    DurableStep {
610        value: serde_json::Value,
611    },
612}
613
614// =============================================================================
615// Request specs (serializable forms of LLM/Direct requests)
616// =============================================================================
617
618/// Serializable attachment data for runtime effect envelopes.
619///
620/// Effect envelopes carry attachment references only. Local executors resolve
621/// bytes from the configured attachment store when a provider request is
622/// actually executed.
623#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
624pub struct LlmAttachmentSpec {
625    pub reference: AttachmentRef,
626}
627
628impl LlmAttachmentSpec {
629    fn into_attachment(self) -> LlmAttachment {
630        LlmAttachment::reference(self.reference)
631    }
632}
633
634/// Serializable LLM request data. Live stream and provider-trace callbacks are
635/// attached by the local executor, and attachment bytes are resolved locally
636/// from refs rather than persisted in the effect envelope.
637#[derive(Clone, Debug, Serialize, Deserialize)]
638pub struct LlmRequestSpec {
639    pub model: String,
640    pub messages: Vec<LlmMessage>,
641    pub attachments: Vec<LlmAttachmentSpec>,
642    pub tools: Arc<Vec<LlmToolSpec>>,
643    pub tool_choice: LlmToolChoice,
644    pub model_variant: Option<String>,
645    #[serde(default)]
646    pub generation: crate::GenerationOptions,
647    pub scope: crate::LlmRequestScope,
648    pub output_spec: Option<LlmOutputSpec>,
649}
650
651impl LlmRequestSpec {
652    pub async fn from_request(
653        request: &CoreLlmRequest,
654        attachment_store: &dyn AttachmentStore,
655    ) -> Result<Self, RuntimeEffectControllerError> {
656        Ok(Self {
657            model: request.model.clone(),
658            messages: request.messages.clone(),
659            attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
660                .await?,
661            tools: Arc::clone(&request.tools),
662            tool_choice: request.tool_choice.clone(),
663            model_variant: request.model_variant.clone(),
664            generation: request.generation.clone(),
665            scope: request.scope.clone(),
666            output_spec: request.output_spec.clone(),
667        })
668    }
669
670    pub fn into_request(
671        self,
672        stream_events: Option<LlmEventSender>,
673        provider_trace: Option<LlmProviderTraceSender>,
674    ) -> CoreLlmRequest {
675        CoreLlmRequest {
676            model: self.model,
677            messages: self.messages,
678            attachments: self
679                .attachments
680                .into_iter()
681                .map(LlmAttachmentSpec::into_attachment)
682                .collect(),
683            tools: self.tools,
684            tool_choice: self.tool_choice,
685            model_variant: self.model_variant,
686            generation: self.generation,
687            scope: self.scope,
688            output_spec: self.output_spec,
689            stream_events,
690            provider_trace,
691        }
692    }
693}
694
695async fn attachment_specs_from_attachments(
696    attachments: &[LlmAttachment],
697    attachment_store: &dyn AttachmentStore,
698) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
699    let mut specs = Vec::with_capacity(attachments.len());
700    for attachment in attachments {
701        specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
702    }
703    Ok(specs)
704}
705
706async fn attachment_spec_from_attachment(
707    attachment: &LlmAttachment,
708    attachment_store: &dyn AttachmentStore,
709) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
710    if let Some(reference) = attachment.reference.as_ref() {
711        return Ok(LlmAttachmentSpec {
712            reference: reference.clone(),
713        });
714    }
715    if attachment.data.is_empty() {
716        return Err(RuntimeEffectControllerError::new(
717            "runtime_effect_attachment_missing_reference",
718            "runtime effect attachment has neither a durable reference nor inline bytes",
719        ));
720    }
721    let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
722        RuntimeEffectControllerError::new(
723            "runtime_effect_attachment_media_type",
724            format!(
725                "attachment media type `{}` cannot be represented durably",
726                attachment.mime
727            ),
728        )
729    })?;
730    let reference = attachment_store
731        .put(
732            attachment.data.clone(),
733            AttachmentCreateMeta::new(media_type, None, None, None),
734        )
735        .await
736        .map_err(|err| {
737            RuntimeEffectControllerError::new(
738                "runtime_effect_attachment_store",
739                format!("failed to store attachment before runtime effect invocation: {err}"),
740            )
741        })?;
742    Ok(LlmAttachmentSpec { reference })
743}
744
745impl RuntimeEffectOutcome {
746    pub fn into_llm_call(
747        self,
748    ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
749        match self {
750            Self::LlmCall {
751                result,
752                text_streamed,
753            } => Ok((result, text_streamed)),
754            other => Err(RuntimeEffectControllerError::wrong_outcome(
755                RuntimeEffectKind::LlmCall,
756                other.kind(),
757            )),
758        }
759    }
760
761    pub fn into_direct_response(
762        self,
763    ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
764        match self {
765            Self::Direct { result } => Ok(result),
766            other => Err(RuntimeEffectControllerError::wrong_outcome(
767                RuntimeEffectKind::Direct,
768                other.kind(),
769            )),
770        }
771    }
772
773    pub fn into_tool_attempt_effect(
774        self,
775    ) -> Result<ToolAttemptEffectOutcome, RuntimeEffectControllerError> {
776        match self {
777            Self::ToolAttempt { launch, triggers } => {
778                Ok(ToolAttemptEffectOutcome { launch, triggers })
779            }
780            other => Err(RuntimeEffectControllerError::wrong_outcome(
781                RuntimeEffectKind::ToolAttempt,
782                other.kind(),
783            )),
784        }
785    }
786
787    pub fn into_tool_batch_effect(
788        self,
789    ) -> Result<ToolBatchEffectOutcome, RuntimeEffectControllerError> {
790        match self {
791            Self::ToolBatch { launches, triggers } => {
792                Ok(ToolBatchEffectOutcome { launches, triggers })
793            }
794            other => Err(RuntimeEffectControllerError::wrong_outcome(
795                RuntimeEffectKind::ToolBatch,
796                other.kind(),
797            )),
798        }
799    }
800
801    pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
802        match self {
803            Self::Process { result } => Ok(result),
804            other => Err(RuntimeEffectControllerError::wrong_outcome(
805                RuntimeEffectKind::Process,
806                other.kind(),
807            )),
808        }
809    }
810
811    pub fn into_exec_code(
812        self,
813    ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
814        match self {
815            Self::ExecCode { result } => Ok(result),
816            other => Err(RuntimeEffectControllerError::wrong_outcome(
817                RuntimeEffectKind::ExecCode,
818                other.kind(),
819            )),
820        }
821    }
822
823    pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
824        match self {
825            Self::Checkpoint { result } => Ok(result),
826            other => Err(RuntimeEffectControllerError::wrong_outcome(
827                RuntimeEffectKind::Checkpoint,
828                other.kind(),
829            )),
830        }
831    }
832
833    pub fn into_sync_execution_environment(
834        self,
835    ) -> Result<Result<Option<ExecutionEnvironmentSync>, String>, RuntimeEffectControllerError>
836    {
837        match self {
838            Self::SyncExecutionEnvironment { result } => Ok(result),
839            other => Err(RuntimeEffectControllerError::wrong_outcome(
840                RuntimeEffectKind::SyncExecutionEnvironment,
841                other.kind(),
842            )),
843        }
844    }
845
846    pub fn into_await_event(self) -> Result<crate::Resolution, RuntimeEffectControllerError> {
847        match self {
848            Self::AwaitEvent { resolution } => Ok(resolution),
849            other => Err(RuntimeEffectControllerError::wrong_outcome(
850                RuntimeEffectKind::AwaitEvent,
851                other.kind(),
852            )),
853        }
854    }
855
856    pub fn into_durable_step(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
857        match self {
858            Self::DurableStep { value } => Ok(value),
859            other => Err(RuntimeEffectControllerError::wrong_outcome(
860                RuntimeEffectKind::DurableStep,
861                other.kind(),
862            )),
863        }
864    }
865
866    pub fn kind(&self) -> RuntimeEffectKind {
867        match self {
868            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
869            Self::Direct { .. } => RuntimeEffectKind::Direct,
870            Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
871            Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
872            Self::Process { .. } => RuntimeEffectKind::Process,
873            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
874            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
875            Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
876            Self::Sleep => RuntimeEffectKind::Sleep,
877            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
878            Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
879        }
880    }
881}