Skip to main content

lash_core/runtime/effect/
envelope.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7    LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8    LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionSurfaceSync, LlmCallError};
12use crate::tool_dispatch::ToolHostEventEffectOutcome;
13use crate::{
14    AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15    ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16    ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration, ProcessScope,
17    ProcessStartGrant,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22/// Durable category for a runtime effect.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26    LlmCall,
27    Direct,
28    ToolCall,
29    Process,
30    ExecCode,
31    Checkpoint,
32    SyncExecutionSurface,
33    Sleep,
34}
35
36impl RuntimeEffectKind {
37    pub fn as_str(self) -> &'static str {
38        match self {
39            Self::LlmCall => "llm_call",
40            Self::Direct => "direct",
41            Self::ToolCall => "tool_call",
42            Self::Process => "process",
43            Self::ExecCode => "exec_code",
44            Self::Checkpoint => "checkpoint",
45            Self::SyncExecutionSurface => "sync_execution_surface",
46            Self::Sleep => "sleep",
47        }
48    }
49}
50
51/// Canonical lineage for a runtime-side invocation.
52#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
53pub struct RuntimeInvocation {
54    pub scope: RuntimeScope,
55    pub subject: RuntimeSubject,
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub caused_by: Option<CausalRef>,
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub replay: Option<RuntimeReplay>,
60}
61
62impl RuntimeInvocation {
63    pub fn effect(
64        scope: RuntimeScope,
65        effect_id: impl Into<String>,
66        kind: RuntimeEffectKind,
67        replay_key: impl Into<String>,
68    ) -> Self {
69        Self {
70            scope,
71            subject: RuntimeSubject::Effect {
72                effect_id: effect_id.into(),
73                kind,
74            },
75            caused_by: None,
76            replay: Some(RuntimeReplay {
77                key: replay_key.into(),
78            }),
79        }
80    }
81
82    pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
83        self.caused_by = caused_by;
84        self
85    }
86
87    pub fn effect_id(&self) -> Option<&str> {
88        match &self.subject {
89            RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
90            _ => None,
91        }
92    }
93
94    pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
95        match &self.subject {
96            RuntimeSubject::Effect { kind, .. } => Some(*kind),
97            _ => None,
98        }
99    }
100
101    pub fn replay_key(&self) -> Option<&str> {
102        self.replay.as_ref().map(|replay| replay.key.as_str())
103    }
104
105    pub fn causal_ref(&self) -> Option<CausalRef> {
106        match &self.subject {
107            RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
108                session_id: self.scope.session_id.clone(),
109                turn_id: self.scope.turn_id.clone(),
110                effect_id: effect_id.clone(),
111            }),
112            RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
113                process_id: process_id.clone(),
114            }),
115            RuntimeSubject::ProcessEvent {
116                process_id,
117                sequence,
118                ..
119            } => Some(CausalRef::ProcessEvent {
120                process_id: process_id.clone(),
121                sequence: *sequence,
122            }),
123            RuntimeSubject::HostEvent { occurrence_id } => Some(CausalRef::HostEvent {
124                occurrence_id: occurrence_id.clone(),
125            }),
126            RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
127                session_id: self.scope.session_id.clone(),
128                node_id: node_id.clone(),
129            }),
130        }
131    }
132}
133
134#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
135pub struct RuntimeScope {
136    pub session_id: String,
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    pub turn_id: Option<String>,
139    #[serde(default, skip_serializing_if = "Option::is_none")]
140    pub turn_index: Option<usize>,
141    #[serde(default, skip_serializing_if = "Option::is_none")]
142    pub protocol_iteration: Option<usize>,
143}
144
145impl RuntimeScope {
146    pub fn new(session_id: impl Into<String>) -> Self {
147        Self {
148            session_id: session_id.into(),
149            turn_id: None,
150            turn_index: None,
151            protocol_iteration: None,
152        }
153    }
154
155    pub fn for_turn(
156        session_id: impl Into<String>,
157        turn_id: impl Into<String>,
158        turn_index: usize,
159        protocol_iteration: usize,
160    ) -> Self {
161        Self {
162            session_id: session_id.into(),
163            turn_id: Some(turn_id.into()),
164            turn_index: Some(turn_index),
165            protocol_iteration: Some(protocol_iteration),
166        }
167    }
168}
169
170#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
171pub struct RuntimeReplay {
172    pub key: String,
173}
174
175#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
176#[serde(tag = "type", rename_all = "snake_case")]
177pub enum RuntimeSubject {
178    Effect {
179        effect_id: String,
180        kind: RuntimeEffectKind,
181    },
182    Process {
183        process_id: String,
184    },
185    ProcessEvent {
186        process_id: String,
187        sequence: u64,
188        event_type: String,
189    },
190    HostEvent {
191        occurrence_id: String,
192    },
193    SessionNode {
194        node_id: String,
195    },
196}
197
198/// Fully serializable envelope emitted at Lash's nondeterministic boundary.
199#[derive(Clone, Debug, Serialize, Deserialize)]
200pub struct RuntimeEffectEnvelope {
201    pub invocation: RuntimeInvocation,
202    pub command: RuntimeEffectCommand,
203}
204
205impl RuntimeEffectEnvelope {
206    pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
207        Self::try_new(invocation, command).expect("valid runtime effect invocation")
208    }
209
210    pub fn try_new(
211        invocation: RuntimeInvocation,
212        command: RuntimeEffectCommand,
213    ) -> Result<Self, RuntimeEffectControllerError> {
214        validate_effect_invocation(&invocation, command.kind())?;
215        Ok(Self {
216            invocation,
217            command,
218        })
219    }
220
221    pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
222        crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
223            RuntimeEffectControllerError::new(
224                "runtime_effect_envelope_hash",
225                format!("failed to serialize runtime effect envelope: {err}"),
226            )
227        })
228    }
229}
230
231fn validate_effect_invocation(
232    invocation: &RuntimeInvocation,
233    command_kind: RuntimeEffectKind,
234) -> Result<(), RuntimeEffectControllerError> {
235    let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
236        return Err(RuntimeEffectControllerError::new(
237            "runtime_effect_invocation_subject",
238            "runtime effect envelope subject must be an effect",
239        ));
240    };
241    if effect_id.trim().is_empty() {
242        return Err(RuntimeEffectControllerError::new(
243            "runtime_effect_invocation_subject",
244            "runtime effect envelope effect id must be non-empty",
245        ));
246    }
247    if *kind != command_kind {
248        return Err(RuntimeEffectControllerError::new(
249            "runtime_effect_invocation_kind",
250            format!(
251                "runtime effect invocation kind {} does not match command kind {}",
252                kind.as_str(),
253                command_kind.as_str()
254            ),
255        ));
256    }
257    if invocation
258        .replay
259        .as_ref()
260        .is_none_or(|replay| replay.key.is_empty())
261    {
262        return Err(RuntimeEffectControllerError::new(
263            "runtime_effect_replay_required",
264            "runtime effect envelope requires replay.key",
265        ));
266    }
267    Ok(())
268}
269
270/// Serializable command emitted at Lash's nondeterministic runtime boundary.
271#[derive(Clone, Debug, Serialize, Deserialize)]
272#[serde(tag = "type", rename_all = "snake_case")]
273pub enum RuntimeEffectCommand {
274    LlmCall {
275        request: Box<LlmRequestSpec>,
276    },
277    Direct {
278        request: Box<LlmRequestSpec>,
279        usage_source: String,
280    },
281    ToolCall {
282        call: crate::PreparedToolCall,
283    },
284    Process {
285        command: ProcessCommand,
286    },
287    ExecCode {
288        code: String,
289    },
290    Checkpoint {
291        checkpoint: CheckpointKind,
292    },
293    SyncExecutionSurface {
294        update_machine_config: bool,
295    },
296    Sleep {
297        duration_ms: u64,
298    },
299}
300
301impl RuntimeEffectCommand {
302    pub fn kind(&self) -> RuntimeEffectKind {
303        match self {
304            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
305            Self::Direct { .. } => RuntimeEffectKind::Direct,
306            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
307            Self::Process { .. } => RuntimeEffectKind::Process,
308            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
309            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
310            Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
311            Self::Sleep { .. } => RuntimeEffectKind::Sleep,
312        }
313    }
314}
315
316/// Serializable operation against the process control plane.
317#[derive(Clone, Debug, Serialize, Deserialize)]
318#[serde(tag = "op", rename_all = "snake_case")]
319pub enum ProcessCommand {
320    Start {
321        registration: ProcessRegistration,
322        #[serde(default, skip_serializing_if = "Option::is_none")]
323        grant: Option<ProcessStartGrant>,
324        #[serde(
325            default,
326            skip_serializing_if = "boxed_process_execution_context_is_empty"
327        )]
328        execution_context: Box<ProcessExecutionContext>,
329    },
330    List {
331        owner_scope: ProcessScope,
332        #[serde(default)]
333        mode: ProcessListMode,
334    },
335    Transfer {
336        from_scope: ProcessScope,
337        to_scope: ProcessScope,
338        process_ids: Vec<String>,
339    },
340    DeleteSession {
341        session_id: String,
342    },
343    Await {
344        process_id: String,
345    },
346    Cancel {
347        process_id: String,
348        reason: Option<String>,
349    },
350    Signal {
351        process_id: String,
352        signal_id: String,
353        request: crate::ProcessEventAppendRequest,
354    },
355}
356
357fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
358    context.is_empty()
359}
360
361type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
362
363impl ProcessCommand {
364    pub fn effect_id(&self) -> String {
365        match self {
366            Self::Start { registration, .. } => format!("process:start:{}", registration.id),
367            Self::List { owner_scope, mode } => {
368                format!("process:list:{}:{}", owner_scope.id(), mode.as_str())
369            }
370            Self::Transfer {
371                from_scope,
372                to_scope,
373                process_ids,
374            } => {
375                let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
376                    .unwrap_or_else(|_| "unhashable".to_string());
377                format!(
378                    "process:transfer:{}:{}:{digest}",
379                    from_scope.id(),
380                    to_scope.id()
381                )
382            }
383            Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
384            Self::Await { process_id } => format!("process:await:{process_id}"),
385            Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
386            Self::Signal {
387                process_id,
388                signal_id,
389                ..
390            } => {
391                format!("process:signal:{process_id}:{signal_id}")
392            }
393        }
394    }
395}
396
397/// Serializable result of a process operation.
398#[derive(Clone, Debug, Serialize, Deserialize)]
399#[serde(tag = "op", rename_all = "snake_case")]
400pub enum ProcessEffectOutcome {
401    Start {
402        record: ProcessRecord,
403    },
404    List {
405        entries: Vec<ProcessHandleGrantEntry>,
406    },
407    Transfer,
408    DeleteSession {
409        report: crate::ProcessSessionDeleteReport,
410    },
411    Await {
412        output: ProcessAwaitOutput,
413    },
414    Cancel {
415        record: ProcessRecord,
416    },
417    Signal {
418        event: crate::ProcessEvent,
419    },
420}
421
422#[derive(Clone, Debug, Serialize, Deserialize)]
423pub struct ToolCallEffectOutcome {
424    pub result: CompletedToolCall,
425    #[serde(default, skip_serializing_if = "Vec::is_empty")]
426    pub host_events: Vec<ToolHostEventEffectOutcome>,
427}
428
429/// Serializable result of a runtime effect command.
430#[derive(Clone, Debug, Serialize, Deserialize)]
431#[serde(tag = "type", rename_all = "snake_case")]
432pub enum RuntimeEffectOutcome {
433    LlmCall {
434        result: Result<LlmResponse, LlmCallError>,
435        text_streamed: bool,
436    },
437    Direct {
438        result: Result<LlmResponse, LlmCallError>,
439    },
440    ToolCall {
441        result: CompletedToolCall,
442        #[serde(default, skip_serializing_if = "Vec::is_empty")]
443        host_events: Vec<ToolHostEventEffectOutcome>,
444    },
445    Process {
446        result: ProcessEffectOutcome,
447    },
448    ExecCode {
449        result: Result<ExecResponse, String>,
450    },
451    Checkpoint {
452        result: CheckpointOutcome,
453    },
454    SyncExecutionSurface {
455        result: Result<Option<ExecutionSurfaceSync>, String>,
456    },
457    Sleep,
458}
459
460// =============================================================================
461// Request specs (serializable forms of LLM/Direct requests)
462// =============================================================================
463
464/// Serializable attachment data for runtime effect envelopes.
465///
466/// Effect envelopes carry attachment references only. Local executors resolve
467/// bytes from the configured attachment store when a provider request is
468/// actually executed.
469#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
470pub struct LlmAttachmentSpec {
471    pub reference: AttachmentRef,
472}
473
474impl LlmAttachmentSpec {
475    fn into_attachment(self) -> LlmAttachment {
476        LlmAttachment::reference(self.reference)
477    }
478}
479
480/// Serializable LLM request data. Live stream and provider-trace callbacks are
481/// attached by the local executor, and attachment bytes are resolved locally
482/// from refs rather than persisted in the effect envelope.
483#[derive(Clone, Debug, Serialize, Deserialize)]
484pub struct LlmRequestSpec {
485    pub model: String,
486    pub messages: Vec<LlmMessage>,
487    pub attachments: Vec<LlmAttachmentSpec>,
488    pub tools: Arc<Vec<LlmToolSpec>>,
489    pub tool_choice: LlmToolChoice,
490    pub model_variant: Option<String>,
491    #[serde(default)]
492    pub generation: crate::GenerationOptions,
493    pub session_id: Option<String>,
494    pub output_spec: Option<LlmOutputSpec>,
495}
496
497impl LlmRequestSpec {
498    pub async fn from_request(
499        request: &CoreLlmRequest,
500        attachment_store: &dyn AttachmentStore,
501    ) -> Result<Self, RuntimeEffectControllerError> {
502        Ok(Self {
503            model: request.model.clone(),
504            messages: request.messages.clone(),
505            attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
506                .await?,
507            tools: Arc::clone(&request.tools),
508            tool_choice: request.tool_choice.clone(),
509            model_variant: request.model_variant.clone(),
510            generation: request.generation.clone(),
511            session_id: request.session_id.clone(),
512            output_spec: request.output_spec.clone(),
513        })
514    }
515
516    pub fn into_request(
517        self,
518        stream_events: Option<LlmEventSender>,
519        provider_trace: Option<LlmProviderTraceSender>,
520    ) -> CoreLlmRequest {
521        CoreLlmRequest {
522            model: self.model,
523            messages: self.messages,
524            attachments: self
525                .attachments
526                .into_iter()
527                .map(LlmAttachmentSpec::into_attachment)
528                .collect(),
529            tools: self.tools,
530            tool_choice: self.tool_choice,
531            model_variant: self.model_variant,
532            generation: self.generation,
533            session_id: self.session_id,
534            output_spec: self.output_spec,
535            stream_events,
536            provider_trace,
537        }
538    }
539}
540
541async fn attachment_specs_from_attachments(
542    attachments: &[LlmAttachment],
543    attachment_store: &dyn AttachmentStore,
544) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
545    let mut specs = Vec::with_capacity(attachments.len());
546    for attachment in attachments {
547        specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
548    }
549    Ok(specs)
550}
551
552async fn attachment_spec_from_attachment(
553    attachment: &LlmAttachment,
554    attachment_store: &dyn AttachmentStore,
555) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
556    if let Some(reference) = attachment.reference.as_ref() {
557        return Ok(LlmAttachmentSpec {
558            reference: reference.clone(),
559        });
560    }
561    if attachment.data.is_empty() {
562        return Err(RuntimeEffectControllerError::new(
563            "runtime_effect_attachment_missing_reference",
564            "runtime effect attachment has neither a durable reference nor inline bytes",
565        ));
566    }
567    let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
568        RuntimeEffectControllerError::new(
569            "runtime_effect_attachment_media_type",
570            format!(
571                "attachment media type `{}` cannot be represented durably",
572                attachment.mime
573            ),
574        )
575    })?;
576    let reference = attachment_store
577        .put(
578            attachment.data.clone(),
579            AttachmentCreateMeta::new(media_type, None, None, None),
580        )
581        .await
582        .map_err(|err| {
583            RuntimeEffectControllerError::new(
584                "runtime_effect_attachment_store",
585                format!("failed to store attachment before runtime effect invocation: {err}"),
586            )
587        })?;
588    Ok(LlmAttachmentSpec { reference })
589}
590
591impl RuntimeEffectOutcome {
592    pub fn into_llm_call(
593        self,
594    ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
595        match self {
596            Self::LlmCall {
597                result,
598                text_streamed,
599            } => Ok((result, text_streamed)),
600            other => Err(RuntimeEffectControllerError::wrong_outcome(
601                RuntimeEffectKind::LlmCall,
602                other.kind(),
603            )),
604        }
605    }
606
607    pub fn into_direct_response(
608        self,
609    ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
610        match self {
611            Self::Direct { result } => Ok(result),
612            other => Err(RuntimeEffectControllerError::wrong_outcome(
613                RuntimeEffectKind::Direct,
614                other.kind(),
615            )),
616        }
617    }
618
619    pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
620        match self {
621            Self::ToolCall { result, .. } => Ok(result),
622            other => Err(RuntimeEffectControllerError::wrong_outcome(
623                RuntimeEffectKind::ToolCall,
624                other.kind(),
625            )),
626        }
627    }
628
629    pub fn into_tool_call_effect(
630        self,
631    ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
632        match self {
633            Self::ToolCall {
634                result,
635                host_events,
636            } => Ok(ToolCallEffectOutcome {
637                result,
638                host_events,
639            }),
640            other => Err(RuntimeEffectControllerError::wrong_outcome(
641                RuntimeEffectKind::ToolCall,
642                other.kind(),
643            )),
644        }
645    }
646
647    pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
648        match self {
649            Self::Process { result } => Ok(result),
650            other => Err(RuntimeEffectControllerError::wrong_outcome(
651                RuntimeEffectKind::Process,
652                other.kind(),
653            )),
654        }
655    }
656
657    pub fn into_exec_code(
658        self,
659    ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
660        match self {
661            Self::ExecCode { result } => Ok(result),
662            other => Err(RuntimeEffectControllerError::wrong_outcome(
663                RuntimeEffectKind::ExecCode,
664                other.kind(),
665            )),
666        }
667    }
668
669    pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
670        match self {
671            Self::Checkpoint { result } => Ok(result),
672            other => Err(RuntimeEffectControllerError::wrong_outcome(
673                RuntimeEffectKind::Checkpoint,
674                other.kind(),
675            )),
676        }
677    }
678
679    pub fn into_sync_execution_surface(
680        self,
681    ) -> Result<Result<Option<ExecutionSurfaceSync>, String>, RuntimeEffectControllerError> {
682        match self {
683            Self::SyncExecutionSurface { result } => Ok(result),
684            other => Err(RuntimeEffectControllerError::wrong_outcome(
685                RuntimeEffectKind::SyncExecutionSurface,
686                other.kind(),
687            )),
688        }
689    }
690
691    pub fn kind(&self) -> RuntimeEffectKind {
692        match self {
693            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
694            Self::Direct { .. } => RuntimeEffectKind::Direct,
695            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
696            Self::Process { .. } => RuntimeEffectKind::Process,
697            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
698            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
699            Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
700            Self::Sleep => RuntimeEffectKind::Sleep,
701        }
702    }
703}