Skip to main content

lash_core/runtime/effect/
envelope.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7    LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8    LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionSurfaceSync, LlmCallError};
12use crate::tool_dispatch::ToolHostEventEffectOutcome;
13use crate::{
14    AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15    ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16    ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration, ProcessScope,
17    ProcessStartGrant,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22/// Durable category for a runtime effect.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26    LlmCall,
27    Direct,
28    ToolCall,
29    Process,
30    ExecCode,
31    Checkpoint,
32    SyncExecutionSurface,
33    Sleep,
34}
35
36impl RuntimeEffectKind {
37    pub fn as_str(self) -> &'static str {
38        match self {
39            Self::LlmCall => "llm_call",
40            Self::Direct => "direct",
41            Self::ToolCall => "tool_call",
42            Self::Process => "process",
43            Self::ExecCode => "exec_code",
44            Self::Checkpoint => "checkpoint",
45            Self::SyncExecutionSurface => "sync_execution_surface",
46            Self::Sleep => "sleep",
47        }
48    }
49}
50
51/// Canonical lineage for a runtime-side invocation.
52#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
53pub struct RuntimeInvocation {
54    pub scope: RuntimeScope,
55    pub subject: RuntimeSubject,
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub caused_by: Option<CausalRef>,
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub replay: Option<RuntimeReplay>,
60}
61
62impl RuntimeInvocation {
63    pub fn effect(
64        scope: RuntimeScope,
65        effect_id: impl Into<String>,
66        kind: RuntimeEffectKind,
67        replay_key: impl Into<String>,
68    ) -> Self {
69        Self {
70            scope,
71            subject: RuntimeSubject::Effect {
72                effect_id: effect_id.into(),
73                kind,
74            },
75            caused_by: None,
76            replay: Some(RuntimeReplay {
77                key: replay_key.into(),
78            }),
79        }
80    }
81
82    pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
83        self.caused_by = caused_by;
84        self
85    }
86
87    pub fn effect_id(&self) -> Option<&str> {
88        match &self.subject {
89            RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
90            _ => None,
91        }
92    }
93
94    pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
95        match &self.subject {
96            RuntimeSubject::Effect { kind, .. } => Some(*kind),
97            _ => None,
98        }
99    }
100
101    pub fn replay_key(&self) -> Option<&str> {
102        self.replay.as_ref().map(|replay| replay.key.as_str())
103    }
104
105    pub fn causal_ref(&self) -> Option<CausalRef> {
106        match &self.subject {
107            RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
108                session_id: self.scope.session_id.clone(),
109                turn_id: self.scope.turn_id.clone(),
110                effect_id: effect_id.clone(),
111            }),
112            RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
113                process_id: process_id.clone(),
114            }),
115            RuntimeSubject::ProcessEvent {
116                process_id,
117                sequence,
118                ..
119            } => Some(CausalRef::ProcessEvent {
120                process_id: process_id.clone(),
121                sequence: *sequence,
122            }),
123            RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
124                session_id: self.scope.session_id.clone(),
125                node_id: node_id.clone(),
126            }),
127        }
128    }
129}
130
131#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
132pub struct RuntimeScope {
133    pub session_id: String,
134    #[serde(default, skip_serializing_if = "Option::is_none")]
135    pub turn_id: Option<String>,
136    #[serde(default, skip_serializing_if = "Option::is_none")]
137    pub turn_index: Option<usize>,
138    #[serde(default, skip_serializing_if = "Option::is_none")]
139    pub protocol_iteration: Option<usize>,
140}
141
142impl RuntimeScope {
143    pub fn new(session_id: impl Into<String>) -> Self {
144        Self {
145            session_id: session_id.into(),
146            turn_id: None,
147            turn_index: None,
148            protocol_iteration: None,
149        }
150    }
151
152    pub fn for_turn(
153        session_id: impl Into<String>,
154        turn_id: impl Into<String>,
155        turn_index: usize,
156        protocol_iteration: usize,
157    ) -> Self {
158        Self {
159            session_id: session_id.into(),
160            turn_id: Some(turn_id.into()),
161            turn_index: Some(turn_index),
162            protocol_iteration: Some(protocol_iteration),
163        }
164    }
165}
166
167#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
168pub struct RuntimeReplay {
169    pub key: String,
170}
171
172#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
173#[serde(tag = "type", rename_all = "snake_case")]
174pub enum RuntimeSubject {
175    Effect {
176        effect_id: String,
177        kind: RuntimeEffectKind,
178    },
179    Process {
180        process_id: String,
181    },
182    ProcessEvent {
183        process_id: String,
184        sequence: u64,
185        event_type: String,
186    },
187    SessionNode {
188        node_id: String,
189    },
190}
191
192/// Fully serializable envelope emitted at Lash's nondeterministic boundary.
193#[derive(Clone, Debug, Serialize, Deserialize)]
194pub struct RuntimeEffectEnvelope {
195    pub invocation: RuntimeInvocation,
196    pub command: RuntimeEffectCommand,
197}
198
199impl RuntimeEffectEnvelope {
200    pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
201        Self::try_new(invocation, command).expect("valid runtime effect invocation")
202    }
203
204    pub fn try_new(
205        invocation: RuntimeInvocation,
206        command: RuntimeEffectCommand,
207    ) -> Result<Self, RuntimeEffectControllerError> {
208        validate_effect_invocation(&invocation, command.kind())?;
209        Ok(Self {
210            invocation,
211            command,
212        })
213    }
214
215    pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
216        crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
217            RuntimeEffectControllerError::new(
218                "runtime_effect_envelope_hash",
219                format!("failed to serialize runtime effect envelope: {err}"),
220            )
221        })
222    }
223}
224
225fn validate_effect_invocation(
226    invocation: &RuntimeInvocation,
227    command_kind: RuntimeEffectKind,
228) -> Result<(), RuntimeEffectControllerError> {
229    let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
230        return Err(RuntimeEffectControllerError::new(
231            "runtime_effect_invocation_subject",
232            "runtime effect envelope subject must be an effect",
233        ));
234    };
235    if effect_id.trim().is_empty() {
236        return Err(RuntimeEffectControllerError::new(
237            "runtime_effect_invocation_subject",
238            "runtime effect envelope effect id must be non-empty",
239        ));
240    }
241    if *kind != command_kind {
242        return Err(RuntimeEffectControllerError::new(
243            "runtime_effect_invocation_kind",
244            format!(
245                "runtime effect invocation kind {} does not match command kind {}",
246                kind.as_str(),
247                command_kind.as_str()
248            ),
249        ));
250    }
251    if invocation
252        .replay
253        .as_ref()
254        .is_none_or(|replay| replay.key.is_empty())
255    {
256        return Err(RuntimeEffectControllerError::new(
257            "runtime_effect_replay_required",
258            "runtime effect envelope requires replay.key",
259        ));
260    }
261    Ok(())
262}
263
264/// Serializable command emitted at Lash's nondeterministic runtime boundary.
265#[derive(Clone, Debug, Serialize, Deserialize)]
266#[serde(tag = "type", rename_all = "snake_case")]
267pub enum RuntimeEffectCommand {
268    LlmCall {
269        request: Box<LlmRequestSpec>,
270    },
271    Direct {
272        request: Box<LlmRequestSpec>,
273        usage_source: String,
274    },
275    ToolCall {
276        call: crate::PreparedToolCall,
277    },
278    Process {
279        command: ProcessCommand,
280    },
281    ExecCode {
282        code: String,
283    },
284    Checkpoint {
285        checkpoint: CheckpointKind,
286    },
287    SyncExecutionSurface {
288        update_machine_config: bool,
289    },
290    Sleep {
291        duration_ms: u64,
292    },
293}
294
295impl RuntimeEffectCommand {
296    pub fn kind(&self) -> RuntimeEffectKind {
297        match self {
298            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
299            Self::Direct { .. } => RuntimeEffectKind::Direct,
300            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
301            Self::Process { .. } => RuntimeEffectKind::Process,
302            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
303            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
304            Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
305            Self::Sleep { .. } => RuntimeEffectKind::Sleep,
306        }
307    }
308}
309
310/// Serializable operation against the process control plane.
311#[derive(Clone, Debug, Serialize, Deserialize)]
312#[serde(tag = "op", rename_all = "snake_case")]
313pub enum ProcessCommand {
314    Start {
315        registration: ProcessRegistration,
316        #[serde(default, skip_serializing_if = "Option::is_none")]
317        grant: Option<ProcessStartGrant>,
318        #[serde(
319            default,
320            skip_serializing_if = "boxed_process_execution_context_is_empty"
321        )]
322        execution_context: Box<ProcessExecutionContext>,
323    },
324    List {
325        owner_scope: ProcessScope,
326        #[serde(default)]
327        mode: ProcessListMode,
328    },
329    Transfer {
330        from_scope: ProcessScope,
331        to_scope: ProcessScope,
332        process_ids: Vec<String>,
333    },
334    DeleteSession {
335        session_id: String,
336    },
337    Await {
338        process_id: String,
339    },
340    Cancel {
341        process_id: String,
342        reason: Option<String>,
343    },
344    Signal {
345        process_id: String,
346        signal_id: String,
347        request: crate::ProcessEventAppendRequest,
348    },
349}
350
351fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
352    context.is_empty()
353}
354
355type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
356
357impl ProcessCommand {
358    pub fn effect_id(&self) -> String {
359        match self {
360            Self::Start { registration, .. } => format!("process:start:{}", registration.id),
361            Self::List { owner_scope, mode } => {
362                format!("process:list:{}:{}", owner_scope.id(), mode.as_str())
363            }
364            Self::Transfer {
365                from_scope,
366                to_scope,
367                process_ids,
368            } => {
369                let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
370                    .unwrap_or_else(|_| "unhashable".to_string());
371                format!(
372                    "process:transfer:{}:{}:{digest}",
373                    from_scope.id(),
374                    to_scope.id()
375                )
376            }
377            Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
378            Self::Await { process_id } => format!("process:await:{process_id}"),
379            Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
380            Self::Signal {
381                process_id,
382                signal_id,
383                ..
384            } => {
385                format!("process:signal:{process_id}:{signal_id}")
386            }
387        }
388    }
389}
390
391/// Serializable result of a process operation.
392#[derive(Clone, Debug, Serialize, Deserialize)]
393#[serde(tag = "op", rename_all = "snake_case")]
394pub enum ProcessEffectOutcome {
395    Start {
396        record: ProcessRecord,
397    },
398    List {
399        entries: Vec<ProcessHandleGrantEntry>,
400    },
401    Transfer,
402    DeleteSession {
403        report: crate::ProcessSessionDeleteReport,
404    },
405    Await {
406        output: ProcessAwaitOutput,
407    },
408    Cancel {
409        record: ProcessRecord,
410    },
411    Signal {
412        event: crate::ProcessEvent,
413    },
414}
415
416#[derive(Clone, Debug, Serialize, Deserialize)]
417pub struct ToolCallEffectOutcome {
418    pub result: CompletedToolCall,
419    #[serde(default, skip_serializing_if = "Vec::is_empty")]
420    pub host_events: Vec<ToolHostEventEffectOutcome>,
421}
422
423/// Serializable result of a runtime effect command.
424#[derive(Clone, Debug, Serialize, Deserialize)]
425#[serde(tag = "type", rename_all = "snake_case")]
426pub enum RuntimeEffectOutcome {
427    LlmCall {
428        result: Result<LlmResponse, LlmCallError>,
429        text_streamed: bool,
430    },
431    Direct {
432        result: Result<LlmResponse, LlmCallError>,
433    },
434    ToolCall {
435        result: CompletedToolCall,
436        #[serde(default, skip_serializing_if = "Vec::is_empty")]
437        host_events: Vec<ToolHostEventEffectOutcome>,
438    },
439    Process {
440        result: ProcessEffectOutcome,
441    },
442    ExecCode {
443        result: Result<ExecResponse, String>,
444    },
445    Checkpoint {
446        result: CheckpointOutcome,
447    },
448    SyncExecutionSurface {
449        result: Result<Option<ExecutionSurfaceSync>, String>,
450    },
451    Sleep,
452}
453
454// =============================================================================
455// Request specs (serializable forms of LLM/Direct requests)
456// =============================================================================
457
458/// Serializable attachment data for runtime effect envelopes.
459///
460/// Effect envelopes carry attachment references only. Local executors resolve
461/// bytes from the configured attachment store when a provider request is
462/// actually executed.
463#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
464pub struct LlmAttachmentSpec {
465    pub reference: AttachmentRef,
466}
467
468impl LlmAttachmentSpec {
469    fn into_attachment(self) -> LlmAttachment {
470        LlmAttachment::reference(self.reference)
471    }
472}
473
474/// Serializable LLM request data. Live stream and provider-trace callbacks are
475/// attached by the local executor, and attachment bytes are resolved locally
476/// from refs rather than persisted in the effect envelope.
477#[derive(Clone, Debug, Serialize, Deserialize)]
478pub struct LlmRequestSpec {
479    pub model: String,
480    pub messages: Vec<LlmMessage>,
481    pub attachments: Vec<LlmAttachmentSpec>,
482    pub tools: Arc<Vec<LlmToolSpec>>,
483    pub tool_choice: LlmToolChoice,
484    pub model_variant: Option<String>,
485    #[serde(default)]
486    pub generation: crate::GenerationOptions,
487    pub session_id: Option<String>,
488    pub output_spec: Option<LlmOutputSpec>,
489}
490
491impl LlmRequestSpec {
492    pub fn from_request(
493        request: &CoreLlmRequest,
494        attachment_store: &dyn AttachmentStore,
495    ) -> Result<Self, RuntimeEffectControllerError> {
496        Ok(Self {
497            model: request.model.clone(),
498            messages: request.messages.clone(),
499            attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)?,
500            tools: Arc::clone(&request.tools),
501            tool_choice: request.tool_choice.clone(),
502            model_variant: request.model_variant.clone(),
503            generation: request.generation.clone(),
504            session_id: request.session_id.clone(),
505            output_spec: request.output_spec.clone(),
506        })
507    }
508
509    pub fn into_request(
510        self,
511        stream_events: Option<LlmEventSender>,
512        provider_trace: Option<LlmProviderTraceSender>,
513    ) -> CoreLlmRequest {
514        CoreLlmRequest {
515            model: self.model,
516            messages: self.messages,
517            attachments: self
518                .attachments
519                .into_iter()
520                .map(LlmAttachmentSpec::into_attachment)
521                .collect(),
522            tools: self.tools,
523            tool_choice: self.tool_choice,
524            model_variant: self.model_variant,
525            generation: self.generation,
526            session_id: self.session_id,
527            output_spec: self.output_spec,
528            stream_events,
529            provider_trace,
530        }
531    }
532}
533
534fn attachment_specs_from_attachments(
535    attachments: &[LlmAttachment],
536    attachment_store: &dyn AttachmentStore,
537) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
538    attachments
539        .iter()
540        .map(|attachment| attachment_spec_from_attachment(attachment, attachment_store))
541        .collect()
542}
543
544fn attachment_spec_from_attachment(
545    attachment: &LlmAttachment,
546    attachment_store: &dyn AttachmentStore,
547) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
548    if let Some(reference) = attachment.reference.as_ref() {
549        return Ok(LlmAttachmentSpec {
550            reference: reference.clone(),
551        });
552    }
553    if attachment.data.is_empty() {
554        return Err(RuntimeEffectControllerError::new(
555            "runtime_effect_attachment_missing_reference",
556            "runtime effect attachment has neither a durable reference nor inline bytes",
557        ));
558    }
559    let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
560        RuntimeEffectControllerError::new(
561            "runtime_effect_attachment_media_type",
562            format!(
563                "attachment media type `{}` cannot be represented durably",
564                attachment.mime
565            ),
566        )
567    })?;
568    let reference = attachment_store
569        .put(
570            attachment.data.clone(),
571            AttachmentCreateMeta::new(media_type, None, None, None),
572        )
573        .map_err(|err| {
574            RuntimeEffectControllerError::new(
575                "runtime_effect_attachment_store",
576                format!("failed to store attachment before runtime effect invocation: {err}"),
577            )
578        })?;
579    Ok(LlmAttachmentSpec { reference })
580}
581
582impl RuntimeEffectOutcome {
583    pub fn into_llm_call(
584        self,
585    ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
586        match self {
587            Self::LlmCall {
588                result,
589                text_streamed,
590            } => Ok((result, text_streamed)),
591            other => Err(RuntimeEffectControllerError::wrong_outcome(
592                RuntimeEffectKind::LlmCall,
593                other.kind(),
594            )),
595        }
596    }
597
598    pub fn into_direct_response(
599        self,
600    ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
601        match self {
602            Self::Direct { result } => Ok(result),
603            other => Err(RuntimeEffectControllerError::wrong_outcome(
604                RuntimeEffectKind::Direct,
605                other.kind(),
606            )),
607        }
608    }
609
610    pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
611        match self {
612            Self::ToolCall { result, .. } => Ok(result),
613            other => Err(RuntimeEffectControllerError::wrong_outcome(
614                RuntimeEffectKind::ToolCall,
615                other.kind(),
616            )),
617        }
618    }
619
620    pub fn into_tool_call_effect(
621        self,
622    ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
623        match self {
624            Self::ToolCall {
625                result,
626                host_events,
627            } => Ok(ToolCallEffectOutcome {
628                result,
629                host_events,
630            }),
631            other => Err(RuntimeEffectControllerError::wrong_outcome(
632                RuntimeEffectKind::ToolCall,
633                other.kind(),
634            )),
635        }
636    }
637
638    pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
639        match self {
640            Self::Process { result } => Ok(result),
641            other => Err(RuntimeEffectControllerError::wrong_outcome(
642                RuntimeEffectKind::Process,
643                other.kind(),
644            )),
645        }
646    }
647
648    pub fn into_exec_code(
649        self,
650    ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
651        match self {
652            Self::ExecCode { result } => Ok(result),
653            other => Err(RuntimeEffectControllerError::wrong_outcome(
654                RuntimeEffectKind::ExecCode,
655                other.kind(),
656            )),
657        }
658    }
659
660    pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
661        match self {
662            Self::Checkpoint { result } => Ok(result),
663            other => Err(RuntimeEffectControllerError::wrong_outcome(
664                RuntimeEffectKind::Checkpoint,
665                other.kind(),
666            )),
667        }
668    }
669
670    pub fn into_sync_execution_surface(
671        self,
672    ) -> Result<Result<Option<ExecutionSurfaceSync>, String>, RuntimeEffectControllerError> {
673        match self {
674            Self::SyncExecutionSurface { result } => Ok(result),
675            other => Err(RuntimeEffectControllerError::wrong_outcome(
676                RuntimeEffectKind::SyncExecutionSurface,
677                other.kind(),
678            )),
679        }
680    }
681
682    pub fn kind(&self) -> RuntimeEffectKind {
683        match self {
684            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
685            Self::Direct { .. } => RuntimeEffectKind::Direct,
686            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
687            Self::Process { .. } => RuntimeEffectKind::Process,
688            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
689            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
690            Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
691            Self::Sleep => RuntimeEffectKind::Sleep,
692        }
693    }
694}