1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7 LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8 LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionEnvironmentSync, LlmCallError};
12use crate::tool_dispatch::ToolTriggerEffectOutcome;
13use crate::{
14 AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15 ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16 ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17 ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26 LlmCall,
27 Direct,
28 ToolAttempt,
29 ToolBatch,
30 Process,
31 ExecCode,
32 Checkpoint,
33 SyncExecutionEnvironment,
34 Sleep,
35 AwaitEvent,
36 DurableStep,
37}
38
39impl RuntimeEffectKind {
40 pub fn as_str(self) -> &'static str {
41 match self {
42 Self::LlmCall => "llm_call",
43 Self::Direct => "direct",
44 Self::ToolAttempt => "tool_attempt",
45 Self::ToolBatch => "tool_batch",
46 Self::Process => "process",
47 Self::ExecCode => "exec_code",
48 Self::Checkpoint => "checkpoint",
49 Self::SyncExecutionEnvironment => "sync_execution_environment",
50 Self::Sleep => "sleep",
51 Self::AwaitEvent => "await_event",
52 Self::DurableStep => "durable_step",
53 }
54 }
55}
56
57#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
59pub struct RuntimeInvocation {
60 pub scope: RuntimeScope,
61 pub subject: RuntimeSubject,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub caused_by: Option<CausalRef>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
65 pub replay: Option<RuntimeReplay>,
66}
67
68impl RuntimeInvocation {
69 pub fn effect(
70 scope: RuntimeScope,
71 effect_id: impl Into<String>,
72 kind: RuntimeEffectKind,
73 replay_key: impl Into<String>,
74 ) -> Self {
75 Self {
76 scope,
77 subject: RuntimeSubject::Effect {
78 effect_id: effect_id.into(),
79 kind,
80 },
81 caused_by: None,
82 replay: Some(RuntimeReplay {
83 key: replay_key.into(),
84 }),
85 }
86 }
87
88 pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
89 self.caused_by = caused_by;
90 self
91 }
92
93 pub fn effect_id(&self) -> Option<&str> {
94 match &self.subject {
95 RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
96 _ => None,
97 }
98 }
99
100 pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
101 match &self.subject {
102 RuntimeSubject::Effect { kind, .. } => Some(*kind),
103 _ => None,
104 }
105 }
106
107 pub fn replay_key(&self) -> Option<&str> {
108 self.replay.as_ref().map(|replay| replay.key.as_str())
109 }
110
111 pub fn causal_ref(&self) -> Option<CausalRef> {
112 match &self.subject {
113 RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
114 session_id: self.scope.session_id.clone(),
115 turn_id: self.scope.turn_id.clone(),
116 effect_id: effect_id.clone(),
117 }),
118 RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
119 process_id: process_id.clone(),
120 }),
121 RuntimeSubject::ProcessEvent {
122 process_id,
123 sequence,
124 ..
125 } => Some(CausalRef::ProcessEvent {
126 process_id: process_id.clone(),
127 sequence: *sequence,
128 }),
129 RuntimeSubject::TriggerOccurrence { occurrence_id } => {
130 Some(CausalRef::TriggerOccurrence {
131 occurrence_id: occurrence_id.clone(),
132 })
133 }
134 RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
135 session_id: self.scope.session_id.clone(),
136 node_id: node_id.clone(),
137 }),
138 }
139 }
140}
141
142#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
143pub struct RuntimeScope {
144 pub session_id: String,
145 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub turn_id: Option<String>,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
148 pub turn_index: Option<usize>,
149 #[serde(default, skip_serializing_if = "Option::is_none")]
150 pub protocol_iteration: Option<usize>,
151}
152
153impl RuntimeScope {
154 pub fn new(session_id: impl Into<String>) -> Self {
155 Self {
156 session_id: session_id.into(),
157 turn_id: None,
158 turn_index: None,
159 protocol_iteration: None,
160 }
161 }
162
163 pub fn for_turn(
164 session_id: impl Into<String>,
165 turn_id: impl Into<String>,
166 turn_index: usize,
167 protocol_iteration: usize,
168 ) -> Self {
169 Self {
170 session_id: session_id.into(),
171 turn_id: Some(turn_id.into()),
172 turn_index: Some(turn_index),
173 protocol_iteration: Some(protocol_iteration),
174 }
175 }
176}
177
178#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
179pub struct RuntimeReplay {
180 pub key: String,
181}
182
183#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(tag = "type", rename_all = "snake_case")]
185pub enum RuntimeSubject {
186 Effect {
187 effect_id: String,
188 kind: RuntimeEffectKind,
189 },
190 Process {
191 process_id: String,
192 },
193 ProcessEvent {
194 process_id: String,
195 sequence: u64,
196 event_type: String,
197 },
198 TriggerOccurrence {
199 occurrence_id: String,
200 },
201 SessionNode {
202 node_id: String,
203 },
204}
205
206#[derive(Clone, Debug, Serialize, Deserialize)]
208pub struct RuntimeEffectEnvelope {
209 pub invocation: RuntimeInvocation,
210 pub command: RuntimeEffectCommand,
211}
212
213impl RuntimeEffectEnvelope {
214 pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
215 Self::try_new(invocation, command).expect("valid runtime effect invocation")
216 }
217
218 pub fn try_new(
219 invocation: RuntimeInvocation,
220 command: RuntimeEffectCommand,
221 ) -> Result<Self, RuntimeEffectControllerError> {
222 validate_effect_invocation(&invocation, command.kind())?;
223 validate_effect_command(&command)?;
224 Ok(Self {
225 invocation,
226 command,
227 })
228 }
229
230 pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
231 crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
232 RuntimeEffectControllerError::new(
233 "runtime_effect_envelope_hash",
234 format!("failed to serialize runtime effect envelope: {err}"),
235 )
236 })
237 }
238}
239
240fn validate_effect_invocation(
241 invocation: &RuntimeInvocation,
242 command_kind: RuntimeEffectKind,
243) -> Result<(), RuntimeEffectControllerError> {
244 let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
245 return Err(RuntimeEffectControllerError::new(
246 "runtime_effect_invocation_subject",
247 "runtime effect envelope subject must be an effect",
248 ));
249 };
250 if effect_id.trim().is_empty() {
251 return Err(RuntimeEffectControllerError::new(
252 "runtime_effect_invocation_subject",
253 "runtime effect envelope effect id must be non-empty",
254 ));
255 }
256 if *kind != command_kind {
257 return Err(RuntimeEffectControllerError::new(
258 "runtime_effect_invocation_kind",
259 format!(
260 "runtime effect invocation kind {} does not match command kind {}",
261 kind.as_str(),
262 command_kind.as_str()
263 ),
264 ));
265 }
266 if invocation
267 .replay
268 .as_ref()
269 .is_none_or(|replay| replay.key.is_empty())
270 {
271 return Err(RuntimeEffectControllerError::new(
272 "runtime_effect_replay_required",
273 "runtime effect envelope requires replay.key",
274 ));
275 }
276 Ok(())
277}
278
279fn validate_effect_command(
280 command: &RuntimeEffectCommand,
281) -> Result<(), RuntimeEffectControllerError> {
282 if let RuntimeEffectCommand::DurableStep { step_id, .. } = command
283 && step_id.trim().is_empty()
284 {
285 return Err(RuntimeEffectControllerError::new(
286 "runtime_effect_durable_step_id",
287 "runtime effect durable step id must be non-empty",
288 ));
289 }
290 if let RuntimeEffectCommand::ToolAttempt {
291 call,
292 attempt,
293 max_attempts,
294 } = command
295 {
296 if call.call_id.trim().is_empty() {
297 return Err(RuntimeEffectControllerError::new(
298 "runtime_effect_tool_attempt_call_id",
299 "runtime effect tool attempt requires a non-empty call id",
300 ));
301 }
302 if *attempt == 0 || *max_attempts == 0 || *attempt > *max_attempts {
303 return Err(RuntimeEffectControllerError::new(
304 "runtime_effect_tool_attempt_index",
305 format!(
306 "runtime effect tool attempt must satisfy 1 <= attempt <= max_attempts, got {attempt}/{max_attempts}"
307 ),
308 ));
309 }
310 }
311 if let RuntimeEffectCommand::ToolBatch { batch } = command {
312 if batch.batch_id.trim().is_empty() {
313 return Err(RuntimeEffectControllerError::new(
314 "runtime_effect_tool_batch_id",
315 "runtime effect tool batch id must be non-empty",
316 ));
317 }
318 if batch.calls.is_empty() {
319 return Err(RuntimeEffectControllerError::new(
320 "runtime_effect_tool_batch_empty",
321 "runtime effect tool batch must contain at least one prepared call",
322 ));
323 }
324 for (index, call) in batch.calls.iter().enumerate() {
325 if call.call.call_id.trim().is_empty() {
326 return Err(RuntimeEffectControllerError::new(
327 "runtime_effect_tool_batch_call_id",
328 format!("runtime effect tool batch call {index} has an empty call id"),
329 ));
330 }
331 if call.replay_suffix.trim().is_empty() {
332 return Err(RuntimeEffectControllerError::new(
333 "runtime_effect_tool_batch_call_replay",
334 format!("runtime effect tool batch call {index} has an empty replay suffix"),
335 ));
336 }
337 }
338 }
339 Ok(())
340}
341
342#[derive(Clone, Debug, Serialize, Deserialize)]
344#[serde(tag = "type", rename_all = "snake_case")]
345pub enum RuntimeEffectCommand {
346 LlmCall {
347 request: Box<LlmRequestSpec>,
348 },
349 Direct {
350 request: Box<LlmRequestSpec>,
351 usage_source: String,
352 },
353 ToolAttempt {
354 call: crate::PreparedToolCall,
355 attempt: u32,
356 max_attempts: u32,
357 },
358 ToolBatch {
359 batch: crate::PreparedToolBatch,
360 },
361 Process {
362 command: Box<ProcessCommand>,
363 },
364 ExecCode {
365 language: String,
366 code: String,
367 },
368 Checkpoint {
369 checkpoint: CheckpointKind,
370 },
371 SyncExecutionEnvironment {
372 update_machine_config: bool,
373 },
374 Sleep {
375 duration_ms: u64,
376 },
377 AwaitEvent {
378 key: crate::AwaitEventKey,
379 },
380 DurableStep {
381 step_id: String,
382 input: serde_json::Value,
383 },
384}
385
386impl RuntimeEffectCommand {
387 pub fn process(command: ProcessCommand) -> Self {
388 Self::Process {
389 command: Box::new(command),
390 }
391 }
392
393 pub fn kind(&self) -> RuntimeEffectKind {
394 match self {
395 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
396 Self::Direct { .. } => RuntimeEffectKind::Direct,
397 Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
398 Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
399 Self::Process { .. } => RuntimeEffectKind::Process,
400 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
401 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
402 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
403 Self::Sleep { .. } => RuntimeEffectKind::Sleep,
404 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
405 Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
406 }
407 }
408}
409
410#[derive(Clone, Debug, Serialize, Deserialize)]
412#[serde(tag = "op", rename_all = "snake_case")]
413#[allow(clippy::large_enum_variant)]
414pub enum ProcessCommand {
415 Start {
416 registration: ProcessRegistration,
417 #[serde(default, skip_serializing_if = "Option::is_none")]
418 grant: Option<ProcessStartGrant>,
419 #[serde(
420 default,
421 skip_serializing_if = "boxed_process_execution_context_is_empty"
422 )]
423 execution_context: Box<ProcessExecutionContext>,
424 },
425 List {
426 session_scope: SessionScope,
427 #[serde(default)]
428 mode: ProcessListMode,
429 },
430 Transfer {
431 from_scope: SessionScope,
432 to_scope: SessionScope,
433 process_ids: Vec<String>,
434 },
435 DeleteSession {
436 session_id: String,
437 },
438 Await {
439 process_id: String,
440 },
441 Cancel {
442 process_id: String,
443 reason: Option<String>,
444 },
445 Signal {
446 process_id: String,
447 signal_name: String,
448 signal_id: String,
449 request: crate::ProcessEventAppendRequest,
450 },
451}
452
453fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
454 context.is_empty()
455}
456
457type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
458
459impl ProcessCommand {
460 pub fn effect_id(&self) -> String {
461 match self {
462 Self::Start { registration, .. } => format!("process:start:{}", registration.id),
463 Self::List {
464 session_scope,
465 mode,
466 } => {
467 format!("process:list:{}:{}", session_scope.id(), mode.as_str())
468 }
469 Self::Transfer {
470 from_scope,
471 to_scope,
472 process_ids,
473 } => {
474 let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
475 .unwrap_or_else(|_| "unhashable".to_string());
476 format!(
477 "process:transfer:{}:{}:{digest}",
478 from_scope.id(),
479 to_scope.id()
480 )
481 }
482 Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
483 Self::Await { process_id } => format!("process:await:{process_id}"),
484 Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
485 Self::Signal {
486 process_id,
487 signal_name,
488 signal_id,
489 ..
490 } => {
491 format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
492 }
493 }
494 }
495}
496
497#[derive(Clone, Debug, Serialize, Deserialize)]
499#[serde(tag = "op", rename_all = "snake_case")]
500pub enum ProcessEffectOutcome {
501 Start {
502 record: ProcessRecord,
503 },
504 List {
505 entries: Vec<ProcessHandleGrantEntry>,
506 },
507 Transfer,
508 DeleteSession {
509 report: crate::ProcessSessionDeleteReport,
510 },
511 Await {
512 output: ProcessAwaitOutput,
513 },
514 Cancel {
515 record: ProcessRecord,
516 },
517 Signal {
518 event: crate::ProcessEvent,
519 },
520}
521
522#[derive(Clone, Debug, Serialize, Deserialize)]
523pub struct ToolAttemptEffectOutcome {
524 pub launch: ToolAttemptLaunch,
525 #[serde(default, skip_serializing_if = "Vec::is_empty")]
526 pub triggers: Vec<ToolTriggerEffectOutcome>,
527}
528
529#[derive(Clone, Debug, Serialize, Deserialize)]
530pub struct ToolBatchEffectOutcome {
531 pub launches: Vec<ToolCallLaunch>,
532 #[serde(default, skip_serializing_if = "Vec::is_empty")]
533 pub triggers: Vec<ToolTriggerEffectOutcome>,
534}
535
536#[derive(Clone, Debug, Serialize, Deserialize)]
537#[serde(tag = "status", rename_all = "snake_case")]
538#[allow(clippy::large_enum_variant)]
539pub enum ToolCallLaunch {
540 Done {
541 result: CompletedToolCall,
542 },
543 Pending {
544 key: crate::AwaitEventKey,
545 pending: crate::PendingCompletion,
546 duration_ms: u64,
547 },
548}
549
550#[derive(Clone, Debug, Serialize, Deserialize)]
551#[serde(tag = "status", rename_all = "snake_case")]
552pub enum ToolAttemptLaunch {
553 Done {
554 record: crate::ToolCallRecord,
555 },
556 Pending {
557 key: crate::AwaitEventKey,
558 pending: crate::PendingCompletion,
559 duration_ms: u64,
560 },
561}
562
563#[derive(Clone, Debug, Serialize, Deserialize)]
565#[serde(tag = "type", rename_all = "snake_case")]
566#[allow(clippy::large_enum_variant)]
567pub enum RuntimeEffectOutcome {
568 LlmCall {
569 result: Result<LlmResponse, LlmCallError>,
570 text_streamed: bool,
571 },
572 Direct {
573 result: Result<LlmResponse, LlmCallError>,
574 },
575 ToolAttempt {
576 launch: ToolAttemptLaunch,
577 #[serde(default, skip_serializing_if = "Vec::is_empty")]
578 triggers: Vec<ToolTriggerEffectOutcome>,
579 },
580 ToolBatch {
581 launches: Vec<ToolCallLaunch>,
582 #[serde(default, skip_serializing_if = "Vec::is_empty")]
583 triggers: Vec<ToolTriggerEffectOutcome>,
584 },
585 Process {
586 result: ProcessEffectOutcome,
587 },
588 ExecCode {
589 result: Result<ExecResponse, String>,
590 },
591 Checkpoint {
592 result: CheckpointOutcome,
593 },
594 SyncExecutionEnvironment {
595 result: Result<Option<ExecutionEnvironmentSync>, String>,
596 },
597 Sleep,
598 AwaitEvent {
599 resolution: crate::Resolution,
600 },
601 DurableStep {
602 value: serde_json::Value,
603 },
604}
605
606#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
616pub struct LlmAttachmentSpec {
617 pub reference: AttachmentRef,
618}
619
620impl LlmAttachmentSpec {
621 fn into_attachment(self) -> LlmAttachment {
622 LlmAttachment::reference(self.reference)
623 }
624}
625
626#[derive(Clone, Debug, Serialize, Deserialize)]
630pub struct LlmRequestSpec {
631 pub model: String,
632 pub messages: Vec<LlmMessage>,
633 pub attachments: Vec<LlmAttachmentSpec>,
634 pub tools: Arc<Vec<LlmToolSpec>>,
635 pub tool_choice: LlmToolChoice,
636 pub model_variant: Option<String>,
637 #[serde(default)]
638 pub generation: crate::GenerationOptions,
639 pub session_id: Option<String>,
640 pub output_spec: Option<LlmOutputSpec>,
641}
642
643impl LlmRequestSpec {
644 pub async fn from_request(
645 request: &CoreLlmRequest,
646 attachment_store: &dyn AttachmentStore,
647 ) -> Result<Self, RuntimeEffectControllerError> {
648 Ok(Self {
649 model: request.model.clone(),
650 messages: request.messages.clone(),
651 attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
652 .await?,
653 tools: Arc::clone(&request.tools),
654 tool_choice: request.tool_choice.clone(),
655 model_variant: request.model_variant.clone(),
656 generation: request.generation.clone(),
657 session_id: request.session_id.clone(),
658 output_spec: request.output_spec.clone(),
659 })
660 }
661
662 pub fn into_request(
663 self,
664 stream_events: Option<LlmEventSender>,
665 provider_trace: Option<LlmProviderTraceSender>,
666 ) -> CoreLlmRequest {
667 CoreLlmRequest {
668 model: self.model,
669 messages: self.messages,
670 attachments: self
671 .attachments
672 .into_iter()
673 .map(LlmAttachmentSpec::into_attachment)
674 .collect(),
675 tools: self.tools,
676 tool_choice: self.tool_choice,
677 model_variant: self.model_variant,
678 generation: self.generation,
679 session_id: self.session_id,
680 output_spec: self.output_spec,
681 stream_events,
682 provider_trace,
683 }
684 }
685}
686
687async fn attachment_specs_from_attachments(
688 attachments: &[LlmAttachment],
689 attachment_store: &dyn AttachmentStore,
690) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
691 let mut specs = Vec::with_capacity(attachments.len());
692 for attachment in attachments {
693 specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
694 }
695 Ok(specs)
696}
697
698async fn attachment_spec_from_attachment(
699 attachment: &LlmAttachment,
700 attachment_store: &dyn AttachmentStore,
701) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
702 if let Some(reference) = attachment.reference.as_ref() {
703 return Ok(LlmAttachmentSpec {
704 reference: reference.clone(),
705 });
706 }
707 if attachment.data.is_empty() {
708 return Err(RuntimeEffectControllerError::new(
709 "runtime_effect_attachment_missing_reference",
710 "runtime effect attachment has neither a durable reference nor inline bytes",
711 ));
712 }
713 let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
714 RuntimeEffectControllerError::new(
715 "runtime_effect_attachment_media_type",
716 format!(
717 "attachment media type `{}` cannot be represented durably",
718 attachment.mime
719 ),
720 )
721 })?;
722 let reference = attachment_store
723 .put(
724 attachment.data.clone(),
725 AttachmentCreateMeta::new(media_type, None, None, None),
726 )
727 .await
728 .map_err(|err| {
729 RuntimeEffectControllerError::new(
730 "runtime_effect_attachment_store",
731 format!("failed to store attachment before runtime effect invocation: {err}"),
732 )
733 })?;
734 Ok(LlmAttachmentSpec { reference })
735}
736
737impl RuntimeEffectOutcome {
738 pub fn into_llm_call(
739 self,
740 ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
741 match self {
742 Self::LlmCall {
743 result,
744 text_streamed,
745 } => Ok((result, text_streamed)),
746 other => Err(RuntimeEffectControllerError::wrong_outcome(
747 RuntimeEffectKind::LlmCall,
748 other.kind(),
749 )),
750 }
751 }
752
753 pub fn into_direct_response(
754 self,
755 ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
756 match self {
757 Self::Direct { result } => Ok(result),
758 other => Err(RuntimeEffectControllerError::wrong_outcome(
759 RuntimeEffectKind::Direct,
760 other.kind(),
761 )),
762 }
763 }
764
765 pub fn into_tool_attempt_effect(
766 self,
767 ) -> Result<ToolAttemptEffectOutcome, RuntimeEffectControllerError> {
768 match self {
769 Self::ToolAttempt { launch, triggers } => {
770 Ok(ToolAttemptEffectOutcome { launch, triggers })
771 }
772 other => Err(RuntimeEffectControllerError::wrong_outcome(
773 RuntimeEffectKind::ToolAttempt,
774 other.kind(),
775 )),
776 }
777 }
778
779 pub fn into_tool_batch_effect(
780 self,
781 ) -> Result<ToolBatchEffectOutcome, RuntimeEffectControllerError> {
782 match self {
783 Self::ToolBatch { launches, triggers } => {
784 Ok(ToolBatchEffectOutcome { launches, triggers })
785 }
786 other => Err(RuntimeEffectControllerError::wrong_outcome(
787 RuntimeEffectKind::ToolBatch,
788 other.kind(),
789 )),
790 }
791 }
792
793 pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
794 match self {
795 Self::Process { result } => Ok(result),
796 other => Err(RuntimeEffectControllerError::wrong_outcome(
797 RuntimeEffectKind::Process,
798 other.kind(),
799 )),
800 }
801 }
802
803 pub fn into_exec_code(
804 self,
805 ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
806 match self {
807 Self::ExecCode { result } => Ok(result),
808 other => Err(RuntimeEffectControllerError::wrong_outcome(
809 RuntimeEffectKind::ExecCode,
810 other.kind(),
811 )),
812 }
813 }
814
815 pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
816 match self {
817 Self::Checkpoint { result } => Ok(result),
818 other => Err(RuntimeEffectControllerError::wrong_outcome(
819 RuntimeEffectKind::Checkpoint,
820 other.kind(),
821 )),
822 }
823 }
824
825 pub fn into_sync_execution_environment(
826 self,
827 ) -> Result<Result<Option<ExecutionEnvironmentSync>, String>, RuntimeEffectControllerError>
828 {
829 match self {
830 Self::SyncExecutionEnvironment { result } => Ok(result),
831 other => Err(RuntimeEffectControllerError::wrong_outcome(
832 RuntimeEffectKind::SyncExecutionEnvironment,
833 other.kind(),
834 )),
835 }
836 }
837
838 pub fn into_await_event(self) -> Result<crate::Resolution, RuntimeEffectControllerError> {
839 match self {
840 Self::AwaitEvent { resolution } => Ok(resolution),
841 other => Err(RuntimeEffectControllerError::wrong_outcome(
842 RuntimeEffectKind::AwaitEvent,
843 other.kind(),
844 )),
845 }
846 }
847
848 pub fn into_durable_step(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
849 match self {
850 Self::DurableStep { value } => Ok(value),
851 other => Err(RuntimeEffectControllerError::wrong_outcome(
852 RuntimeEffectKind::DurableStep,
853 other.kind(),
854 )),
855 }
856 }
857
858 pub fn kind(&self) -> RuntimeEffectKind {
859 match self {
860 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
861 Self::Direct { .. } => RuntimeEffectKind::Direct,
862 Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
863 Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
864 Self::Process { .. } => RuntimeEffectKind::Process,
865 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
866 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
867 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
868 Self::Sleep => RuntimeEffectKind::Sleep,
869 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
870 Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
871 }
872 }
873}