1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4
5use crate::CheckpointKind;
6use crate::llm::types::{
7 LlmAttachment, LlmEventSender, LlmMessage, LlmOutputSpec, LlmProviderTraceSender,
8 LlmToolChoice, LlmToolSpec,
9};
10use crate::runtime::ProcessHandleGrantEntry;
11use crate::sansio::{CompletedToolCall, ExecutionEnvironmentSync, LlmCallError};
12use crate::tool_dispatch::ToolTriggerEffectOutcome;
13use crate::{
14 AttachmentCreateMeta, AttachmentRef, AttachmentStore, CausalRef, CheckpointDelivery,
15 ExecResponse, LlmRequest as CoreLlmRequest, LlmResponse, MediaType, ProcessAwaitOutput,
16 ProcessExecutionContext, ProcessListMode, ProcessRecord, ProcessRegistration,
17 ProcessStartGrant, SessionScope,
18};
19
20use super::executor::RuntimeEffectControllerError;
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum RuntimeEffectKind {
26 LlmCall,
27 Direct,
28 ToolAttempt,
29 ToolBatch,
30 Process,
31 ExecCode,
32 Checkpoint,
33 SyncExecutionEnvironment,
34 Sleep,
35 AwaitEvent,
36 DurableStep,
37}
38
39impl RuntimeEffectKind {
40 pub fn as_str(self) -> &'static str {
41 match self {
42 Self::LlmCall => "llm_call",
43 Self::Direct => "direct",
44 Self::ToolAttempt => "tool_attempt",
45 Self::ToolBatch => "tool_batch",
46 Self::Process => "process",
47 Self::ExecCode => "exec_code",
48 Self::Checkpoint => "checkpoint",
49 Self::SyncExecutionEnvironment => "sync_execution_environment",
50 Self::Sleep => "sleep",
51 Self::AwaitEvent => "await_event",
52 Self::DurableStep => "durable_step",
53 }
54 }
55}
56
57#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
59pub struct RuntimeInvocation {
60 pub scope: RuntimeScope,
61 pub subject: RuntimeSubject,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub caused_by: Option<CausalRef>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
65 pub replay: Option<RuntimeReplay>,
66}
67
68impl RuntimeInvocation {
69 pub fn effect(
70 scope: RuntimeScope,
71 effect_id: impl Into<String>,
72 kind: RuntimeEffectKind,
73 replay_key: impl Into<String>,
74 ) -> Self {
75 Self {
76 scope,
77 subject: RuntimeSubject::Effect {
78 effect_id: effect_id.into(),
79 kind,
80 },
81 caused_by: None,
82 replay: Some(RuntimeReplay {
83 key: replay_key.into(),
84 }),
85 }
86 }
87
88 pub fn with_caused_by(mut self, caused_by: Option<CausalRef>) -> Self {
89 self.caused_by = caused_by;
90 self
91 }
92
93 pub fn effect_id(&self) -> Option<&str> {
94 match &self.subject {
95 RuntimeSubject::Effect { effect_id, .. } => Some(effect_id),
96 _ => None,
97 }
98 }
99
100 pub fn effect_kind(&self) -> Option<RuntimeEffectKind> {
101 match &self.subject {
102 RuntimeSubject::Effect { kind, .. } => Some(*kind),
103 _ => None,
104 }
105 }
106
107 pub fn replay_key(&self) -> Option<&str> {
108 self.replay.as_ref().map(|replay| replay.key.as_str())
109 }
110
111 pub fn causal_ref(&self) -> Option<CausalRef> {
112 match &self.subject {
113 RuntimeSubject::Effect { effect_id, .. } => Some(CausalRef::Effect {
114 session_id: self.scope.session_id.clone(),
115 turn_id: self.scope.turn_id.clone(),
116 effect_id: effect_id.clone(),
117 }),
118 RuntimeSubject::Process { process_id } => Some(CausalRef::Process {
119 process_id: process_id.clone(),
120 }),
121 RuntimeSubject::ProcessEvent {
122 process_id,
123 sequence,
124 ..
125 } => Some(CausalRef::ProcessEvent {
126 process_id: process_id.clone(),
127 sequence: *sequence,
128 }),
129 RuntimeSubject::TriggerOccurrence { occurrence_id } => {
130 Some(CausalRef::TriggerOccurrence {
131 occurrence_id: occurrence_id.clone(),
132 })
133 }
134 RuntimeSubject::SessionNode { node_id } => Some(CausalRef::SessionNode {
135 session_id: self.scope.session_id.clone(),
136 node_id: node_id.clone(),
137 }),
138 }
139 }
140}
141
142#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
143pub struct RuntimeScope {
144 pub session_id: String,
145 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub turn_id: Option<String>,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
148 pub turn_index: Option<usize>,
149 #[serde(default, skip_serializing_if = "Option::is_none")]
150 pub protocol_iteration: Option<usize>,
151}
152
153impl RuntimeScope {
154 pub fn new(session_id: impl Into<String>) -> Self {
155 Self {
156 session_id: session_id.into(),
157 turn_id: None,
158 turn_index: None,
159 protocol_iteration: None,
160 }
161 }
162
163 pub fn for_turn(
164 session_id: impl Into<String>,
165 turn_id: impl Into<String>,
166 turn_index: usize,
167 protocol_iteration: usize,
168 ) -> Self {
169 Self {
170 session_id: session_id.into(),
171 turn_id: Some(turn_id.into()),
172 turn_index: Some(turn_index),
173 protocol_iteration: Some(protocol_iteration),
174 }
175 }
176}
177
178#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
179pub struct RuntimeReplay {
180 pub key: String,
181}
182
183#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(tag = "type", rename_all = "snake_case")]
185pub enum RuntimeSubject {
186 Effect {
187 effect_id: String,
188 kind: RuntimeEffectKind,
189 },
190 Process {
191 process_id: String,
192 },
193 ProcessEvent {
194 process_id: String,
195 sequence: u64,
196 event_type: String,
197 },
198 TriggerOccurrence {
199 occurrence_id: String,
200 },
201 SessionNode {
202 node_id: String,
203 },
204}
205
206#[derive(Clone, Debug, Serialize, Deserialize)]
208pub struct RuntimeEffectEnvelope {
209 pub invocation: RuntimeInvocation,
210 pub command: RuntimeEffectCommand,
211}
212
213impl RuntimeEffectEnvelope {
214 pub fn new(invocation: RuntimeInvocation, command: RuntimeEffectCommand) -> Self {
215 Self::try_new(invocation, command).expect("valid runtime effect invocation")
216 }
217
218 pub fn try_new(
219 invocation: RuntimeInvocation,
220 command: RuntimeEffectCommand,
221 ) -> Result<Self, RuntimeEffectControllerError> {
222 validate_effect_invocation(&invocation, command.kind())?;
223 validate_effect_command(&command)?;
224 Ok(Self {
225 invocation,
226 command,
227 })
228 }
229
230 pub fn stable_hash(&self) -> Result<String, RuntimeEffectControllerError> {
231 crate::stable_hash::stable_json_sha256_hex(self).map_err(|err| {
232 RuntimeEffectControllerError::new(
233 "runtime_effect_envelope_hash",
234 format!("failed to serialize runtime effect envelope: {err}"),
235 )
236 })
237 }
238}
239
240fn validate_effect_invocation(
241 invocation: &RuntimeInvocation,
242 command_kind: RuntimeEffectKind,
243) -> Result<(), RuntimeEffectControllerError> {
244 let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
245 return Err(RuntimeEffectControllerError::new(
246 "runtime_effect_invocation_subject",
247 "runtime effect envelope subject must be an effect",
248 ));
249 };
250 if effect_id.trim().is_empty() {
251 return Err(RuntimeEffectControllerError::new(
252 "runtime_effect_invocation_subject",
253 "runtime effect envelope effect id must be non-empty",
254 ));
255 }
256 if *kind != command_kind {
257 return Err(RuntimeEffectControllerError::new(
258 "runtime_effect_invocation_kind",
259 format!(
260 "runtime effect invocation kind {} does not match command kind {}",
261 kind.as_str(),
262 command_kind.as_str()
263 ),
264 ));
265 }
266 if invocation
267 .replay
268 .as_ref()
269 .is_none_or(|replay| replay.key.is_empty())
270 {
271 return Err(RuntimeEffectControllerError::new(
272 "runtime_effect_replay_required",
273 "runtime effect envelope requires replay.key",
274 ));
275 }
276 Ok(())
277}
278
279fn validate_effect_command(
280 command: &RuntimeEffectCommand,
281) -> Result<(), RuntimeEffectControllerError> {
282 if let RuntimeEffectCommand::DurableStep { step_id, .. } = command
283 && step_id.trim().is_empty()
284 {
285 return Err(RuntimeEffectControllerError::new(
286 "runtime_effect_durable_step_id",
287 "runtime effect durable step id must be non-empty",
288 ));
289 }
290 if let RuntimeEffectCommand::ToolAttempt {
291 call,
292 execution_grant: _,
293 attempt,
294 max_attempts,
295 } = command
296 {
297 if call.call_id.trim().is_empty() {
298 return Err(RuntimeEffectControllerError::new(
299 "runtime_effect_tool_attempt_call_id",
300 "runtime effect tool attempt requires a non-empty call id",
301 ));
302 }
303 if *attempt == 0 || *max_attempts == 0 || *attempt > *max_attempts {
304 return Err(RuntimeEffectControllerError::new(
305 "runtime_effect_tool_attempt_index",
306 format!(
307 "runtime effect tool attempt must satisfy 1 <= attempt <= max_attempts, got {attempt}/{max_attempts}"
308 ),
309 ));
310 }
311 }
312 if let RuntimeEffectCommand::ToolBatch { batch } = command {
313 if batch.batch_id.trim().is_empty() {
314 return Err(RuntimeEffectControllerError::new(
315 "runtime_effect_tool_batch_id",
316 "runtime effect tool batch id must be non-empty",
317 ));
318 }
319 if batch.calls.is_empty() {
320 return Err(RuntimeEffectControllerError::new(
321 "runtime_effect_tool_batch_empty",
322 "runtime effect tool batch must contain at least one prepared call",
323 ));
324 }
325 for (index, call) in batch.calls.iter().enumerate() {
326 if call.call.call_id.trim().is_empty() {
327 return Err(RuntimeEffectControllerError::new(
328 "runtime_effect_tool_batch_call_id",
329 format!("runtime effect tool batch call {index} has an empty call id"),
330 ));
331 }
332 if call.replay_suffix.trim().is_empty() {
333 return Err(RuntimeEffectControllerError::new(
334 "runtime_effect_tool_batch_call_replay",
335 format!("runtime effect tool batch call {index} has an empty replay suffix"),
336 ));
337 }
338 }
339 }
340 Ok(())
341}
342
343#[derive(Clone, Debug, Serialize, Deserialize)]
345#[serde(tag = "type", rename_all = "snake_case")]
346pub enum RuntimeEffectCommand {
347 LlmCall {
348 request: Box<LlmRequestSpec>,
349 },
350 Direct {
351 request: Box<LlmRequestSpec>,
352 usage_source: String,
353 },
354 ToolAttempt {
355 call: crate::PreparedToolCall,
356 #[serde(default, skip_serializing_if = "Option::is_none")]
357 execution_grant: Option<Box<crate::ToolExecutionGrant>>,
358 attempt: u32,
359 max_attempts: u32,
360 },
361 ToolBatch {
362 batch: crate::PreparedToolBatch,
363 },
364 Process {
365 command: Box<ProcessCommand>,
366 },
367 ExecCode {
368 language: String,
369 code: String,
370 },
371 Checkpoint {
372 checkpoint: CheckpointKind,
373 },
374 SyncExecutionEnvironment {
375 update_machine_config: bool,
376 },
377 Sleep {
378 duration_ms: u64,
379 },
380 AwaitEvent {
381 key: crate::AwaitEventKey,
382 },
383 DurableStep {
384 step_id: String,
385 input: serde_json::Value,
386 },
387}
388
389impl RuntimeEffectCommand {
390 pub fn process(command: ProcessCommand) -> Self {
391 Self::Process {
392 command: Box::new(command),
393 }
394 }
395
396 pub fn kind(&self) -> RuntimeEffectKind {
397 match self {
398 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
399 Self::Direct { .. } => RuntimeEffectKind::Direct,
400 Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
401 Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
402 Self::Process { .. } => RuntimeEffectKind::Process,
403 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
404 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
405 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
406 Self::Sleep { .. } => RuntimeEffectKind::Sleep,
407 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
408 Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
409 }
410 }
411}
412
413#[derive(Clone, Debug, Serialize, Deserialize)]
415#[serde(tag = "op", rename_all = "snake_case")]
416#[allow(clippy::large_enum_variant)]
417pub enum ProcessCommand {
418 Start {
419 registration: ProcessRegistration,
420 #[serde(default, skip_serializing_if = "Option::is_none")]
421 grant: Option<ProcessStartGrant>,
422 #[serde(
423 default,
424 skip_serializing_if = "boxed_process_execution_context_is_empty"
425 )]
426 execution_context: Box<ProcessExecutionContext>,
427 },
428 List {
429 session_scope: SessionScope,
430 #[serde(default)]
431 mode: ProcessListMode,
432 },
433 Transfer {
434 from_scope: SessionScope,
435 to_scope: SessionScope,
436 process_ids: Vec<String>,
437 },
438 DeleteSession {
439 session_id: String,
440 },
441 Await {
442 process_id: String,
443 },
444 Cancel {
445 process_id: String,
446 reason: Option<String>,
447 },
448 Signal {
449 process_id: String,
450 signal_name: String,
451 signal_id: String,
452 request: crate::ProcessEventAppendRequest,
453 },
454}
455
456fn boxed_process_execution_context_is_empty(context: &ProcessExecutionContext) -> bool {
457 context.is_empty()
458}
459
460type CheckpointOutcome = Result<CheckpointDelivery, RuntimeEffectControllerError>;
461
462impl ProcessCommand {
463 pub fn effect_id(&self) -> String {
464 match self {
465 Self::Start { registration, .. } => format!("process:start:{}", registration.id),
466 Self::List {
467 session_scope,
468 mode,
469 } => {
470 format!("process:list:{}:{}", session_scope.id(), mode.as_str())
471 }
472 Self::Transfer {
473 from_scope,
474 to_scope,
475 process_ids,
476 } => {
477 let digest = crate::stable_hash::stable_json_sha256_hex(process_ids)
478 .unwrap_or_else(|_| "unhashable".to_string());
479 format!(
480 "process:transfer:{}:{}:{digest}",
481 from_scope.id(),
482 to_scope.id()
483 )
484 }
485 Self::DeleteSession { session_id } => format!("process:delete-session:{session_id}"),
486 Self::Await { process_id } => format!("process:await:{process_id}"),
487 Self::Cancel { process_id, .. } => format!("process:cancel:{process_id}"),
488 Self::Signal {
489 process_id,
490 signal_name,
491 signal_id,
492 ..
493 } => {
494 format!("process:signal:{process_id}:signal.{signal_name}:{signal_id}")
495 }
496 }
497 }
498}
499
500#[derive(Clone, Debug, Serialize, Deserialize)]
502#[serde(tag = "op", rename_all = "snake_case")]
503pub enum ProcessEffectOutcome {
504 Start {
505 record: Box<ProcessRecord>,
509 },
510 List {
511 entries: Vec<ProcessHandleGrantEntry>,
512 },
513 Transfer,
514 DeleteSession {
515 report: crate::ProcessSessionDeleteReport,
516 },
517 Await {
518 output: ProcessAwaitOutput,
519 },
520 Cancel {
521 record: Box<ProcessRecord>,
522 },
523 Signal {
524 event: Box<crate::ProcessEvent>,
527 },
528}
529
530#[derive(Clone, Debug, Serialize, Deserialize)]
531pub struct ToolAttemptEffectOutcome {
532 pub launch: ToolAttemptLaunch,
533 #[serde(default, skip_serializing_if = "Vec::is_empty")]
534 pub triggers: Vec<ToolTriggerEffectOutcome>,
535}
536
537#[derive(Clone, Debug, Serialize, Deserialize)]
538pub struct ToolBatchEffectOutcome {
539 pub launches: Vec<ToolCallLaunch>,
540 #[serde(default, skip_serializing_if = "Vec::is_empty")]
541 pub triggers: Vec<ToolTriggerEffectOutcome>,
542}
543
544#[derive(Clone, Debug, Serialize, Deserialize)]
545#[serde(tag = "status", rename_all = "snake_case")]
546#[allow(clippy::large_enum_variant)]
547pub enum ToolCallLaunch {
548 Done {
549 result: CompletedToolCall,
550 },
551 Pending {
552 key: crate::AwaitEventKey,
553 pending: crate::PendingCompletion,
554 duration_ms: u64,
555 },
556}
557
558#[derive(Clone, Debug, Serialize, Deserialize)]
559#[serde(tag = "status", rename_all = "snake_case")]
560pub enum ToolAttemptLaunch {
561 Done {
562 record: crate::ToolCallRecord,
563 },
564 Pending {
565 key: crate::AwaitEventKey,
566 pending: crate::PendingCompletion,
567 duration_ms: u64,
568 },
569}
570
571#[derive(Clone, Debug, Serialize, Deserialize)]
573#[serde(tag = "type", rename_all = "snake_case")]
574#[allow(clippy::large_enum_variant)]
575pub enum RuntimeEffectOutcome {
576 LlmCall {
577 result: Result<LlmResponse, LlmCallError>,
578 text_streamed: bool,
579 },
580 Direct {
581 result: Result<LlmResponse, LlmCallError>,
582 },
583 ToolAttempt {
584 launch: ToolAttemptLaunch,
585 #[serde(default, skip_serializing_if = "Vec::is_empty")]
586 triggers: Vec<ToolTriggerEffectOutcome>,
587 },
588 ToolBatch {
589 launches: Vec<ToolCallLaunch>,
590 #[serde(default, skip_serializing_if = "Vec::is_empty")]
591 triggers: Vec<ToolTriggerEffectOutcome>,
592 },
593 Process {
594 result: ProcessEffectOutcome,
595 },
596 ExecCode {
597 result: Result<ExecResponse, String>,
598 },
599 Checkpoint {
600 result: CheckpointOutcome,
601 },
602 SyncExecutionEnvironment {
603 result: Result<Option<ExecutionEnvironmentSync>, String>,
604 },
605 Sleep,
606 AwaitEvent {
607 resolution: crate::Resolution,
608 },
609 DurableStep {
610 value: serde_json::Value,
611 },
612}
613
614#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
624pub struct LlmAttachmentSpec {
625 pub reference: AttachmentRef,
626}
627
628impl LlmAttachmentSpec {
629 fn into_attachment(self) -> LlmAttachment {
630 LlmAttachment::reference(self.reference)
631 }
632}
633
634#[derive(Clone, Debug, Serialize, Deserialize)]
638pub struct LlmRequestSpec {
639 pub model: String,
640 pub messages: Vec<LlmMessage>,
641 pub attachments: Vec<LlmAttachmentSpec>,
642 pub tools: Arc<Vec<LlmToolSpec>>,
643 pub tool_choice: LlmToolChoice,
644 pub model_variant: Option<String>,
645 #[serde(default)]
646 pub generation: crate::GenerationOptions,
647 pub scope: crate::LlmRequestScope,
648 pub output_spec: Option<LlmOutputSpec>,
649}
650
651impl LlmRequestSpec {
652 pub async fn from_request(
653 request: &CoreLlmRequest,
654 attachment_store: &dyn AttachmentStore,
655 ) -> Result<Self, RuntimeEffectControllerError> {
656 Ok(Self {
657 model: request.model.clone(),
658 messages: request.messages.clone(),
659 attachments: attachment_specs_from_attachments(&request.attachments, attachment_store)
660 .await?,
661 tools: Arc::clone(&request.tools),
662 tool_choice: request.tool_choice.clone(),
663 model_variant: request.model_variant.clone(),
664 generation: request.generation.clone(),
665 scope: request.scope.clone(),
666 output_spec: request.output_spec.clone(),
667 })
668 }
669
670 pub fn into_request(
671 self,
672 stream_events: Option<LlmEventSender>,
673 provider_trace: Option<LlmProviderTraceSender>,
674 ) -> CoreLlmRequest {
675 CoreLlmRequest {
676 model: self.model,
677 messages: self.messages,
678 attachments: self
679 .attachments
680 .into_iter()
681 .map(LlmAttachmentSpec::into_attachment)
682 .collect(),
683 tools: self.tools,
684 tool_choice: self.tool_choice,
685 model_variant: self.model_variant,
686 generation: self.generation,
687 scope: self.scope,
688 output_spec: self.output_spec,
689 stream_events,
690 provider_trace,
691 }
692 }
693}
694
695async fn attachment_specs_from_attachments(
696 attachments: &[LlmAttachment],
697 attachment_store: &dyn AttachmentStore,
698) -> Result<Vec<LlmAttachmentSpec>, RuntimeEffectControllerError> {
699 let mut specs = Vec::with_capacity(attachments.len());
700 for attachment in attachments {
701 specs.push(attachment_spec_from_attachment(attachment, attachment_store).await?);
702 }
703 Ok(specs)
704}
705
706async fn attachment_spec_from_attachment(
707 attachment: &LlmAttachment,
708 attachment_store: &dyn AttachmentStore,
709) -> Result<LlmAttachmentSpec, RuntimeEffectControllerError> {
710 if let Some(reference) = attachment.reference.as_ref() {
711 return Ok(LlmAttachmentSpec {
712 reference: reference.clone(),
713 });
714 }
715 if attachment.data.is_empty() {
716 return Err(RuntimeEffectControllerError::new(
717 "runtime_effect_attachment_missing_reference",
718 "runtime effect attachment has neither a durable reference nor inline bytes",
719 ));
720 }
721 let media_type = MediaType::from_mime(&attachment.mime).ok_or_else(|| {
722 RuntimeEffectControllerError::new(
723 "runtime_effect_attachment_media_type",
724 format!(
725 "attachment media type `{}` cannot be represented durably",
726 attachment.mime
727 ),
728 )
729 })?;
730 let reference = attachment_store
731 .put(
732 attachment.data.clone(),
733 AttachmentCreateMeta::new(media_type, None, None, None),
734 )
735 .await
736 .map_err(|err| {
737 RuntimeEffectControllerError::new(
738 "runtime_effect_attachment_store",
739 format!("failed to store attachment before runtime effect invocation: {err}"),
740 )
741 })?;
742 Ok(LlmAttachmentSpec { reference })
743}
744
745impl RuntimeEffectOutcome {
746 pub fn into_llm_call(
747 self,
748 ) -> Result<(Result<LlmResponse, LlmCallError>, bool), RuntimeEffectControllerError> {
749 match self {
750 Self::LlmCall {
751 result,
752 text_streamed,
753 } => Ok((result, text_streamed)),
754 other => Err(RuntimeEffectControllerError::wrong_outcome(
755 RuntimeEffectKind::LlmCall,
756 other.kind(),
757 )),
758 }
759 }
760
761 pub fn into_direct_response(
762 self,
763 ) -> Result<Result<LlmResponse, LlmCallError>, RuntimeEffectControllerError> {
764 match self {
765 Self::Direct { result } => Ok(result),
766 other => Err(RuntimeEffectControllerError::wrong_outcome(
767 RuntimeEffectKind::Direct,
768 other.kind(),
769 )),
770 }
771 }
772
773 pub fn into_tool_attempt_effect(
774 self,
775 ) -> Result<ToolAttemptEffectOutcome, RuntimeEffectControllerError> {
776 match self {
777 Self::ToolAttempt { launch, triggers } => {
778 Ok(ToolAttemptEffectOutcome { launch, triggers })
779 }
780 other => Err(RuntimeEffectControllerError::wrong_outcome(
781 RuntimeEffectKind::ToolAttempt,
782 other.kind(),
783 )),
784 }
785 }
786
787 pub fn into_tool_batch_effect(
788 self,
789 ) -> Result<ToolBatchEffectOutcome, RuntimeEffectControllerError> {
790 match self {
791 Self::ToolBatch { launches, triggers } => {
792 Ok(ToolBatchEffectOutcome { launches, triggers })
793 }
794 other => Err(RuntimeEffectControllerError::wrong_outcome(
795 RuntimeEffectKind::ToolBatch,
796 other.kind(),
797 )),
798 }
799 }
800
801 pub fn into_process(self) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
802 match self {
803 Self::Process { result } => Ok(result),
804 other => Err(RuntimeEffectControllerError::wrong_outcome(
805 RuntimeEffectKind::Process,
806 other.kind(),
807 )),
808 }
809 }
810
811 pub fn into_exec_code(
812 self,
813 ) -> Result<Result<ExecResponse, String>, RuntimeEffectControllerError> {
814 match self {
815 Self::ExecCode { result } => Ok(result),
816 other => Err(RuntimeEffectControllerError::wrong_outcome(
817 RuntimeEffectKind::ExecCode,
818 other.kind(),
819 )),
820 }
821 }
822
823 pub(crate) fn into_checkpoint(self) -> Result<CheckpointOutcome, RuntimeEffectControllerError> {
824 match self {
825 Self::Checkpoint { result } => Ok(result),
826 other => Err(RuntimeEffectControllerError::wrong_outcome(
827 RuntimeEffectKind::Checkpoint,
828 other.kind(),
829 )),
830 }
831 }
832
833 pub fn into_sync_execution_environment(
834 self,
835 ) -> Result<Result<Option<ExecutionEnvironmentSync>, String>, RuntimeEffectControllerError>
836 {
837 match self {
838 Self::SyncExecutionEnvironment { result } => Ok(result),
839 other => Err(RuntimeEffectControllerError::wrong_outcome(
840 RuntimeEffectKind::SyncExecutionEnvironment,
841 other.kind(),
842 )),
843 }
844 }
845
846 pub fn into_await_event(self) -> Result<crate::Resolution, RuntimeEffectControllerError> {
847 match self {
848 Self::AwaitEvent { resolution } => Ok(resolution),
849 other => Err(RuntimeEffectControllerError::wrong_outcome(
850 RuntimeEffectKind::AwaitEvent,
851 other.kind(),
852 )),
853 }
854 }
855
856 pub fn into_durable_step(self) -> Result<serde_json::Value, RuntimeEffectControllerError> {
857 match self {
858 Self::DurableStep { value } => Ok(value),
859 other => Err(RuntimeEffectControllerError::wrong_outcome(
860 RuntimeEffectKind::DurableStep,
861 other.kind(),
862 )),
863 }
864 }
865
866 pub fn kind(&self) -> RuntimeEffectKind {
867 match self {
868 Self::LlmCall { .. } => RuntimeEffectKind::LlmCall,
869 Self::Direct { .. } => RuntimeEffectKind::Direct,
870 Self::ToolAttempt { .. } => RuntimeEffectKind::ToolAttempt,
871 Self::ToolBatch { .. } => RuntimeEffectKind::ToolBatch,
872 Self::Process { .. } => RuntimeEffectKind::Process,
873 Self::ExecCode { .. } => RuntimeEffectKind::ExecCode,
874 Self::Checkpoint { .. } => RuntimeEffectKind::Checkpoint,
875 Self::SyncExecutionEnvironment { .. } => RuntimeEffectKind::SyncExecutionEnvironment,
876 Self::Sleep => RuntimeEffectKind::Sleep,
877 Self::AwaitEvent { .. } => RuntimeEffectKind::AwaitEvent,
878 Self::DurableStep { .. } => RuntimeEffectKind::DurableStep,
879 }
880 }
881}