1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7 LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8 LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionEnvironmentSync, LlmCallError};
12use crate::tool_dispatch::ToolTriggerEffectOutcome;
13use crate::{
14 AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15 ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16 ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17 ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26 LlmCall,
27 Direct,
28 ToolCall,
29 ToolBatch,
30 Process,
31 ExecCode,
32 Checkpoint,
33 SyncExecutionEnvironment,
34 Sleep,
35 AwaitEvent,
36 DurableStep,
37}
38
39impl RuntimeEffectKind {
40 pub fn as_str(self) -> &'static str {
41 match self {
42 Self::LlmCall => "llm_call",
43 Self::Direct => "direct",
44 Self::ToolCall => "tool_call",
45 Self::ToolBatch => "tool_batch",
46 Self::Process => "process",
47 Self::ExecCode => "exec_code",
48 Self::Checkpoint => "checkpoint",
49 Self::SyncExecutionEnvironment => "sync_execution_environment",
50 Self::Sleep => "sleep",
51 Self::AwaitEvent => "await_event",
52 Self::DurableStep => "durable_step",
53 }
54 }
55}
56
57#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
59pub struct RuntimeInvocation {
60 pub scope: RuntimeScope,
61 pub subject: RuntimeSubject,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub caused_by: Option<CausalRef>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
65 pub replay: Option<RuntimeReplay>,
66}
67
68impl RuntimeInvocation {
69 pub fn effect(
70 scope: RuntimeScope,
71 effect_id: impl Into<String>,
72 kind: RuntimeEffectKind,
73 replay_key: impl Into<String>,
74 ) -> Self {
75 Self {
76 scope,
77 subject: RuntimeSubject::Effect {
78 effect_id: effect_id.into(),
79 kind,
80 },
81 caused_by: None,
82 replay: Some(RuntimeReplay {
83 key: replay_key.into(),
84 }),
85 }
86 }
87
88 pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
89 self.caused_by = caused_by;
90 self
91 }
92
93 pub fn effect_id(&self) -> Option<&str> {
94 match &self.subject {
95 RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
96 _ => None,
97 }
98 }
99
100 pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
101 match &self.subject {
102 RuntimeSubject::Effect { kind, .. } => Some(*kind),
103 _ => None,
104 }
105 }
106
107 pub fn replay_key(&self) -> Option<&str> {
108 self.replay.as_ref().map(|replay| replay.key.as_str())
109 }
110
111 pub fn causal_ref(&self) -> Option<CausalRef> {
112 match &self.subject {
113 RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
114 session_id: self.scope.session_id.clone(),
115 turn_id: self.scope.turn_id.clone(),
116 effect_id: effect_id.clone(),
117 }),
118 RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
119 process_id: process_id.clone(),
120 }),
121 RuntimeSubject::ProcessEvent {
122 process_id,
123 sequence,
124 ..
125 } => Some(CausalRef::ProcessEvent {
126 process_id: process_id.clone(),
127 sequence: *sequence,
128 }),
129 RuntimeSubject::TriggerOccurrence { occurrence_id } => {
130 Some(CausalRef::TriggerOccurrence {
131 occurrence_id: occurrence_id.clone(),
132 })
133 }
134 RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
135 session_id: self.scope.session_id.clone(),
136 node_id: node_id.clone(),
137 }),
138 }
139 }
140}
141
142#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
143pub struct RuntimeScope {
144 pub session_id: String,
145 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub turn_id: Option<String>,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
148 pub turn_index: Option<usize>,
149 #[serde(default, skip_serializing_if = "Option::is_none")]
150 pub protocol_iteration: Option<usize>,
151}
152
153impl RuntimeScope {
154 pub fn new(session_id: impl Into<String>) -> Self {
155 Self {
156 session_id: session_id.into(),
157 turn_id: None,
158 turn_index: None,
159 protocol_iteration: None,
160 }
161 }
162
163 pub fn for_turn(
164 session_id: impl Into<String>,
165 turn_id: impl Into<String>,
166 turn_index: usize,
167 protocol_iteration: usize,
168 ) -> Self {
169 Self {
170 session_id: session_id.into(),
171 turn_id: Some(turn_id.into()),
172 turn_index: Some(turn_index),
173 protocol_iteration: Some(protocol_iteration),
174 }
175 }
176}
177
178#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
179pub struct RuntimeReplay {
180 pub key: String,
181}
182
183#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(tag = "type", rename_all = "snake_case")]
185pub enum RuntimeSubject {
186 Effect {
187 effect_id: String,
188 kind: RuntimeEffectKind,
189 },
190 Process {
191 process_id: String,
192 },
193 ProcessEvent {
194 process_id: String,
195 sequence: u64,
196 event_type: String,
197 },
198 TriggerOccurrence {
199 occurrence_id: String,
200 },
201 SessionNode {
202 node_id: String,
203 },
204}
205
206#[derive(Clone, Debug, Serialize, Deserialize)]
208pub struct RuntimeEffectEnvelope {
209 pub invocation: RuntimeInvocation,
210 pub command: RuntimeEffectCommand,
211}
212
213impl RuntimeEffectEnvelope {
214 pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
215 Self::try_new(invocation, command).expect("valid runtime effect invocation")
216 }
217
218 pub fn try_new(
219 invocation: RuntimeInvocation,
220 command: RuntimeEffectCommand,
221 ) -> Result<Self, RuntimeEffectControllerError> {
222 validate_effect_invocation(&invocation, command.kind())?;
223 validate_effect_command(&command)?;
224 Ok(Self {
225 invocation,
226 command,
227 })
228 }
229
230 pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
231 crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
232 RuntimeEffectControllerError::new(
233 "runtime_effect_envelope_hash",
234 format!("failed to serialize runtime effect envelope: {err}"),
235 )
236 })
237 }
238}
239
240fn validate_effect_invocation(
241 invocation: &RuntimeInvocation,
242 command_kind: RuntimeEffectKind,
243) -> Result<(), RuntimeEffectControllerError> {
244 let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
245 return Err(RuntimeEffectControllerError::new(
246 "runtime_effect_invocation_subject",
247 "runtime effect envelope subject must be an effect",
248 ));
249 };
250 if effect_id.trim().is_empty() {
251 return Err(RuntimeEffectControllerError::new(
252 "runtime_effect_invocation_subject",
253 "runtime effect envelope effect id must be non-empty",
254 ));
255 }
256 if *kind != command_kind {
257 return Err(RuntimeEffectControllerError::new(
258 "runtime_effect_invocation_kind",
259 format!(
260 "runtime effect invocation kind {} does not match command kind {}",
261 kind.as_str(),
262 command_kind.as_str()
263 ),
264 ));
265 }
266 if invocation
267 .replay
268 .as_ref()
269 .is_none_or(|replay| replay.key.is_empty())
270 {
271 return Err(RuntimeEffectControllerError::new(
272 "runtime_effect_replay_required",
273 "runtime effect envelope requires replay.key",
274 ));
275 }
276 Ok(())
277}
278
279fn validate_effect_command(
280 command: &RuntimeEffectCommand,
281) -> Result<(), RuntimeEffectControllerError> {
282 if let RuntimeEffectCommand::DurableStep { step_id, .. } = command
283 && step_id.trim().is_empty()
284 {
285 return Err(RuntimeEffectControllerError::new(
286 "runtime_effect_durable_step_id",
287 "runtime effect durable step id must be non-empty",
288 ));
289 }
290 if let RuntimeEffectCommand::ToolBatch { batch } = command {
291 if batch.batch_id.trim().is_empty() {
292 return Err(RuntimeEffectControllerError::new(
293 "runtime_effect_tool_batch_id",
294 "runtime effect tool batch id must be non-empty",
295 ));
296 }
297 if batch.calls.is_empty() {
298 return Err(RuntimeEffectControllerError::new(
299 "runtime_effect_tool_batch_empty",
300 "runtime effect tool batch must contain at least one prepared call",
301 ));
302 }
303 for (index, child) in batch.calls.iter().enumerate() {
304 if child.call.call_id.trim().is_empty() {
305 return Err(RuntimeEffectControllerError::new(
306 "runtime_effect_tool_batch_child_call_id",
307 format!("runtime effect tool batch child {index} has an empty call id"),
308 ));
309 }
310 if child.replay_suffix.trim().is_empty() {
311 return Err(RuntimeEffectControllerError::new(
312 "runtime_effect_tool_batch_child_replay",
313 format!("runtime effect tool batch child {index} has an empty replay suffix"),
314 ));
315 }
316 }
317 }
318 Ok(())
319}
320
321#[derive(Clone, Debug, Serialize, Deserialize)]
323#[serde(tag = "type", rename_all = "snake_case")]
324pub enum RuntimeEffectCommand {
325 LlmCall {
326 request: Box<LlmRequestSpec>,
327 },
328 Direct {
329 request: Box<LlmRequestSpec>,
330 usage_source: String,
331 },
332 ToolCall {
333 call: crate::PreparedToolCall,
334 },
335 ToolBatch {
336 batch: crate::PreparedToolBatch,
337 },
338 Process {
339 command: Box<ProcessCommand>,
340 },
341 ExecCode {
342 language: String,
343 code: String,
344 },
345 Checkpoint {
346 checkpoint: CheckpointKind,
347 },
348 SyncExecutionEnvironment {
349 update_machine_config: bool,
350 },
351 Sleep {
352 duration_ms: u64,
353 },
354 AwaitEvent {
355 key: crate::AwaitEventKey,
356 },
357 DurableStep {
358 step_id: String,
359 input: serde_json::Value,
360 },
361}
362
363impl RuntimeEffectCommand {
364 pub fn process(command: ProcessCommand) -> Self {
365 Self::Process {
366 command: Box::new(command),
367 }
368 }
369
370 pub fn kind(&self) -> RuntimeEffectKind {
371 match self {
372 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
373 Self::Direct { .. } => RuntimeEffectKind::Direct,
374 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
375 Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
376 Self::Process { .. } => RuntimeEffectKind::Process,
377 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
378 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
379 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
380 Self::Sleep { .. } => RuntimeEffectKind::Sleep,
381 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
382 Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
383 }
384 }
385}
386
387#[derive(Clone, Debug, Serialize, Deserialize)]
389#[serde(tag = "op", rename_all = "snake_case")]
390pub enum ProcessCommand {
391 Start {
392 registration: ProcessRegistration,
393 #[serde(default, skip_serializing_if = "Option::is_none")]
394 grant: Option<ProcessStartGrant>,
395 #[serde(
396 default,
397 skip_serializing_if = "boxed_process_execution_context_is_empty"
398 )]
399 execution_context: Box<ProcessExecutionContext>,
400 },
401 List {
402 session_scope: SessionScope,
403 #[serde(default)]
404 mode: ProcessListMode,
405 },
406 Transfer {
407 from_scope: SessionScope,
408 to_scope: SessionScope,
409 process_ids: Vec<String>,
410 },
411 DeleteSession {
412 session_id: String,
413 },
414 Await {
415 process_id: String,
416 },
417 Cancel {
418 process_id: String,
419 reason: Option<String>,
420 },
421 Signal {
422 process_id: String,
423 signal_name: String,
424 signal_id: String,
425 request: crate::ProcessEventAppendRequest,
426 },
427}
428
429fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
430 context.is_empty()
431}
432
433type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
434
435impl ProcessCommand {
436 pub fn effect_id(&self) -> String {
437 match self {
438 Self::Start { registration, .. } => format!("process:start:{}", registration.id),
439 Self::List {
440 session_scope,
441 mode,
442 } => {
443 format!("process:list:{}:{}", session_scope.id(), mode.as_str())
444 }
445 Self::Transfer {
446 from_scope,
447 to_scope,
448 process_ids,
449 } => {
450 let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
451 .unwrap_or_else(|_| "unhashable".to_string());
452 format!(
453 "process:transfer:{}:{}:{digest}",
454 from_scope.id(),
455 to_scope.id()
456 )
457 }
458 Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
459 Self::Await { process_id } => format!("process:await:{process_id}"),
460 Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
461 Self::Signal {
462 process_id,
463 signal_name,
464 signal_id,
465 ..
466 } => {
467 format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
468 }
469 }
470 }
471}
472
473#[derive(Clone, Debug, Serialize, Deserialize)]
475#[serde(tag = "op", rename_all = "snake_case")]
476pub enum ProcessEffectOutcome {
477 Start {
478 record: ProcessRecord,
479 },
480 List {
481 entries: Vec<ProcessHandleGrantEntry>,
482 },
483 Transfer,
484 DeleteSession {
485 report: crate::ProcessSessionDeleteReport,
486 },
487 Await {
488 output: ProcessAwaitOutput,
489 },
490 Cancel {
491 record: ProcessRecord,
492 },
493 Signal {
494 event: crate::ProcessEvent,
495 },
496}
497
498#[derive(Clone, Debug, Serialize, Deserialize)]
499pub struct ToolCallEffectOutcome {
500 pub launch: ToolCallLaunch,
501 #[serde(default, skip_serializing_if = "Vec::is_empty")]
502 pub triggers: Vec<ToolTriggerEffectOutcome>,
503}
504
505#[derive(Clone, Debug, Serialize, Deserialize)]
506pub struct ToolBatchEffectOutcome {
507 pub launches: Vec<ToolCallLaunch>,
508 #[serde(default, skip_serializing_if = "Vec::is_empty")]
509 pub triggers: Vec<ToolTriggerEffectOutcome>,
510}
511
512#[derive(Clone, Debug, Serialize, Deserialize)]
513#[serde(tag = "status", rename_all = "snake_case")]
514pub enum ToolCallLaunch {
515 Done {
516 result: CompletedToolCall,
517 },
518 Pending {
519 key: crate::AwaitEventKey,
520 pending: crate::PendingCompletion,
521 duration_ms: u64,
522 },
523}
524
525#[derive(Clone, Debug, Serialize, Deserialize)]
527#[serde(tag = "type", rename_all = "snake_case")]
528pub enum RuntimeEffectOutcome {
529 LlmCall {
530 result: Result<LlmResponse, LlmCallError>,
531 text_streamed: bool,
532 },
533 Direct {
534 result: Result<LlmResponse, LlmCallError>,
535 },
536 ToolCall {
537 launch: ToolCallLaunch,
538 #[serde(default, skip_serializing_if = "Vec::is_empty")]
539 triggers: Vec<ToolTriggerEffectOutcome>,
540 },
541 ToolBatch {
542 launches: Vec<ToolCallLaunch>,
543 #[serde(default, skip_serializing_if = "Vec::is_empty")]
544 triggers: Vec<ToolTriggerEffectOutcome>,
545 },
546 Process {
547 result: ProcessEffectOutcome,
548 },
549 ExecCode {
550 result: Result<ExecResponse, String>,
551 },
552 Checkpoint {
553 result: CheckpointOutcome,
554 },
555 SyncExecutionEnvironment {
556 result: Result<Option<ExecutionEnvironmentSync>, String>,
557 },
558 Sleep,
559 AwaitEvent {
560 resolution: crate::Resolution,
561 },
562 DurableStep {
563 value: serde_json::Value,
564 },
565}
566
567#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
577pub struct LlmAttachmentSpec {
578 pub reference: AttachmentRef,
579}
580
581impl LlmAttachmentSpec {
582 fn into_attachment(self) -> LlmAttachment {
583 LlmAttachment::reference(self.reference)
584 }
585}
586
587#[derive(Clone, Debug, Serialize, Deserialize)]
591pub struct LlmRequestSpec {
592 pub model: String,
593 pub messages: Vec<LlmMessage>,
594 pub attachments: Vec<LlmAttachmentSpec>,
595 pub tools: Arc<Vec<LlmToolSpec>>,
596 pub tool_choice: LlmToolChoice,
597 pub model_variant: Option<String>,
598 #[serde(default)]
599 pub generation: crate::GenerationOptions,
600 pub session_id: Option<String>,
601 pub output_spec: Option<LlmOutputSpec>,
602}
603
604impl LlmRequestSpec {
605 pub async fn from_request(
606 request: &CoreLlmRequest,
607 attachment_store: &dyn AttachmentStore,
608 ) -> Result<Self, RuntimeEffectControllerError> {
609 Ok(Self {
610 model: request.model.clone(),
611 messages: request.messages.clone(),
612 attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
613 .await?,
614 tools: Arc::clone(&request.tools),
615 tool_choice: request.tool_choice.clone(),
616 model_variant: request.model_variant.clone(),
617 generation: request.generation.clone(),
618 session_id: request.session_id.clone(),
619 output_spec: request.output_spec.clone(),
620 })
621 }
622
623 pub fn into_request(
624 self,
625 stream_events: Option<LlmEventSender>,
626 provider_trace: Option<LlmProviderTraceSender>,
627 ) -> CoreLlmRequest {
628 CoreLlmRequest {
629 model: self.model,
630 messages: self.messages,
631 attachments: self
632 .attachments
633 .into_iter()
634 .map(LlmAttachmentSpec::into_attachment)
635 .collect(),
636 tools: self.tools,
637 tool_choice: self.tool_choice,
638 model_variant: self.model_variant,
639 generation: self.generation,
640 session_id: self.session_id,
641 output_spec: self.output_spec,
642 stream_events,
643 provider_trace,
644 }
645 }
646}
647
648async fn attachment_specs_from_attachments(
649 attachments: &[LlmAttachment],
650 attachment_store: &dyn AttachmentStore,
651) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
652 let mut specs = Vec::with_capacity(attachments.len());
653 for attachment in attachments {
654 specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
655 }
656 Ok(specs)
657}
658
659async fn attachment_spec_from_attachment(
660 attachment: &LlmAttachment,
661 attachment_store: &dyn AttachmentStore,
662) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
663 if let Some(reference) = attachment.reference.as_ref() {
664 return Ok(LlmAttachmentSpec {
665 reference: reference.clone(),
666 });
667 }
668 if attachment.data.is_empty() {
669 return Err(RuntimeEffectControllerError::new(
670 "runtime_effect_attachment_missing_reference",
671 "runtime effect attachment has neither a durable reference nor inline bytes",
672 ));
673 }
674 let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
675 RuntimeEffectControllerError::new(
676 "runtime_effect_attachment_media_type",
677 format!(
678 "attachment media type `{}` cannot be represented durably",
679 attachment.mime
680 ),
681 )
682 })?;
683 let reference = attachment_store
684 .put(
685 attachment.data.clone(),
686 AttachmentCreateMeta::new(media_type, None, None, None),
687 )
688 .await
689 .map_err(|err| {
690 RuntimeEffectControllerError::new(
691 "runtime_effect_attachment_store",
692 format!("failed to store attachment before runtime effect invocation: {err}"),
693 )
694 })?;
695 Ok(LlmAttachmentSpec { reference })
696}
697
698impl RuntimeEffectOutcome {
699 pub fn into_llm_call(
700 self,
701 ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
702 match self {
703 Self::LlmCall {
704 result,
705 text_streamed,
706 } => Ok((result, text_streamed)),
707 other => Err(RuntimeEffectControllerError::wrong_outcome(
708 RuntimeEffectKind::LlmCall,
709 other.kind(),
710 )),
711 }
712 }
713
714 pub fn into_direct_response(
715 self,
716 ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
717 match self {
718 Self::Direct { result } => Ok(result),
719 other => Err(RuntimeEffectControllerError::wrong_outcome(
720 RuntimeEffectKind::Direct,
721 other.kind(),
722 )),
723 }
724 }
725
726 pub fn into_tool_call(self) -> Result<CompletedToolCall, RuntimeEffectControllerError> {
727 match self {
728 Self::ToolCall {
729 launch: ToolCallLaunch::Done { result },
730 ..
731 } => Ok(result),
732 Self::ToolCall {
733 launch: ToolCallLaunch::Pending { .. },
734 ..
735 } => Err(RuntimeEffectControllerError::new(
736 "runtime_effect_tool_call_pending",
737 "tool call launch is pending and has no completed output yet",
738 )),
739 other => Err(RuntimeEffectControllerError::wrong_outcome(
740 RuntimeEffectKind::ToolCall,
741 other.kind(),
742 )),
743 }
744 }
745
746 pub fn into_tool_call_effect(
747 self,
748 ) -> Result<ToolCallEffectOutcome, RuntimeEffectControllerError> {
749 match self {
750 Self::ToolCall { launch, triggers } => Ok(ToolCallEffectOutcome { launch, triggers }),
751 other => Err(RuntimeEffectControllerError::wrong_outcome(
752 RuntimeEffectKind::ToolCall,
753 other.kind(),
754 )),
755 }
756 }
757
758 pub fn into_tool_batch_effect(
759 self,
760 ) -> Result<ToolBatchEffectOutcome, RuntimeEffectControllerError> {
761 match self {
762 Self::ToolBatch { launches, triggers } => {
763 Ok(ToolBatchEffectOutcome { launches, triggers })
764 }
765 other => Err(RuntimeEffectControllerError::wrong_outcome(
766 RuntimeEffectKind::ToolBatch,
767 other.kind(),
768 )),
769 }
770 }
771
772 pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
773 match self {
774 Self::Process { result } => Ok(result),
775 other => Err(RuntimeEffectControllerError::wrong_outcome(
776 RuntimeEffectKind::Process,
777 other.kind(),
778 )),
779 }
780 }
781
782 pub fn into_exec_code(
783 self,
784 ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
785 match self {
786 Self::ExecCode { result } => Ok(result),
787 other => Err(RuntimeEffectControllerError::wrong_outcome(
788 RuntimeEffectKind::ExecCode,
789 other.kind(),
790 )),
791 }
792 }
793
794 pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
795 match self {
796 Self::Checkpoint { result } => Ok(result),
797 other => Err(RuntimeEffectControllerError::wrong_outcome(
798 RuntimeEffectKind::Checkpoint,
799 other.kind(),
800 )),
801 }
802 }
803
804 pub fn into_sync_execution_environment(
805 self,
806 ) -> Result<Result<Option<ExecutionEnvironmentSync>, String>, RuntimeEffectControllerError>
807 {
808 match self {
809 Self::SyncExecutionEnvironment { result } => Ok(result),
810 other => Err(RuntimeEffectControllerError::wrong_outcome(
811 RuntimeEffectKind::SyncExecutionEnvironment,
812 other.kind(),
813 )),
814 }
815 }
816
817 pub fn into_await_event(self) -> Result<crate::Resolution, RuntimeEffectControllerError> {
818 match self {
819 Self::AwaitEvent { resolution } => Ok(resolution),
820 other => Err(RuntimeEffectControllerError::wrong_outcome(
821 RuntimeEffectKind::AwaitEvent,
822 other.kind(),
823 )),
824 }
825 }
826
827 pub fn into_durable_step(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
828 match self {
829 Self::DurableStep { value } => Ok(value),
830 other => Err(RuntimeEffectControllerError::wrong_outcome(
831 RuntimeEffectKind::DurableStep,
832 other.kind(),
833 )),
834 }
835 }
836
837 pub fn kind(&self) -> RuntimeEffectKind {
838 match self {
839 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
840 Self::Direct { .. } => RuntimeEffectKind::Direct,
841 Self::ToolCall { .. } => RuntimeEffectKind::ToolCall,
842 Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
843 Self::Process { .. } => RuntimeEffectKind::Process,
844 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
845 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
846 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
847 Self::Sleep => RuntimeEffectKind::Sleep,
848 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
849 Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
850 }
851 }
852}