1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7 LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8 LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionEnvironmentSync, LlmCallError};
12use crate::tool_dispatch::ToolTriggerEffectOutcome;
13use crate::{
14 AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15 ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16 ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17 ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26 LlmCall,
27 Direct,
28 ToolCall,
29 Process,
30 ExecCode,
31 Checkpoint,
32 SyncExecutionEnvironment,
33 Sleep,
34 AwaitEvent,
35}
36
37impl RuntimeEffectKind {
38 pub fn as_str(self) -> &'static str {
39 match self {
40 Self::LlmCall => "llm_call",
41 Self::Direct => "direct",
42 Self::ToolCall => "tool_call",
43 Self::Process => "process",
44 Self::ExecCode => "exec_code",
45 Self::Checkpoint => "checkpoint",
46 Self::SyncExecutionEnvironment => "sync_execution_environment",
47 Self::Sleep => "sleep",
48 Self::AwaitEvent => "await_event",
49 }
50 }
51}
52
53#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
55pub struct RuntimeInvocation {
56 pub scope: RuntimeScope,
57 pub subject: RuntimeSubject,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub caused_by: Option<CausalRef>,
60 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub replay: Option<RuntimeReplay>,
62}
63
64impl RuntimeInvocation {
65 pub fn effect(
66 scope: RuntimeScope,
67 effect_id: impl Into<String>,
68 kind: RuntimeEffectKind,
69 replay_key: impl Into<String>,
70 ) -> Self {
71 Self {
72 scope,
73 subject: RuntimeSubject::Effect {
74 effect_id: effect_id.into(),
75 kind,
76 },
77 caused_by: None,
78 replay: Some(RuntimeReplay {
79 key: replay_key.into(),
80 }),
81 }
82 }
83
84 pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
85 self.caused_by = caused_by;
86 self
87 }
88
89 pub fn effect_id(&self) -> Option<&str> {
90 match &self.subject {
91 RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
92 _ => None,
93 }
94 }
95
96 pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
97 match &self.subject {
98 RuntimeSubject::Effect { kind, .. } => Some(*kind),
99 _ => None,
100 }
101 }
102
103 pub fn replay_key(&self) -> Option<&str> {
104 self.replay.as_ref().map(|replay| replay.key.as_str())
105 }
106
107 pub fn causal_ref(&self) -> Option<CausalRef> {
108 match &self.subject {
109 RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
110 session_id: self.scope.session_id.clone(),
111 turn_id: self.scope.turn_id.clone(),
112 effect_id: effect_id.clone(),
113 }),
114 RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
115 process_id: process_id.clone(),
116 }),
117 RuntimeSubject::ProcessEvent {
118 process_id,
119 sequence,
120 ..
121 } => Some(CausalRef::ProcessEvent {
122 process_id: process_id.clone(),
123 sequence: *sequence,
124 }),
125 RuntimeSubject::TriggerOccurrence { occurrence_id } => {
126 Some(CausalRef::TriggerOccurrence {
127 occurrence_id: occurrence_id.clone(),
128 })
129 }
130 RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
131 session_id: self.scope.session_id.clone(),
132 node_id: node_id.clone(),
133 }),
134 }
135 }
136}
137
138#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
139pub struct RuntimeScope {
140 pub session_id: String,
141 #[serde(default, skip_serializing_if = "Option::is_none")]
142 pub turn_id: Option<String>,
143 #[serde(default, skip_serializing_if = "Option::is_none")]
144 pub turn_index: Option<usize>,
145 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub protocol_iteration: Option<usize>,
147}
148
149impl RuntimeScope {
150 pub fn new(session_id: impl Into<String>) -> Self {
151 Self {
152 session_id: session_id.into(),
153 turn_id: None,
154 turn_index: None,
155 protocol_iteration: None,
156 }
157 }
158
159 pub fn for_turn(
160 session_id: impl Into<String>,
161 turn_id: impl Into<String>,
162 turn_index: usize,
163 protocol_iteration: usize,
164 ) -> Self {
165 Self {
166 session_id: session_id.into(),
167 turn_id: Some(turn_id.into()),
168 turn_index: Some(turn_index),
169 protocol_iteration: Some(protocol_iteration),
170 }
171 }
172}
173
174#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
175pub struct RuntimeReplay {
176 pub key: String,
177}
178
179#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
180#[serde(tag = "type", rename_all = "snake_case")]
181pub enum RuntimeSubject {
182 Effect {
183 effect_id: String,
184 kind: RuntimeEffectKind,
185 },
186 Process {
187 process_id: String,
188 },
189 ProcessEvent {
190 process_id: String,
191 sequence: u64,
192 event_type: String,
193 },
194 TriggerOccurrence {
195 occurrence_id: String,
196 },
197 SessionNode {
198 node_id: String,
199 },
200}
201
202#[derive(Clone, Debug, Serialize, Deserialize)]
204pub struct RuntimeEffectEnvelope {
205 pub invocation: RuntimeInvocation,
206 pub command: RuntimeEffectCommand,
207}
208
209impl RuntimeEffectEnvelope {
210 pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
211 Self::try_new(invocation, command).expect("valid runtime effect invocation")
212 }
213
214 pub fn try_new(
215 invocation: RuntimeInvocation,
216 command: RuntimeEffectCommand,
217 ) -> Result<Self, RuntimeEffectControllerError> {
218 validate_effect_invocation(&invocation, command.kind())?;
219 Ok(Self {
220 invocation,
221 command,
222 })
223 }
224
225 pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
226 crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
227 RuntimeEffectControllerError::new(
228 "runtime_effect_envelope_hash",
229 format!("failed to serialize runtime effect envelope: {err}"),
230 )
231 })
232 }
233}
234
235fn validate_effect_invocation(
236 invocation: &RuntimeInvocation,
237 command_kind: RuntimeEffectKind,
238) -> Result<(), RuntimeEffectControllerError> {
239 let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
240 return Err(RuntimeEffectControllerError::new(
241 "runtime_effect_invocation_subject",
242 "runtime effect envelope subject must be an effect",
243 ));
244 };
245 if effect_id.trim().is_empty() {
246 return Err(RuntimeEffectControllerError::new(
247 "runtime_effect_invocation_subject",
248 "runtime effect envelope effect id must be non-empty",
249 ));
250 }
251 if *kind != command_kind {
252 return Err(RuntimeEffectControllerError::new(
253 "runtime_effect_invocation_kind",
254 format!(
255 "runtime effect invocation kind {} does not match command kind {}",
256 kind.as_str(),
257 command_kind.as_str()
258 ),
259 ));
260 }
261 if invocation
262 .replay
263 .as_ref()
264 .is_none_or(|replay| replay.key.is_empty())
265 {
266 return Err(RuntimeEffectControllerError::new(
267 "runtime_effect_replay_required",
268 "runtime effect envelope requires replay.key",
269 ));
270 }
271 Ok(())
272}
273
274#[derive(Clone, Debug, Serialize, Deserialize)]
276#[serde(tag = "type", rename_all = "snake_case")]
277pub enum RuntimeEffectCommand {
278 LlmCall {
279 request: Box<LlmRequestSpec>,
280 },
281 Direct {
282 request: Box<LlmRequestSpec>,
283 usage_source: String,
284 },
285 ToolCall {
286 call: crate::PreparedToolCall,
287 },
288 Process {
289 command: Box<ProcessCommand>,
290 },
291 ExecCode {
292 code: String,
293 },
294 Checkpoint {
295 checkpoint: CheckpointKind,
296 },
297 SyncExecutionEnvironment {
298 update_machine_config: bool,
299 },
300 Sleep {
301 duration_ms: u64,
302 },
303 AwaitEvent {
304 key: crate::AwaitEventKey,
305 },
306}
307
308impl RuntimeEffectCommand {
309 pub fn process(command: ProcessCommand) -> Self {
310 Self::Process {
311 command: Box::new(command),
312 }
313 }
314
315 pub fn kind(&self) -> RuntimeEffectKind {
316 match self {
317 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
318 Self::Direct { .. } => RuntimeEffectKind::Direct,
319 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
320 Self::Process { .. } => RuntimeEffectKind::Process,
321 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
322 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
323 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
324 Self::Sleep { .. } => RuntimeEffectKind::Sleep,
325 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
326 }
327 }
328}
329
330#[derive(Clone, Debug, Serialize, Deserialize)]
332#[serde(tag = "op", rename_all = "snake_case")]
333pub enum ProcessCommand {
334 Start {
335 registration: ProcessRegistration,
336 #[serde(default, skip_serializing_if = "Option::is_none")]
337 grant: Option<ProcessStartGrant>,
338 #[serde(
339 default,
340 skip_serializing_if = "boxed_process_execution_context_is_empty"
341 )]
342 execution_context: Box<ProcessExecutionContext>,
343 },
344 List {
345 session_scope: SessionScope,
346 #[serde(default)]
347 mode: ProcessListMode,
348 },
349 Transfer {
350 from_scope: SessionScope,
351 to_scope: SessionScope,
352 process_ids: Vec<String>,
353 },
354 DeleteSession {
355 session_id: String,
356 },
357 Await {
358 process_id: String,
359 },
360 Cancel {
361 process_id: String,
362 reason: Option<String>,
363 },
364 Signal {
365 process_id: String,
366 signal_name: String,
367 signal_id: String,
368 request: crate::ProcessEventAppendRequest,
369 },
370}
371
372fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
373 context.is_empty()
374}
375
376type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
377
378impl ProcessCommand {
379 pub fn effect_id(&self) -> String {
380 match self {
381 Self::Start { registration, .. } => format!("process:start:{}", registration.id),
382 Self::List {
383 session_scope,
384 mode,
385 } => {
386 format!("process:list:{}:{}", session_scope.id(), mode.as_str())
387 }
388 Self::Transfer {
389 from_scope,
390 to_scope,
391 process_ids,
392 } => {
393 let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
394 .unwrap_or_else(|_| "unhashable".to_string());
395 format!(
396 "process:transfer:{}:{}:{digest}",
397 from_scope.id(),
398 to_scope.id()
399 )
400 }
401 Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
402 Self::Await { process_id } => format!("process:await:{process_id}"),
403 Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
404 Self::Signal {
405 process_id,
406 signal_name,
407 signal_id,
408 ..
409 } => {
410 format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
411 }
412 }
413 }
414}
415
416#[derive(Clone, Debug, Serialize, Deserialize)]
418#[serde(tag = "op", rename_all = "snake_case")]
419pub enum ProcessEffectOutcome {
420 Start {
421 record: ProcessRecord,
422 },
423 List {
424 entries: Vec<ProcessHandleGrantEntry>,
425 },
426 Transfer,
427 DeleteSession {
428 report: crate::ProcessSessionDeleteReport,
429 },
430 Await {
431 output: ProcessAwaitOutput,
432 },
433 Cancel {
434 record: ProcessRecord,
435 },
436 Signal {
437 event: crate::ProcessEvent,
438 },
439}
440
441#[derive(Clone, Debug, Serialize, Deserialize)]
442pub struct ToolCallEffectOutcome {
443 pub launch: ToolCallLaunch,
444 #[serde(default, skip_serializing_if = "Vec::is_empty")]
445 pub triggers: Vec<ToolTriggerEffectOutcome>,
446}
447
448#[derive(Clone, Debug, Serialize, Deserialize)]
449#[serde(tag = "status", rename_all = "snake_case")]
450pub enum ToolCallLaunch {
451 Done {
452 result: CompletedToolCall,
453 },
454 Pending {
455 key: crate::AwaitEventKey,
456 pending: crate::PendingCompletion,
457 duration_ms: u64,
458 },
459}
460
461#[derive(Clone, Debug, Serialize, Deserialize)]
463#[serde(tag = "type", rename_all = "snake_case")]
464pub enum RuntimeEffectOutcome {
465 LlmCall {
466 result: Result<LlmResponse, LlmCallError>,
467 text_streamed: bool,
468 },
469 Direct {
470 result: Result<LlmResponse, LlmCallError>,
471 },
472 ToolCall {
473 launch: ToolCallLaunch,
474 #[serde(default, skip_serializing_if = "Vec::is_empty")]
475 triggers: Vec<ToolTriggerEffectOutcome>,
476 },
477 Process {
478 result: ProcessEffectOutcome,
479 },
480 ExecCode {
481 result: Result<ExecResponse, String>,
482 },
483 Checkpoint {
484 result: CheckpointOutcome,
485 },
486 SyncExecutionEnvironment {
487 result: Result<Option<ExecutionEnvironmentSync>, String>,
488 },
489 Sleep,
490 AwaitEvent {
491 resolution: crate::Resolution,
492 },
493}
494
495#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
505pub struct LlmAttachmentSpec {
506 pub reference: AttachmentRef,
507}
508
509impl LlmAttachmentSpec {
510 fn into_attachment(self) -> LlmAttachment {
511 LlmAttachment::reference(self.reference)
512 }
513}
514
515#[derive(Clone, Debug, Serialize, Deserialize)]
519pub struct LlmRequestSpec {
520 pub model: String,
521 pub messages: Vec<LlmMessage>,
522 pub attachments: Vec<LlmAttachmentSpec>,
523 pub tools: Arc<Vec<LlmToolSpec>>,
524 pub tool_choice: LlmToolChoice,
525 pub model_variant: Option<String>,
526 #[serde(default)]
527 pub generation: crate::GenerationOptions,
528 pub session_id: Option<String>,
529 pub output_spec: Option<LlmOutputSpec>,
530}
531
532impl LlmRequestSpec {
533 pub async fn from_request(
534 request: &CoreLlmRequest,
535 attachment_store: &dyn AttachmentStore,
536 ) -> Result<Self, RuntimeEffectControllerError> {
537 Ok(Self {
538 model: request.model.clone(),
539 messages: request.messages.clone(),
540 attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
541 .await?,
542 tools: Arc::clone(&request.tools),
543 tool_choice: request.tool_choice.clone(),
544 model_variant: request.model_variant.clone(),
545 generation: request.generation.clone(),
546 session_id: request.session_id.clone(),
547 output_spec: request.output_spec.clone(),
548 })
549 }
550
551 pub fn into_request(
552 self,
553 stream_events: Option<LlmEventSender>,
554 provider_trace: Option<LlmProviderTraceSender>,
555 ) -> CoreLlmRequest {
556 CoreLlmRequest {
557 model: self.model,
558 messages: self.messages,
559 attachments: self
560 .attachments
561 .into_iter()
562 .map(LlmAttachmentSpec::into_attachment)
563 .collect(),
564 tools: self.tools,
565 tool_choice: self.tool_choice,
566 model_variant: self.model_variant,
567 generation: self.generation,
568 session_id: self.session_id,
569 output_spec: self.output_spec,
570 stream_events,
571 provider_trace,
572 }
573 }
574}
575
576async fn attachment_specs_from_attachments(
577 attachments: &[LlmAttachment],
578 attachment_store: &dyn AttachmentStore,
579) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
580 let mut specs = Vec::with_capacity(attachments.len());
581 for attachment in attachments {
582 specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
583 }
584 Ok(specs)
585}
586
587async fn attachment_spec_from_attachment(
588 attachment: &LlmAttachment,
589 attachment_store: &dyn AttachmentStore,
590) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
591 if let Some(reference) = attachment.reference.as_ref() {
592 return Ok(LlmAttachmentSpec {
593 reference: reference.clone(),
594 });
595 }
596 if attachment.data.is_empty() {
597 return Err(RuntimeEffectControllerError::new(
598 "runtime_effect_attachment_missing_reference",
599 "runtime effect attachment has neither a durable reference nor inline bytes",
600 ));
601 }
602 let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
603 RuntimeEffectControllerError::new(
604 "runtime_effect_attachment_media_type",
605 format!(
606 "attachment media type `{}` cannot be represented durably",
607 attachment.mime
608 ),
609 )
610 })?;
611 let reference = attachment_store
612 .put(
613 attachment.data.clone(),
614 AttachmentCreateMeta::new(media_type, None, None, None),
615 )
616 .await
617 .map_err(|err| {
618 RuntimeEffectControllerError::new(
619 "runtime_effect_attachment_store",
620 format!("failed to store attachment before runtime effect invocation: {err}"),
621 )
622 })?;
623 Ok(LlmAttachmentSpec { reference })
624}
625
626impl RuntimeEffectOutcome {
627 pub fn into_llm_call(
628 self,
629 ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
630 match self {
631 Self::LlmCall {
632 result,
633 text_streamed,
634 } => Ok((result, text_streamed)),
635 other => Err(RuntimeEffectControllerError::wrong_outcome(
636 RuntimeEffectKind::LlmCall,
637 other.kind(),
638 )),
639 }
640 }
641
642 pub fn into_direct_response(
643 self,
644 ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
645 match self {
646 Self::Direct { result } => Ok(result),
647 other => Err(RuntimeEffectControllerError::wrong_outcome(
648 RuntimeEffectKind::Direct,
649 other.kind(),
650 )),
651 }
652 }
653
654 pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
655 match self {
656 Self::ToolCall {
657 launch: ToolCallLaunch::Done { result },
658 ..
659 } => Ok(result),
660 Self::ToolCall {
661 launch: ToolCallLaunch::Pending { .. },
662 ..
663 } => Err(RuntimeEffectControllerError::new(
664 "runtime_effect_tool_call_pending",
665 "tool call launch is pending and has no completed output yet",
666 )),
667 other => Err(RuntimeEffectControllerError::wrong_outcome(
668 RuntimeEffectKind::ToolCall,
669 other.kind(),
670 )),
671 }
672 }
673
674 pub fn into_tool_call_effect(
675 self,
676 ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
677 match self {
678 Self::ToolCall { launch, triggers } => Ok(ToolCallEffectOutcome { launch, triggers }),
679 other => Err(RuntimeEffectControllerError::wrong_outcome(
680 RuntimeEffectKind::ToolCall,
681 other.kind(),
682 )),
683 }
684 }
685
686 pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
687 match self {
688 Self::Process { result } => Ok(result),
689 other => Err(RuntimeEffectControllerError::wrong_outcome(
690 RuntimeEffectKind::Process,
691 other.kind(),
692 )),
693 }
694 }
695
696 pub fn into_exec_code(
697 self,
698 ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
699 match self {
700 Self::ExecCode { result } => Ok(result),
701 other => Err(RuntimeEffectControllerError::wrong_outcome(
702 RuntimeEffectKind::ExecCode,
703 other.kind(),
704 )),
705 }
706 }
707
708 pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
709 match self {
710 Self::Checkpoint { result } => Ok(result),
711 other => Err(RuntimeEffectControllerError::wrong_outcome(
712 RuntimeEffectKind::Checkpoint,
713 other.kind(),
714 )),
715 }
716 }
717
718 pub fn into_sync_execution_environment(
719 self,
720 ) -> Result<Result<Option<ExecutionEnvironmentSync>, String>, RuntimeEffectControllerError>
721 {
722 match self {
723 Self::SyncExecutionEnvironment { result } => Ok(result),
724 other => Err(RuntimeEffectControllerError::wrong_outcome(
725 RuntimeEffectKind::SyncExecutionEnvironment,
726 other.kind(),
727 )),
728 }
729 }
730
731 pub fn into_await_event(self) -> Result<crate::Resolution, RuntimeEffectControllerError> {
732 match self {
733 Self::AwaitEvent { resolution } => Ok(resolution),
734 other => Err(RuntimeEffectControllerError::wrong_outcome(
735 RuntimeEffectKind::AwaitEvent,
736 other.kind(),
737 )),
738 }
739 }
740
741 pub fn kind(&self) -> RuntimeEffectKind {
742 match self {
743 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
744 Self::Direct { .. } => RuntimeEffectKind::Direct,
745 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
746 Self::Process { .. } => RuntimeEffectKind::Process,
747 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
748 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
749 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
750 Self::Sleep => RuntimeEffectKind::Sleep,
751 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
752 }
753 }
754}