1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7 LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8 LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionEnvironmentSync, LlmCallError};
12use crate::tool_dispatch::ToolTriggerEffectOutcome;
13use crate::{
14 AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15 ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16 ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17 ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26 LlmCall,
27 Direct,
28 ToolAttempt,
29 ToolBatch,
30 Process,
31 ExecCode,
32 Checkpoint,
33 SyncExecutionEnvironment,
34 Sleep,
35 AwaitEvent,
36 DurableStep,
37}
38
39impl RuntimeEffectKind {
40 pub fn as_str(self) -> &'static str {
41 match self {
42 Self::LlmCall => "llm_call",
43 Self::Direct => "direct",
44 Self::ToolAttempt => "tool_attempt",
45 Self::ToolBatch => "tool_batch",
46 Self::Process => "process",
47 Self::ExecCode => "exec_code",
48 Self::Checkpoint => "checkpoint",
49 Self::SyncExecutionEnvironment => "sync_execution_environment",
50 Self::Sleep => "sleep",
51 Self::AwaitEvent => "await_event",
52 Self::DurableStep => "durable_step",
53 }
54 }
55}
56
57#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
59pub struct RuntimeInvocation {
60 pub scope: RuntimeScope,
61 pub subject: RuntimeSubject,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub caused_by: Option<CausalRef>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
65 pub replay: Option<RuntimeReplay>,
66}
67
68impl RuntimeInvocation {
69 pub fn effect(
70 scope: RuntimeScope,
71 effect_id: impl Into<String>,
72 kind: RuntimeEffectKind,
73 replay_key: impl Into<String>,
74 ) -> Self {
75 Self {
76 scope,
77 subject: RuntimeSubject::Effect {
78 effect_id: effect_id.into(),
79 kind,
80 },
81 caused_by: None,
82 replay: Some(RuntimeReplay {
83 key: replay_key.into(),
84 }),
85 }
86 }
87
88 pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
89 self.caused_by = caused_by;
90 self
91 }
92
93 pub fn effect_id(&self) -> Option<&str> {
94 match &self.subject {
95 RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
96 _ => None,
97 }
98 }
99
100 pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
101 match &self.subject {
102 RuntimeSubject::Effect { kind, .. } => Some(*kind),
103 _ => None,
104 }
105 }
106
107 pub fn replay_key(&self) -> Option<&str> {
108 self.replay.as_ref().map(|replay| replay.key.as_str())
109 }
110
111 pub fn causal_ref(&self) -> Option<CausalRef> {
112 match &self.subject {
113 RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
114 session_id: self.scope.session_id.clone(),
115 turn_id: self.scope.turn_id.clone(),
116 effect_id: effect_id.clone(),
117 }),
118 RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
119 process_id: process_id.clone(),
120 }),
121 RuntimeSubject::ProcessEvent {
122 process_id,
123 sequence,
124 ..
125 } => Some(CausalRef::ProcessEvent {
126 process_id: process_id.clone(),
127 sequence: *sequence,
128 }),
129 RuntimeSubject::TriggerOccurrence { occurrence_id } => {
130 Some(CausalRef::TriggerOccurrence {
131 occurrence_id: occurrence_id.clone(),
132 })
133 }
134 RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
135 session_id: self.scope.session_id.clone(),
136 node_id: node_id.clone(),
137 }),
138 }
139 }
140}
141
142#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
143pub struct RuntimeScope {
144 pub session_id: String,
145 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub turn_id: Option<String>,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
148 pub turn_index: Option<usize>,
149 #[serde(default, skip_serializing_if = "Option::is_none")]
150 pub protocol_iteration: Option<usize>,
151}
152
153impl RuntimeScope {
154 pub fn new(session_id: impl Into<String>) -> Self {
155 Self {
156 session_id: session_id.into(),
157 turn_id: None,
158 turn_index: None,
159 protocol_iteration: None,
160 }
161 }
162
163 pub fn for_turn(
164 session_id: impl Into<String>,
165 turn_id: impl Into<String>,
166 turn_index: usize,
167 protocol_iteration: usize,
168 ) -> Self {
169 Self {
170 session_id: session_id.into(),
171 turn_id: Some(turn_id.into()),
172 turn_index: Some(turn_index),
173 protocol_iteration: Some(protocol_iteration),
174 }
175 }
176}
177
178#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
179pub struct RuntimeReplay {
180 pub key: String,
181}
182
183#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(tag = "type", rename_all = "snake_case")]
185pub enum RuntimeSubject {
186 Effect {
187 effect_id: String,
188 kind: RuntimeEffectKind,
189 },
190 Process {
191 process_id: String,
192 },
193 ProcessEvent {
194 process_id: String,
195 sequence: u64,
196 event_type: String,
197 },
198 TriggerOccurrence {
199 occurrence_id: String,
200 },
201 SessionNode {
202 node_id: String,
203 },
204}
205
206#[derive(Clone, Debug, Serialize, Deserialize)]
208pub struct RuntimeEffectEnvelope {
209 pub invocation: RuntimeInvocation,
210 pub command: RuntimeEffectCommand,
211}
212
213impl RuntimeEffectEnvelope {
214 pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
215 Self::try_new(invocation, command).expect("valid runtime effect invocation")
216 }
217
218 pub fn try_new(
219 invocation: RuntimeInvocation,
220 command: RuntimeEffectCommand,
221 ) -> Result<Self, RuntimeEffectControllerError> {
222 validate_effect_invocation(&invocation, command.kind())?;
223 validate_effect_command(&command)?;
224 Ok(Self {
225 invocation,
226 command,
227 })
228 }
229
230 pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
231 crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
232 RuntimeEffectControllerError::new(
233 "runtime_effect_envelope_hash",
234 format!("failed to serialize runtime effect envelope: {err}"),
235 )
236 })
237 }
238}
239
240fn validate_effect_invocation(
241 invocation: &RuntimeInvocation,
242 command_kind: RuntimeEffectKind,
243) -> Result<(), RuntimeEffectControllerError> {
244 let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
245 return Err(RuntimeEffectControllerError::new(
246 "runtime_effect_invocation_subject",
247 "runtime effect envelope subject must be an effect",
248 ));
249 };
250 if effect_id.trim().is_empty() {
251 return Err(RuntimeEffectControllerError::new(
252 "runtime_effect_invocation_subject",
253 "runtime effect envelope effect id must be non-empty",
254 ));
255 }
256 if *kind != command_kind {
257 return Err(RuntimeEffectControllerError::new(
258 "runtime_effect_invocation_kind",
259 format!(
260 "runtime effect invocation kind {} does not match command kind {}",
261 kind.as_str(),
262 command_kind.as_str()
263 ),
264 ));
265 }
266 if invocation
267 .replay
268 .as_ref()
269 .is_none_or(|replay| replay.key.is_empty())
270 {
271 return Err(RuntimeEffectControllerError::new(
272 "runtime_effect_replay_required",
273 "runtime effect envelope requires replay.key",
274 ));
275 }
276 Ok(())
277}
278
279fn validate_effect_command(
280 command: &RuntimeEffectCommand,
281) -> Result<(), RuntimeEffectControllerError> {
282 if let RuntimeEffectCommand::DurableStep { step_id, .. } = command
283 && step_id.trim().is_empty()
284 {
285 return Err(RuntimeEffectControllerError::new(
286 "runtime_effect_durable_step_id",
287 "runtime effect durable step id must be non-empty",
288 ));
289 }
290 if let RuntimeEffectCommand::ToolAttempt {
291 call,
292 execution_grant: _,
293 attempt,
294 max_attempts,
295 } = command
296 {
297 if call.call_id.trim().is_empty() {
298 return Err(RuntimeEffectControllerError::new(
299 "runtime_effect_tool_attempt_call_id",
300 "runtime effect tool attempt requires a non-empty call id",
301 ));
302 }
303 if *attempt == 0 || *max_attempts == 0 || *attempt > *max_attempts {
304 return Err(RuntimeEffectControllerError::new(
305 "runtime_effect_tool_attempt_index",
306 format!(
307 "runtime effect tool attempt must satisfy 1 <= attempt <= max_attempts, got {attempt}/{max_attempts}"
308 ),
309 ));
310 }
311 }
312 if let RuntimeEffectCommand::ToolBatch { batch } = command {
313 if batch.batch_id.trim().is_empty() {
314 return Err(RuntimeEffectControllerError::new(
315 "runtime_effect_tool_batch_id",
316 "runtime effect tool batch id must be non-empty",
317 ));
318 }
319 if batch.calls.is_empty() {
320 return Err(RuntimeEffectControllerError::new(
321 "runtime_effect_tool_batch_empty",
322 "runtime effect tool batch must contain at least one prepared call",
323 ));
324 }
325 for (index, call) in batch.calls.iter().enumerate() {
326 if call.call.call_id.trim().is_empty() {
327 return Err(RuntimeEffectControllerError::new(
328 "runtime_effect_tool_batch_call_id",
329 format!("runtime effect tool batch call {index} has an empty call id"),
330 ));
331 }
332 if call.replay_suffix.trim().is_empty() {
333 return Err(RuntimeEffectControllerError::new(
334 "runtime_effect_tool_batch_call_replay",
335 format!("runtime effect tool batch call {index} has an empty replay suffix"),
336 ));
337 }
338 }
339 }
340 Ok(())
341}
342
343#[derive(Clone, Debug, Serialize, Deserialize)]
345#[serde(tag = "type", rename_all = "snake_case")]
346pub enum RuntimeEffectCommand {
347 LlmCall {
348 request: Box<LlmRequestSpec>,
349 },
350 Direct {
351 request: Box<LlmRequestSpec>,
352 usage_source: String,
353 },
354 ToolAttempt {
355 call: crate::PreparedToolCall,
356 #[serde(default, skip_serializing_if = "Option::is_none")]
357 execution_grant: Option<Box<crate::ToolExecutionGrant>>,
358 attempt: u32,
359 max_attempts: u32,
360 },
361 ToolBatch {
362 batch: crate::PreparedToolBatch,
363 },
364 Process {
365 command: Box<ProcessCommand>,
366 },
367 ExecCode {
368 language: String,
369 code: String,
370 },
371 Checkpoint {
372 checkpoint: CheckpointKind,
373 },
374 SyncExecutionEnvironment {
375 update_machine_config: bool,
376 },
377 Sleep {
378 duration_ms: u64,
379 },
380 AwaitEvent {
381 key: crate::AwaitEventKey,
382 },
383 DurableStep {
384 step_id: String,
385 input: serde_json::Value,
386 },
387}
388
389impl RuntimeEffectCommand {
390 pub fn process(command: ProcessCommand) -> Self {
391 Self::Process {
392 command: Box::new(command),
393 }
394 }
395
396 pub fn kind(&self) -> RuntimeEffectKind {
397 match self {
398 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
399 Self::Direct { .. } => RuntimeEffectKind::Direct,
400 Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
401 Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
402 Self::Process { .. } => RuntimeEffectKind::Process,
403 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
404 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
405 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
406 Self::Sleep { .. } => RuntimeEffectKind::Sleep,
407 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
408 Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
409 }
410 }
411}
412
413#[derive(Clone, Debug, Serialize, Deserialize)]
415#[serde(tag = "op", rename_all = "snake_case")]
416#[allow(clippy::large_enum_variant)]
417pub enum ProcessCommand {
418 Start {
419 registration: ProcessRegistration,
420 #[serde(default, skip_serializing_if = "Option::is_none")]
421 grant: Option<ProcessStartGrant>,
422 #[serde(
423 default,
424 skip_serializing_if = "boxed_process_execution_context_is_empty"
425 )]
426 execution_context: Box<ProcessExecutionContext>,
427 },
428 List {
429 session_scope: SessionScope,
430 #[serde(default)]
431 mode: ProcessListMode,
432 },
433 Transfer {
434 from_scope: SessionScope,
435 to_scope: SessionScope,
436 process_ids: Vec<String>,
437 },
438 DeleteSession {
439 session_id: String,
440 },
441 Await {
442 process_id: String,
443 },
444 Cancel {
445 process_id: String,
446 reason: Option<String>,
447 },
448 Signal {
449 process_id: String,
450 signal_name: String,
451 signal_id: String,
452 request: crate::ProcessEventAppendRequest,
453 },
454}
455
456fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
457 context.is_empty()
458}
459
460type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
461
462impl ProcessCommand {
463 pub fn effect_id(&self) -> String {
464 match self {
465 Self::Start { registration, .. } => format!("process:start:{}", registration.id),
466 Self::List {
467 session_scope,
468 mode,
469 } => {
470 format!("process:list:{}:{}", session_scope.id(), mode.as_str())
471 }
472 Self::Transfer {
473 from_scope,
474 to_scope,
475 process_ids,
476 } => {
477 let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
478 .unwrap_or_else(|_| "unhashable".to_string());
479 format!(
480 "process:transfer:{}:{}:{digest}",
481 from_scope.id(),
482 to_scope.id()
483 )
484 }
485 Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
486 Self::Await { process_id } => format!("process:await:{process_id}"),
487 Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
488 Self::Signal {
489 process_id,
490 signal_name,
491 signal_id,
492 ..
493 } => {
494 format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
495 }
496 }
497 }
498}
499
500#[derive(Clone, Debug, Serialize, Deserialize)]
502#[serde(tag = "op", rename_all = "snake_case")]
503pub enum ProcessEffectOutcome {
504 Start {
505 record: ProcessRecord,
506 },
507 List {
508 entries: Vec<ProcessHandleGrantEntry>,
509 },
510 Transfer,
511 DeleteSession {
512 report: crate::ProcessSessionDeleteReport,
513 },
514 Await {
515 output: ProcessAwaitOutput,
516 },
517 Cancel {
518 record: ProcessRecord,
519 },
520 Signal {
521 event: crate::ProcessEvent,
522 },
523}
524
525#[derive(Clone, Debug, Serialize, Deserialize)]
526pub struct ToolAttemptEffectOutcome {
527 pub launch: ToolAttemptLaunch,
528 #[serde(default, skip_serializing_if = "Vec::is_empty")]
529 pub triggers: Vec<ToolTriggerEffectOutcome>,
530}
531
532#[derive(Clone, Debug, Serialize, Deserialize)]
533pub struct ToolBatchEffectOutcome {
534 pub launches: Vec<ToolCallLaunch>,
535 #[serde(default, skip_serializing_if = "Vec::is_empty")]
536 pub triggers: Vec<ToolTriggerEffectOutcome>,
537}
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540#[serde(tag = "status", rename_all = "snake_case")]
541#[allow(clippy::large_enum_variant)]
542pub enum ToolCallLaunch {
543 Done {
544 result: CompletedToolCall,
545 },
546 Pending {
547 key: crate::AwaitEventKey,
548 pending: crate::PendingCompletion,
549 duration_ms: u64,
550 },
551}
552
553#[derive(Clone, Debug, Serialize, Deserialize)]
554#[serde(tag = "status", rename_all = "snake_case")]
555pub enum ToolAttemptLaunch {
556 Done {
557 record: crate::ToolCallRecord,
558 },
559 Pending {
560 key: crate::AwaitEventKey,
561 pending: crate::PendingCompletion,
562 duration_ms: u64,
563 },
564}
565
566#[derive(Clone, Debug, Serialize, Deserialize)]
568#[serde(tag = "type", rename_all = "snake_case")]
569#[allow(clippy::large_enum_variant)]
570pub enum RuntimeEffectOutcome {
571 LlmCall {
572 result: Result<LlmResponse, LlmCallError>,
573 text_streamed: bool,
574 },
575 Direct {
576 result: Result<LlmResponse, LlmCallError>,
577 },
578 ToolAttempt {
579 launch: ToolAttemptLaunch,
580 #[serde(default, skip_serializing_if = "Vec::is_empty")]
581 triggers: Vec<ToolTriggerEffectOutcome>,
582 },
583 ToolBatch {
584 launches: Vec<ToolCallLaunch>,
585 #[serde(default, skip_serializing_if = "Vec::is_empty")]
586 triggers: Vec<ToolTriggerEffectOutcome>,
587 },
588 Process {
589 result: ProcessEffectOutcome,
590 },
591 ExecCode {
592 result: Result<ExecResponse, String>,
593 },
594 Checkpoint {
595 result: CheckpointOutcome,
596 },
597 SyncExecutionEnvironment {
598 result: Result<Option<ExecutionEnvironmentSync>, String>,
599 },
600 Sleep,
601 AwaitEvent {
602 resolution: crate::Resolution,
603 },
604 DurableStep {
605 value: serde_json::Value,
606 },
607}
608
609#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
619pub struct LlmAttachmentSpec {
620 pub reference: AttachmentRef,
621}
622
623impl LlmAttachmentSpec {
624 fn into_attachment(self) -> LlmAttachment {
625 LlmAttachment::reference(self.reference)
626 }
627}
628
629#[derive(Clone, Debug, Serialize, Deserialize)]
633pub struct LlmRequestSpec {
634 pub model: String,
635 pub messages: Vec<LlmMessage>,
636 pub attachments: Vec<LlmAttachmentSpec>,
637 pub tools: Arc<Vec<LlmToolSpec>>,
638 pub tool_choice: LlmToolChoice,
639 pub model_variant: Option<String>,
640 #[serde(default)]
641 pub generation: crate::GenerationOptions,
642 pub session_id: Option<String>,
643 pub output_spec: Option<LlmOutputSpec>,
644}
645
646impl LlmRequestSpec {
647 pub async fn from_request(
648 request: &CoreLlmRequest,
649 attachment_store: &dyn AttachmentStore,
650 ) -> Result<Self, RuntimeEffectControllerError> {
651 Ok(Self {
652 model: request.model.clone(),
653 messages: request.messages.clone(),
654 attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
655 .await?,
656 tools: Arc::clone(&request.tools),
657 tool_choice: request.tool_choice.clone(),
658 model_variant: request.model_variant.clone(),
659 generation: request.generation.clone(),
660 session_id: request.session_id.clone(),
661 output_spec: request.output_spec.clone(),
662 })
663 }
664
665 pub fn into_request(
666 self,
667 stream_events: Option<LlmEventSender>,
668 provider_trace: Option<LlmProviderTraceSender>,
669 ) -> CoreLlmRequest {
670 CoreLlmRequest {
671 model: self.model,
672 messages: self.messages,
673 attachments: self
674 .attachments
675 .into_iter()
676 .map(LlmAttachmentSpec::into_attachment)
677 .collect(),
678 tools: self.tools,
679 tool_choice: self.tool_choice,
680 model_variant: self.model_variant,
681 generation: self.generation,
682 session_id: self.session_id,
683 output_spec: self.output_spec,
684 stream_events,
685 provider_trace,
686 }
687 }
688}
689
690async fn attachment_specs_from_attachments(
691 attachments: &[LlmAttachment],
692 attachment_store: &dyn AttachmentStore,
693) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
694 let mut specs = Vec::with_capacity(attachments.len());
695 for attachment in attachments {
696 specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
697 }
698 Ok(specs)
699}
700
701async fn attachment_spec_from_attachment(
702 attachment: &LlmAttachment,
703 attachment_store: &dyn AttachmentStore,
704) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
705 if let Some(reference) = attachment.reference.as_ref() {
706 return Ok(LlmAttachmentSpec {
707 reference: reference.clone(),
708 });
709 }
710 if attachment.data.is_empty() {
711 return Err(RuntimeEffectControllerError::new(
712 "runtime_effect_attachment_missing_reference",
713 "runtime effect attachment has neither a durable reference nor inline bytes",
714 ));
715 }
716 let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
717 RuntimeEffectControllerError::new(
718 "runtime_effect_attachment_media_type",
719 format!(
720 "attachment media type `{}` cannot be represented durably",
721 attachment.mime
722 ),
723 )
724 })?;
725 let reference = attachment_store
726 .put(
727 attachment.data.clone(),
728 AttachmentCreateMeta::new(media_type, None, None, None),
729 )
730 .await
731 .map_err(|err| {
732 RuntimeEffectControllerError::new(
733 "runtime_effect_attachment_store",
734 format!("failed to store attachment before runtime effect invocation: {err}"),
735 )
736 })?;
737 Ok(LlmAttachmentSpec { reference })
738}
739
740impl RuntimeEffectOutcome {
741 pub fn into_llm_call(
742 self,
743 ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
744 match self {
745 Self::LlmCall {
746 result,
747 text_streamed,
748 } => Ok((result, text_streamed)),
749 other => Err(RuntimeEffectControllerError::wrong_outcome(
750 RuntimeEffectKind::LlmCall,
751 other.kind(),
752 )),
753 }
754 }
755
756 pub fn into_direct_response(
757 self,
758 ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
759 match self {
760 Self::Direct { result } => Ok(result),
761 other => Err(RuntimeEffectControllerError::wrong_outcome(
762 RuntimeEffectKind::Direct,
763 other.kind(),
764 )),
765 }
766 }
767
768 pub fn into_tool_attempt_effect(
769 self,
770 ) -> Result<ToolAttemptEffectOutcome, RuntimeEffectControllerError> {
771 match self {
772 Self::ToolAttempt { launch, triggers } => {
773 Ok(ToolAttemptEffectOutcome { launch, triggers })
774 }
775 other => Err(RuntimeEffectControllerError::wrong_outcome(
776 RuntimeEffectKind::ToolAttempt,
777 other.kind(),
778 )),
779 }
780 }
781
782 pub fn into_tool_batch_effect(
783 self,
784 ) -> Result<ToolBatchEffectOutcome, RuntimeEffectControllerError> {
785 match self {
786 Self::ToolBatch { launches, triggers } => {
787 Ok(ToolBatchEffectOutcome { launches, triggers })
788 }
789 other => Err(RuntimeEffectControllerError::wrong_outcome(
790 RuntimeEffectKind::ToolBatch,
791 other.kind(),
792 )),
793 }
794 }
795
796 pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
797 match self {
798 Self::Process { result } => Ok(result),
799 other => Err(RuntimeEffectControllerError::wrong_outcome(
800 RuntimeEffectKind::Process,
801 other.kind(),
802 )),
803 }
804 }
805
806 pub fn into_exec_code(
807 self,
808 ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
809 match self {
810 Self::ExecCode { result } => Ok(result),
811 other => Err(RuntimeEffectControllerError::wrong_outcome(
812 RuntimeEffectKind::ExecCode,
813 other.kind(),
814 )),
815 }
816 }
817
818 pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
819 match self {
820 Self::Checkpoint { result } => Ok(result),
821 other => Err(RuntimeEffectControllerError::wrong_outcome(
822 RuntimeEffectKind::Checkpoint,
823 other.kind(),
824 )),
825 }
826 }
827
828 pub fn into_sync_execution_environment(
829 self,
830 ) -> Result<Result<Option<ExecutionEnvironmentSync>, String>, RuntimeEffectControllerError>
831 {
832 match self {
833 Self::SyncExecutionEnvironment { result } => Ok(result),
834 other => Err(RuntimeEffectControllerError::wrong_outcome(
835 RuntimeEffectKind::SyncExecutionEnvironment,
836 other.kind(),
837 )),
838 }
839 }
840
841 pub fn into_await_event(self) -> Result<crate::Resolution, RuntimeEffectControllerError> {
842 match self {
843 Self::AwaitEvent { resolution } => Ok(resolution),
844 other => Err(RuntimeEffectControllerError::wrong_outcome(
845 RuntimeEffectKind::AwaitEvent,
846 other.kind(),
847 )),
848 }
849 }
850
851 pub fn into_durable_step(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
852 match self {
853 Self::DurableStep { value } => Ok(value),
854 other => Err(RuntimeEffectControllerError::wrong_outcome(
855 RuntimeEffectKind::DurableStep,
856 other.kind(),
857 )),
858 }
859 }
860
861 pub fn kind(&self) -> RuntimeEffectKind {
862 match self {
863 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
864 Self::Direct { .. } => RuntimeEffectKind::Direct,
865 Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
866 Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
867 Self::Process { .. } => RuntimeEffectKind::Process,
868 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
869 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
870 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
871 Self::Sleep => RuntimeEffectKind::Sleep,
872 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
873 Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
874 }
875 }
876}