Skip to main content

lash_core/runtime/effect/
executor.rs

1use std::collections::HashMap;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Instant;
5
6use serde::{Deserialize, Serialize};
7use tokio::sync::mpsc;
8use tokio_util::sync::CancellationToken;
9
10use crate::AttachmentStore;
11use crate::LlmRequest as CoreLlmRequest;
12use crate::LlmResponse;
13use crate::ProcessRecord;
14use crate::ProcessRegistry;
15use crate::provider::ProviderHandle;
16use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
17use crate::sansio::LlmCallError;
18use crate::{PluginError, RuntimeError, RuntimeErrorCode};
19
20use super::envelope::{
21    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
22    RuntimeEffectKind, RuntimeEffectOutcome,
23};
24use super::outcome::llm_call_error_from_transport;
25
26type AwaitEventOptions = (CancellationToken, Option<Instant>, Arc<dyn crate::Clock>);
27
28use super::await_events::inline_await_events;
29
30// =============================================================================
31// Effect host + controller trait + scope + error
32// =============================================================================
33
34/// Stable semantic identity for one effectful runtime operation.
35///
36/// The scope is chosen by the host boundary before any nondeterministic work is
37/// planned. It is intentionally generic: Restate, an inline test host, or a
38/// future durable effect host all receive the same Lash scope vocabulary.
39#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
40#[serde(tag = "type", rename_all = "snake_case")]
41pub enum ExecutionScope {
42    Turn {
43        session_id: String,
44        turn_id: String,
45    },
46    Process {
47        process_id: String,
48    },
49    QueueDrain {
50        session_id: String,
51        drain_id: String,
52    },
53    SessionDelete {
54        session_id: String,
55    },
56    RuntimeOperation {
57        operation_id: String,
58    },
59}
60
61impl ExecutionScope {
62    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
63        Self::Turn {
64            session_id: session_id.into(),
65            turn_id: turn_id.into(),
66        }
67    }
68
69    pub fn process(process_id: impl Into<String>) -> Self {
70        Self::Process {
71            process_id: process_id.into(),
72        }
73    }
74
75    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
76        Self::QueueDrain {
77            session_id: session_id.into(),
78            drain_id: drain_id.into(),
79        }
80    }
81
82    pub fn session_delete(session_id: impl Into<String>) -> Self {
83        Self::SessionDelete {
84            session_id: session_id.into(),
85        }
86    }
87
88    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
89        Self::RuntimeOperation {
90            operation_id: operation_id.into(),
91        }
92    }
93
94    pub fn id(&self) -> &str {
95        match self {
96            Self::Turn { turn_id, .. } => turn_id,
97            Self::Process { process_id } => process_id,
98            Self::QueueDrain { drain_id, .. } => drain_id,
99            Self::SessionDelete { session_id } => session_id,
100            Self::RuntimeOperation { operation_id } => operation_id,
101        }
102    }
103
104    pub fn session_id(&self) -> Option<&str> {
105        match self {
106            Self::Turn { session_id, .. }
107            | Self::QueueDrain { session_id, .. }
108            | Self::SessionDelete { session_id } => Some(session_id),
109            Self::Process { .. } | Self::RuntimeOperation { .. } => None,
110        }
111    }
112
113    pub fn turn_id(&self) -> Option<&str> {
114        match self {
115            Self::Turn { turn_id, .. } => Some(turn_id),
116            _ => None,
117        }
118    }
119
120    pub fn validates_turn_trace_id(&self) -> bool {
121        matches!(self, Self::Turn { .. })
122    }
123
124    pub(super) fn validate(&self) -> Result<(), RuntimeError> {
125        let missing = match self {
126            Self::Turn {
127                session_id,
128                turn_id,
129            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
130            Self::Process { process_id } => process_id.trim().is_empty(),
131            Self::QueueDrain {
132                session_id,
133                drain_id,
134            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
135            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
136            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
137        };
138        if missing {
139            return Err(RuntimeError::new(
140                RuntimeErrorCode::MissingExecutionScopeId,
141                "execution scopes require non-empty stable ids",
142            ));
143        }
144        Ok(())
145    }
146}
147
148#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
149#[serde(tag = "type", rename_all = "snake_case")]
150pub enum AwaitEventWaitIdentity {
151    ToolCompletion {
152        tool_call_id: String,
153    },
154    ProcessSignal {
155        process_id: String,
156        signal_name: String,
157        ordinal: u64,
158    },
159    Custom {
160        key: String,
161    },
162}
163
164impl AwaitEventWaitIdentity {
165    pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
166        Self::ToolCompletion {
167            tool_call_id: tool_call_id.into(),
168        }
169    }
170
171    pub fn process_signal(
172        process_id: impl Into<String>,
173        signal_name: impl Into<String>,
174        ordinal: u64,
175    ) -> Self {
176        Self::ProcessSignal {
177            process_id: process_id.into(),
178            signal_name: signal_name.into(),
179            ordinal,
180        }
181    }
182
183    pub(super) fn validate(&self) -> Result<(), RuntimeError> {
184        let invalid = match self {
185            Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
186            Self::ProcessSignal {
187                process_id,
188                signal_name,
189                ordinal,
190            } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
191            Self::Custom { key } => key.trim().is_empty(),
192        };
193        if invalid {
194            return Err(RuntimeError::new(
195                "invalid_await_event_wait_identity",
196                "await-event wait identity requires non-empty stable ids",
197            ));
198        }
199        Ok(())
200    }
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
204pub struct AwaitEventKey {
205    pub scope: ExecutionScope,
206    pub wait: AwaitEventWaitIdentity,
207    pub key_id: String,
208    pub signature: String,
209}
210
211impl AwaitEventKey {
212    pub fn promise_key(&self) -> String {
213        format!("lash-await-event:{}", self.key_id)
214    }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
218pub struct ExternalCompletionError {
219    pub code: String,
220    pub message: String,
221    #[serde(default, skip_serializing_if = "Option::is_none")]
222    pub raw: Option<serde_json::Value>,
223}
224
225impl ExternalCompletionError {
226    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
227        Self {
228            code: code.into(),
229            message: message.into(),
230            raw: None,
231        }
232    }
233}
234
235#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
236#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
237pub enum Resolution {
238    Ok(serde_json::Value),
239    Err(ExternalCompletionError),
240    Timeout,
241    Cancelled,
242}
243
244#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
245#[serde(tag = "status", rename_all = "snake_case")]
246pub enum ResolveOutcome {
247    Accepted,
248    AlreadyResolved { terminal: Resolution },
249    UnknownOrRevoked,
250}
251
252enum ScopedEffectControllerInner<'run> {
253    Borrowed(&'run dyn RuntimeEffectController),
254    Shared(Arc<dyn RuntimeEffectController>),
255}
256
257impl Clone for ScopedEffectControllerInner<'_> {
258    fn clone(&self) -> Self {
259        match self {
260            Self::Borrowed(controller) => Self::Borrowed(*controller),
261            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
262        }
263    }
264}
265
266/// Scoped low-level controller plus the semantic execution scope it is serving.
267#[derive(Clone)]
268pub struct ScopedEffectController<'run> {
269    controller: ScopedEffectControllerInner<'run>,
270    scope: ExecutionScope,
271}
272
273impl<'run> ScopedEffectController<'run> {
274    pub fn borrowed(
275        controller: &'run dyn RuntimeEffectController,
276        scope: ExecutionScope,
277    ) -> Result<Self, RuntimeError> {
278        scope.validate()?;
279        Ok(Self {
280            controller: ScopedEffectControllerInner::Borrowed(controller),
281            scope,
282        })
283    }
284
285    pub fn shared(
286        controller: Arc<dyn RuntimeEffectController>,
287        scope: ExecutionScope,
288    ) -> Result<Self, RuntimeError> {
289        scope.validate()?;
290        Ok(Self {
291            controller: ScopedEffectControllerInner::Shared(controller),
292            scope,
293        })
294    }
295
296    pub fn controller(&self) -> &dyn RuntimeEffectController {
297        match &self.controller {
298            ScopedEffectControllerInner::Borrowed(controller) => *controller,
299            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
300        }
301    }
302
303    pub fn execution_scope(&self) -> &ExecutionScope {
304        &self.scope
305    }
306
307    pub fn scope_id(&self) -> &str {
308        self.scope.id()
309    }
310
311    pub fn turn_id(&self) -> Option<&str> {
312        self.scope.turn_id()
313    }
314}
315
316/// Shared durability and Durable Wait contract for effect boundaries.
317///
318/// Both the deployment-level [`EffectHost`] factory and the per-run
319/// [`RuntimeEffectController`] resolve Durable Waits and describe their
320/// durability; this supertrait is the single declaration of that contract.
321#[async_trait::async_trait]
322pub trait AwaitEventResolver: Send + Sync {
323    fn durability_tier(&self) -> crate::DurabilityTier {
324        crate::DurabilityTier::Inline
325    }
326
327    fn requires_durable_attachment_store(&self) -> bool {
328        false
329    }
330
331    fn supports_durable_effects(&self) -> bool {
332        false
333    }
334
335    async fn await_event_key(
336        &self,
337        _scope: &ExecutionScope,
338        _wait: AwaitEventWaitIdentity,
339    ) -> Result<AwaitEventKey, RuntimeError> {
340        Err(RuntimeError::new(
341            "await_event_unsupported",
342            "this effect boundary does not support await-event keys",
343        ))
344    }
345
346    async fn resolve_await_event(
347        &self,
348        _key: &AwaitEventKey,
349        _resolution: Resolution,
350    ) -> Result<ResolveOutcome, RuntimeError> {
351        Ok(ResolveOutcome::UnknownOrRevoked)
352    }
353
354    async fn await_await_event(
355        &self,
356        _key: &AwaitEventKey,
357        _cancel: CancellationToken,
358        _deadline: Option<Instant>,
359    ) -> Result<Resolution, RuntimeError> {
360        Err(RuntimeError::new(
361            "await_event_unsupported",
362            "this effect boundary does not support await-event waits",
363        ))
364    }
365
366    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
367        Ok(())
368    }
369
370    /// Cancel every *outstanding* durable wait for `session_id` without
371    /// deleting the session: each waiter receives a terminal
372    /// [`Resolution::Cancelled`] instead of hanging, late resolves observe
373    /// that terminal, and waits registered afterwards behave normally — in
374    /// contrast to [`revoke_await_events_for_session`](Self::revoke_await_events_for_session),
375    /// which tombstones the session's waits forever.
376    ///
377    /// The default errors loudly: an effect boundary that tracks durable waits
378    /// must implement this to honor the host lever, and one that cannot must
379    /// not silently claim success.
380    async fn cancel_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
381        Err(RuntimeError::new(
382            "await_event_cancel_unsupported",
383            "this effect boundary does not support cancelling durable waits",
384        ))
385    }
386}
387
388/// Deployment-level factory for scoped effect controllers.
389#[async_trait::async_trait]
390pub trait EffectHost: AwaitEventResolver {
391    fn scoped<'run>(
392        &'run self,
393        scope: ExecutionScope,
394    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
395
396    fn scoped_static(
397        &self,
398        _scope: ExecutionScope,
399    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
400        Ok(None)
401    }
402}
403
404/// Boundary for nondeterministic runtime work.
405#[async_trait::async_trait]
406pub trait RuntimeEffectController: AwaitEventResolver {
407    /// Whether this controller can safely accept overlapping `execute_effect`
408    /// calls from one runtime coordinator.
409    ///
410    /// Local and store-backed controllers can usually fan out independent
411    /// effects. Some workflow substrates expose a single ordered journal
412    /// context where native operations must be awaited immediately before the
413    /// next context call is issued. Those controllers should return `false` so
414    /// coordinators serialize child effects while still replaying each child by
415    /// its own stable key.
416    fn supports_concurrent_effects(&self) -> bool {
417        true
418    }
419
420    async fn execute_effect(
421        &self,
422        envelope: RuntimeEffectEnvelope,
423        local_executor: RuntimeEffectLocalExecutor<'_>,
424    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
425}
426
427/// Runtime-internal handle for effect-controller references carried through
428/// per-turn execution contexts.
429#[derive(Clone)]
430pub(crate) enum RuntimeEffectControllerHandle<'run> {
431    Borrowed(ScopedEffectController<'run>),
432    #[cfg(any(test, feature = "testing"))]
433    Shared {
434        controller: Arc<dyn RuntimeEffectController>,
435        scope: ExecutionScope,
436    },
437}
438
439impl<'run> RuntimeEffectControllerHandle<'run> {
440    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
441        Self::Borrowed(scoped)
442    }
443
444    #[cfg(any(test, feature = "testing"))]
445    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
446        Self::Shared {
447            controller,
448            scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
449        }
450    }
451
452    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
453        match self {
454            Self::Borrowed(scoped) => scoped.controller(),
455            #[cfg(any(test, feature = "testing"))]
456            Self::Shared { controller, .. } => controller.as_ref(),
457        }
458    }
459
460    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
461        match self {
462            Self::Borrowed(scoped) => scoped.clone(),
463            #[cfg(any(test, feature = "testing"))]
464            Self::Shared { controller, scope } => {
465                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
466                    .expect("runtime effect controller handle carries a valid scope")
467            }
468        }
469    }
470
471    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
472        self.clone()
473    }
474}
475
476#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
477#[error("{code}: {message}")]
478pub struct RuntimeEffectControllerError {
479    pub code: String,
480    pub message: String,
481}
482
483impl RuntimeEffectControllerError {
484    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
485        Self {
486            code: code.into(),
487            message: message.into(),
488        }
489    }
490
491    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
492        Self::new(
493            "runtime_effect_wrong_outcome",
494            format!(
495                "expected {} outcome, got {}",
496                expected.as_str(),
497                actual.as_str()
498            ),
499        )
500    }
501
502    pub(crate) fn into_runtime_error(self) -> RuntimeError {
503        RuntimeError::new(self.code, self.message)
504    }
505}
506
507impl From<RuntimeError> for RuntimeEffectControllerError {
508    fn from(err: RuntimeError) -> Self {
509        Self::new(err.code.as_str(), err.message)
510    }
511}
512
513impl From<PluginError> for RuntimeEffectControllerError {
514    fn from(err: PluginError) -> Self {
515        Self::new("plugin", err.to_string())
516    }
517}
518
519impl From<crate::StoreError> for RuntimeEffectControllerError {
520    fn from(err: crate::StoreError) -> Self {
521        Self::new("runtime_store", err.to_string())
522    }
523}
524
525// =============================================================================
526// Local executor (per-effect borrowed runner state)
527// =============================================================================
528
529#[async_trait::async_trait]
530pub(crate) trait ProcessRunner: Send + Sync {
531    async fn run_process(
532        &self,
533        registration: crate::ProcessRegistration,
534        execution_context: crate::ProcessExecutionContext,
535        registry: Arc<dyn ProcessRegistry>,
536        scoped_effect_controller: crate::ScopedEffectController<'_>,
537        cancellation: CancellationToken,
538    ) -> crate::ProcessAwaitOutput;
539}
540
541pub struct ProcessLocalExecution {
542    pub registry: Arc<dyn ProcessRegistry>,
543    pub process_work_driver: Option<crate::ProcessWorkDriver>,
544}
545
546impl ProcessLocalExecution {
547    pub async fn execute(
548        self,
549        command: ProcessCommand,
550    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
551        let Self {
552            registry,
553            process_work_driver,
554        } = self;
555        match command {
556            ProcessCommand::Start {
557                registration,
558                grant,
559                execution_context: _,
560            } => {
561                let record =
562                    InlineRuntimeEffectController::start_process(registry, registration, grant)
563                        .await?;
564                if let Some(driver) = process_work_driver.as_ref() {
565                    driver.claim_and_run_pending("process_start").await?;
566                }
567                Ok(ProcessEffectOutcome::Start {
568                    record: Box::new(record),
569                })
570            }
571            ProcessCommand::List {
572                session_scope,
573                mode,
574            } => {
575                let entries = match mode {
576                    crate::ProcessListMode::Live => {
577                        registry.list_live_handle_grants(&session_scope).await?
578                    }
579                    crate::ProcessListMode::All => {
580                        registry.list_handle_grants(&session_scope).await?
581                    }
582                };
583                Ok(ProcessEffectOutcome::List { entries })
584            }
585            ProcessCommand::Transfer {
586                from_scope,
587                to_scope,
588                process_ids,
589            } => {
590                registry
591                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
592                    .await?;
593                Ok(ProcessEffectOutcome::Transfer)
594            }
595            ProcessCommand::DeleteSession { session_id } => {
596                let report = registry.delete_session_process_state(&session_id).await?;
597                Ok(ProcessEffectOutcome::DeleteSession { report })
598            }
599            ProcessCommand::Await { process_id } => {
600                let output = if let Some(driver) = process_work_driver.as_ref() {
601                    driver.await_terminal(&process_id).await?
602                } else {
603                    crate::ProcessAwaiter::polling(registry)
604                        .await_terminal(&process_id)
605                        .await?
606                };
607                Ok(ProcessEffectOutcome::Await { output })
608            }
609            ProcessCommand::Cancel { process_id, reason } => {
610                let record = InlineRuntimeEffectController
611                    .request_process_cancel(registry, &process_id, reason)
612                    .await?;
613                Ok(ProcessEffectOutcome::Cancel {
614                    record: Box::new(record),
615                })
616            }
617            ProcessCommand::Signal {
618                process_id,
619                request,
620                ..
621            } => {
622                let result = registry.append_event(&process_id, request).await?;
623                Ok(ProcessEffectOutcome::Signal {
624                    event: Box::new(result.event),
625                })
626            }
627        }
628    }
629}
630
631pub(super) struct LocalTurnEffectRunner<'a, 'run> {
632    driver: &'a mut RuntimeTurnDriver<'run>,
633    machine: &'a mut crate::TurnMachine,
634    event_tx: mpsc::Sender<RuntimeStreamEvent>,
635    cancellation: CancellationToken,
636}
637
638pub(super) struct LocalDirectEffectRunner {
639    provider: ProviderHandle,
640    attachment_store: Arc<dyn AttachmentStore>,
641}
642
643struct LocalToolBatchEffectRunner<'run> {
644    context: crate::RuntimeExecutionContext<'run>,
645    child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
646}
647
648#[async_trait::async_trait]
649trait RuntimeEffectLocalRunner: Send {
650    async fn execute(
651        self: Box<Self>,
652        envelope: RuntimeEffectEnvelope,
653    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
654}
655
656#[cfg(any(test, feature = "testing"))]
657type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
658        RuntimeEffectEnvelope,
659    ) -> Pin<
660        Box<
661            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
662                + Send
663                + 'run,
664        >,
665    > + Send
666    + 'run;
667
668#[cfg(any(test, feature = "testing"))]
669struct TestingRuntimeEffectLocalRunner<'run> {
670    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
671}
672
673type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
674        serde_json::Value,
675    ) -> Pin<
676        Box<
677            dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
678                + Send
679                + 'run,
680        >,
681    > + Send
682    + 'run;
683
684struct DurableStepLocalRunner<'run> {
685    run: Box<DurableStepLocalRunnerFn<'run>>,
686}
687
688enum RuntimeEffectLocalExecutorState<'run> {
689    Unavailable,
690    SleepOnly {
691        cancellation: CancellationToken,
692        clock: Arc<dyn crate::Clock>,
693    },
694    ExternalWaitOptions {
695        cancellation: CancellationToken,
696        deadline: Option<Instant>,
697        clock: Arc<dyn crate::Clock>,
698    },
699    Process(ProcessLocalExecution),
700    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
701}
702
703/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
704///
705/// Durable controllers may ignore it and replay their own recorded result. The
706/// default inline controller delegates to it, so local provider/tool/checkpoint
707/// work still crosses the same `execute_effect` boundary as durable controllers.
708pub struct RuntimeEffectLocalExecutor<'run> {
709    state: RuntimeEffectLocalExecutorState<'run>,
710}
711
712impl<'run> RuntimeEffectLocalExecutor<'run> {
713    pub fn unavailable() -> Self {
714        Self {
715            state: RuntimeEffectLocalExecutorState::Unavailable,
716        }
717    }
718
719    pub fn sleep(cancellation: CancellationToken) -> Self {
720        Self::sleep_with_clock(cancellation, Arc::new(crate::SystemClock))
721    }
722
723    pub fn sleep_with_clock(cancellation: CancellationToken, clock: Arc<dyn crate::Clock>) -> Self {
724        Self {
725            state: RuntimeEffectLocalExecutorState::SleepOnly {
726                cancellation,
727                clock,
728            },
729        }
730    }
731
732    pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
733        Self::await_event_with_clock(cancellation, deadline, Arc::new(crate::SystemClock))
734    }
735
736    pub fn await_event_with_clock(
737        cancellation: CancellationToken,
738        deadline: Option<Instant>,
739        clock: Arc<dyn crate::Clock>,
740    ) -> Self {
741        Self {
742            state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
743                cancellation,
744                deadline,
745                clock,
746            },
747        }
748    }
749
750    pub fn processes(
751        registry: Arc<dyn ProcessRegistry>,
752        process_work_driver: Option<crate::ProcessWorkDriver>,
753    ) -> Self {
754        Self {
755            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution {
756                registry,
757                process_work_driver,
758            }),
759        }
760    }
761
762    pub fn durable_step<F, Fut>(run: F) -> Self
763    where
764        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
765        Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
766    {
767        Self {
768            state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
769                run: Box::new(move |input| {
770                    Box::pin(async move { run(input).await.map_err(Into::into) })
771                }),
772            })),
773        }
774    }
775
776    #[cfg(any(test, feature = "testing"))]
777    pub fn testing<F, Fut>(run: F) -> Self
778    where
779        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
780        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
781            + Send
782            + 'run,
783    {
784        Self {
785            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
786                TestingRuntimeEffectLocalRunner {
787                    run: Box::new(move |envelope| Box::pin(run(envelope))),
788                },
789            )),
790        }
791    }
792
793    pub(in crate::runtime) fn turn<'scope>(
794        driver: &'run mut RuntimeTurnDriver<'scope>,
795        machine: &'run mut crate::TurnMachine,
796        event_tx: mpsc::Sender<RuntimeStreamEvent>,
797        cancellation: CancellationToken,
798    ) -> Self
799    where
800        'scope: 'run,
801    {
802        Self {
803            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
804                driver,
805                machine,
806                event_tx,
807                cancellation,
808            })),
809        }
810    }
811
812    pub(in crate::runtime) fn direct(
813        provider: ProviderHandle,
814        attachment_store: Arc<dyn AttachmentStore>,
815    ) -> Self {
816        Self {
817            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
818                provider,
819                attachment_store,
820            })),
821        }
822    }
823
824    pub(crate) fn tool_batch(
825        context: crate::RuntimeExecutionContext<'run>,
826        child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
827    ) -> Self {
828        Self {
829            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalToolBatchEffectRunner {
830                context,
831                child_trace_hooks,
832            })),
833        }
834    }
835
836    pub async fn execute(
837        self,
838        envelope: RuntimeEffectEnvelope,
839    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
840        match self.state {
841            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
842            RuntimeEffectLocalExecutorState::SleepOnly {
843                cancellation,
844                clock,
845            } => execute_local_sleep(envelope, cancellation, clock.as_ref()).await,
846            RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
847                Err(RuntimeEffectControllerError::new(
848                    "runtime_effect_local_executor_mismatch",
849                    format!(
850                        "local await-event options cannot execute {} command directly",
851                        envelope.command.kind().as_str()
852                    ),
853                ))
854            }
855            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
856                "runtime_effect_local_executor_unavailable",
857                format!(
858                    "no local executor is available for {}",
859                    envelope.command.kind().as_str()
860                ),
861            )),
862            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
863                "runtime_effect_local_executor_mismatch",
864                format!(
865                    "process executor cannot execute {} command directly",
866                    envelope.command.kind().as_str()
867                ),
868            )),
869        }
870    }
871
872    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
873        match self.state {
874            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
875            _ => Err(RuntimeEffectControllerError::new(
876                "runtime_effect_local_executor_unavailable",
877                "no process executor is available for process command",
878            )),
879        }
880    }
881
882    fn into_await_event_options(self) -> Result<AwaitEventOptions, RuntimeEffectControllerError> {
883        match self.state {
884            RuntimeEffectLocalExecutorState::ExternalWaitOptions {
885                cancellation,
886                deadline,
887                clock,
888            } => Ok((cancellation, deadline, clock)),
889            _ => Ok((CancellationToken::new(), None, Arc::new(crate::SystemClock))),
890        }
891    }
892}
893
894#[cfg(any(test, feature = "testing"))]
895#[async_trait::async_trait]
896impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
897    async fn execute(
898        self: Box<Self>,
899        envelope: RuntimeEffectEnvelope,
900    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
901        (self.run)(envelope).await
902    }
903}
904
905#[async_trait::async_trait]
906impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
907    async fn execute(
908        self: Box<Self>,
909        envelope: RuntimeEffectEnvelope,
910    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
911        match envelope.command {
912            RuntimeEffectCommand::DurableStep { input, .. } => {
913                let value = (self.run)(input).await?;
914                Ok(RuntimeEffectOutcome::DurableStep { value })
915            }
916            command => Err(RuntimeEffectControllerError::new(
917                "runtime_effect_local_executor_mismatch",
918                format!(
919                    "local durable step executor cannot execute {} command",
920                    command.kind().as_str()
921                ),
922            )),
923        }
924    }
925}
926
927#[async_trait::async_trait]
928impl RuntimeEffectLocalRunner for LocalToolBatchEffectRunner<'_> {
929    async fn execute(
930        self: Box<Self>,
931        envelope: RuntimeEffectEnvelope,
932    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
933        match envelope.command {
934            RuntimeEffectCommand::ToolBatch { batch } => {
935                let outcome = self
936                    .context
937                    .execute_prepared_tool_batch_launches(
938                        batch,
939                        envelope.invocation,
940                        self.child_trace_hooks,
941                    )
942                    .await?;
943                Ok(RuntimeEffectOutcome::ToolBatch {
944                    launches: outcome.launches,
945                    triggers: outcome.triggers,
946                })
947            }
948            RuntimeEffectCommand::ToolAttempt {
949                call,
950                execution_grant,
951                attempt,
952                max_attempts,
953            } => {
954                let child_execution_trace_hook = self.child_trace_hooks.get(&call.call_id).cloned();
955                let outcome = self
956                    .context
957                    .execute_prepared_tool_attempt_effect(
958                        call,
959                        execution_grant,
960                        attempt,
961                        max_attempts,
962                        envelope.invocation,
963                        child_execution_trace_hook,
964                    )
965                    .await?;
966                Ok(RuntimeEffectOutcome::ToolAttempt {
967                    launch: outcome.launch,
968                    triggers: outcome.triggers,
969                })
970            }
971            command => Err(RuntimeEffectControllerError::new(
972                "runtime_effect_local_executor_mismatch",
973                format!(
974                    "local tool executor cannot execute {} command",
975                    command.kind().as_str()
976                ),
977            )),
978        }
979    }
980}
981
982#[async_trait::async_trait]
983impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
984    async fn execute(
985        self: Box<Self>,
986        envelope: RuntimeEffectEnvelope,
987    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
988        let runner = *self;
989        match envelope.command {
990            RuntimeEffectCommand::LlmCall { request } => {
991                let protocol_iteration = runner.machine.protocol_iteration();
992                let (result, text_streamed) = runner
993                    .driver
994                    .run_llm_call(
995                        Arc::new((*request).into_request(None, None)),
996                        protocol_iteration,
997                        envelope.invocation,
998                        &runner.event_tx,
999                        &runner.cancellation,
1000                    )
1001                    .await;
1002                Ok(RuntimeEffectOutcome::LlmCall {
1003                    result,
1004                    text_streamed,
1005                })
1006            }
1007            RuntimeEffectCommand::ToolBatch { batch } => {
1008                let outcome = runner
1009                    .driver
1010                    .run_tool_batch(
1011                        batch,
1012                        envelope.invocation,
1013                        &runner.event_tx,
1014                        &runner.cancellation,
1015                    )
1016                    .await?;
1017                Ok(RuntimeEffectOutcome::ToolBatch {
1018                    launches: outcome.launches,
1019                    triggers: outcome.triggers,
1020                })
1021            }
1022            RuntimeEffectCommand::ExecCode { language, code } => {
1023                let protocol_iteration = runner.machine.protocol_iteration();
1024                let messages = runner.machine.message_sequence();
1025                Ok(RuntimeEffectOutcome::ExecCode {
1026                    result: runner
1027                        .driver
1028                        .run_exec_code(
1029                            language,
1030                            &code,
1031                            messages,
1032                            protocol_iteration,
1033                            envelope.invocation,
1034                            &runner.event_tx,
1035                        )
1036                        .await,
1037                })
1038            }
1039            RuntimeEffectCommand::Checkpoint { checkpoint } => {
1040                Ok(RuntimeEffectOutcome::Checkpoint {
1041                    result: runner
1042                        .driver
1043                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1044                        .await
1045                        .map_err(RuntimeEffectControllerError::from),
1046                })
1047            }
1048            RuntimeEffectCommand::SyncExecutionEnvironment {
1049                update_machine_config,
1050            } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1051                result: runner
1052                    .driver
1053                    .refresh_execution_environment(runner.machine, update_machine_config)
1054                    .await
1055                    .map_err(|err| err.to_string()),
1056            }),
1057            RuntimeEffectCommand::Sleep { duration_ms } => {
1058                sleep_with_cancellation(
1059                    duration_ms,
1060                    &runner.cancellation,
1061                    runner.driver.host.core.clock.as_ref(),
1062                )
1063                .await?;
1064                Ok(RuntimeEffectOutcome::Sleep)
1065            }
1066            command => Err(RuntimeEffectControllerError::new(
1067                "runtime_effect_local_executor_mismatch",
1068                format!(
1069                    "local turn executor cannot execute {} command",
1070                    command.kind().as_str()
1071                ),
1072            )),
1073        }
1074    }
1075}
1076
1077#[async_trait::async_trait]
1078impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1079    async fn execute(
1080        mut self: Box<Self>,
1081        envelope: RuntimeEffectEnvelope,
1082    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1083        match envelope.command {
1084            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1085                result: self
1086                    .run_direct_llm_request((*request).into_request(
1087                        crate::session_model::transport_stream_events(&self.provider, None),
1088                        None,
1089                    ))
1090                    .await,
1091            }),
1092            RuntimeEffectCommand::Sleep { duration_ms } => {
1093                sleep_with_cancellation(
1094                    duration_ms,
1095                    &CancellationToken::new(),
1096                    &crate::SystemClock,
1097                )
1098                .await?;
1099                Ok(RuntimeEffectOutcome::Sleep)
1100            }
1101            command => Err(RuntimeEffectControllerError::new(
1102                "runtime_effect_local_executor_mismatch",
1103                format!(
1104                    "local direct executor cannot execute {} command",
1105                    command.kind().as_str()
1106                ),
1107            )),
1108        }
1109    }
1110}
1111
1112impl LocalDirectEffectRunner {
1113    async fn run_direct_llm_request(
1114        &mut self,
1115        request: CoreLlmRequest,
1116    ) -> Result<LlmResponse, LlmCallError> {
1117        let request = crate::attachments::resolve_llm_request_attachments(
1118            request,
1119            self.attachment_store.as_ref(),
1120        )
1121        .await
1122        .map_err(|err| LlmCallError {
1123            message: err.to_string(),
1124            retryable: false,
1125            kind: crate::ProviderFailureKind::Unknown,
1126            raw: None,
1127            code: Some("attachment_resolution_failed".to_string()),
1128            terminal_reason: crate::LlmTerminalReason::ProviderError,
1129            request_body: None,
1130        })?;
1131        self.provider
1132            .complete(request)
1133            .await
1134            .map_err(llm_call_error_from_transport)
1135    }
1136}
1137
1138async fn execute_local_sleep(
1139    envelope: RuntimeEffectEnvelope,
1140    cancellation: CancellationToken,
1141    clock: &dyn crate::Clock,
1142) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1143    match envelope.command {
1144        RuntimeEffectCommand::Sleep { duration_ms } => {
1145            sleep_with_cancellation(duration_ms, &cancellation, clock).await?;
1146            Ok(RuntimeEffectOutcome::Sleep)
1147        }
1148        command => Err(RuntimeEffectControllerError::new(
1149            "runtime_effect_local_executor_mismatch",
1150            format!(
1151                "local sleep executor cannot execute {} command",
1152                command.kind().as_str()
1153            ),
1154        )),
1155    }
1156}
1157
1158async fn sleep_with_cancellation(
1159    duration_ms: u64,
1160    cancellation: &CancellationToken,
1161    clock: &dyn crate::Clock,
1162) -> Result<(), RuntimeEffectControllerError> {
1163    let sleep = clock.sleep(std::time::Duration::from_millis(duration_ms));
1164    tokio::pin!(sleep);
1165    tokio::select! {
1166        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1167            "runtime_effect_sleep_cancelled",
1168            "runtime effect sleep was cancelled",
1169        )),
1170        _ = &mut sleep => Ok(()),
1171    }
1172}
1173
1174// =============================================================================
1175// Default in-process effect controller
1176// =============================================================================
1177
1178/// Default in-process effect controller.
1179///
1180/// The inline controller executes local runners in process, provides in-memory
1181/// await-event resolution, and exposes durable-tool-effect semantics for local
1182/// runs. It does not make in-flight effects crash durable; workflow adapters
1183/// provide that by recording outcomes in their own history.
1184#[derive(Clone, Default)]
1185pub struct InlineRuntimeEffectController;
1186
1187#[async_trait::async_trait]
1188impl AwaitEventResolver for InlineRuntimeEffectController {
1189    fn supports_durable_effects(&self) -> bool {
1190        true
1191    }
1192
1193    async fn await_event_key(
1194        &self,
1195        scope: &ExecutionScope,
1196        wait: AwaitEventWaitIdentity,
1197    ) -> Result<AwaitEventKey, RuntimeError> {
1198        inline_await_events().key_for(scope, wait)
1199    }
1200
1201    async fn resolve_await_event(
1202        &self,
1203        key: &AwaitEventKey,
1204        resolution: Resolution,
1205    ) -> Result<ResolveOutcome, RuntimeError> {
1206        inline_await_events().resolve(key, resolution)
1207    }
1208
1209    async fn await_await_event(
1210        &self,
1211        key: &AwaitEventKey,
1212        cancel: CancellationToken,
1213        deadline: Option<Instant>,
1214    ) -> Result<Resolution, RuntimeError> {
1215        inline_await_events()
1216            .await_resolution(key, cancel, deadline, &crate::SystemClock)
1217            .await
1218    }
1219
1220    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1221        inline_await_events().revoke_session(session_id)
1222    }
1223
1224    async fn cancel_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1225        inline_await_events().cancel_session(session_id)
1226    }
1227}
1228
1229#[async_trait::async_trait]
1230impl RuntimeEffectController for InlineRuntimeEffectController {
1231    async fn execute_effect(
1232        &self,
1233        envelope: RuntimeEffectEnvelope,
1234        local_executor: RuntimeEffectLocalExecutor<'_>,
1235    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1236        match envelope.command {
1237            RuntimeEffectCommand::AwaitEvent { key } => {
1238                let (cancellation, deadline, clock) = local_executor.into_await_event_options()?;
1239                let resolution = inline_await_events()
1240                    .await_resolution(&key, cancellation, deadline, clock.as_ref())
1241                    .await
1242                    .map_err(RuntimeEffectControllerError::from)?;
1243                Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1244            }
1245            RuntimeEffectCommand::Process { command } => {
1246                let execution = local_executor.into_process()?;
1247                let result = tokio::task::spawn(async move { execution.execute(*command).await })
1248                    .await
1249                    .map_err(|err| {
1250                        RuntimeEffectControllerError::new(
1251                            "runtime_effect_process_task_join",
1252                            format!("inline process effect task failed: {err}"),
1253                        )
1254                    })??;
1255                Ok(RuntimeEffectOutcome::Process { result })
1256            }
1257            _ => local_executor.execute(envelope).await,
1258        }
1259    }
1260}
1261
1262/// In-process deployment effect host.
1263#[derive(Clone)]
1264pub struct InlineEffectHost {
1265    controller: Arc<dyn RuntimeEffectController>,
1266}
1267
1268impl InlineEffectHost {
1269    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1270        Self { controller }
1271    }
1272}
1273
1274impl Default for InlineEffectHost {
1275    fn default() -> Self {
1276        Self::new(Arc::new(InlineRuntimeEffectController))
1277    }
1278}
1279
1280#[async_trait::async_trait]
1281impl AwaitEventResolver for InlineEffectHost {
1282    fn durability_tier(&self) -> crate::DurabilityTier {
1283        self.controller.durability_tier()
1284    }
1285
1286    fn requires_durable_attachment_store(&self) -> bool {
1287        self.controller.requires_durable_attachment_store()
1288    }
1289
1290    fn supports_durable_effects(&self) -> bool {
1291        self.controller.supports_durable_effects()
1292    }
1293
1294    async fn await_event_key(
1295        &self,
1296        scope: &ExecutionScope,
1297        wait: AwaitEventWaitIdentity,
1298    ) -> Result<AwaitEventKey, RuntimeError> {
1299        self.controller.await_event_key(scope, wait).await
1300    }
1301
1302    async fn resolve_await_event(
1303        &self,
1304        key: &AwaitEventKey,
1305        resolution: Resolution,
1306    ) -> Result<ResolveOutcome, RuntimeError> {
1307        self.controller.resolve_await_event(key, resolution).await
1308    }
1309
1310    async fn await_await_event(
1311        &self,
1312        key: &AwaitEventKey,
1313        cancel: CancellationToken,
1314        deadline: Option<Instant>,
1315    ) -> Result<Resolution, RuntimeError> {
1316        self.controller
1317            .await_await_event(key, cancel, deadline)
1318            .await
1319    }
1320
1321    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1322        self.controller
1323            .revoke_await_events_for_session(session_id)
1324            .await
1325    }
1326
1327    async fn cancel_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1328        self.controller
1329            .cancel_await_events_for_session(session_id)
1330            .await
1331    }
1332}
1333
1334#[async_trait::async_trait]
1335impl EffectHost for InlineEffectHost {
1336    fn scoped<'run>(
1337        &'run self,
1338        scope: ExecutionScope,
1339    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1340        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1341    }
1342
1343    fn scoped_static(
1344        &self,
1345        scope: ExecutionScope,
1346    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1347        Ok(Some(ScopedEffectController::shared(
1348            Arc::clone(&self.controller),
1349            scope,
1350        )?))
1351    }
1352}
1353
1354impl InlineRuntimeEffectController {
1355    /// Register the process (and any handle grant) into the durable registry.
1356    ///
1357    /// The inline controller no longer runs the process here: the registry's
1358    /// non-terminal row *is* the durable work queue, and the host-owned
1359    /// [`ProcessWorkDriver`](crate::ProcessWorkDriver) is the sole executor.
1360    /// Registering the row is all this path does; the control seam drives the
1361    /// host driver after a successful start.
1362    pub(crate) async fn start_process(
1363        registry: Arc<dyn crate::ProcessRegistry>,
1364        registration: crate::ProcessRegistration,
1365        grant: Option<crate::ProcessStartGrant>,
1366    ) -> Result<ProcessRecord, PluginError> {
1367        let registration_for_record = registration.clone();
1368        let record = registry.register_process(registration_for_record).await?;
1369        if let Some(grant) = grant {
1370            registry
1371                .grant_handle(&grant.session_scope, &registration.id, grant.descriptor)
1372                .await?;
1373        }
1374        Ok(record)
1375    }
1376
1377    pub(crate) async fn request_process_cancel(
1378        &self,
1379        registry: Arc<dyn crate::ProcessRegistry>,
1380        process_id: &str,
1381        reason: Option<String>,
1382    ) -> Result<ProcessRecord, PluginError> {
1383        // Cancellation is a durable signal: the cancel event is what the
1384        // runner-run process observes, so the inline controller appends it and
1385        // no longer tracks an in-process cancellation token.
1386        registry
1387            .append_event(
1388                process_id,
1389                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1390            )
1391            .await?;
1392        registry
1393            .get_process(process_id)
1394            .await
1395            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1396    }
1397}
1398
1399impl std::fmt::Debug for InlineRuntimeEffectController {
1400    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1401        f.debug_struct("InlineRuntimeEffectController").finish()
1402    }
1403}