Skip to main content

lash_core/runtime/effect/
envelope.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7    LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8    LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionSurfaceSync, LlmCallError};
12use crate::tool_dispatch::ToolHostEventEffectOutcome;
13use crate::{
14    AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15    ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16    ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17    ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22/// Durable category for a runtime effect.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26    LlmCall,
27    Direct,
28    ToolCall,
29    Process,
30    ExecCode,
31    Checkpoint,
32    SyncExecutionSurface,
33    Sleep,
34    AwaitEvent,
35}
36
37impl RuntimeEffectKind {
38    pub fn as_str(self) -> &'static str {
39        match self {
40            Self::LlmCall => "llm_call",
41            Self::Direct => "direct",
42            Self::ToolCall => "tool_call",
43            Self::Process => "process",
44            Self::ExecCode => "exec_code",
45            Self::Checkpoint => "checkpoint",
46            Self::SyncExecutionSurface => "sync_execution_surface",
47            Self::Sleep => "sleep",
48            Self::AwaitEvent => "await_event",
49        }
50    }
51}
52
53/// Canonical lineage for a runtime-side invocation.
54#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
55pub struct RuntimeInvocation {
56    pub scope: RuntimeScope,
57    pub subject: RuntimeSubject,
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub caused_by: Option<CausalRef>,
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub replay: Option<RuntimeReplay>,
62}
63
64impl RuntimeInvocation {
65    pub fn effect(
66        scope: RuntimeScope,
67        effect_id: impl Into<String>,
68        kind: RuntimeEffectKind,
69        replay_key: impl Into<String>,
70    ) -> Self {
71        Self {
72            scope,
73            subject: RuntimeSubject::Effect {
74                effect_id: effect_id.into(),
75                kind,
76            },
77            caused_by: None,
78            replay: Some(RuntimeReplay {
79                key: replay_key.into(),
80            }),
81        }
82    }
83
84    pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
85        self.caused_by = caused_by;
86        self
87    }
88
89    pub fn effect_id(&self) -> Option<&str> {
90        match &self.subject {
91            RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
92            _ => None,
93        }
94    }
95
96    pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
97        match &self.subject {
98            RuntimeSubject::Effect { kind, .. } => Some(*kind),
99            _ => None,
100        }
101    }
102
103    pub fn replay_key(&self) -> Option<&str> {
104        self.replay.as_ref().map(|replay| replay.key.as_str())
105    }
106
107    pub fn causal_ref(&self) -> Option<CausalRef> {
108        match &self.subject {
109            RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
110                session_id: self.scope.session_id.clone(),
111                turn_id: self.scope.turn_id.clone(),
112                effect_id: effect_id.clone(),
113            }),
114            RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
115                process_id: process_id.clone(),
116            }),
117            RuntimeSubject::ProcessEvent {
118                process_id,
119                sequence,
120                ..
121            } => Some(CausalRef::ProcessEvent {
122                process_id: process_id.clone(),
123                sequence: *sequence,
124            }),
125            RuntimeSubject::HostEvent { occurrence_id } => Some(CausalRef::HostEvent {
126                occurrence_id: occurrence_id.clone(),
127            }),
128            RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
129                session_id: self.scope.session_id.clone(),
130                node_id: node_id.clone(),
131            }),
132        }
133    }
134}
135
136#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
137pub struct RuntimeScope {
138    pub session_id: String,
139    #[serde(default, skip_serializing_if = "Option::is_none")]
140    pub turn_id: Option<String>,
141    #[serde(default, skip_serializing_if = "Option::is_none")]
142    pub turn_index: Option<usize>,
143    #[serde(default, skip_serializing_if = "Option::is_none")]
144    pub protocol_iteration: Option<usize>,
145}
146
147impl RuntimeScope {
148    pub fn new(session_id: impl Into<String>) -> Self {
149        Self {
150            session_id: session_id.into(),
151            turn_id: None,
152            turn_index: None,
153            protocol_iteration: None,
154        }
155    }
156
157    pub fn for_turn(
158        session_id: impl Into<String>,
159        turn_id: impl Into<String>,
160        turn_index: usize,
161        protocol_iteration: usize,
162    ) -> Self {
163        Self {
164            session_id: session_id.into(),
165            turn_id: Some(turn_id.into()),
166            turn_index: Some(turn_index),
167            protocol_iteration: Some(protocol_iteration),
168        }
169    }
170}
171
172#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
173pub struct RuntimeReplay {
174    pub key: String,
175}
176
177#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
178#[serde(tag = "type", rename_all = "snake_case")]
179pub enum RuntimeSubject {
180    Effect {
181        effect_id: String,
182        kind: RuntimeEffectKind,
183    },
184    Process {
185        process_id: String,
186    },
187    ProcessEvent {
188        process_id: String,
189        sequence: u64,
190        event_type: String,
191    },
192    HostEvent {
193        occurrence_id: String,
194    },
195    SessionNode {
196        node_id: String,
197    },
198}
199
200/// Fully serializable envelope emitted at Lash's nondeterministic boundary.
201#[derive(Clone, Debug, Serialize, Deserialize)]
202pub struct RuntimeEffectEnvelope {
203    pub invocation: RuntimeInvocation,
204    pub command: RuntimeEffectCommand,
205}
206
207impl RuntimeEffectEnvelope {
208    pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
209        Self::try_new(invocation, command).expect("valid runtime effect invocation")
210    }
211
212    pub fn try_new(
213        invocation: RuntimeInvocation,
214        command: RuntimeEffectCommand,
215    ) -> Result<Self, RuntimeEffectControllerError> {
216        validate_effect_invocation(&invocation, command.kind())?;
217        Ok(Self {
218            invocation,
219            command,
220        })
221    }
222
223    pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
224        crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
225            RuntimeEffectControllerError::new(
226                "runtime_effect_envelope_hash",
227                format!("failed to serialize runtime effect envelope: {err}"),
228            )
229        })
230    }
231}
232
233fn validate_effect_invocation(
234    invocation: &RuntimeInvocation,
235    command_kind: RuntimeEffectKind,
236) -> Result<(), RuntimeEffectControllerError> {
237    let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
238        return Err(RuntimeEffectControllerError::new(
239            "runtime_effect_invocation_subject",
240            "runtime effect envelope subject must be an effect",
241        ));
242    };
243    if effect_id.trim().is_empty() {
244        return Err(RuntimeEffectControllerError::new(
245            "runtime_effect_invocation_subject",
246            "runtime effect envelope effect id must be non-empty",
247        ));
248    }
249    if *kind != command_kind {
250        return Err(RuntimeEffectControllerError::new(
251            "runtime_effect_invocation_kind",
252            format!(
253                "runtime effect invocation kind {} does not match command kind {}",
254                kind.as_str(),
255                command_kind.as_str()
256            ),
257        ));
258    }
259    if invocation
260        .replay
261        .as_ref()
262        .is_none_or(|replay| replay.key.is_empty())
263    {
264        return Err(RuntimeEffectControllerError::new(
265            "runtime_effect_replay_required",
266            "runtime effect envelope requires replay.key",
267        ));
268    }
269    Ok(())
270}
271
272/// Serializable command emitted at Lash's nondeterministic runtime boundary.
273#[derive(Clone, Debug, Serialize, Deserialize)]
274#[serde(tag = "type", rename_all = "snake_case")]
275pub enum RuntimeEffectCommand {
276    LlmCall {
277        request: Box<LlmRequestSpec>,
278    },
279    Direct {
280        request: Box<LlmRequestSpec>,
281        usage_source: String,
282    },
283    ToolCall {
284        call: crate::PreparedToolCall,
285    },
286    Process {
287        command: Box<ProcessCommand>,
288    },
289    ExecCode {
290        code: String,
291    },
292    Checkpoint {
293        checkpoint: CheckpointKind,
294    },
295    SyncExecutionSurface {
296        update_machine_config: bool,
297    },
298    Sleep {
299        duration_ms: u64,
300    },
301    AwaitEvent {
302        key: String,
303    },
304}
305
306impl RuntimeEffectCommand {
307    pub fn process(command: ProcessCommand) -> Self {
308        Self::Process {
309            command: Box::new(command),
310        }
311    }
312
313    pub fn kind(&self) -> RuntimeEffectKind {
314        match self {
315            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
316            Self::Direct { .. } => RuntimeEffectKind::Direct,
317            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
318            Self::Process { .. } => RuntimeEffectKind::Process,
319            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
320            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
321            Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
322            Self::Sleep { .. } => RuntimeEffectKind::Sleep,
323            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
324        }
325    }
326}
327
328/// Serializable operation against the process control plane.
329#[derive(Clone, Debug, Serialize, Deserialize)]
330#[serde(tag = "op", rename_all = "snake_case")]
331pub enum ProcessCommand {
332    Start {
333        registration: ProcessRegistration,
334        #[serde(default, skip_serializing_if = "Option::is_none")]
335        grant: Option<ProcessStartGrant>,
336        #[serde(
337            default,
338            skip_serializing_if = "boxed_process_execution_context_is_empty"
339        )]
340        execution_context: Box<ProcessExecutionContext>,
341    },
342    List {
343        session_scope: SessionScope,
344        #[serde(default)]
345        mode: ProcessListMode,
346    },
347    Transfer {
348        from_scope: SessionScope,
349        to_scope: SessionScope,
350        process_ids: Vec<String>,
351    },
352    DeleteSession {
353        session_id: String,
354    },
355    Await {
356        process_id: String,
357    },
358    Cancel {
359        process_id: String,
360        reason: Option<String>,
361    },
362    Signal {
363        process_id: String,
364        signal_name: String,
365        signal_id: String,
366        request: crate::ProcessEventAppendRequest,
367    },
368}
369
370fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
371    context.is_empty()
372}
373
374type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
375
376impl ProcessCommand {
377    pub fn effect_id(&self) -> String {
378        match self {
379            Self::Start { registration, .. } => format!("process:start:{}", registration.id),
380            Self::List {
381                session_scope,
382                mode,
383            } => {
384                format!("process:list:{}:{}", session_scope.id(), mode.as_str())
385            }
386            Self::Transfer {
387                from_scope,
388                to_scope,
389                process_ids,
390            } => {
391                let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
392                    .unwrap_or_else(|_| "unhashable".to_string());
393                format!(
394                    "process:transfer:{}:{}:{digest}",
395                    from_scope.id(),
396                    to_scope.id()
397                )
398            }
399            Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
400            Self::Await { process_id } => format!("process:await:{process_id}"),
401            Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
402            Self::Signal {
403                process_id,
404                signal_name,
405                signal_id,
406                ..
407            } => {
408                format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
409            }
410        }
411    }
412}
413
414/// Serializable result of a process operation.
415#[derive(Clone, Debug, Serialize, Deserialize)]
416#[serde(tag = "op", rename_all = "snake_case")]
417pub enum ProcessEffectOutcome {
418    Start {
419        record: ProcessRecord,
420    },
421    List {
422        entries: Vec<ProcessHandleGrantEntry>,
423    },
424    Transfer,
425    DeleteSession {
426        report: crate::ProcessSessionDeleteReport,
427    },
428    Await {
429        output: ProcessAwaitOutput,
430    },
431    Cancel {
432        record: ProcessRecord,
433    },
434    Signal {
435        event: crate::ProcessEvent,
436    },
437}
438
439#[derive(Clone, Debug, Serialize, Deserialize)]
440pub struct ToolCallEffectOutcome {
441    pub result: CompletedToolCall,
442    #[serde(default, skip_serializing_if = "Vec::is_empty")]
443    pub host_events: Vec<ToolHostEventEffectOutcome>,
444}
445
446/// Serializable result of a runtime effect command.
447#[derive(Clone, Debug, Serialize, Deserialize)]
448#[serde(tag = "type", rename_all = "snake_case")]
449pub enum RuntimeEffectOutcome {
450    LlmCall {
451        result: Result<LlmResponse, LlmCallError>,
452        text_streamed: bool,
453    },
454    Direct {
455        result: Result<LlmResponse, LlmCallError>,
456    },
457    ToolCall {
458        result: CompletedToolCall,
459        #[serde(default, skip_serializing_if = "Vec::is_empty")]
460        host_events: Vec<ToolHostEventEffectOutcome>,
461    },
462    Process {
463        result: ProcessEffectOutcome,
464    },
465    ExecCode {
466        result: Result<ExecResponse, String>,
467    },
468    Checkpoint {
469        result: CheckpointOutcome,
470    },
471    SyncExecutionSurface {
472        result: Result<Option<ExecutionSurfaceSync>, String>,
473    },
474    Sleep,
475    AwaitEvent {
476        payload: serde_json::Value,
477    },
478}
479
480// =============================================================================
481// Request specs (serializable forms of LLM/Direct requests)
482// =============================================================================
483
484/// Serializable attachment data for runtime effect envelopes.
485///
486/// Effect envelopes carry attachment references only. Local executors resolve
487/// bytes from the configured attachment store when a provider request is
488/// actually executed.
489#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
490pub struct LlmAttachmentSpec {
491    pub reference: AttachmentRef,
492}
493
494impl LlmAttachmentSpec {
495    fn into_attachment(self) -> LlmAttachment {
496        LlmAttachment::reference(self.reference)
497    }
498}
499
500/// Serializable LLM request data. Live stream and provider-trace callbacks are
501/// attached by the local executor, and attachment bytes are resolved locally
502/// from refs rather than persisted in the effect envelope.
503#[derive(Clone, Debug, Serialize, Deserialize)]
504pub struct LlmRequestSpec {
505    pub model: String,
506    pub messages: Vec<LlmMessage>,
507    pub attachments: Vec<LlmAttachmentSpec>,
508    pub tools: Arc<Vec<LlmToolSpec>>,
509    pub tool_choice: LlmToolChoice,
510    pub model_variant: Option<String>,
511    #[serde(default)]
512    pub generation: crate::GenerationOptions,
513    pub session_id: Option<String>,
514    pub output_spec: Option<LlmOutputSpec>,
515}
516
517impl LlmRequestSpec {
518    pub async fn from_request(
519        request: &CoreLlmRequest,
520        attachment_store: &dyn AttachmentStore,
521    ) -> Result<Self, RuntimeEffectControllerError> {
522        Ok(Self {
523            model: request.model.clone(),
524            messages: request.messages.clone(),
525            attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
526                .await?,
527            tools: Arc::clone(&request.tools),
528            tool_choice: request.tool_choice.clone(),
529            model_variant: request.model_variant.clone(),
530            generation: request.generation.clone(),
531            session_id: request.session_id.clone(),
532            output_spec: request.output_spec.clone(),
533        })
534    }
535
536    pub fn into_request(
537        self,
538        stream_events: Option<LlmEventSender>,
539        provider_trace: Option<LlmProviderTraceSender>,
540    ) -> CoreLlmRequest {
541        CoreLlmRequest {
542            model: self.model,
543            messages: self.messages,
544            attachments: self
545                .attachments
546                .into_iter()
547                .map(LlmAttachmentSpec::into_attachment)
548                .collect(),
549            tools: self.tools,
550            tool_choice: self.tool_choice,
551            model_variant: self.model_variant,
552            generation: self.generation,
553            session_id: self.session_id,
554            output_spec: self.output_spec,
555            stream_events,
556            provider_trace,
557        }
558    }
559}
560
561async fn attachment_specs_from_attachments(
562    attachments: &[LlmAttachment],
563    attachment_store: &dyn AttachmentStore,
564) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
565    let mut specs = Vec::with_capacity(attachments.len());
566    for attachment in attachments {
567        specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
568    }
569    Ok(specs)
570}
571
572async fn attachment_spec_from_attachment(
573    attachment: &LlmAttachment,
574    attachment_store: &dyn AttachmentStore,
575) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
576    if let Some(reference) = attachment.reference.as_ref() {
577        return Ok(LlmAttachmentSpec {
578            reference: reference.clone(),
579        });
580    }
581    if attachment.data.is_empty() {
582        return Err(RuntimeEffectControllerError::new(
583            "runtime_effect_attachment_missing_reference",
584            "runtime effect attachment has neither a durable reference nor inline bytes",
585        ));
586    }
587    let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
588        RuntimeEffectControllerError::new(
589            "runtime_effect_attachment_media_type",
590            format!(
591                "attachment media type `{}` cannot be represented durably",
592                attachment.mime
593            ),
594        )
595    })?;
596    let reference = attachment_store
597        .put(
598            attachment.data.clone(),
599            AttachmentCreateMeta::new(media_type, None, None, None),
600        )
601        .await
602        .map_err(|err| {
603            RuntimeEffectControllerError::new(
604                "runtime_effect_attachment_store",
605                format!("failed to store attachment before runtime effect invocation: {err}"),
606            )
607        })?;
608    Ok(LlmAttachmentSpec { reference })
609}
610
611impl RuntimeEffectOutcome {
612    pub fn into_llm_call(
613        self,
614    ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
615        match self {
616            Self::LlmCall {
617                result,
618                text_streamed,
619            } => Ok((result, text_streamed)),
620            other => Err(RuntimeEffectControllerError::wrong_outcome(
621                RuntimeEffectKind::LlmCall,
622                other.kind(),
623            )),
624        }
625    }
626
627    pub fn into_direct_response(
628        self,
629    ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
630        match self {
631            Self::Direct { result } => Ok(result),
632            other => Err(RuntimeEffectControllerError::wrong_outcome(
633                RuntimeEffectKind::Direct,
634                other.kind(),
635            )),
636        }
637    }
638
639    pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
640        match self {
641            Self::ToolCall { result, .. } => Ok(result),
642            other => Err(RuntimeEffectControllerError::wrong_outcome(
643                RuntimeEffectKind::ToolCall,
644                other.kind(),
645            )),
646        }
647    }
648
649    pub fn into_tool_call_effect(
650        self,
651    ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
652        match self {
653            Self::ToolCall {
654                result,
655                host_events,
656            } => Ok(ToolCallEffectOutcome {
657                result,
658                host_events,
659            }),
660            other => Err(RuntimeEffectControllerError::wrong_outcome(
661                RuntimeEffectKind::ToolCall,
662                other.kind(),
663            )),
664        }
665    }
666
667    pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
668        match self {
669            Self::Process { result } => Ok(result),
670            other => Err(RuntimeEffectControllerError::wrong_outcome(
671                RuntimeEffectKind::Process,
672                other.kind(),
673            )),
674        }
675    }
676
677    pub fn into_exec_code(
678        self,
679    ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
680        match self {
681            Self::ExecCode { result } => Ok(result),
682            other => Err(RuntimeEffectControllerError::wrong_outcome(
683                RuntimeEffectKind::ExecCode,
684                other.kind(),
685            )),
686        }
687    }
688
689    pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
690        match self {
691            Self::Checkpoint { result } => Ok(result),
692            other => Err(RuntimeEffectControllerError::wrong_outcome(
693                RuntimeEffectKind::Checkpoint,
694                other.kind(),
695            )),
696        }
697    }
698
699    pub fn into_sync_execution_surface(
700        self,
701    ) -> Result<Result<Option<ExecutionSurfaceSync>, String>, RuntimeEffectControllerError> {
702        match self {
703            Self::SyncExecutionSurface { result } => Ok(result),
704            other => Err(RuntimeEffectControllerError::wrong_outcome(
705                RuntimeEffectKind::SyncExecutionSurface,
706                other.kind(),
707            )),
708        }
709    }
710
711    pub fn into_await_event(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
712        match self {
713            Self::AwaitEvent { payload } => Ok(payload),
714            other => Err(RuntimeEffectControllerError::wrong_outcome(
715                RuntimeEffectKind::AwaitEvent,
716                other.kind(),
717            )),
718        }
719    }
720
721    pub fn kind(&self) -> RuntimeEffectKind {
722        match self {
723            Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
724            Self::Direct { .. } => RuntimeEffectKind::Direct,
725            Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
726            Self::Process { .. } => RuntimeEffectKind::Process,
727            Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
728            Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
729            Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
730            Self::Sleep => RuntimeEffectKind::Sleep,
731            Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
732        }
733    }
734}