1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7 LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8 LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionSurfaceSync, LlmCallError};
12use crate::tool_dispatch::ToolHostEventEffectOutcome;
13use crate::{
14 AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15 ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16 ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration, ProcessScope,
17 ProcessStartGrant,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26 LlmCall,
27 Direct,
28 ToolCall,
29 Process,
30 ExecCode,
31 Checkpoint,
32 SyncExecutionSurface,
33 Sleep,
34}
35
36impl RuntimeEffectKind {
37 pub fn as_str(self) -> &'static str {
38 match self {
39 Self::LlmCall => "llm_call",
40 Self::Direct => "direct",
41 Self::ToolCall => "tool_call",
42 Self::Process => "process",
43 Self::ExecCode => "exec_code",
44 Self::Checkpoint => "checkpoint",
45 Self::SyncExecutionSurface => "sync_execution_surface",
46 Self::Sleep => "sleep",
47 }
48 }
49}
50
51#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
53pub struct RuntimeInvocation {
54 pub scope: RuntimeScope,
55 pub subject: RuntimeSubject,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub caused_by: Option<CausalRef>,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub replay: Option<RuntimeReplay>,
60}
61
62impl RuntimeInvocation {
63 pub fn effect(
64 scope: RuntimeScope,
65 effect_id: impl Into<String>,
66 kind: RuntimeEffectKind,
67 replay_key: impl Into<String>,
68 ) -> Self {
69 Self {
70 scope,
71 subject: RuntimeSubject::Effect {
72 effect_id: effect_id.into(),
73 kind,
74 },
75 caused_by: None,
76 replay: Some(RuntimeReplay {
77 key: replay_key.into(),
78 }),
79 }
80 }
81
82 pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
83 self.caused_by = caused_by;
84 self
85 }
86
87 pub fn effect_id(&self) -> Option<&str> {
88 match &self.subject {
89 RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
90 _ => None,
91 }
92 }
93
94 pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
95 match &self.subject {
96 RuntimeSubject::Effect { kind, .. } => Some(*kind),
97 _ => None,
98 }
99 }
100
101 pub fn replay_key(&self) -> Option<&str> {
102 self.replay.as_ref().map(|replay| replay.key.as_str())
103 }
104
105 pub fn causal_ref(&self) -> Option<CausalRef> {
106 match &self.subject {
107 RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
108 session_id: self.scope.session_id.clone(),
109 turn_id: self.scope.turn_id.clone(),
110 effect_id: effect_id.clone(),
111 }),
112 RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
113 process_id: process_id.clone(),
114 }),
115 RuntimeSubject::ProcessEvent {
116 process_id,
117 sequence,
118 ..
119 } => Some(CausalRef::ProcessEvent {
120 process_id: process_id.clone(),
121 sequence: *sequence,
122 }),
123 RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
124 session_id: self.scope.session_id.clone(),
125 node_id: node_id.clone(),
126 }),
127 }
128 }
129}
130
131#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
132pub struct RuntimeScope {
133 pub session_id: String,
134 #[serde(default, skip_serializing_if = "Option::is_none")]
135 pub turn_id: Option<String>,
136 #[serde(default, skip_serializing_if = "Option::is_none")]
137 pub turn_index: Option<usize>,
138 #[serde(default, skip_serializing_if = "Option::is_none")]
139 pub protocol_iteration: Option<usize>,
140}
141
142impl RuntimeScope {
143 pub fn new(session_id: impl Into<String>) -> Self {
144 Self {
145 session_id: session_id.into(),
146 turn_id: None,
147 turn_index: None,
148 protocol_iteration: None,
149 }
150 }
151
152 pub fn for_turn(
153 session_id: impl Into<String>,
154 turn_id: impl Into<String>,
155 turn_index: usize,
156 protocol_iteration: usize,
157 ) -> Self {
158 Self {
159 session_id: session_id.into(),
160 turn_id: Some(turn_id.into()),
161 turn_index: Some(turn_index),
162 protocol_iteration: Some(protocol_iteration),
163 }
164 }
165}
166
167#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
168pub struct RuntimeReplay {
169 pub key: String,
170}
171
172#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
173#[serde(tag = "type", rename_all = "snake_case")]
174pub enum RuntimeSubject {
175 Effect {
176 effect_id: String,
177 kind: RuntimeEffectKind,
178 },
179 Process {
180 process_id: String,
181 },
182 ProcessEvent {
183 process_id: String,
184 sequence: u64,
185 event_type: String,
186 },
187 SessionNode {
188 node_id: String,
189 },
190}
191
192#[derive(Clone, Debug, Serialize, Deserialize)]
194pub struct RuntimeEffectEnvelope {
195 pub invocation: RuntimeInvocation,
196 pub command: RuntimeEffectCommand,
197}
198
199impl RuntimeEffectEnvelope {
200 pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
201 Self::try_new(invocation, command).expect("valid runtime effect invocation")
202 }
203
204 pub fn try_new(
205 invocation: RuntimeInvocation,
206 command: RuntimeEffectCommand,
207 ) -> Result<Self, RuntimeEffectControllerError> {
208 validate_effect_invocation(&invocation, command.kind())?;
209 Ok(Self {
210 invocation,
211 command,
212 })
213 }
214
215 pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
216 crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
217 RuntimeEffectControllerError::new(
218 "runtime_effect_envelope_hash",
219 format!("failed to serialize runtime effect envelope: {err}"),
220 )
221 })
222 }
223}
224
225fn validate_effect_invocation(
226 invocation: &RuntimeInvocation,
227 command_kind: RuntimeEffectKind,
228) -> Result<(), RuntimeEffectControllerError> {
229 let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
230 return Err(RuntimeEffectControllerError::new(
231 "runtime_effect_invocation_subject",
232 "runtime effect envelope subject must be an effect",
233 ));
234 };
235 if effect_id.trim().is_empty() {
236 return Err(RuntimeEffectControllerError::new(
237 "runtime_effect_invocation_subject",
238 "runtime effect envelope effect id must be non-empty",
239 ));
240 }
241 if *kind != command_kind {
242 return Err(RuntimeEffectControllerError::new(
243 "runtime_effect_invocation_kind",
244 format!(
245 "runtime effect invocation kind {} does not match command kind {}",
246 kind.as_str(),
247 command_kind.as_str()
248 ),
249 ));
250 }
251 if invocation
252 .replay
253 .as_ref()
254 .is_none_or(|replay| replay.key.is_empty())
255 {
256 return Err(RuntimeEffectControllerError::new(
257 "runtime_effect_replay_required",
258 "runtime effect envelope requires replay.key",
259 ));
260 }
261 Ok(())
262}
263
264#[derive(Clone, Debug, Serialize, Deserialize)]
266#[serde(tag = "type", rename_all = "snake_case")]
267pub enum RuntimeEffectCommand {
268 LlmCall {
269 request: Box<LlmRequestSpec>,
270 },
271 Direct {
272 request: Box<LlmRequestSpec>,
273 usage_source: String,
274 },
275 ToolCall {
276 call: crate::PreparedToolCall,
277 },
278 Process {
279 command: ProcessCommand,
280 },
281 ExecCode {
282 code: String,
283 },
284 Checkpoint {
285 checkpoint: CheckpointKind,
286 },
287 SyncExecutionSurface {
288 update_machine_config: bool,
289 },
290 Sleep {
291 duration_ms: u64,
292 },
293}
294
295impl RuntimeEffectCommand {
296 pub fn kind(&self) -> RuntimeEffectKind {
297 match self {
298 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
299 Self::Direct { .. } => RuntimeEffectKind::Direct,
300 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
301 Self::Process { .. } => RuntimeEffectKind::Process,
302 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
303 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
304 Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
305 Self::Sleep { .. } => RuntimeEffectKind::Sleep,
306 }
307 }
308}
309
310#[derive(Clone, Debug, Serialize, Deserialize)]
312#[serde(tag = "op", rename_all = "snake_case")]
313pub enum ProcessCommand {
314 Start {
315 registration: ProcessRegistration,
316 #[serde(default, skip_serializing_if = "Option::is_none")]
317 grant: Option<ProcessStartGrant>,
318 #[serde(
319 default,
320 skip_serializing_if = "boxed_process_execution_context_is_empty"
321 )]
322 execution_context: Box<ProcessExecutionContext>,
323 },
324 List {
325 owner_scope: ProcessScope,
326 #[serde(default)]
327 mode: ProcessListMode,
328 },
329 Transfer {
330 from_scope: ProcessScope,
331 to_scope: ProcessScope,
332 process_ids: Vec<String>,
333 },
334 DeleteSession {
335 session_id: String,
336 },
337 Await {
338 process_id: String,
339 },
340 Cancel {
341 process_id: String,
342 reason: Option<String>,
343 },
344 Signal {
345 process_id: String,
346 signal_id: String,
347 request: crate::ProcessEventAppendRequest,
348 },
349}
350
351fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
352 context.is_empty()
353}
354
355type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
356
357impl ProcessCommand {
358 pub fn effect_id(&self) -> String {
359 match self {
360 Self::Start { registration, .. } => format!("process:start:{}", registration.id),
361 Self::List { owner_scope, mode } => {
362 format!("process:list:{}:{}", owner_scope.id(), mode.as_str())
363 }
364 Self::Transfer {
365 from_scope,
366 to_scope,
367 process_ids,
368 } => {
369 let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
370 .unwrap_or_else(|_| "unhashable".to_string());
371 format!(
372 "process:transfer:{}:{}:{digest}",
373 from_scope.id(),
374 to_scope.id()
375 )
376 }
377 Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
378 Self::Await { process_id } => format!("process:await:{process_id}"),
379 Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
380 Self::Signal {
381 process_id,
382 signal_id,
383 ..
384 } => {
385 format!("process:signal:{process_id}:{signal_id}")
386 }
387 }
388 }
389}
390
391#[derive(Clone, Debug, Serialize, Deserialize)]
393#[serde(tag = "op", rename_all = "snake_case")]
394pub enum ProcessEffectOutcome {
395 Start {
396 record: ProcessRecord,
397 },
398 List {
399 entries: Vec<ProcessHandleGrantEntry>,
400 },
401 Transfer,
402 DeleteSession {
403 report: crate::ProcessSessionDeleteReport,
404 },
405 Await {
406 output: ProcessAwaitOutput,
407 },
408 Cancel {
409 record: ProcessRecord,
410 },
411 Signal {
412 event: crate::ProcessEvent,
413 },
414}
415
416#[derive(Clone, Debug, Serialize, Deserialize)]
417pub struct ToolCallEffectOutcome {
418 pub result: CompletedToolCall,
419 #[serde(default, skip_serializing_if = "Vec::is_empty")]
420 pub host_events: Vec<ToolHostEventEffectOutcome>,
421}
422
423#[derive(Clone, Debug, Serialize, Deserialize)]
425#[serde(tag = "type", rename_all = "snake_case")]
426pub enum RuntimeEffectOutcome {
427 LlmCall {
428 result: Result<LlmResponse, LlmCallError>,
429 text_streamed: bool,
430 },
431 Direct {
432 result: Result<LlmResponse, LlmCallError>,
433 },
434 ToolCall {
435 result: CompletedToolCall,
436 #[serde(default, skip_serializing_if = "Vec::is_empty")]
437 host_events: Vec<ToolHostEventEffectOutcome>,
438 },
439 Process {
440 result: ProcessEffectOutcome,
441 },
442 ExecCode {
443 result: Result<ExecResponse, String>,
444 },
445 Checkpoint {
446 result: CheckpointOutcome,
447 },
448 SyncExecutionSurface {
449 result: Result<Option<ExecutionSurfaceSync>, String>,
450 },
451 Sleep,
452}
453
454#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
464pub struct LlmAttachmentSpec {
465 pub reference: AttachmentRef,
466}
467
468impl LlmAttachmentSpec {
469 fn into_attachment(self) -> LlmAttachment {
470 LlmAttachment::reference(self.reference)
471 }
472}
473
474#[derive(Clone, Debug, Serialize, Deserialize)]
478pub struct LlmRequestSpec {
479 pub model: String,
480 pub messages: Vec<LlmMessage>,
481 pub attachments: Vec<LlmAttachmentSpec>,
482 pub tools: Arc<Vec<LlmToolSpec>>,
483 pub tool_choice: LlmToolChoice,
484 pub model_variant: Option<String>,
485 #[serde(default)]
486 pub generation: crate::GenerationOptions,
487 pub session_id: Option<String>,
488 pub output_spec: Option<LlmOutputSpec>,
489}
490
491impl LlmRequestSpec {
492 pub fn from_request(
493 request: &CoreLlmRequest,
494 attachment_store: &dyn AttachmentStore,
495 ) -> Result<Self, RuntimeEffectControllerError> {
496 Ok(Self {
497 model: request.model.clone(),
498 messages: request.messages.clone(),
499 attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)?,
500 tools: Arc::clone(&request.tools),
501 tool_choice: request.tool_choice.clone(),
502 model_variant: request.model_variant.clone(),
503 generation: request.generation.clone(),
504 session_id: request.session_id.clone(),
505 output_spec: request.output_spec.clone(),
506 })
507 }
508
509 pub fn into_request(
510 self,
511 stream_events: Option<LlmEventSender>,
512 provider_trace: Option<LlmProviderTraceSender>,
513 ) -> CoreLlmRequest {
514 CoreLlmRequest {
515 model: self.model,
516 messages: self.messages,
517 attachments: self
518 .attachments
519 .into_iter()
520 .map(LlmAttachmentSpec::into_attachment)
521 .collect(),
522 tools: self.tools,
523 tool_choice: self.tool_choice,
524 model_variant: self.model_variant,
525 generation: self.generation,
526 session_id: self.session_id,
527 output_spec: self.output_spec,
528 stream_events,
529 provider_trace,
530 }
531 }
532}
533
534fn attachment_specs_from_attachments(
535 attachments: &[LlmAttachment],
536 attachment_store: &dyn AttachmentStore,
537) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
538 attachments
539 .iter()
540 .map(|attachment| attachment_spec_from_attachment(attachment, attachment_store))
541 .collect()
542}
543
544fn attachment_spec_from_attachment(
545 attachment: &LlmAttachment,
546 attachment_store: &dyn AttachmentStore,
547) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
548 if let Some(reference) = attachment.reference.as_ref() {
549 return Ok(LlmAttachmentSpec {
550 reference: reference.clone(),
551 });
552 }
553 if attachment.data.is_empty() {
554 return Err(RuntimeEffectControllerError::new(
555 "runtime_effect_attachment_missing_reference",
556 "runtime effect attachment has neither a durable reference nor inline bytes",
557 ));
558 }
559 let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
560 RuntimeEffectControllerError::new(
561 "runtime_effect_attachment_media_type",
562 format!(
563 "attachment media type `{}` cannot be represented durably",
564 attachment.mime
565 ),
566 )
567 })?;
568 let reference = attachment_store
569 .put(
570 attachment.data.clone(),
571 AttachmentCreateMeta::new(media_type, None, None, None),
572 )
573 .map_err(|err| {
574 RuntimeEffectControllerError::new(
575 "runtime_effect_attachment_store",
576 format!("failed to store attachment before runtime effect invocation: {err}"),
577 )
578 })?;
579 Ok(LlmAttachmentSpec { reference })
580}
581
582impl RuntimeEffectOutcome {
583 pub fn into_llm_call(
584 self,
585 ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
586 match self {
587 Self::LlmCall {
588 result,
589 text_streamed,
590 } => Ok((result, text_streamed)),
591 other => Err(RuntimeEffectControllerError::wrong_outcome(
592 RuntimeEffectKind::LlmCall,
593 other.kind(),
594 )),
595 }
596 }
597
598 pub fn into_direct_response(
599 self,
600 ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
601 match self {
602 Self::Direct { result } => Ok(result),
603 other => Err(RuntimeEffectControllerError::wrong_outcome(
604 RuntimeEffectKind::Direct,
605 other.kind(),
606 )),
607 }
608 }
609
610 pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
611 match self {
612 Self::ToolCall { result, .. } => Ok(result),
613 other => Err(RuntimeEffectControllerError::wrong_outcome(
614 RuntimeEffectKind::ToolCall,
615 other.kind(),
616 )),
617 }
618 }
619
620 pub fn into_tool_call_effect(
621 self,
622 ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
623 match self {
624 Self::ToolCall {
625 result,
626 host_events,
627 } => Ok(ToolCallEffectOutcome {
628 result,
629 host_events,
630 }),
631 other => Err(RuntimeEffectControllerError::wrong_outcome(
632 RuntimeEffectKind::ToolCall,
633 other.kind(),
634 )),
635 }
636 }
637
638 pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
639 match self {
640 Self::Process { result } => Ok(result),
641 other => Err(RuntimeEffectControllerError::wrong_outcome(
642 RuntimeEffectKind::Process,
643 other.kind(),
644 )),
645 }
646 }
647
648 pub fn into_exec_code(
649 self,
650 ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
651 match self {
652 Self::ExecCode { result } => Ok(result),
653 other => Err(RuntimeEffectControllerError::wrong_outcome(
654 RuntimeEffectKind::ExecCode,
655 other.kind(),
656 )),
657 }
658 }
659
660 pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
661 match self {
662 Self::Checkpoint { result } => Ok(result),
663 other => Err(RuntimeEffectControllerError::wrong_outcome(
664 RuntimeEffectKind::Checkpoint,
665 other.kind(),
666 )),
667 }
668 }
669
670 pub fn into_sync_execution_surface(
671 self,
672 ) -> Result<Result<Option<ExecutionSurfaceSync>, String>, RuntimeEffectControllerError> {
673 match self {
674 Self::SyncExecutionSurface { result } => Ok(result),
675 other => Err(RuntimeEffectControllerError::wrong_outcome(
676 RuntimeEffectKind::SyncExecutionSurface,
677 other.kind(),
678 )),
679 }
680 }
681
682 pub fn kind(&self) -> RuntimeEffectKind {
683 match self {
684 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
685 Self::Direct { .. } => RuntimeEffectKind::Direct,
686 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
687 Self::Process { .. } => RuntimeEffectKind::Process,
688 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
689 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
690 Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
691 Self::Sleep => RuntimeEffectKind::Sleep,
692 }
693 }
694}