1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7 LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8 LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionSurfaceSync, LlmCallError};
12use crate::tool_dispatch::ToolHostEventEffectOutcome;
13use crate::{
14 AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15 ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16 ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration, ProcessScope,
17 ProcessStartGrant,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26 LlmCall,
27 Direct,
28 ToolCall,
29 Process,
30 ExecCode,
31 Checkpoint,
32 SyncExecutionSurface,
33 Sleep,
34}
35
36impl RuntimeEffectKind {
37 pub fn as_str(self) -> &'static str {
38 match self {
39 Self::LlmCall => "llm_call",
40 Self::Direct => "direct",
41 Self::ToolCall => "tool_call",
42 Self::Process => "process",
43 Self::ExecCode => "exec_code",
44 Self::Checkpoint => "checkpoint",
45 Self::SyncExecutionSurface => "sync_execution_surface",
46 Self::Sleep => "sleep",
47 }
48 }
49}
50
51#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
53pub struct RuntimeInvocation {
54 pub scope: RuntimeScope,
55 pub subject: RuntimeSubject,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub caused_by: Option<CausalRef>,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub replay: Option<RuntimeReplay>,
60}
61
62impl RuntimeInvocation {
63 pub fn effect(
64 scope: RuntimeScope,
65 effect_id: impl Into<String>,
66 kind: RuntimeEffectKind,
67 replay_key: impl Into<String>,
68 ) -> Self {
69 Self {
70 scope,
71 subject: RuntimeSubject::Effect {
72 effect_id: effect_id.into(),
73 kind,
74 },
75 caused_by: None,
76 replay: Some(RuntimeReplay {
77 key: replay_key.into(),
78 }),
79 }
80 }
81
82 pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
83 self.caused_by = caused_by;
84 self
85 }
86
87 pub fn effect_id(&self) -> Option<&str> {
88 match &self.subject {
89 RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
90 _ => None,
91 }
92 }
93
94 pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
95 match &self.subject {
96 RuntimeSubject::Effect { kind, .. } => Some(*kind),
97 _ => None,
98 }
99 }
100
101 pub fn replay_key(&self) -> Option<&str> {
102 self.replay.as_ref().map(|replay| replay.key.as_str())
103 }
104
105 pub fn causal_ref(&self) -> Option<CausalRef> {
106 match &self.subject {
107 RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
108 session_id: self.scope.session_id.clone(),
109 turn_id: self.scope.turn_id.clone(),
110 effect_id: effect_id.clone(),
111 }),
112 RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
113 process_id: process_id.clone(),
114 }),
115 RuntimeSubject::ProcessEvent {
116 process_id,
117 sequence,
118 ..
119 } => Some(CausalRef::ProcessEvent {
120 process_id: process_id.clone(),
121 sequence: *sequence,
122 }),
123 RuntimeSubject::HostEvent { occurrence_id } => Some(CausalRef::HostEvent {
124 occurrence_id: occurrence_id.clone(),
125 }),
126 RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
127 session_id: self.scope.session_id.clone(),
128 node_id: node_id.clone(),
129 }),
130 }
131 }
132}
133
134#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
135pub struct RuntimeScope {
136 pub session_id: String,
137 #[serde(default, skip_serializing_if = "Option::is_none")]
138 pub turn_id: Option<String>,
139 #[serde(default, skip_serializing_if = "Option::is_none")]
140 pub turn_index: Option<usize>,
141 #[serde(default, skip_serializing_if = "Option::is_none")]
142 pub protocol_iteration: Option<usize>,
143}
144
145impl RuntimeScope {
146 pub fn new(session_id: impl Into<String>) -> Self {
147 Self {
148 session_id: session_id.into(),
149 turn_id: None,
150 turn_index: None,
151 protocol_iteration: None,
152 }
153 }
154
155 pub fn for_turn(
156 session_id: impl Into<String>,
157 turn_id: impl Into<String>,
158 turn_index: usize,
159 protocol_iteration: usize,
160 ) -> Self {
161 Self {
162 session_id: session_id.into(),
163 turn_id: Some(turn_id.into()),
164 turn_index: Some(turn_index),
165 protocol_iteration: Some(protocol_iteration),
166 }
167 }
168}
169
170#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
171pub struct RuntimeReplay {
172 pub key: String,
173}
174
175#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
176#[serde(tag = "type", rename_all = "snake_case")]
177pub enum RuntimeSubject {
178 Effect {
179 effect_id: String,
180 kind: RuntimeEffectKind,
181 },
182 Process {
183 process_id: String,
184 },
185 ProcessEvent {
186 process_id: String,
187 sequence: u64,
188 event_type: String,
189 },
190 HostEvent {
191 occurrence_id: String,
192 },
193 SessionNode {
194 node_id: String,
195 },
196}
197
198#[derive(Clone, Debug, Serialize, Deserialize)]
200pub struct RuntimeEffectEnvelope {
201 pub invocation: RuntimeInvocation,
202 pub command: RuntimeEffectCommand,
203}
204
205impl RuntimeEffectEnvelope {
206 pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
207 Self::try_new(invocation, command).expect("valid runtime effect invocation")
208 }
209
210 pub fn try_new(
211 invocation: RuntimeInvocation,
212 command: RuntimeEffectCommand,
213 ) -> Result<Self, RuntimeEffectControllerError> {
214 validate_effect_invocation(&invocation, command.kind())?;
215 Ok(Self {
216 invocation,
217 command,
218 })
219 }
220
221 pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
222 crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
223 RuntimeEffectControllerError::new(
224 "runtime_effect_envelope_hash",
225 format!("failed to serialize runtime effect envelope: {err}"),
226 )
227 })
228 }
229}
230
231fn validate_effect_invocation(
232 invocation: &RuntimeInvocation,
233 command_kind: RuntimeEffectKind,
234) -> Result<(), RuntimeEffectControllerError> {
235 let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
236 return Err(RuntimeEffectControllerError::new(
237 "runtime_effect_invocation_subject",
238 "runtime effect envelope subject must be an effect",
239 ));
240 };
241 if effect_id.trim().is_empty() {
242 return Err(RuntimeEffectControllerError::new(
243 "runtime_effect_invocation_subject",
244 "runtime effect envelope effect id must be non-empty",
245 ));
246 }
247 if *kind != command_kind {
248 return Err(RuntimeEffectControllerError::new(
249 "runtime_effect_invocation_kind",
250 format!(
251 "runtime effect invocation kind {} does not match command kind {}",
252 kind.as_str(),
253 command_kind.as_str()
254 ),
255 ));
256 }
257 if invocation
258 .replay
259 .as_ref()
260 .is_none_or(|replay| replay.key.is_empty())
261 {
262 return Err(RuntimeEffectControllerError::new(
263 "runtime_effect_replay_required",
264 "runtime effect envelope requires replay.key",
265 ));
266 }
267 Ok(())
268}
269
270#[derive(Clone, Debug, Serialize, Deserialize)]
272#[serde(tag = "type", rename_all = "snake_case")]
273pub enum RuntimeEffectCommand {
274 LlmCall {
275 request: Box<LlmRequestSpec>,
276 },
277 Direct {
278 request: Box<LlmRequestSpec>,
279 usage_source: String,
280 },
281 ToolCall {
282 call: crate::PreparedToolCall,
283 },
284 Process {
285 command: ProcessCommand,
286 },
287 ExecCode {
288 code: String,
289 },
290 Checkpoint {
291 checkpoint: CheckpointKind,
292 },
293 SyncExecutionSurface {
294 update_machine_config: bool,
295 },
296 Sleep {
297 duration_ms: u64,
298 },
299}
300
301impl RuntimeEffectCommand {
302 pub fn kind(&self) -> RuntimeEffectKind {
303 match self {
304 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
305 Self::Direct { .. } => RuntimeEffectKind::Direct,
306 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
307 Self::Process { .. } => RuntimeEffectKind::Process,
308 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
309 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
310 Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
311 Self::Sleep { .. } => RuntimeEffectKind::Sleep,
312 }
313 }
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize)]
318#[serde(tag = "op", rename_all = "snake_case")]
319pub enum ProcessCommand {
320 Start {
321 registration: ProcessRegistration,
322 #[serde(default, skip_serializing_if = "Option::is_none")]
323 grant: Option<ProcessStartGrant>,
324 #[serde(
325 default,
326 skip_serializing_if = "boxed_process_execution_context_is_empty"
327 )]
328 execution_context: Box<ProcessExecutionContext>,
329 },
330 List {
331 owner_scope: ProcessScope,
332 #[serde(default)]
333 mode: ProcessListMode,
334 },
335 Transfer {
336 from_scope: ProcessScope,
337 to_scope: ProcessScope,
338 process_ids: Vec<String>,
339 },
340 DeleteSession {
341 session_id: String,
342 },
343 Await {
344 process_id: String,
345 },
346 Cancel {
347 process_id: String,
348 reason: Option<String>,
349 },
350 Signal {
351 process_id: String,
352 signal_id: String,
353 request: crate::ProcessEventAppendRequest,
354 },
355}
356
357fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
358 context.is_empty()
359}
360
361type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
362
363impl ProcessCommand {
364 pub fn effect_id(&self) -> String {
365 match self {
366 Self::Start { registration, .. } => format!("process:start:{}", registration.id),
367 Self::List { owner_scope, mode } => {
368 format!("process:list:{}:{}", owner_scope.id(), mode.as_str())
369 }
370 Self::Transfer {
371 from_scope,
372 to_scope,
373 process_ids,
374 } => {
375 let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
376 .unwrap_or_else(|_| "unhashable".to_string());
377 format!(
378 "process:transfer:{}:{}:{digest}",
379 from_scope.id(),
380 to_scope.id()
381 )
382 }
383 Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
384 Self::Await { process_id } => format!("process:await:{process_id}"),
385 Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
386 Self::Signal {
387 process_id,
388 signal_id,
389 ..
390 } => {
391 format!("process:signal:{process_id}:{signal_id}")
392 }
393 }
394 }
395}
396
397#[derive(Clone, Debug, Serialize, Deserialize)]
399#[serde(tag = "op", rename_all = "snake_case")]
400pub enum ProcessEffectOutcome {
401 Start {
402 record: ProcessRecord,
403 },
404 List {
405 entries: Vec<ProcessHandleGrantEntry>,
406 },
407 Transfer,
408 DeleteSession {
409 report: crate::ProcessSessionDeleteReport,
410 },
411 Await {
412 output: ProcessAwaitOutput,
413 },
414 Cancel {
415 record: ProcessRecord,
416 },
417 Signal {
418 event: crate::ProcessEvent,
419 },
420}
421
422#[derive(Clone, Debug, Serialize, Deserialize)]
423pub struct ToolCallEffectOutcome {
424 pub result: CompletedToolCall,
425 #[serde(default, skip_serializing_if = "Vec::is_empty")]
426 pub host_events: Vec<ToolHostEventEffectOutcome>,
427}
428
429#[derive(Clone, Debug, Serialize, Deserialize)]
431#[serde(tag = "type", rename_all = "snake_case")]
432pub enum RuntimeEffectOutcome {
433 LlmCall {
434 result: Result<LlmResponse, LlmCallError>,
435 text_streamed: bool,
436 },
437 Direct {
438 result: Result<LlmResponse, LlmCallError>,
439 },
440 ToolCall {
441 result: CompletedToolCall,
442 #[serde(default, skip_serializing_if = "Vec::is_empty")]
443 host_events: Vec<ToolHostEventEffectOutcome>,
444 },
445 Process {
446 result: ProcessEffectOutcome,
447 },
448 ExecCode {
449 result: Result<ExecResponse, String>,
450 },
451 Checkpoint {
452 result: CheckpointOutcome,
453 },
454 SyncExecutionSurface {
455 result: Result<Option<ExecutionSurfaceSync>, String>,
456 },
457 Sleep,
458}
459
460#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
470pub struct LlmAttachmentSpec {
471 pub reference: AttachmentRef,
472}
473
474impl LlmAttachmentSpec {
475 fn into_attachment(self) -> LlmAttachment {
476 LlmAttachment::reference(self.reference)
477 }
478}
479
480#[derive(Clone, Debug, Serialize, Deserialize)]
484pub struct LlmRequestSpec {
485 pub model: String,
486 pub messages: Vec<LlmMessage>,
487 pub attachments: Vec<LlmAttachmentSpec>,
488 pub tools: Arc<Vec<LlmToolSpec>>,
489 pub tool_choice: LlmToolChoice,
490 pub model_variant: Option<String>,
491 #[serde(default)]
492 pub generation: crate::GenerationOptions,
493 pub session_id: Option<String>,
494 pub output_spec: Option<LlmOutputSpec>,
495}
496
497impl LlmRequestSpec {
498 pub async fn from_request(
499 request: &CoreLlmRequest,
500 attachment_store: &dyn AttachmentStore,
501 ) -> Result<Self, RuntimeEffectControllerError> {
502 Ok(Self {
503 model: request.model.clone(),
504 messages: request.messages.clone(),
505 attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
506 .await?,
507 tools: Arc::clone(&request.tools),
508 tool_choice: request.tool_choice.clone(),
509 model_variant: request.model_variant.clone(),
510 generation: request.generation.clone(),
511 session_id: request.session_id.clone(),
512 output_spec: request.output_spec.clone(),
513 })
514 }
515
516 pub fn into_request(
517 self,
518 stream_events: Option<LlmEventSender>,
519 provider_trace: Option<LlmProviderTraceSender>,
520 ) -> CoreLlmRequest {
521 CoreLlmRequest {
522 model: self.model,
523 messages: self.messages,
524 attachments: self
525 .attachments
526 .into_iter()
527 .map(LlmAttachmentSpec::into_attachment)
528 .collect(),
529 tools: self.tools,
530 tool_choice: self.tool_choice,
531 model_variant: self.model_variant,
532 generation: self.generation,
533 session_id: self.session_id,
534 output_spec: self.output_spec,
535 stream_events,
536 provider_trace,
537 }
538 }
539}
540
541async fn attachment_specs_from_attachments(
542 attachments: &[LlmAttachment],
543 attachment_store: &dyn AttachmentStore,
544) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
545 let mut specs = Vec::with_capacity(attachments.len());
546 for attachment in attachments {
547 specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
548 }
549 Ok(specs)
550}
551
552async fn attachment_spec_from_attachment(
553 attachment: &LlmAttachment,
554 attachment_store: &dyn AttachmentStore,
555) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
556 if let Some(reference) = attachment.reference.as_ref() {
557 return Ok(LlmAttachmentSpec {
558 reference: reference.clone(),
559 });
560 }
561 if attachment.data.is_empty() {
562 return Err(RuntimeEffectControllerError::new(
563 "runtime_effect_attachment_missing_reference",
564 "runtime effect attachment has neither a durable reference nor inline bytes",
565 ));
566 }
567 let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
568 RuntimeEffectControllerError::new(
569 "runtime_effect_attachment_media_type",
570 format!(
571 "attachment media type `{}` cannot be represented durably",
572 attachment.mime
573 ),
574 )
575 })?;
576 let reference = attachment_store
577 .put(
578 attachment.data.clone(),
579 AttachmentCreateMeta::new(media_type, None, None, None),
580 )
581 .await
582 .map_err(|err| {
583 RuntimeEffectControllerError::new(
584 "runtime_effect_attachment_store",
585 format!("failed to store attachment before runtime effect invocation: {err}"),
586 )
587 })?;
588 Ok(LlmAttachmentSpec { reference })
589}
590
591impl RuntimeEffectOutcome {
592 pub fn into_llm_call(
593 self,
594 ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
595 match self {
596 Self::LlmCall {
597 result,
598 text_streamed,
599 } => Ok((result, text_streamed)),
600 other => Err(RuntimeEffectControllerError::wrong_outcome(
601 RuntimeEffectKind::LlmCall,
602 other.kind(),
603 )),
604 }
605 }
606
607 pub fn into_direct_response(
608 self,
609 ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
610 match self {
611 Self::Direct { result } => Ok(result),
612 other => Err(RuntimeEffectControllerError::wrong_outcome(
613 RuntimeEffectKind::Direct,
614 other.kind(),
615 )),
616 }
617 }
618
619 pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
620 match self {
621 Self::ToolCall { result, .. } => Ok(result),
622 other => Err(RuntimeEffectControllerError::wrong_outcome(
623 RuntimeEffectKind::ToolCall,
624 other.kind(),
625 )),
626 }
627 }
628
629 pub fn into_tool_call_effect(
630 self,
631 ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
632 match self {
633 Self::ToolCall {
634 result,
635 host_events,
636 } => Ok(ToolCallEffectOutcome {
637 result,
638 host_events,
639 }),
640 other => Err(RuntimeEffectControllerError::wrong_outcome(
641 RuntimeEffectKind::ToolCall,
642 other.kind(),
643 )),
644 }
645 }
646
647 pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
648 match self {
649 Self::Process { result } => Ok(result),
650 other => Err(RuntimeEffectControllerError::wrong_outcome(
651 RuntimeEffectKind::Process,
652 other.kind(),
653 )),
654 }
655 }
656
657 pub fn into_exec_code(
658 self,
659 ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
660 match self {
661 Self::ExecCode { result } => Ok(result),
662 other => Err(RuntimeEffectControllerError::wrong_outcome(
663 RuntimeEffectKind::ExecCode,
664 other.kind(),
665 )),
666 }
667 }
668
669 pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
670 match self {
671 Self::Checkpoint { result } => Ok(result),
672 other => Err(RuntimeEffectControllerError::wrong_outcome(
673 RuntimeEffectKind::Checkpoint,
674 other.kind(),
675 )),
676 }
677 }
678
679 pub fn into_sync_execution_surface(
680 self,
681 ) -> Result<Result<Option<ExecutionSurfaceSync>, String>, RuntimeEffectControllerError> {
682 match self {
683 Self::SyncExecutionSurface { result } => Ok(result),
684 other => Err(RuntimeEffectControllerError::wrong_outcome(
685 RuntimeEffectKind::SyncExecutionSurface,
686 other.kind(),
687 )),
688 }
689 }
690
691 pub fn kind(&self) -> RuntimeEffectKind {
692 match self {
693 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
694 Self::Direct { .. } => RuntimeEffectKind::Direct,
695 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
696 Self::Process { .. } => RuntimeEffectKind::Process,
697 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
698 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
699 Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
700 Self::Sleep => RuntimeEffectKind::Sleep,
701 }
702 }
703}