Skip to main content

lash_core/runtime/effect/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::pin::Pin;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5
6use hmac::{Hmac, Mac};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{Notify, mpsc};
9use tokio_util::sync::CancellationToken;
10
11use crate::AttachmentStore;
12use crate::LlmRequest as CoreLlmRequest;
13use crate::LlmResponse;
14use crate::ProcessRecord;
15use crate::ProcessRegistry;
16use crate::provider::ProviderHandle;
17use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
18use crate::sansio::LlmCallError;
19use crate::{PluginError, RuntimeError, RuntimeErrorCode};
20
21use super::envelope::{
22    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
23    RuntimeEffectKind, RuntimeEffectOutcome,
24};
25use super::outcome::llm_call_error_from_transport;
26
27type HmacSha256 = Hmac<sha2::Sha256>;
28type AwaitEventOptions = (CancellationToken, Option<Instant>, Arc<dyn crate::Clock>);
29
30fn inline_await_events() -> &'static AwaitEventRegistry {
31    static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
32    REGISTRY.get_or_init(AwaitEventRegistry::new)
33}
34
35// =============================================================================
36// Effect host + controller trait + scope + error
37// =============================================================================
38
39/// Stable semantic identity for one effectful runtime operation.
40///
41/// The scope is chosen by the host boundary before any nondeterministic work is
42/// planned. It is intentionally generic: Restate, an inline test host, or a
43/// future durable effect host all receive the same Lash scope vocabulary.
44#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum ExecutionScope {
47    Turn {
48        session_id: String,
49        turn_id: String,
50    },
51    Process {
52        process_id: String,
53    },
54    QueueDrain {
55        session_id: String,
56        drain_id: String,
57    },
58    SessionDelete {
59        session_id: String,
60    },
61    RuntimeOperation {
62        operation_id: String,
63    },
64}
65
66impl ExecutionScope {
67    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
68        Self::Turn {
69            session_id: session_id.into(),
70            turn_id: turn_id.into(),
71        }
72    }
73
74    pub fn process(process_id: impl Into<String>) -> Self {
75        Self::Process {
76            process_id: process_id.into(),
77        }
78    }
79
80    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
81        Self::QueueDrain {
82            session_id: session_id.into(),
83            drain_id: drain_id.into(),
84        }
85    }
86
87    pub fn session_delete(session_id: impl Into<String>) -> Self {
88        Self::SessionDelete {
89            session_id: session_id.into(),
90        }
91    }
92
93    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
94        Self::RuntimeOperation {
95            operation_id: operation_id.into(),
96        }
97    }
98
99    pub fn id(&self) -> &str {
100        match self {
101            Self::Turn { turn_id, .. } => turn_id,
102            Self::Process { process_id } => process_id,
103            Self::QueueDrain { drain_id, .. } => drain_id,
104            Self::SessionDelete { session_id } => session_id,
105            Self::RuntimeOperation { operation_id } => operation_id,
106        }
107    }
108
109    pub fn session_id(&self) -> Option<&str> {
110        match self {
111            Self::Turn { session_id, .. }
112            | Self::QueueDrain { session_id, .. }
113            | Self::SessionDelete { session_id } => Some(session_id),
114            Self::Process { .. } | Self::RuntimeOperation { .. } => None,
115        }
116    }
117
118    pub fn turn_id(&self) -> Option<&str> {
119        match self {
120            Self::Turn { turn_id, .. } => Some(turn_id),
121            _ => None,
122        }
123    }
124
125    pub fn validates_turn_trace_id(&self) -> bool {
126        matches!(self, Self::Turn { .. })
127    }
128
129    fn validate(&self) -> Result<(), RuntimeError> {
130        let missing = match self {
131            Self::Turn {
132                session_id,
133                turn_id,
134            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
135            Self::Process { process_id } => process_id.trim().is_empty(),
136            Self::QueueDrain {
137                session_id,
138                drain_id,
139            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
140            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
141            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
142        };
143        if missing {
144            return Err(RuntimeError::new(
145                RuntimeErrorCode::MissingExecutionScopeId,
146                "execution scopes require non-empty stable ids",
147            ));
148        }
149        Ok(())
150    }
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
154#[serde(tag = "type", rename_all = "snake_case")]
155pub enum AwaitEventWaitIdentity {
156    ToolCompletion {
157        tool_call_id: String,
158    },
159    ProcessSignal {
160        process_id: String,
161        signal_name: String,
162        ordinal: u64,
163    },
164    Custom {
165        key: String,
166    },
167}
168
169impl AwaitEventWaitIdentity {
170    pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
171        Self::ToolCompletion {
172            tool_call_id: tool_call_id.into(),
173        }
174    }
175
176    pub fn process_signal(
177        process_id: impl Into<String>,
178        signal_name: impl Into<String>,
179        ordinal: u64,
180    ) -> Self {
181        Self::ProcessSignal {
182            process_id: process_id.into(),
183            signal_name: signal_name.into(),
184            ordinal,
185        }
186    }
187
188    fn validate(&self) -> Result<(), RuntimeError> {
189        let invalid = match self {
190            Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
191            Self::ProcessSignal {
192                process_id,
193                signal_name,
194                ordinal,
195            } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
196            Self::Custom { key } => key.trim().is_empty(),
197        };
198        if invalid {
199            return Err(RuntimeError::new(
200                "invalid_await_event_wait_identity",
201                "await-event wait identity requires non-empty stable ids",
202            ));
203        }
204        Ok(())
205    }
206}
207
208#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
209pub struct AwaitEventKey {
210    pub scope: ExecutionScope,
211    pub wait: AwaitEventWaitIdentity,
212    pub key_id: String,
213    pub signature: String,
214}
215
216impl AwaitEventKey {
217    pub fn promise_key(&self) -> String {
218        format!("lash-await-event:{}", self.key_id)
219    }
220}
221
222#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
223pub struct ExternalCompletionError {
224    pub code: String,
225    pub message: String,
226    #[serde(default, skip_serializing_if = "Option::is_none")]
227    pub raw: Option<serde_json::Value>,
228}
229
230impl ExternalCompletionError {
231    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
232        Self {
233            code: code.into(),
234            message: message.into(),
235            raw: None,
236        }
237    }
238}
239
240#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
241#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
242pub enum Resolution {
243    Ok(serde_json::Value),
244    Err(ExternalCompletionError),
245    Timeout,
246    Cancelled,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "status", rename_all = "snake_case")]
251pub enum ResolveOutcome {
252    Accepted,
253    AlreadyResolved { terminal: Resolution },
254    UnknownOrRevoked,
255}
256
257#[derive(Debug)]
258struct AwaitEventEntry {
259    terminal: Option<Resolution>,
260    notify: Arc<Notify>,
261}
262
263#[derive(Debug)]
264struct AwaitEventRegistryState {
265    entries: HashMap<String, AwaitEventEntry>,
266    revoked_key_ids: HashSet<String>,
267    revoked_session_ids: HashSet<String>,
268}
269
270#[derive(Debug)]
271struct AwaitEventRegistry {
272    secret: Vec<u8>,
273    state: std::sync::Mutex<AwaitEventRegistryState>,
274}
275
276impl AwaitEventRegistry {
277    fn new() -> Self {
278        Self {
279            secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
280            state: std::sync::Mutex::new(AwaitEventRegistryState {
281                entries: HashMap::new(),
282                revoked_key_ids: HashSet::new(),
283                revoked_session_ids: HashSet::new(),
284            }),
285        }
286    }
287
288    fn key_for(
289        &self,
290        scope: &ExecutionScope,
291        wait: AwaitEventWaitIdentity,
292    ) -> Result<AwaitEventKey, RuntimeError> {
293        scope.validate()?;
294        wait.validate()?;
295        let key_id =
296            crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
297                RuntimeError::new(
298                    "await_event_key_hash",
299                    format!("failed to hash await-event identity: {err}"),
300                )
301            })?;
302        let signature = self.signature(scope, &wait, &key_id)?;
303        Ok(AwaitEventKey {
304            scope: scope.clone(),
305            wait,
306            key_id,
307            signature,
308        })
309    }
310
311    fn signature(
312        &self,
313        scope: &ExecutionScope,
314        wait: &AwaitEventWaitIdentity,
315        key_id: &str,
316    ) -> Result<String, RuntimeError> {
317        let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
318            RuntimeError::new(
319                "await_event_key_sign",
320                format!("failed to initialize await-event key signer: {err}"),
321            )
322        })?;
323        let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
324            RuntimeError::new(
325                "await_event_key_sign",
326                format!("failed to serialize await-event key identity: {err}"),
327            )
328        })?;
329        mac.update(&canonical);
330        Ok(format!("{:x}", mac.finalize().into_bytes()))
331    }
332
333    fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
334        let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
335        Ok(expected == key.signature)
336    }
337
338    fn resolve(
339        &self,
340        key: &AwaitEventKey,
341        resolution: Resolution,
342    ) -> Result<ResolveOutcome, RuntimeError> {
343        if !self.verify(key)? {
344            return Ok(ResolveOutcome::UnknownOrRevoked);
345        }
346        let mut state = self.state.lock().map_err(|_| {
347            RuntimeError::new(
348                "await_event_registry_poisoned",
349                "await-event registry lock poisoned",
350            )
351        })?;
352        if state.revoked_key_ids.contains(&key.key_id)
353            || key
354                .scope
355                .session_id()
356                .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
357        {
358            return Ok(ResolveOutcome::UnknownOrRevoked);
359        }
360        let entry = state
361            .entries
362            .entry(key.key_id.clone())
363            .or_insert_with(|| AwaitEventEntry {
364                terminal: None,
365                notify: Arc::new(Notify::new()),
366            });
367        if let Some(terminal) = &entry.terminal {
368            return Ok(ResolveOutcome::AlreadyResolved {
369                terminal: terminal.clone(),
370            });
371        }
372        entry.terminal = Some(resolution);
373        entry.notify.notify_waiters();
374        Ok(ResolveOutcome::Accepted)
375    }
376
377    async fn await_resolution(
378        &self,
379        key: &AwaitEventKey,
380        cancel: CancellationToken,
381        deadline: Option<Instant>,
382        clock: &dyn crate::Clock,
383    ) -> Result<Resolution, RuntimeError> {
384        if !self.verify(key)? {
385            return Err(RuntimeError::new(
386                "await_event_unknown_or_revoked",
387                "await-event key is invalid or revoked",
388            ));
389        }
390        loop {
391            let notify =
392                {
393                    let mut state = self.state.lock().map_err(|_| {
394                        RuntimeError::new(
395                            "await_event_registry_poisoned",
396                            "await-event registry lock poisoned",
397                        )
398                    })?;
399                    if state.revoked_key_ids.contains(&key.key_id)
400                        || key.scope.session_id().is_some_and(|session_id| {
401                            state.revoked_session_ids.contains(session_id)
402                        })
403                    {
404                        return Err(RuntimeError::new(
405                            "await_event_unknown_or_revoked",
406                            "await-event key is invalid or revoked",
407                        ));
408                    }
409                    let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
410                        AwaitEventEntry {
411                            terminal: None,
412                            notify: Arc::new(Notify::new()),
413                        }
414                    });
415                    if let Some(terminal) = entry.terminal.clone() {
416                        return Ok(terminal);
417                    }
418                    Arc::clone(&entry.notify)
419                };
420            if let Some(deadline) = deadline {
421                tokio::select! {
422                    _ = cancel.cancelled() => {
423                        let _ = self.resolve(key, Resolution::Cancelled)?;
424                    }
425                    _ = clock.sleep_until(deadline) => {
426                        let _ = self.resolve(key, Resolution::Timeout)?;
427                    }
428                    _ = notify.notified() => {}
429                }
430            } else {
431                tokio::select! {
432                    _ = cancel.cancelled() => {
433                        let _ = self.resolve(key, Resolution::Cancelled)?;
434                    }
435                    _ = notify.notified() => {}
436                }
437            }
438        }
439    }
440
441    fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
442        let mut state = self.state.lock().map_err(|_| {
443            RuntimeError::new(
444                "await_event_registry_poisoned",
445                "await-event registry lock poisoned",
446            )
447        })?;
448        state.revoked_session_ids.insert(session_id.to_string());
449        for entry in state.entries.values() {
450            entry.notify.notify_waiters();
451        }
452        Ok(())
453    }
454}
455
456enum ScopedEffectControllerInner<'run> {
457    Borrowed(&'run dyn RuntimeEffectController),
458    Shared(Arc<dyn RuntimeEffectController>),
459}
460
461impl Clone for ScopedEffectControllerInner<'_> {
462    fn clone(&self) -> Self {
463        match self {
464            Self::Borrowed(controller) => Self::Borrowed(*controller),
465            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
466        }
467    }
468}
469
470/// Scoped low-level controller plus the semantic execution scope it is serving.
471#[derive(Clone)]
472pub struct ScopedEffectController<'run> {
473    controller: ScopedEffectControllerInner<'run>,
474    scope: ExecutionScope,
475}
476
477impl<'run> ScopedEffectController<'run> {
478    pub fn borrowed(
479        controller: &'run dyn RuntimeEffectController,
480        scope: ExecutionScope,
481    ) -> Result<Self, RuntimeError> {
482        scope.validate()?;
483        Ok(Self {
484            controller: ScopedEffectControllerInner::Borrowed(controller),
485            scope,
486        })
487    }
488
489    pub fn shared(
490        controller: Arc<dyn RuntimeEffectController>,
491        scope: ExecutionScope,
492    ) -> Result<Self, RuntimeError> {
493        scope.validate()?;
494        Ok(Self {
495            controller: ScopedEffectControllerInner::Shared(controller),
496            scope,
497        })
498    }
499
500    pub fn controller(&self) -> &dyn RuntimeEffectController {
501        match &self.controller {
502            ScopedEffectControllerInner::Borrowed(controller) => *controller,
503            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
504        }
505    }
506
507    pub fn execution_scope(&self) -> &ExecutionScope {
508        &self.scope
509    }
510
511    pub fn scope_id(&self) -> &str {
512        self.scope.id()
513    }
514
515    pub fn turn_id(&self) -> Option<&str> {
516        self.scope.turn_id()
517    }
518}
519
520/// Deployment-level factory for scoped effect controllers.
521#[async_trait::async_trait]
522pub trait EffectHost: Send + Sync {
523    fn durability_tier(&self) -> crate::DurabilityTier {
524        crate::DurabilityTier::Inline
525    }
526
527    fn requires_durable_attachment_store(&self) -> bool {
528        false
529    }
530
531    fn supports_durable_effects(&self) -> bool {
532        false
533    }
534
535    fn scoped<'run>(
536        &'run self,
537        scope: ExecutionScope,
538    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
539
540    fn scoped_static(
541        &self,
542        _scope: ExecutionScope,
543    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
544        Ok(None)
545    }
546
547    async fn await_event_key(
548        &self,
549        _scope: &ExecutionScope,
550        _wait: AwaitEventWaitIdentity,
551    ) -> Result<AwaitEventKey, RuntimeError> {
552        Err(RuntimeError::new(
553            "await_event_unsupported",
554            "this effect host does not support await-event keys",
555        ))
556    }
557
558    async fn resolve_await_event(
559        &self,
560        _key: &AwaitEventKey,
561        _resolution: Resolution,
562    ) -> Result<ResolveOutcome, RuntimeError> {
563        Ok(ResolveOutcome::UnknownOrRevoked)
564    }
565
566    async fn await_await_event(
567        &self,
568        _key: &AwaitEventKey,
569        _cancel: CancellationToken,
570        _deadline: Option<Instant>,
571    ) -> Result<Resolution, RuntimeError> {
572        Err(RuntimeError::new(
573            "await_event_unsupported",
574            "this effect host does not support await-event waits",
575        ))
576    }
577
578    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
579        Ok(())
580    }
581}
582
583/// Boundary for nondeterministic runtime work.
584#[async_trait::async_trait]
585pub trait RuntimeEffectController: Send + Sync {
586    /// Durability tier this controller provides; defaults to
587    /// [`DurabilityTier::Inline`].
588    fn durability_tier(&self) -> crate::DurabilityTier {
589        crate::DurabilityTier::Inline
590    }
591
592    fn requires_durable_attachment_store(&self) -> bool {
593        false
594    }
595
596    fn supports_durable_effects(&self) -> bool {
597        false
598    }
599
600    /// Whether this controller can safely accept overlapping `execute_effect`
601    /// calls from one runtime coordinator.
602    ///
603    /// Local and store-backed controllers can usually fan out independent
604    /// effects. Some workflow substrates expose a single ordered journal
605    /// context where native operations must be awaited immediately before the
606    /// next context call is issued. Those controllers should return `false` so
607    /// coordinators serialize child effects while still replaying each child by
608    /// its own stable key.
609    fn supports_concurrent_effects(&self) -> bool {
610        true
611    }
612
613    async fn execute_effect(
614        &self,
615        envelope: RuntimeEffectEnvelope,
616        local_executor: RuntimeEffectLocalExecutor<'_>,
617    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
618
619    async fn await_event_key(
620        &self,
621        _scope: &ExecutionScope,
622        _wait: AwaitEventWaitIdentity,
623    ) -> Result<AwaitEventKey, RuntimeError> {
624        Err(RuntimeError::new(
625            "await_event_unsupported",
626            "this effect controller does not support await-event keys",
627        ))
628    }
629
630    async fn resolve_await_event(
631        &self,
632        _key: &AwaitEventKey,
633        _resolution: Resolution,
634    ) -> Result<ResolveOutcome, RuntimeError> {
635        Ok(ResolveOutcome::UnknownOrRevoked)
636    }
637
638    async fn await_await_event(
639        &self,
640        _key: &AwaitEventKey,
641        _cancel: CancellationToken,
642        _deadline: Option<Instant>,
643    ) -> Result<Resolution, RuntimeError> {
644        Err(RuntimeError::new(
645            "await_event_unsupported",
646            "this effect controller does not support await-event waits",
647        ))
648    }
649
650    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
651        Ok(())
652    }
653}
654
655/// Runtime-internal handle for effect-controller references carried through
656/// per-turn execution contexts.
657#[derive(Clone)]
658pub(crate) enum RuntimeEffectControllerHandle<'run> {
659    Borrowed(ScopedEffectController<'run>),
660    #[cfg(any(test, feature = "testing"))]
661    Shared {
662        controller: Arc<dyn RuntimeEffectController>,
663        scope: ExecutionScope,
664    },
665}
666
667impl<'run> RuntimeEffectControllerHandle<'run> {
668    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
669        Self::Borrowed(scoped)
670    }
671
672    #[cfg(any(test, feature = "testing"))]
673    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
674        Self::Shared {
675            controller,
676            scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
677        }
678    }
679
680    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
681        match self {
682            Self::Borrowed(scoped) => scoped.controller(),
683            #[cfg(any(test, feature = "testing"))]
684            Self::Shared { controller, .. } => controller.as_ref(),
685        }
686    }
687
688    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
689        match self {
690            Self::Borrowed(scoped) => scoped.clone(),
691            #[cfg(any(test, feature = "testing"))]
692            Self::Shared { controller, scope } => {
693                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
694                    .expect("runtime effect controller handle carries a valid scope")
695            }
696        }
697    }
698
699    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
700        self.clone()
701    }
702}
703
704#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
705#[error("{code}: {message}")]
706pub struct RuntimeEffectControllerError {
707    pub code: String,
708    pub message: String,
709}
710
711impl RuntimeEffectControllerError {
712    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
713        Self {
714            code: code.into(),
715            message: message.into(),
716        }
717    }
718
719    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
720        Self::new(
721            "runtime_effect_wrong_outcome",
722            format!(
723                "expected {} outcome, got {}",
724                expected.as_str(),
725                actual.as_str()
726            ),
727        )
728    }
729
730    pub(crate) fn into_runtime_error(self) -> RuntimeError {
731        RuntimeError::new(self.code, self.message)
732    }
733}
734
735impl From<RuntimeError> for RuntimeEffectControllerError {
736    fn from(err: RuntimeError) -> Self {
737        Self::new(err.code.as_str(), err.message)
738    }
739}
740
741impl From<PluginError> for RuntimeEffectControllerError {
742    fn from(err: PluginError) -> Self {
743        Self::new("plugin", err.to_string())
744    }
745}
746
747impl From<crate::StoreError> for RuntimeEffectControllerError {
748    fn from(err: crate::StoreError) -> Self {
749        Self::new("runtime_store", err.to_string())
750    }
751}
752
753// =============================================================================
754// Local executor (per-effect borrowed runner state)
755// =============================================================================
756
757#[async_trait::async_trait]
758pub(crate) trait ProcessRunner: Send + Sync {
759    async fn run_process(
760        &self,
761        registration: crate::ProcessRegistration,
762        execution_context: crate::ProcessExecutionContext,
763        registry: Arc<dyn ProcessRegistry>,
764        scoped_effect_controller: crate::ScopedEffectController<'_>,
765        cancellation: CancellationToken,
766    ) -> crate::ProcessAwaitOutput;
767}
768
769pub struct ProcessLocalExecution {
770    pub registry: Arc<dyn ProcessRegistry>,
771    pub process_work_driver: Option<crate::ProcessWorkDriver>,
772}
773
774pub(super) struct LocalTurnEffectRunner<'a, 'run> {
775    driver: &'a mut RuntimeTurnDriver<'run>,
776    machine: &'a mut crate::TurnMachine,
777    event_tx: mpsc::Sender<RuntimeStreamEvent>,
778    cancellation: CancellationToken,
779}
780
781pub(super) struct LocalDirectEffectRunner {
782    provider: ProviderHandle,
783    attachment_store: Arc<dyn AttachmentStore>,
784}
785
786struct LocalToolBatchEffectRunner<'run> {
787    context: crate::RuntimeExecutionContext<'run>,
788    child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
789}
790
791#[async_trait::async_trait]
792trait RuntimeEffectLocalRunner: Send {
793    async fn execute(
794        self: Box<Self>,
795        envelope: RuntimeEffectEnvelope,
796    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
797}
798
799#[cfg(any(test, feature = "testing"))]
800type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
801        RuntimeEffectEnvelope,
802    ) -> Pin<
803        Box<
804            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
805                + Send
806                + 'run,
807        >,
808    > + Send
809    + 'run;
810
811#[cfg(any(test, feature = "testing"))]
812struct TestingRuntimeEffectLocalRunner<'run> {
813    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
814}
815
816type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
817        serde_json::Value,
818    ) -> Pin<
819        Box<
820            dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
821                + Send
822                + 'run,
823        >,
824    > + Send
825    + 'run;
826
827struct DurableStepLocalRunner<'run> {
828    run: Box<DurableStepLocalRunnerFn<'run>>,
829}
830
831enum RuntimeEffectLocalExecutorState<'run> {
832    Unavailable,
833    SleepOnly {
834        cancellation: CancellationToken,
835        clock: Arc<dyn crate::Clock>,
836    },
837    ExternalWaitOptions {
838        cancellation: CancellationToken,
839        deadline: Option<Instant>,
840        clock: Arc<dyn crate::Clock>,
841    },
842    Process(ProcessLocalExecution),
843    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
844}
845
846/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
847///
848/// Durable controllers may ignore it and replay their own recorded result. The
849/// default inline controller delegates to it, so local provider/tool/checkpoint
850/// work still crosses the same `execute_effect` boundary as durable controllers.
851pub struct RuntimeEffectLocalExecutor<'run> {
852    state: RuntimeEffectLocalExecutorState<'run>,
853}
854
855impl<'run> RuntimeEffectLocalExecutor<'run> {
856    pub fn unavailable() -> Self {
857        Self {
858            state: RuntimeEffectLocalExecutorState::Unavailable,
859        }
860    }
861
862    pub fn sleep(cancellation: CancellationToken) -> Self {
863        Self::sleep_with_clock(cancellation, Arc::new(crate::SystemClock))
864    }
865
866    pub fn sleep_with_clock(cancellation: CancellationToken, clock: Arc<dyn crate::Clock>) -> Self {
867        Self {
868            state: RuntimeEffectLocalExecutorState::SleepOnly {
869                cancellation,
870                clock,
871            },
872        }
873    }
874
875    pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
876        Self::await_event_with_clock(cancellation, deadline, Arc::new(crate::SystemClock))
877    }
878
879    pub fn await_event_with_clock(
880        cancellation: CancellationToken,
881        deadline: Option<Instant>,
882        clock: Arc<dyn crate::Clock>,
883    ) -> Self {
884        Self {
885            state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
886                cancellation,
887                deadline,
888                clock,
889            },
890        }
891    }
892
893    pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
894        Self::processes_with_driver(registry, None)
895    }
896
897    pub fn processes_with_driver(
898        registry: Arc<dyn ProcessRegistry>,
899        process_work_driver: Option<crate::ProcessWorkDriver>,
900    ) -> Self {
901        Self {
902            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution {
903                registry,
904                process_work_driver,
905            }),
906        }
907    }
908
909    pub fn durable_step<F, Fut>(run: F) -> Self
910    where
911        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
912        Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
913    {
914        Self {
915            state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
916                run: Box::new(move |input| {
917                    Box::pin(async move { run(input).await.map_err(Into::into) })
918                }),
919            })),
920        }
921    }
922
923    #[cfg(any(test, feature = "testing"))]
924    pub fn testing<F, Fut>(run: F) -> Self
925    where
926        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
927        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
928            + Send
929            + 'run,
930    {
931        Self {
932            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
933                TestingRuntimeEffectLocalRunner {
934                    run: Box::new(move |envelope| Box::pin(run(envelope))),
935                },
936            )),
937        }
938    }
939
940    pub(in crate::runtime) fn turn<'scope>(
941        driver: &'run mut RuntimeTurnDriver<'scope>,
942        machine: &'run mut crate::TurnMachine,
943        event_tx: mpsc::Sender<RuntimeStreamEvent>,
944        cancellation: CancellationToken,
945    ) -> Self
946    where
947        'scope: 'run,
948    {
949        Self {
950            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
951                driver,
952                machine,
953                event_tx,
954                cancellation,
955            })),
956        }
957    }
958
959    pub(in crate::runtime) fn direct(
960        provider: ProviderHandle,
961        attachment_store: Arc<dyn AttachmentStore>,
962    ) -> Self {
963        Self {
964            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
965                provider,
966                attachment_store,
967            })),
968        }
969    }
970
971    pub(crate) fn tool_batch(
972        context: crate::RuntimeExecutionContext<'run>,
973        child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
974    ) -> Self {
975        Self {
976            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalToolBatchEffectRunner {
977                context,
978                child_trace_hooks,
979            })),
980        }
981    }
982
983    pub async fn execute(
984        self,
985        envelope: RuntimeEffectEnvelope,
986    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
987        match self.state {
988            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
989            RuntimeEffectLocalExecutorState::SleepOnly {
990                cancellation,
991                clock,
992            } => execute_local_sleep(envelope, cancellation, clock.as_ref()).await,
993            RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
994                Err(RuntimeEffectControllerError::new(
995                    "runtime_effect_local_executor_mismatch",
996                    format!(
997                        "local await-event options cannot execute {} command directly",
998                        envelope.command.kind().as_str()
999                    ),
1000                ))
1001            }
1002            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
1003                "runtime_effect_local_executor_unavailable",
1004                format!(
1005                    "no local executor is available for {}",
1006                    envelope.command.kind().as_str()
1007                ),
1008            )),
1009            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
1010                "runtime_effect_local_executor_mismatch",
1011                format!(
1012                    "process executor cannot execute {} command directly",
1013                    envelope.command.kind().as_str()
1014                ),
1015            )),
1016        }
1017    }
1018
1019    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
1020        match self.state {
1021            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
1022            _ => Err(RuntimeEffectControllerError::new(
1023                "runtime_effect_local_executor_unavailable",
1024                "no process executor is available for process command",
1025            )),
1026        }
1027    }
1028
1029    fn into_await_event_options(self) -> Result<AwaitEventOptions, RuntimeEffectControllerError> {
1030        match self.state {
1031            RuntimeEffectLocalExecutorState::ExternalWaitOptions {
1032                cancellation,
1033                deadline,
1034                clock,
1035            } => Ok((cancellation, deadline, clock)),
1036            _ => Ok((CancellationToken::new(), None, Arc::new(crate::SystemClock))),
1037        }
1038    }
1039}
1040
1041#[cfg(any(test, feature = "testing"))]
1042#[async_trait::async_trait]
1043impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
1044    async fn execute(
1045        self: Box<Self>,
1046        envelope: RuntimeEffectEnvelope,
1047    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1048        (self.run)(envelope).await
1049    }
1050}
1051
1052#[async_trait::async_trait]
1053impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
1054    async fn execute(
1055        self: Box<Self>,
1056        envelope: RuntimeEffectEnvelope,
1057    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1058        match envelope.command {
1059            RuntimeEffectCommand::DurableStep { input, .. } => {
1060                let value = (self.run)(input).await?;
1061                Ok(RuntimeEffectOutcome::DurableStep { value })
1062            }
1063            command => Err(RuntimeEffectControllerError::new(
1064                "runtime_effect_local_executor_mismatch",
1065                format!(
1066                    "local durable step executor cannot execute {} command",
1067                    command.kind().as_str()
1068                ),
1069            )),
1070        }
1071    }
1072}
1073
1074#[async_trait::async_trait]
1075impl RuntimeEffectLocalRunner for LocalToolBatchEffectRunner<'_> {
1076    async fn execute(
1077        self: Box<Self>,
1078        envelope: RuntimeEffectEnvelope,
1079    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1080        match envelope.command {
1081            RuntimeEffectCommand::ToolBatch { batch } => {
1082                let outcome = self
1083                    .context
1084                    .execute_prepared_tool_batch_launches(
1085                        batch,
1086                        envelope.invocation,
1087                        self.child_trace_hooks,
1088                    )
1089                    .await?;
1090                Ok(RuntimeEffectOutcome::ToolBatch {
1091                    launches: outcome.launches,
1092                    triggers: outcome.triggers,
1093                })
1094            }
1095            RuntimeEffectCommand::ToolAttempt {
1096                call,
1097                execution_grant,
1098                attempt,
1099                max_attempts,
1100            } => {
1101                let child_execution_trace_hook = self.child_trace_hooks.get(&call.call_id).cloned();
1102                let outcome = self
1103                    .context
1104                    .execute_prepared_tool_attempt_effect(
1105                        call,
1106                        execution_grant,
1107                        attempt,
1108                        max_attempts,
1109                        envelope.invocation,
1110                        child_execution_trace_hook,
1111                    )
1112                    .await?;
1113                Ok(RuntimeEffectOutcome::ToolAttempt {
1114                    launch: outcome.launch,
1115                    triggers: outcome.triggers,
1116                })
1117            }
1118            command => Err(RuntimeEffectControllerError::new(
1119                "runtime_effect_local_executor_mismatch",
1120                format!(
1121                    "local tool executor cannot execute {} command",
1122                    command.kind().as_str()
1123                ),
1124            )),
1125        }
1126    }
1127}
1128
1129#[async_trait::async_trait]
1130impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
1131    async fn execute(
1132        self: Box<Self>,
1133        envelope: RuntimeEffectEnvelope,
1134    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1135        let runner = *self;
1136        match envelope.command {
1137            RuntimeEffectCommand::LlmCall { request } => {
1138                let protocol_iteration = runner.machine.protocol_iteration();
1139                let (result, text_streamed) = runner
1140                    .driver
1141                    .run_llm_call(
1142                        Arc::new((*request).into_request(None, None)),
1143                        protocol_iteration,
1144                        envelope.invocation,
1145                        &runner.event_tx,
1146                        &runner.cancellation,
1147                    )
1148                    .await;
1149                Ok(RuntimeEffectOutcome::LlmCall {
1150                    result,
1151                    text_streamed,
1152                })
1153            }
1154            RuntimeEffectCommand::ToolBatch { batch } => {
1155                let outcome = runner
1156                    .driver
1157                    .run_tool_batch(
1158                        batch,
1159                        envelope.invocation,
1160                        &runner.event_tx,
1161                        &runner.cancellation,
1162                    )
1163                    .await?;
1164                Ok(RuntimeEffectOutcome::ToolBatch {
1165                    launches: outcome.launches,
1166                    triggers: outcome.triggers,
1167                })
1168            }
1169            RuntimeEffectCommand::ExecCode { language, code } => {
1170                let protocol_iteration = runner.machine.protocol_iteration();
1171                let messages = runner.machine.message_sequence();
1172                Ok(RuntimeEffectOutcome::ExecCode {
1173                    result: runner
1174                        .driver
1175                        .run_exec_code(
1176                            language,
1177                            &code,
1178                            messages,
1179                            protocol_iteration,
1180                            envelope.invocation,
1181                            &runner.event_tx,
1182                        )
1183                        .await,
1184                })
1185            }
1186            RuntimeEffectCommand::Checkpoint { checkpoint } => {
1187                Ok(RuntimeEffectOutcome::Checkpoint {
1188                    result: runner
1189                        .driver
1190                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1191                        .await
1192                        .map_err(RuntimeEffectControllerError::from),
1193                })
1194            }
1195            RuntimeEffectCommand::SyncExecutionEnvironment {
1196                update_machine_config,
1197            } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1198                result: runner
1199                    .driver
1200                    .refresh_execution_environment(runner.machine, update_machine_config)
1201                    .await
1202                    .map_err(|err| err.to_string()),
1203            }),
1204            RuntimeEffectCommand::Sleep { duration_ms } => {
1205                sleep_with_cancellation(
1206                    duration_ms,
1207                    &runner.cancellation,
1208                    runner.driver.host.core.clock.as_ref(),
1209                )
1210                .await?;
1211                Ok(RuntimeEffectOutcome::Sleep)
1212            }
1213            command => Err(RuntimeEffectControllerError::new(
1214                "runtime_effect_local_executor_mismatch",
1215                format!(
1216                    "local turn executor cannot execute {} command",
1217                    command.kind().as_str()
1218                ),
1219            )),
1220        }
1221    }
1222}
1223
1224#[async_trait::async_trait]
1225impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1226    async fn execute(
1227        mut self: Box<Self>,
1228        envelope: RuntimeEffectEnvelope,
1229    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1230        match envelope.command {
1231            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1232                result: self
1233                    .run_direct_llm_request((*request).into_request(
1234                        crate::session_model::transport_stream_events(&self.provider, None),
1235                        None,
1236                    ))
1237                    .await,
1238            }),
1239            RuntimeEffectCommand::Sleep { duration_ms } => {
1240                sleep_with_cancellation(
1241                    duration_ms,
1242                    &CancellationToken::new(),
1243                    &crate::SystemClock,
1244                )
1245                .await?;
1246                Ok(RuntimeEffectOutcome::Sleep)
1247            }
1248            command => Err(RuntimeEffectControllerError::new(
1249                "runtime_effect_local_executor_mismatch",
1250                format!(
1251                    "local direct executor cannot execute {} command",
1252                    command.kind().as_str()
1253                ),
1254            )),
1255        }
1256    }
1257}
1258
1259impl LocalDirectEffectRunner {
1260    async fn run_direct_llm_request(
1261        &mut self,
1262        request: CoreLlmRequest,
1263    ) -> Result<LlmResponse, LlmCallError> {
1264        let request = crate::attachments::resolve_llm_request_attachments(
1265            request,
1266            self.attachment_store.as_ref(),
1267        )
1268        .await
1269        .map_err(|err| LlmCallError {
1270            message: err.to_string(),
1271            retryable: false,
1272            raw: None,
1273            code: Some("attachment_resolution_failed".to_string()),
1274            terminal_reason: crate::LlmTerminalReason::ProviderError,
1275            request_body: None,
1276        })?;
1277        self.provider
1278            .complete(request)
1279            .await
1280            .map_err(llm_call_error_from_transport)
1281    }
1282}
1283
1284async fn execute_local_sleep(
1285    envelope: RuntimeEffectEnvelope,
1286    cancellation: CancellationToken,
1287    clock: &dyn crate::Clock,
1288) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1289    match envelope.command {
1290        RuntimeEffectCommand::Sleep { duration_ms } => {
1291            sleep_with_cancellation(duration_ms, &cancellation, clock).await?;
1292            Ok(RuntimeEffectOutcome::Sleep)
1293        }
1294        command => Err(RuntimeEffectControllerError::new(
1295            "runtime_effect_local_executor_mismatch",
1296            format!(
1297                "local sleep executor cannot execute {} command",
1298                command.kind().as_str()
1299            ),
1300        )),
1301    }
1302}
1303
1304async fn sleep_with_cancellation(
1305    duration_ms: u64,
1306    cancellation: &CancellationToken,
1307    clock: &dyn crate::Clock,
1308) -> Result<(), RuntimeEffectControllerError> {
1309    let sleep = clock.sleep(std::time::Duration::from_millis(duration_ms));
1310    tokio::pin!(sleep);
1311    tokio::select! {
1312        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1313            "runtime_effect_sleep_cancelled",
1314            "runtime effect sleep was cancelled",
1315        )),
1316        _ = &mut sleep => Ok(()),
1317    }
1318}
1319
1320// =============================================================================
1321// Default in-process effect controller
1322// =============================================================================
1323
1324/// Default in-process effect controller.
1325///
1326/// The inline controller executes local runners in process, provides in-memory
1327/// await-event resolution, and exposes durable-tool-effect semantics for local
1328/// runs. It does not make in-flight effects crash durable; workflow adapters
1329/// provide that by recording outcomes in their own history.
1330#[derive(Clone, Default)]
1331pub struct InlineRuntimeEffectController;
1332
1333#[async_trait::async_trait]
1334impl RuntimeEffectController for InlineRuntimeEffectController {
1335    fn supports_durable_effects(&self) -> bool {
1336        true
1337    }
1338
1339    async fn execute_effect(
1340        &self,
1341        envelope: RuntimeEffectEnvelope,
1342        local_executor: RuntimeEffectLocalExecutor<'_>,
1343    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1344        match envelope.command {
1345            RuntimeEffectCommand::AwaitEvent { key } => {
1346                let (cancellation, deadline, clock) = local_executor.into_await_event_options()?;
1347                let resolution = inline_await_events()
1348                    .await_resolution(&key, cancellation, deadline, clock.as_ref())
1349                    .await
1350                    .map_err(RuntimeEffectControllerError::from)?;
1351                Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1352            }
1353            RuntimeEffectCommand::Process { command } => {
1354                let execution = local_executor.into_process()?;
1355                let registry = execution.registry;
1356                let process_work_driver = execution.process_work_driver;
1357                let result = tokio::task::spawn(async move {
1358                    Self::execute_process_command(registry, process_work_driver, *command).await
1359                })
1360                .await
1361                .map_err(|err| {
1362                    RuntimeEffectControllerError::new(
1363                        "runtime_effect_process_task_join",
1364                        format!("inline process effect task failed: {err}"),
1365                    )
1366                })??;
1367                Ok(RuntimeEffectOutcome::Process { result })
1368            }
1369            _ => local_executor.execute(envelope).await,
1370        }
1371    }
1372
1373    async fn await_event_key(
1374        &self,
1375        scope: &ExecutionScope,
1376        wait: AwaitEventWaitIdentity,
1377    ) -> Result<AwaitEventKey, RuntimeError> {
1378        inline_await_events().key_for(scope, wait)
1379    }
1380
1381    async fn resolve_await_event(
1382        &self,
1383        key: &AwaitEventKey,
1384        resolution: Resolution,
1385    ) -> Result<ResolveOutcome, RuntimeError> {
1386        inline_await_events().resolve(key, resolution)
1387    }
1388
1389    async fn await_await_event(
1390        &self,
1391        key: &AwaitEventKey,
1392        cancel: CancellationToken,
1393        deadline: Option<Instant>,
1394    ) -> Result<Resolution, RuntimeError> {
1395        inline_await_events()
1396            .await_resolution(key, cancel, deadline, &crate::SystemClock)
1397            .await
1398    }
1399
1400    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1401        inline_await_events().revoke_session(session_id)
1402    }
1403}
1404
1405/// In-process deployment effect host.
1406#[derive(Clone)]
1407pub struct InlineEffectHost {
1408    controller: Arc<dyn RuntimeEffectController>,
1409}
1410
1411impl InlineEffectHost {
1412    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1413        Self { controller }
1414    }
1415}
1416
1417impl Default for InlineEffectHost {
1418    fn default() -> Self {
1419        Self::new(Arc::new(InlineRuntimeEffectController))
1420    }
1421}
1422
1423#[async_trait::async_trait]
1424impl EffectHost for InlineEffectHost {
1425    fn durability_tier(&self) -> crate::DurabilityTier {
1426        self.controller.durability_tier()
1427    }
1428
1429    fn requires_durable_attachment_store(&self) -> bool {
1430        self.controller.requires_durable_attachment_store()
1431    }
1432
1433    fn supports_durable_effects(&self) -> bool {
1434        self.controller.supports_durable_effects()
1435    }
1436
1437    fn scoped<'run>(
1438        &'run self,
1439        scope: ExecutionScope,
1440    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1441        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1442    }
1443
1444    fn scoped_static(
1445        &self,
1446        scope: ExecutionScope,
1447    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1448        Ok(Some(ScopedEffectController::shared(
1449            Arc::clone(&self.controller),
1450            scope,
1451        )?))
1452    }
1453
1454    async fn await_event_key(
1455        &self,
1456        scope: &ExecutionScope,
1457        wait: AwaitEventWaitIdentity,
1458    ) -> Result<AwaitEventKey, RuntimeError> {
1459        self.controller.await_event_key(scope, wait).await
1460    }
1461
1462    async fn resolve_await_event(
1463        &self,
1464        key: &AwaitEventKey,
1465        resolution: Resolution,
1466    ) -> Result<ResolveOutcome, RuntimeError> {
1467        self.controller.resolve_await_event(key, resolution).await
1468    }
1469
1470    async fn await_await_event(
1471        &self,
1472        key: &AwaitEventKey,
1473        cancel: CancellationToken,
1474        deadline: Option<Instant>,
1475    ) -> Result<Resolution, RuntimeError> {
1476        self.controller
1477            .await_await_event(key, cancel, deadline)
1478            .await
1479    }
1480
1481    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1482        self.controller
1483            .revoke_await_events_for_session(session_id)
1484            .await
1485    }
1486}
1487
1488impl InlineRuntimeEffectController {
1489    /// Register the process (and any handle grant) into the durable registry.
1490    ///
1491    /// The inline controller no longer runs the process here: the registry's
1492    /// non-terminal row *is* the durable work queue, and the host-owned
1493    /// [`ProcessWorkDriver`](crate::ProcessWorkDriver) is the sole executor.
1494    /// Registering the row is all this path does; the control seam drives the
1495    /// host driver after a successful start.
1496    pub(crate) async fn start_process(
1497        registry: Arc<dyn crate::ProcessRegistry>,
1498        registration: crate::ProcessRegistration,
1499        grant: Option<crate::ProcessStartGrant>,
1500    ) -> Result<ProcessRecord, PluginError> {
1501        let registration_for_record = registration.clone();
1502        let record = registry.register_process(registration_for_record).await?;
1503        if let Some(grant) = grant {
1504            registry
1505                .grant_handle(&grant.session_scope, &registration.id, grant.descriptor)
1506                .await?;
1507        }
1508        Ok(record)
1509    }
1510
1511    pub(crate) async fn request_process_cancel(
1512        &self,
1513        registry: Arc<dyn crate::ProcessRegistry>,
1514        process_id: &str,
1515        reason: Option<String>,
1516    ) -> Result<ProcessRecord, PluginError> {
1517        // Cancellation is a durable signal: the cancel event is what the
1518        // runner-run process observes, so the inline controller appends it and
1519        // no longer tracks an in-process cancellation token.
1520        registry
1521            .append_event(
1522                process_id,
1523                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1524            )
1525            .await?;
1526        registry
1527            .get_process(process_id)
1528            .await
1529            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1530    }
1531
1532    async fn execute_process_command(
1533        registry: Arc<dyn crate::ProcessRegistry>,
1534        process_work_driver: Option<crate::ProcessWorkDriver>,
1535        command: ProcessCommand,
1536    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1537        match command {
1538            ProcessCommand::Start {
1539                registration,
1540                grant,
1541                execution_context: _,
1542            } => {
1543                let record = Self::start_process(registry, registration, grant).await?;
1544                if let Some(driver) = process_work_driver.as_ref() {
1545                    driver.claim_and_run_pending("process_start").await?;
1546                }
1547                Ok(ProcessEffectOutcome::Start { record })
1548            }
1549            ProcessCommand::List {
1550                session_scope,
1551                mode,
1552            } => {
1553                let entries = match mode {
1554                    crate::ProcessListMode::Live => {
1555                        registry.list_live_handle_grants(&session_scope).await?
1556                    }
1557                    crate::ProcessListMode::All => {
1558                        registry.list_handle_grants(&session_scope).await?
1559                    }
1560                };
1561                Ok(ProcessEffectOutcome::List { entries })
1562            }
1563            ProcessCommand::Transfer {
1564                from_scope,
1565                to_scope,
1566                process_ids,
1567            } => {
1568                registry
1569                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1570                    .await?;
1571                Ok(ProcessEffectOutcome::Transfer)
1572            }
1573            ProcessCommand::DeleteSession { session_id } => {
1574                let report = registry.delete_session_process_state(&session_id).await?;
1575                Ok(ProcessEffectOutcome::DeleteSession { report })
1576            }
1577            ProcessCommand::Await { process_id } => {
1578                let output = registry.await_process(&process_id).await?;
1579                Ok(ProcessEffectOutcome::Await { output })
1580            }
1581            ProcessCommand::Cancel { process_id, reason } => {
1582                let record = InlineRuntimeEffectController
1583                    .request_process_cancel(registry, &process_id, reason)
1584                    .await?;
1585                Ok(ProcessEffectOutcome::Cancel { record })
1586            }
1587            ProcessCommand::Signal {
1588                process_id,
1589                request,
1590                ..
1591            } => {
1592                let result = registry.append_event(&process_id, request).await?;
1593                Ok(ProcessEffectOutcome::Signal {
1594                    event: result.event,
1595                })
1596            }
1597        }
1598    }
1599}
1600
1601impl std::fmt::Debug for InlineRuntimeEffectController {
1602    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1603        f.debug_struct("InlineRuntimeEffectController").finish()
1604    }
1605}