1use std::collections::HashMap;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Instant;
5
6use serde::{Deserialize, Serialize};
7use tokio::sync::mpsc;
8use tokio_util::sync::CancellationToken;
9
10use crate::AttachmentStore;
11use crate::LlmRequest as CoreLlmRequest;
12use crate::LlmResponse;
13use crate::ProcessRecord;
14use crate::ProcessRegistry;
15use crate::provider::ProviderHandle;
16use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
17use crate::sansio::LlmCallError;
18use crate::{PluginError, RuntimeError, RuntimeErrorCode};
19
20use super::envelope::{
21 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
22 RuntimeEffectKind, RuntimeEffectOutcome,
23};
24use super::outcome::llm_call_error_from_transport;
25
26type AwaitEventOptions = (CancellationToken, Option<Instant>, Arc<dyn crate::Clock>);
27
28use super::await_events::inline_await_events;
29
30#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
40#[serde(tag = "type", rename_all = "snake_case")]
41pub enum ExecutionScope {
42 Turn {
43 session_id: String,
44 turn_id: String,
45 },
46 Process {
47 process_id: String,
48 },
49 QueueDrain {
50 session_id: String,
51 drain_id: String,
52 },
53 SessionDelete {
54 session_id: String,
55 },
56 RuntimeOperation {
57 operation_id: String,
58 },
59}
60
61impl ExecutionScope {
62 pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
63 Self::Turn {
64 session_id: session_id.into(),
65 turn_id: turn_id.into(),
66 }
67 }
68
69 pub fn process(process_id: impl Into<String>) -> Self {
70 Self::Process {
71 process_id: process_id.into(),
72 }
73 }
74
75 pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
76 Self::QueueDrain {
77 session_id: session_id.into(),
78 drain_id: drain_id.into(),
79 }
80 }
81
82 pub fn session_delete(session_id: impl Into<String>) -> Self {
83 Self::SessionDelete {
84 session_id: session_id.into(),
85 }
86 }
87
88 pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
89 Self::RuntimeOperation {
90 operation_id: operation_id.into(),
91 }
92 }
93
94 pub fn id(&self) -> &str {
95 match self {
96 Self::Turn { turn_id, .. } => turn_id,
97 Self::Process { process_id } => process_id,
98 Self::QueueDrain { drain_id, .. } => drain_id,
99 Self::SessionDelete { session_id } => session_id,
100 Self::RuntimeOperation { operation_id } => operation_id,
101 }
102 }
103
104 pub fn session_id(&self) -> Option<&str> {
105 match self {
106 Self::Turn { session_id, .. }
107 | Self::QueueDrain { session_id, .. }
108 | Self::SessionDelete { session_id } => Some(session_id),
109 Self::Process { .. } | Self::RuntimeOperation { .. } => None,
110 }
111 }
112
113 pub fn turn_id(&self) -> Option<&str> {
114 match self {
115 Self::Turn { turn_id, .. } => Some(turn_id),
116 _ => None,
117 }
118 }
119
120 pub fn validates_turn_trace_id(&self) -> bool {
121 matches!(self, Self::Turn { .. })
122 }
123
124 pub(super) fn validate(&self) -> Result<(), RuntimeError> {
125 let missing = match self {
126 Self::Turn {
127 session_id,
128 turn_id,
129 } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
130 Self::Process { process_id } => process_id.trim().is_empty(),
131 Self::QueueDrain {
132 session_id,
133 drain_id,
134 } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
135 Self::SessionDelete { session_id } => session_id.trim().is_empty(),
136 Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
137 };
138 if missing {
139 return Err(RuntimeError::new(
140 RuntimeErrorCode::MissingExecutionScopeId,
141 "execution scopes require non-empty stable ids",
142 ));
143 }
144 Ok(())
145 }
146}
147
148#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
149#[serde(tag = "type", rename_all = "snake_case")]
150pub enum AwaitEventWaitIdentity {
151 ToolCompletion {
152 tool_call_id: String,
153 },
154 ProcessSignal {
155 process_id: String,
156 signal_name: String,
157 ordinal: u64,
158 },
159 Custom {
160 key: String,
161 },
162}
163
164impl AwaitEventWaitIdentity {
165 pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
166 Self::ToolCompletion {
167 tool_call_id: tool_call_id.into(),
168 }
169 }
170
171 pub fn process_signal(
172 process_id: impl Into<String>,
173 signal_name: impl Into<String>,
174 ordinal: u64,
175 ) -> Self {
176 Self::ProcessSignal {
177 process_id: process_id.into(),
178 signal_name: signal_name.into(),
179 ordinal,
180 }
181 }
182
183 pub(super) fn validate(&self) -> Result<(), RuntimeError> {
184 let invalid = match self {
185 Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
186 Self::ProcessSignal {
187 process_id,
188 signal_name,
189 ordinal,
190 } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
191 Self::Custom { key } => key.trim().is_empty(),
192 };
193 if invalid {
194 return Err(RuntimeError::new(
195 "invalid_await_event_wait_identity",
196 "await-event wait identity requires non-empty stable ids",
197 ));
198 }
199 Ok(())
200 }
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
204pub struct AwaitEventKey {
205 pub scope: ExecutionScope,
206 pub wait: AwaitEventWaitIdentity,
207 pub key_id: String,
208 pub signature: String,
209}
210
211impl AwaitEventKey {
212 pub fn promise_key(&self) -> String {
213 format!("lash-await-event:{}", self.key_id)
214 }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
218pub struct ExternalCompletionError {
219 pub code: String,
220 pub message: String,
221 #[serde(default, skip_serializing_if = "Option::is_none")]
222 pub raw: Option<serde_json::Value>,
223}
224
225impl ExternalCompletionError {
226 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
227 Self {
228 code: code.into(),
229 message: message.into(),
230 raw: None,
231 }
232 }
233}
234
235#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
236#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
237pub enum Resolution {
238 Ok(serde_json::Value),
239 Err(ExternalCompletionError),
240 Timeout,
241 Cancelled,
242}
243
244#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
245#[serde(tag = "status", rename_all = "snake_case")]
246pub enum ResolveOutcome {
247 Accepted,
248 AlreadyResolved { terminal: Resolution },
249 UnknownOrRevoked,
250}
251
252enum ScopedEffectControllerInner<'run> {
253 Borrowed(&'run dyn RuntimeEffectController),
254 Shared(Arc<dyn RuntimeEffectController>),
255}
256
257impl Clone for ScopedEffectControllerInner<'_> {
258 fn clone(&self) -> Self {
259 match self {
260 Self::Borrowed(controller) => Self::Borrowed(*controller),
261 Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
262 }
263 }
264}
265
266#[derive(Clone)]
268pub struct ScopedEffectController<'run> {
269 controller: ScopedEffectControllerInner<'run>,
270 scope: ExecutionScope,
271}
272
273impl<'run> ScopedEffectController<'run> {
274 pub fn borrowed(
275 controller: &'run dyn RuntimeEffectController,
276 scope: ExecutionScope,
277 ) -> Result<Self, RuntimeError> {
278 scope.validate()?;
279 Ok(Self {
280 controller: ScopedEffectControllerInner::Borrowed(controller),
281 scope,
282 })
283 }
284
285 pub fn shared(
286 controller: Arc<dyn RuntimeEffectController>,
287 scope: ExecutionScope,
288 ) -> Result<Self, RuntimeError> {
289 scope.validate()?;
290 Ok(Self {
291 controller: ScopedEffectControllerInner::Shared(controller),
292 scope,
293 })
294 }
295
296 pub fn controller(&self) -> &dyn RuntimeEffectController {
297 match &self.controller {
298 ScopedEffectControllerInner::Borrowed(controller) => *controller,
299 ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
300 }
301 }
302
303 pub fn execution_scope(&self) -> &ExecutionScope {
304 &self.scope
305 }
306
307 pub fn scope_id(&self) -> &str {
308 self.scope.id()
309 }
310
311 pub fn turn_id(&self) -> Option<&str> {
312 self.scope.turn_id()
313 }
314}
315
316#[async_trait::async_trait]
322pub trait AwaitEventResolver: Send + Sync {
323 fn durability_tier(&self) -> crate::DurabilityTier {
324 crate::DurabilityTier::Inline
325 }
326
327 fn requires_durable_attachment_store(&self) -> bool {
328 false
329 }
330
331 fn supports_durable_effects(&self) -> bool {
332 false
333 }
334
335 async fn await_event_key(
336 &self,
337 _scope: &ExecutionScope,
338 _wait: AwaitEventWaitIdentity,
339 ) -> Result<AwaitEventKey, RuntimeError> {
340 Err(RuntimeError::new(
341 "await_event_unsupported",
342 "this effect boundary does not support await-event keys",
343 ))
344 }
345
346 async fn resolve_await_event(
347 &self,
348 _key: &AwaitEventKey,
349 _resolution: Resolution,
350 ) -> Result<ResolveOutcome, RuntimeError> {
351 Ok(ResolveOutcome::UnknownOrRevoked)
352 }
353
354 async fn await_await_event(
355 &self,
356 _key: &AwaitEventKey,
357 _cancel: CancellationToken,
358 _deadline: Option<Instant>,
359 ) -> Result<Resolution, RuntimeError> {
360 Err(RuntimeError::new(
361 "await_event_unsupported",
362 "this effect boundary does not support await-event waits",
363 ))
364 }
365
366 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
367 Ok(())
368 }
369
370 async fn cancel_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
381 Err(RuntimeError::new(
382 "await_event_cancel_unsupported",
383 "this effect boundary does not support cancelling durable waits",
384 ))
385 }
386}
387
388#[async_trait::async_trait]
390pub trait EffectHost: AwaitEventResolver {
391 fn scoped<'run>(
392 &'run self,
393 scope: ExecutionScope,
394 ) -> Result<ScopedEffectController<'run>, RuntimeError>;
395
396 fn scoped_static(
397 &self,
398 _scope: ExecutionScope,
399 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
400 Ok(None)
401 }
402}
403
404#[async_trait::async_trait]
406pub trait RuntimeEffectController: AwaitEventResolver {
407 fn supports_concurrent_effects(&self) -> bool {
417 true
418 }
419
420 async fn execute_effect(
421 &self,
422 envelope: RuntimeEffectEnvelope,
423 local_executor: RuntimeEffectLocalExecutor<'_>,
424 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
425}
426
427#[derive(Clone)]
430pub(crate) enum RuntimeEffectControllerHandle<'run> {
431 Borrowed(ScopedEffectController<'run>),
432 #[cfg(any(test, feature = "testing"))]
433 Shared {
434 controller: Arc<dyn RuntimeEffectController>,
435 scope: ExecutionScope,
436 },
437}
438
439impl<'run> RuntimeEffectControllerHandle<'run> {
440 pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
441 Self::Borrowed(scoped)
442 }
443
444 #[cfg(any(test, feature = "testing"))]
445 pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
446 Self::Shared {
447 controller,
448 scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
449 }
450 }
451
452 pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
453 match self {
454 Self::Borrowed(scoped) => scoped.controller(),
455 #[cfg(any(test, feature = "testing"))]
456 Self::Shared { controller, .. } => controller.as_ref(),
457 }
458 }
459
460 pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
461 match self {
462 Self::Borrowed(scoped) => scoped.clone(),
463 #[cfg(any(test, feature = "testing"))]
464 Self::Shared { controller, scope } => {
465 ScopedEffectController::shared(Arc::clone(controller), scope.clone())
466 .expect("runtime effect controller handle carries a valid scope")
467 }
468 }
469 }
470
471 pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
472 self.clone()
473 }
474}
475
476#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
477#[error("{code}: {message}")]
478pub struct RuntimeEffectControllerError {
479 pub code: String,
480 pub message: String,
481}
482
483impl RuntimeEffectControllerError {
484 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
485 Self {
486 code: code.into(),
487 message: message.into(),
488 }
489 }
490
491 pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
492 Self::new(
493 "runtime_effect_wrong_outcome",
494 format!(
495 "expected {} outcome, got {}",
496 expected.as_str(),
497 actual.as_str()
498 ),
499 )
500 }
501
502 pub(crate) fn into_runtime_error(self) -> RuntimeError {
503 RuntimeError::new(self.code, self.message)
504 }
505}
506
507impl From<RuntimeError> for RuntimeEffectControllerError {
508 fn from(err: RuntimeError) -> Self {
509 Self::new(err.code.as_str(), err.message)
510 }
511}
512
513impl From<PluginError> for RuntimeEffectControllerError {
514 fn from(err: PluginError) -> Self {
515 Self::new("plugin", err.to_string())
516 }
517}
518
519impl From<crate::StoreError> for RuntimeEffectControllerError {
520 fn from(err: crate::StoreError) -> Self {
521 Self::new("runtime_store", err.to_string())
522 }
523}
524
525#[async_trait::async_trait]
530pub(crate) trait ProcessRunner: Send + Sync {
531 async fn run_process(
532 &self,
533 registration: crate::ProcessRegistration,
534 execution_context: crate::ProcessExecutionContext,
535 registry: Arc<dyn ProcessRegistry>,
536 scoped_effect_controller: crate::ScopedEffectController<'_>,
537 cancellation: CancellationToken,
538 ) -> crate::ProcessAwaitOutput;
539}
540
541pub struct ProcessLocalExecution {
542 pub registry: Arc<dyn ProcessRegistry>,
543 pub process_work_driver: Option<crate::ProcessWorkDriver>,
544}
545
546impl ProcessLocalExecution {
547 pub async fn execute(
548 self,
549 command: ProcessCommand,
550 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
551 let Self {
552 registry,
553 process_work_driver,
554 } = self;
555 match command {
556 ProcessCommand::Start {
557 registration,
558 grant,
559 execution_context: _,
560 } => {
561 let record =
562 InlineRuntimeEffectController::start_process(registry, registration, grant)
563 .await?;
564 if let Some(driver) = process_work_driver.as_ref() {
565 driver.claim_and_run_pending("process_start").await?;
566 }
567 Ok(ProcessEffectOutcome::Start {
568 record: Box::new(record),
569 })
570 }
571 ProcessCommand::List {
572 session_scope,
573 mode,
574 } => {
575 let entries = match mode {
576 crate::ProcessListMode::Live => {
577 registry.list_live_handle_grants(&session_scope).await?
578 }
579 crate::ProcessListMode::All => {
580 registry.list_handle_grants(&session_scope).await?
581 }
582 };
583 Ok(ProcessEffectOutcome::List { entries })
584 }
585 ProcessCommand::Transfer {
586 from_scope,
587 to_scope,
588 process_ids,
589 } => {
590 registry
591 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
592 .await?;
593 Ok(ProcessEffectOutcome::Transfer)
594 }
595 ProcessCommand::DeleteSession { session_id } => {
596 let report = registry.delete_session_process_state(&session_id).await?;
597 Ok(ProcessEffectOutcome::DeleteSession { report })
598 }
599 ProcessCommand::Await { process_id } => {
600 let output = if let Some(driver) = process_work_driver.as_ref() {
601 driver.await_terminal(&process_id).await?
602 } else {
603 crate::ProcessAwaiter::polling(registry)
604 .await_terminal(&process_id)
605 .await?
606 };
607 Ok(ProcessEffectOutcome::Await { output })
608 }
609 ProcessCommand::Cancel { process_id, reason } => {
610 let record = InlineRuntimeEffectController
611 .request_process_cancel(registry, &process_id, reason)
612 .await?;
613 Ok(ProcessEffectOutcome::Cancel {
614 record: Box::new(record),
615 })
616 }
617 ProcessCommand::Signal {
618 process_id,
619 request,
620 ..
621 } => {
622 let result = registry.append_event(&process_id, request).await?;
623 Ok(ProcessEffectOutcome::Signal {
624 event: Box::new(result.event),
625 })
626 }
627 }
628 }
629}
630
631pub(super) struct LocalTurnEffectRunner<'a, 'run> {
632 driver: &'a mut RuntimeTurnDriver<'run>,
633 machine: &'a mut crate::TurnMachine,
634 event_tx: mpsc::Sender<RuntimeStreamEvent>,
635 cancellation: CancellationToken,
636}
637
638pub(super) struct LocalDirectEffectRunner {
639 provider: ProviderHandle,
640 attachment_store: Arc<dyn AttachmentStore>,
641}
642
643struct LocalToolBatchEffectRunner<'run> {
644 context: crate::RuntimeExecutionContext<'run>,
645 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
646}
647
648#[async_trait::async_trait]
649trait RuntimeEffectLocalRunner: Send {
650 async fn execute(
651 self: Box<Self>,
652 envelope: RuntimeEffectEnvelope,
653 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
654}
655
656#[cfg(any(test, feature = "testing"))]
657type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
658 RuntimeEffectEnvelope,
659 ) -> Pin<
660 Box<
661 dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
662 + Send
663 + 'run,
664 >,
665 > + Send
666 + 'run;
667
668#[cfg(any(test, feature = "testing"))]
669struct TestingRuntimeEffectLocalRunner<'run> {
670 run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
671}
672
673type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
674 serde_json::Value,
675 ) -> Pin<
676 Box<
677 dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
678 + Send
679 + 'run,
680 >,
681 > + Send
682 + 'run;
683
684struct DurableStepLocalRunner<'run> {
685 run: Box<DurableStepLocalRunnerFn<'run>>,
686}
687
688enum RuntimeEffectLocalExecutorState<'run> {
689 Unavailable,
690 SleepOnly {
691 cancellation: CancellationToken,
692 clock: Arc<dyn crate::Clock>,
693 },
694 ExternalWaitOptions {
695 cancellation: CancellationToken,
696 deadline: Option<Instant>,
697 clock: Arc<dyn crate::Clock>,
698 },
699 Process(ProcessLocalExecution),
700 Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
701}
702
703pub struct RuntimeEffectLocalExecutor<'run> {
709 state: RuntimeEffectLocalExecutorState<'run>,
710}
711
712impl<'run> RuntimeEffectLocalExecutor<'run> {
713 pub fn unavailable() -> Self {
714 Self {
715 state: RuntimeEffectLocalExecutorState::Unavailable,
716 }
717 }
718
719 pub fn sleep(cancellation: CancellationToken) -> Self {
720 Self::sleep_with_clock(cancellation, Arc::new(crate::SystemClock))
721 }
722
723 pub fn sleep_with_clock(cancellation: CancellationToken, clock: Arc<dyn crate::Clock>) -> Self {
724 Self {
725 state: RuntimeEffectLocalExecutorState::SleepOnly {
726 cancellation,
727 clock,
728 },
729 }
730 }
731
732 pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
733 Self::await_event_with_clock(cancellation, deadline, Arc::new(crate::SystemClock))
734 }
735
736 pub fn await_event_with_clock(
737 cancellation: CancellationToken,
738 deadline: Option<Instant>,
739 clock: Arc<dyn crate::Clock>,
740 ) -> Self {
741 Self {
742 state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
743 cancellation,
744 deadline,
745 clock,
746 },
747 }
748 }
749
750 pub fn processes(
751 registry: Arc<dyn ProcessRegistry>,
752 process_work_driver: Option<crate::ProcessWorkDriver>,
753 ) -> Self {
754 Self {
755 state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution {
756 registry,
757 process_work_driver,
758 }),
759 }
760 }
761
762 pub fn durable_step<F, Fut>(run: F) -> Self
763 where
764 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
765 Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
766 {
767 Self {
768 state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
769 run: Box::new(move |input| {
770 Box::pin(async move { run(input).await.map_err(Into::into) })
771 }),
772 })),
773 }
774 }
775
776 #[cfg(any(test, feature = "testing"))]
777 pub fn testing<F, Fut>(run: F) -> Self
778 where
779 F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
780 Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
781 + Send
782 + 'run,
783 {
784 Self {
785 state: RuntimeEffectLocalExecutorState::Runner(Box::new(
786 TestingRuntimeEffectLocalRunner {
787 run: Box::new(move |envelope| Box::pin(run(envelope))),
788 },
789 )),
790 }
791 }
792
793 pub(in crate::runtime) fn turn<'scope>(
794 driver: &'run mut RuntimeTurnDriver<'scope>,
795 machine: &'run mut crate::TurnMachine,
796 event_tx: mpsc::Sender<RuntimeStreamEvent>,
797 cancellation: CancellationToken,
798 ) -> Self
799 where
800 'scope: 'run,
801 {
802 Self {
803 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
804 driver,
805 machine,
806 event_tx,
807 cancellation,
808 })),
809 }
810 }
811
812 pub(in crate::runtime) fn direct(
813 provider: ProviderHandle,
814 attachment_store: Arc<dyn AttachmentStore>,
815 ) -> Self {
816 Self {
817 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
818 provider,
819 attachment_store,
820 })),
821 }
822 }
823
824 pub(crate) fn tool_batch(
825 context: crate::RuntimeExecutionContext<'run>,
826 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
827 ) -> Self {
828 Self {
829 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalToolBatchEffectRunner {
830 context,
831 child_trace_hooks,
832 })),
833 }
834 }
835
836 pub async fn execute(
837 self,
838 envelope: RuntimeEffectEnvelope,
839 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
840 match self.state {
841 RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
842 RuntimeEffectLocalExecutorState::SleepOnly {
843 cancellation,
844 clock,
845 } => execute_local_sleep(envelope, cancellation, clock.as_ref()).await,
846 RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
847 Err(RuntimeEffectControllerError::new(
848 "runtime_effect_local_executor_mismatch",
849 format!(
850 "local await-event options cannot execute {} command directly",
851 envelope.command.kind().as_str()
852 ),
853 ))
854 }
855 RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
856 "runtime_effect_local_executor_unavailable",
857 format!(
858 "no local executor is available for {}",
859 envelope.command.kind().as_str()
860 ),
861 )),
862 RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
863 "runtime_effect_local_executor_mismatch",
864 format!(
865 "process executor cannot execute {} command directly",
866 envelope.command.kind().as_str()
867 ),
868 )),
869 }
870 }
871
872 pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
873 match self.state {
874 RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
875 _ => Err(RuntimeEffectControllerError::new(
876 "runtime_effect_local_executor_unavailable",
877 "no process executor is available for process command",
878 )),
879 }
880 }
881
882 fn into_await_event_options(self) -> Result<AwaitEventOptions, RuntimeEffectControllerError> {
883 match self.state {
884 RuntimeEffectLocalExecutorState::ExternalWaitOptions {
885 cancellation,
886 deadline,
887 clock,
888 } => Ok((cancellation, deadline, clock)),
889 _ => Ok((CancellationToken::new(), None, Arc::new(crate::SystemClock))),
890 }
891 }
892}
893
894#[cfg(any(test, feature = "testing"))]
895#[async_trait::async_trait]
896impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
897 async fn execute(
898 self: Box<Self>,
899 envelope: RuntimeEffectEnvelope,
900 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
901 (self.run)(envelope).await
902 }
903}
904
905#[async_trait::async_trait]
906impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
907 async fn execute(
908 self: Box<Self>,
909 envelope: RuntimeEffectEnvelope,
910 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
911 match envelope.command {
912 RuntimeEffectCommand::DurableStep { input, .. } => {
913 let value = (self.run)(input).await?;
914 Ok(RuntimeEffectOutcome::DurableStep { value })
915 }
916 command => Err(RuntimeEffectControllerError::new(
917 "runtime_effect_local_executor_mismatch",
918 format!(
919 "local durable step executor cannot execute {} command",
920 command.kind().as_str()
921 ),
922 )),
923 }
924 }
925}
926
927#[async_trait::async_trait]
928impl RuntimeEffectLocalRunner for LocalToolBatchEffectRunner<'_> {
929 async fn execute(
930 self: Box<Self>,
931 envelope: RuntimeEffectEnvelope,
932 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
933 match envelope.command {
934 RuntimeEffectCommand::ToolBatch { batch } => {
935 let outcome = self
936 .context
937 .execute_prepared_tool_batch_launches(
938 batch,
939 envelope.invocation,
940 self.child_trace_hooks,
941 )
942 .await?;
943 Ok(RuntimeEffectOutcome::ToolBatch {
944 launches: outcome.launches,
945 triggers: outcome.triggers,
946 })
947 }
948 RuntimeEffectCommand::ToolAttempt {
949 call,
950 execution_grant,
951 attempt,
952 max_attempts,
953 } => {
954 let child_execution_trace_hook = self.child_trace_hooks.get(&call.call_id).cloned();
955 let outcome = self
956 .context
957 .execute_prepared_tool_attempt_effect(
958 call,
959 execution_grant,
960 attempt,
961 max_attempts,
962 envelope.invocation,
963 child_execution_trace_hook,
964 )
965 .await?;
966 Ok(RuntimeEffectOutcome::ToolAttempt {
967 launch: outcome.launch,
968 triggers: outcome.triggers,
969 })
970 }
971 command => Err(RuntimeEffectControllerError::new(
972 "runtime_effect_local_executor_mismatch",
973 format!(
974 "local tool executor cannot execute {} command",
975 command.kind().as_str()
976 ),
977 )),
978 }
979 }
980}
981
982#[async_trait::async_trait]
983impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
984 async fn execute(
985 self: Box<Self>,
986 envelope: RuntimeEffectEnvelope,
987 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
988 let runner = *self;
989 match envelope.command {
990 RuntimeEffectCommand::LlmCall { request } => {
991 let protocol_iteration = runner.machine.protocol_iteration();
992 let (result, text_streamed) = runner
993 .driver
994 .run_llm_call(
995 Arc::new((*request).into_request(None, None)),
996 protocol_iteration,
997 envelope.invocation,
998 &runner.event_tx,
999 &runner.cancellation,
1000 )
1001 .await;
1002 Ok(RuntimeEffectOutcome::LlmCall {
1003 result,
1004 text_streamed,
1005 })
1006 }
1007 RuntimeEffectCommand::ToolBatch { batch } => {
1008 let outcome = runner
1009 .driver
1010 .run_tool_batch(
1011 batch,
1012 envelope.invocation,
1013 &runner.event_tx,
1014 &runner.cancellation,
1015 )
1016 .await?;
1017 Ok(RuntimeEffectOutcome::ToolBatch {
1018 launches: outcome.launches,
1019 triggers: outcome.triggers,
1020 })
1021 }
1022 RuntimeEffectCommand::ExecCode { language, code } => {
1023 let protocol_iteration = runner.machine.protocol_iteration();
1024 let messages = runner.machine.message_sequence();
1025 Ok(RuntimeEffectOutcome::ExecCode {
1026 result: runner
1027 .driver
1028 .run_exec_code(
1029 language,
1030 &code,
1031 messages,
1032 protocol_iteration,
1033 envelope.invocation,
1034 &runner.event_tx,
1035 )
1036 .await,
1037 })
1038 }
1039 RuntimeEffectCommand::Checkpoint { checkpoint } => {
1040 Ok(RuntimeEffectOutcome::Checkpoint {
1041 result: runner
1042 .driver
1043 .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1044 .await
1045 .map_err(RuntimeEffectControllerError::from),
1046 })
1047 }
1048 RuntimeEffectCommand::SyncExecutionEnvironment {
1049 update_machine_config,
1050 } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1051 result: runner
1052 .driver
1053 .refresh_execution_environment(runner.machine, update_machine_config)
1054 .await
1055 .map_err(|err| err.to_string()),
1056 }),
1057 RuntimeEffectCommand::Sleep { duration_ms } => {
1058 sleep_with_cancellation(
1059 duration_ms,
1060 &runner.cancellation,
1061 runner.driver.host.core.clock.as_ref(),
1062 )
1063 .await?;
1064 Ok(RuntimeEffectOutcome::Sleep)
1065 }
1066 command => Err(RuntimeEffectControllerError::new(
1067 "runtime_effect_local_executor_mismatch",
1068 format!(
1069 "local turn executor cannot execute {} command",
1070 command.kind().as_str()
1071 ),
1072 )),
1073 }
1074 }
1075}
1076
1077#[async_trait::async_trait]
1078impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1079 async fn execute(
1080 mut self: Box<Self>,
1081 envelope: RuntimeEffectEnvelope,
1082 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1083 match envelope.command {
1084 RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1085 result: self
1086 .run_direct_llm_request((*request).into_request(
1087 crate::session_model::transport_stream_events(&self.provider, None),
1088 None,
1089 ))
1090 .await,
1091 }),
1092 RuntimeEffectCommand::Sleep { duration_ms } => {
1093 sleep_with_cancellation(
1094 duration_ms,
1095 &CancellationToken::new(),
1096 &crate::SystemClock,
1097 )
1098 .await?;
1099 Ok(RuntimeEffectOutcome::Sleep)
1100 }
1101 command => Err(RuntimeEffectControllerError::new(
1102 "runtime_effect_local_executor_mismatch",
1103 format!(
1104 "local direct executor cannot execute {} command",
1105 command.kind().as_str()
1106 ),
1107 )),
1108 }
1109 }
1110}
1111
1112impl LocalDirectEffectRunner {
1113 async fn run_direct_llm_request(
1114 &mut self,
1115 request: CoreLlmRequest,
1116 ) -> Result<LlmResponse, LlmCallError> {
1117 let request = crate::attachments::resolve_llm_request_attachments(
1118 request,
1119 self.attachment_store.as_ref(),
1120 )
1121 .await
1122 .map_err(|err| LlmCallError {
1123 message: err.to_string(),
1124 retryable: false,
1125 kind: crate::ProviderFailureKind::Unknown,
1126 raw: None,
1127 code: Some("attachment_resolution_failed".to_string()),
1128 terminal_reason: crate::LlmTerminalReason::ProviderError,
1129 request_body: None,
1130 })?;
1131 self.provider
1132 .complete(request)
1133 .await
1134 .map_err(llm_call_error_from_transport)
1135 }
1136}
1137
1138async fn execute_local_sleep(
1139 envelope: RuntimeEffectEnvelope,
1140 cancellation: CancellationToken,
1141 clock: &dyn crate::Clock,
1142) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1143 match envelope.command {
1144 RuntimeEffectCommand::Sleep { duration_ms } => {
1145 sleep_with_cancellation(duration_ms, &cancellation, clock).await?;
1146 Ok(RuntimeEffectOutcome::Sleep)
1147 }
1148 command => Err(RuntimeEffectControllerError::new(
1149 "runtime_effect_local_executor_mismatch",
1150 format!(
1151 "local sleep executor cannot execute {} command",
1152 command.kind().as_str()
1153 ),
1154 )),
1155 }
1156}
1157
1158async fn sleep_with_cancellation(
1159 duration_ms: u64,
1160 cancellation: &CancellationToken,
1161 clock: &dyn crate::Clock,
1162) -> Result<(), RuntimeEffectControllerError> {
1163 let sleep = clock.sleep(std::time::Duration::from_millis(duration_ms));
1164 tokio::pin!(sleep);
1165 tokio::select! {
1166 _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1167 "runtime_effect_sleep_cancelled",
1168 "runtime effect sleep was cancelled",
1169 )),
1170 _ = &mut sleep => Ok(()),
1171 }
1172}
1173
1174#[derive(Clone, Default)]
1185pub struct InlineRuntimeEffectController;
1186
1187#[async_trait::async_trait]
1188impl AwaitEventResolver for InlineRuntimeEffectController {
1189 fn supports_durable_effects(&self) -> bool {
1190 true
1191 }
1192
1193 async fn await_event_key(
1194 &self,
1195 scope: &ExecutionScope,
1196 wait: AwaitEventWaitIdentity,
1197 ) -> Result<AwaitEventKey, RuntimeError> {
1198 inline_await_events().key_for(scope, wait)
1199 }
1200
1201 async fn resolve_await_event(
1202 &self,
1203 key: &AwaitEventKey,
1204 resolution: Resolution,
1205 ) -> Result<ResolveOutcome, RuntimeError> {
1206 inline_await_events().resolve(key, resolution)
1207 }
1208
1209 async fn await_await_event(
1210 &self,
1211 key: &AwaitEventKey,
1212 cancel: CancellationToken,
1213 deadline: Option<Instant>,
1214 ) -> Result<Resolution, RuntimeError> {
1215 inline_await_events()
1216 .await_resolution(key, cancel, deadline, &crate::SystemClock)
1217 .await
1218 }
1219
1220 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1221 inline_await_events().revoke_session(session_id)
1222 }
1223
1224 async fn cancel_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1225 inline_await_events().cancel_session(session_id)
1226 }
1227}
1228
1229#[async_trait::async_trait]
1230impl RuntimeEffectController for InlineRuntimeEffectController {
1231 async fn execute_effect(
1232 &self,
1233 envelope: RuntimeEffectEnvelope,
1234 local_executor: RuntimeEffectLocalExecutor<'_>,
1235 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1236 match envelope.command {
1237 RuntimeEffectCommand::AwaitEvent { key } => {
1238 let (cancellation, deadline, clock) = local_executor.into_await_event_options()?;
1239 let resolution = inline_await_events()
1240 .await_resolution(&key, cancellation, deadline, clock.as_ref())
1241 .await
1242 .map_err(RuntimeEffectControllerError::from)?;
1243 Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1244 }
1245 RuntimeEffectCommand::Process { command } => {
1246 let execution = local_executor.into_process()?;
1247 let result = tokio::task::spawn(async move { execution.execute(*command).await })
1248 .await
1249 .map_err(|err| {
1250 RuntimeEffectControllerError::new(
1251 "runtime_effect_process_task_join",
1252 format!("inline process effect task failed: {err}"),
1253 )
1254 })??;
1255 Ok(RuntimeEffectOutcome::Process { result })
1256 }
1257 _ => local_executor.execute(envelope).await,
1258 }
1259 }
1260}
1261
1262#[derive(Clone)]
1264pub struct InlineEffectHost {
1265 controller: Arc<dyn RuntimeEffectController>,
1266}
1267
1268impl InlineEffectHost {
1269 pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1270 Self { controller }
1271 }
1272}
1273
1274impl Default for InlineEffectHost {
1275 fn default() -> Self {
1276 Self::new(Arc::new(InlineRuntimeEffectController))
1277 }
1278}
1279
1280#[async_trait::async_trait]
1281impl AwaitEventResolver for InlineEffectHost {
1282 fn durability_tier(&self) -> crate::DurabilityTier {
1283 self.controller.durability_tier()
1284 }
1285
1286 fn requires_durable_attachment_store(&self) -> bool {
1287 self.controller.requires_durable_attachment_store()
1288 }
1289
1290 fn supports_durable_effects(&self) -> bool {
1291 self.controller.supports_durable_effects()
1292 }
1293
1294 async fn await_event_key(
1295 &self,
1296 scope: &ExecutionScope,
1297 wait: AwaitEventWaitIdentity,
1298 ) -> Result<AwaitEventKey, RuntimeError> {
1299 self.controller.await_event_key(scope, wait).await
1300 }
1301
1302 async fn resolve_await_event(
1303 &self,
1304 key: &AwaitEventKey,
1305 resolution: Resolution,
1306 ) -> Result<ResolveOutcome, RuntimeError> {
1307 self.controller.resolve_await_event(key, resolution).await
1308 }
1309
1310 async fn await_await_event(
1311 &self,
1312 key: &AwaitEventKey,
1313 cancel: CancellationToken,
1314 deadline: Option<Instant>,
1315 ) -> Result<Resolution, RuntimeError> {
1316 self.controller
1317 .await_await_event(key, cancel, deadline)
1318 .await
1319 }
1320
1321 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1322 self.controller
1323 .revoke_await_events_for_session(session_id)
1324 .await
1325 }
1326
1327 async fn cancel_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1328 self.controller
1329 .cancel_await_events_for_session(session_id)
1330 .await
1331 }
1332}
1333
1334#[async_trait::async_trait]
1335impl EffectHost for InlineEffectHost {
1336 fn scoped<'run>(
1337 &'run self,
1338 scope: ExecutionScope,
1339 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1340 ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1341 }
1342
1343 fn scoped_static(
1344 &self,
1345 scope: ExecutionScope,
1346 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1347 Ok(Some(ScopedEffectController::shared(
1348 Arc::clone(&self.controller),
1349 scope,
1350 )?))
1351 }
1352}
1353
1354impl InlineRuntimeEffectController {
1355 pub(crate) async fn start_process(
1363 registry: Arc<dyn crate::ProcessRegistry>,
1364 registration: crate::ProcessRegistration,
1365 grant: Option<crate::ProcessStartGrant>,
1366 ) -> Result<ProcessRecord, PluginError> {
1367 let registration_for_record = registration.clone();
1368 let record = registry.register_process(registration_for_record).await?;
1369 if let Some(grant) = grant {
1370 registry
1371 .grant_handle(&grant.session_scope, ®istration.id, grant.descriptor)
1372 .await?;
1373 }
1374 Ok(record)
1375 }
1376
1377 pub(crate) async fn request_process_cancel(
1378 &self,
1379 registry: Arc<dyn crate::ProcessRegistry>,
1380 process_id: &str,
1381 reason: Option<String>,
1382 ) -> Result<ProcessRecord, PluginError> {
1383 registry
1387 .append_event(
1388 process_id,
1389 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1390 )
1391 .await?;
1392 registry
1393 .get_process(process_id)
1394 .await
1395 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1396 }
1397}
1398
1399impl std::fmt::Debug for InlineRuntimeEffectController {
1400 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1401 f.debug_struct("InlineRuntimeEffectController").finish()
1402 }
1403}