1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7 LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8 LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionSurfaceSync, LlmCallError};
12use crate::tool_dispatch::ToolHostEventEffectOutcome;
13use crate::{
14 AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15 ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16 ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17 ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26 LlmCall,
27 Direct,
28 ToolCall,
29 Process,
30 ExecCode,
31 Checkpoint,
32 SyncExecutionSurface,
33 Sleep,
34 AwaitEvent,
35}
36
37impl RuntimeEffectKind {
38 pub fn as_str(self) -> &'static str {
39 match self {
40 Self::LlmCall => "llm_call",
41 Self::Direct => "direct",
42 Self::ToolCall => "tool_call",
43 Self::Process => "process",
44 Self::ExecCode => "exec_code",
45 Self::Checkpoint => "checkpoint",
46 Self::SyncExecutionSurface => "sync_execution_surface",
47 Self::Sleep => "sleep",
48 Self::AwaitEvent => "await_event",
49 }
50 }
51}
52
53#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
55pub struct RuntimeInvocation {
56 pub scope: RuntimeScope,
57 pub subject: RuntimeSubject,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub caused_by: Option<CausalRef>,
60 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub replay: Option<RuntimeReplay>,
62}
63
64impl RuntimeInvocation {
65 pub fn effect(
66 scope: RuntimeScope,
67 effect_id: impl Into<String>,
68 kind: RuntimeEffectKind,
69 replay_key: impl Into<String>,
70 ) -> Self {
71 Self {
72 scope,
73 subject: RuntimeSubject::Effect {
74 effect_id: effect_id.into(),
75 kind,
76 },
77 caused_by: None,
78 replay: Some(RuntimeReplay {
79 key: replay_key.into(),
80 }),
81 }
82 }
83
84 pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
85 self.caused_by = caused_by;
86 self
87 }
88
89 pub fn effect_id(&self) -> Option<&str> {
90 match &self.subject {
91 RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
92 _ => None,
93 }
94 }
95
96 pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
97 match &self.subject {
98 RuntimeSubject::Effect { kind, .. } => Some(*kind),
99 _ => None,
100 }
101 }
102
103 pub fn replay_key(&self) -> Option<&str> {
104 self.replay.as_ref().map(|replay| replay.key.as_str())
105 }
106
107 pub fn causal_ref(&self) -> Option<CausalRef> {
108 match &self.subject {
109 RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
110 session_id: self.scope.session_id.clone(),
111 turn_id: self.scope.turn_id.clone(),
112 effect_id: effect_id.clone(),
113 }),
114 RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
115 process_id: process_id.clone(),
116 }),
117 RuntimeSubject::ProcessEvent {
118 process_id,
119 sequence,
120 ..
121 } => Some(CausalRef::ProcessEvent {
122 process_id: process_id.clone(),
123 sequence: *sequence,
124 }),
125 RuntimeSubject::HostEvent { occurrence_id } => Some(CausalRef::HostEvent {
126 occurrence_id: occurrence_id.clone(),
127 }),
128 RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
129 session_id: self.scope.session_id.clone(),
130 node_id: node_id.clone(),
131 }),
132 }
133 }
134}
135
136#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
137pub struct RuntimeScope {
138 pub session_id: String,
139 #[serde(default, skip_serializing_if = "Option::is_none")]
140 pub turn_id: Option<String>,
141 #[serde(default, skip_serializing_if = "Option::is_none")]
142 pub turn_index: Option<usize>,
143 #[serde(default, skip_serializing_if = "Option::is_none")]
144 pub protocol_iteration: Option<usize>,
145}
146
147impl RuntimeScope {
148 pub fn new(session_id: impl Into<String>) -> Self {
149 Self {
150 session_id: session_id.into(),
151 turn_id: None,
152 turn_index: None,
153 protocol_iteration: None,
154 }
155 }
156
157 pub fn for_turn(
158 session_id: impl Into<String>,
159 turn_id: impl Into<String>,
160 turn_index: usize,
161 protocol_iteration: usize,
162 ) -> Self {
163 Self {
164 session_id: session_id.into(),
165 turn_id: Some(turn_id.into()),
166 turn_index: Some(turn_index),
167 protocol_iteration: Some(protocol_iteration),
168 }
169 }
170}
171
172#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
173pub struct RuntimeReplay {
174 pub key: String,
175}
176
177#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
178#[serde(tag = "type", rename_all = "snake_case")]
179pub enum RuntimeSubject {
180 Effect {
181 effect_id: String,
182 kind: RuntimeEffectKind,
183 },
184 Process {
185 process_id: String,
186 },
187 ProcessEvent {
188 process_id: String,
189 sequence: u64,
190 event_type: String,
191 },
192 HostEvent {
193 occurrence_id: String,
194 },
195 SessionNode {
196 node_id: String,
197 },
198}
199
200#[derive(Clone, Debug, Serialize, Deserialize)]
202pub struct RuntimeEffectEnvelope {
203 pub invocation: RuntimeInvocation,
204 pub command: RuntimeEffectCommand,
205}
206
207impl RuntimeEffectEnvelope {
208 pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
209 Self::try_new(invocation, command).expect("valid runtime effect invocation")
210 }
211
212 pub fn try_new(
213 invocation: RuntimeInvocation,
214 command: RuntimeEffectCommand,
215 ) -> Result<Self, RuntimeEffectControllerError> {
216 validate_effect_invocation(&invocation, command.kind())?;
217 Ok(Self {
218 invocation,
219 command,
220 })
221 }
222
223 pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
224 crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
225 RuntimeEffectControllerError::new(
226 "runtime_effect_envelope_hash",
227 format!("failed to serialize runtime effect envelope: {err}"),
228 )
229 })
230 }
231}
232
233fn validate_effect_invocation(
234 invocation: &RuntimeInvocation,
235 command_kind: RuntimeEffectKind,
236) -> Result<(), RuntimeEffectControllerError> {
237 let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
238 return Err(RuntimeEffectControllerError::new(
239 "runtime_effect_invocation_subject",
240 "runtime effect envelope subject must be an effect",
241 ));
242 };
243 if effect_id.trim().is_empty() {
244 return Err(RuntimeEffectControllerError::new(
245 "runtime_effect_invocation_subject",
246 "runtime effect envelope effect id must be non-empty",
247 ));
248 }
249 if *kind != command_kind {
250 return Err(RuntimeEffectControllerError::new(
251 "runtime_effect_invocation_kind",
252 format!(
253 "runtime effect invocation kind {} does not match command kind {}",
254 kind.as_str(),
255 command_kind.as_str()
256 ),
257 ));
258 }
259 if invocation
260 .replay
261 .as_ref()
262 .is_none_or(|replay| replay.key.is_empty())
263 {
264 return Err(RuntimeEffectControllerError::new(
265 "runtime_effect_replay_required",
266 "runtime effect envelope requires replay.key",
267 ));
268 }
269 Ok(())
270}
271
272#[derive(Clone, Debug, Serialize, Deserialize)]
274#[serde(tag = "type", rename_all = "snake_case")]
275pub enum RuntimeEffectCommand {
276 LlmCall {
277 request: Box<LlmRequestSpec>,
278 },
279 Direct {
280 request: Box<LlmRequestSpec>,
281 usage_source: String,
282 },
283 ToolCall {
284 call: crate::PreparedToolCall,
285 },
286 Process {
287 command: Box<ProcessCommand>,
288 },
289 ExecCode {
290 code: String,
291 },
292 Checkpoint {
293 checkpoint: CheckpointKind,
294 },
295 SyncExecutionSurface {
296 update_machine_config: bool,
297 },
298 Sleep {
299 duration_ms: u64,
300 },
301 AwaitEvent {
302 key: String,
303 },
304}
305
306impl RuntimeEffectCommand {
307 pub fn process(command: ProcessCommand) -> Self {
308 Self::Process {
309 command: Box::new(command),
310 }
311 }
312
313 pub fn kind(&self) -> RuntimeEffectKind {
314 match self {
315 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
316 Self::Direct { .. } => RuntimeEffectKind::Direct,
317 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
318 Self::Process { .. } => RuntimeEffectKind::Process,
319 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
320 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
321 Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
322 Self::Sleep { .. } => RuntimeEffectKind::Sleep,
323 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
324 }
325 }
326}
327
328#[derive(Clone, Debug, Serialize, Deserialize)]
330#[serde(tag = "op", rename_all = "snake_case")]
331pub enum ProcessCommand {
332 Start {
333 registration: ProcessRegistration,
334 #[serde(default, skip_serializing_if = "Option::is_none")]
335 grant: Option<ProcessStartGrant>,
336 #[serde(
337 default,
338 skip_serializing_if = "boxed_process_execution_context_is_empty"
339 )]
340 execution_context: Box<ProcessExecutionContext>,
341 },
342 List {
343 session_scope: SessionScope,
344 #[serde(default)]
345 mode: ProcessListMode,
346 },
347 Transfer {
348 from_scope: SessionScope,
349 to_scope: SessionScope,
350 process_ids: Vec<String>,
351 },
352 DeleteSession {
353 session_id: String,
354 },
355 Await {
356 process_id: String,
357 },
358 Cancel {
359 process_id: String,
360 reason: Option<String>,
361 },
362 Signal {
363 process_id: String,
364 signal_name: String,
365 signal_id: String,
366 request: crate::ProcessEventAppendRequest,
367 },
368}
369
370fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
371 context.is_empty()
372}
373
374type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
375
376impl ProcessCommand {
377 pub fn effect_id(&self) -> String {
378 match self {
379 Self::Start { registration, .. } => format!("process:start:{}", registration.id),
380 Self::List {
381 session_scope,
382 mode,
383 } => {
384 format!("process:list:{}:{}", session_scope.id(), mode.as_str())
385 }
386 Self::Transfer {
387 from_scope,
388 to_scope,
389 process_ids,
390 } => {
391 let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
392 .unwrap_or_else(|_| "unhashable".to_string());
393 format!(
394 "process:transfer:{}:{}:{digest}",
395 from_scope.id(),
396 to_scope.id()
397 )
398 }
399 Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
400 Self::Await { process_id } => format!("process:await:{process_id}"),
401 Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
402 Self::Signal {
403 process_id,
404 signal_name,
405 signal_id,
406 ..
407 } => {
408 format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
409 }
410 }
411 }
412}
413
414#[derive(Clone, Debug, Serialize, Deserialize)]
416#[serde(tag = "op", rename_all = "snake_case")]
417pub enum ProcessEffectOutcome {
418 Start {
419 record: ProcessRecord,
420 },
421 List {
422 entries: Vec<ProcessHandleGrantEntry>,
423 },
424 Transfer,
425 DeleteSession {
426 report: crate::ProcessSessionDeleteReport,
427 },
428 Await {
429 output: ProcessAwaitOutput,
430 },
431 Cancel {
432 record: ProcessRecord,
433 },
434 Signal {
435 event: crate::ProcessEvent,
436 },
437}
438
439#[derive(Clone, Debug, Serialize, Deserialize)]
440pub struct ToolCallEffectOutcome {
441 pub result: CompletedToolCall,
442 #[serde(default, skip_serializing_if = "Vec::is_empty")]
443 pub host_events: Vec<ToolHostEventEffectOutcome>,
444}
445
446#[derive(Clone, Debug, Serialize, Deserialize)]
448#[serde(tag = "type", rename_all = "snake_case")]
449pub enum RuntimeEffectOutcome {
450 LlmCall {
451 result: Result<LlmResponse, LlmCallError>,
452 text_streamed: bool,
453 },
454 Direct {
455 result: Result<LlmResponse, LlmCallError>,
456 },
457 ToolCall {
458 result: CompletedToolCall,
459 #[serde(default, skip_serializing_if = "Vec::is_empty")]
460 host_events: Vec<ToolHostEventEffectOutcome>,
461 },
462 Process {
463 result: ProcessEffectOutcome,
464 },
465 ExecCode {
466 result: Result<ExecResponse, String>,
467 },
468 Checkpoint {
469 result: CheckpointOutcome,
470 },
471 SyncExecutionSurface {
472 result: Result<Option<ExecutionSurfaceSync>, String>,
473 },
474 Sleep,
475 AwaitEvent {
476 payload: serde_json::Value,
477 },
478}
479
480#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
490pub struct LlmAttachmentSpec {
491 pub reference: AttachmentRef,
492}
493
494impl LlmAttachmentSpec {
495 fn into_attachment(self) -> LlmAttachment {
496 LlmAttachment::reference(self.reference)
497 }
498}
499
500#[derive(Clone, Debug, Serialize, Deserialize)]
504pub struct LlmRequestSpec {
505 pub model: String,
506 pub messages: Vec<LlmMessage>,
507 pub attachments: Vec<LlmAttachmentSpec>,
508 pub tools: Arc<Vec<LlmToolSpec>>,
509 pub tool_choice: LlmToolChoice,
510 pub model_variant: Option<String>,
511 #[serde(default)]
512 pub generation: crate::GenerationOptions,
513 pub session_id: Option<String>,
514 pub output_spec: Option<LlmOutputSpec>,
515}
516
517impl LlmRequestSpec {
518 pub async fn from_request(
519 request: &CoreLlmRequest,
520 attachment_store: &dyn AttachmentStore,
521 ) -> Result<Self, RuntimeEffectControllerError> {
522 Ok(Self {
523 model: request.model.clone(),
524 messages: request.messages.clone(),
525 attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
526 .await?,
527 tools: Arc::clone(&request.tools),
528 tool_choice: request.tool_choice.clone(),
529 model_variant: request.model_variant.clone(),
530 generation: request.generation.clone(),
531 session_id: request.session_id.clone(),
532 output_spec: request.output_spec.clone(),
533 })
534 }
535
536 pub fn into_request(
537 self,
538 stream_events: Option<LlmEventSender>,
539 provider_trace: Option<LlmProviderTraceSender>,
540 ) -> CoreLlmRequest {
541 CoreLlmRequest {
542 model: self.model,
543 messages: self.messages,
544 attachments: self
545 .attachments
546 .into_iter()
547 .map(LlmAttachmentSpec::into_attachment)
548 .collect(),
549 tools: self.tools,
550 tool_choice: self.tool_choice,
551 model_variant: self.model_variant,
552 generation: self.generation,
553 session_id: self.session_id,
554 output_spec: self.output_spec,
555 stream_events,
556 provider_trace,
557 }
558 }
559}
560
561async fn attachment_specs_from_attachments(
562 attachments: &[LlmAttachment],
563 attachment_store: &dyn AttachmentStore,
564) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
565 let mut specs = Vec::with_capacity(attachments.len());
566 for attachment in attachments {
567 specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
568 }
569 Ok(specs)
570}
571
572async fn attachment_spec_from_attachment(
573 attachment: &LlmAttachment,
574 attachment_store: &dyn AttachmentStore,
575) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
576 if let Some(reference) = attachment.reference.as_ref() {
577 return Ok(LlmAttachmentSpec {
578 reference: reference.clone(),
579 });
580 }
581 if attachment.data.is_empty() {
582 return Err(RuntimeEffectControllerError::new(
583 "runtime_effect_attachment_missing_reference",
584 "runtime effect attachment has neither a durable reference nor inline bytes",
585 ));
586 }
587 let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
588 RuntimeEffectControllerError::new(
589 "runtime_effect_attachment_media_type",
590 format!(
591 "attachment media type `{}` cannot be represented durably",
592 attachment.mime
593 ),
594 )
595 })?;
596 let reference = attachment_store
597 .put(
598 attachment.data.clone(),
599 AttachmentCreateMeta::new(media_type, None, None, None),
600 )
601 .await
602 .map_err(|err| {
603 RuntimeEffectControllerError::new(
604 "runtime_effect_attachment_store",
605 format!("failed to store attachment before runtime effect invocation: {err}"),
606 )
607 })?;
608 Ok(LlmAttachmentSpec { reference })
609}
610
611impl RuntimeEffectOutcome {
612 pub fn into_llm_call(
613 self,
614 ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
615 match self {
616 Self::LlmCall {
617 result,
618 text_streamed,
619 } => Ok((result, text_streamed)),
620 other => Err(RuntimeEffectControllerError::wrong_outcome(
621 RuntimeEffectKind::LlmCall,
622 other.kind(),
623 )),
624 }
625 }
626
627 pub fn into_direct_response(
628 self,
629 ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
630 match self {
631 Self::Direct { result } => Ok(result),
632 other => Err(RuntimeEffectControllerError::wrong_outcome(
633 RuntimeEffectKind::Direct,
634 other.kind(),
635 )),
636 }
637 }
638
639 pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
640 match self {
641 Self::ToolCall { result, .. } => Ok(result),
642 other => Err(RuntimeEffectControllerError::wrong_outcome(
643 RuntimeEffectKind::ToolCall,
644 other.kind(),
645 )),
646 }
647 }
648
649 pub fn into_tool_call_effect(
650 self,
651 ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
652 match self {
653 Self::ToolCall {
654 result,
655 host_events,
656 } => Ok(ToolCallEffectOutcome {
657 result,
658 host_events,
659 }),
660 other => Err(RuntimeEffectControllerError::wrong_outcome(
661 RuntimeEffectKind::ToolCall,
662 other.kind(),
663 )),
664 }
665 }
666
667 pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
668 match self {
669 Self::Process { result } => Ok(result),
670 other => Err(RuntimeEffectControllerError::wrong_outcome(
671 RuntimeEffectKind::Process,
672 other.kind(),
673 )),
674 }
675 }
676
677 pub fn into_exec_code(
678 self,
679 ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
680 match self {
681 Self::ExecCode { result } => Ok(result),
682 other => Err(RuntimeEffectControllerError::wrong_outcome(
683 RuntimeEffectKind::ExecCode,
684 other.kind(),
685 )),
686 }
687 }
688
689 pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
690 match self {
691 Self::Checkpoint { result } => Ok(result),
692 other => Err(RuntimeEffectControllerError::wrong_outcome(
693 RuntimeEffectKind::Checkpoint,
694 other.kind(),
695 )),
696 }
697 }
698
699 pub fn into_sync_execution_surface(
700 self,
701 ) -> Result<Result<Option<ExecutionSurfaceSync>, String>, RuntimeEffectControllerError> {
702 match self {
703 Self::SyncExecutionSurface { result } => Ok(result),
704 other => Err(RuntimeEffectControllerError::wrong_outcome(
705 RuntimeEffectKind::SyncExecutionSurface,
706 other.kind(),
707 )),
708 }
709 }
710
711 pub fn into_await_event(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
712 match self {
713 Self::AwaitEvent { payload } => Ok(payload),
714 other => Err(RuntimeEffectControllerError::wrong_outcome(
715 RuntimeEffectKind::AwaitEvent,
716 other.kind(),
717 )),
718 }
719 }
720
721 pub fn kind(&self) -> RuntimeEffectKind {
722 match self {
723 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
724 Self::Direct { .. } => RuntimeEffectKind::Direct,
725 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
726 Self::Process { .. } => RuntimeEffectKind::Process,
727 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
728 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
729 Self::SyncExecutionSurface { .. } => RuntimeEffectKind::SyncExecutionSurface,
730 Self::Sleep => RuntimeEffectKind::Sleep,
731 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
732 }
733 }
734}