Skip to main content

lash_core/runtime/effect/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::pin::Pin;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5
6use hmac::{Hmac, Mac};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{Notify, mpsc};
9use tokio_util::sync::CancellationToken;
10
11use crate::AttachmentStore;
12use crate::LlmRequest as CoreLlmRequest;
13use crate::LlmResponse;
14use crate::ProcessRecord;
15use crate::ProcessRegistry;
16use crate::provider::ProviderHandle;
17use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
18use crate::sansio::LlmCallError;
19use crate::{PluginError, RuntimeError, RuntimeErrorCode};
20
21use super::envelope::{
22    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
23    RuntimeEffectKind, RuntimeEffectOutcome,
24};
25use super::outcome::llm_call_error_from_transport;
26
27type HmacSha256 = Hmac<sha2::Sha256>;
28type AwaitEventOptions = (CancellationToken, Option<Instant>, Arc<dyn crate::Clock>);
29
30fn inline_await_events() -> &'static AwaitEventRegistry {
31    static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
32    REGISTRY.get_or_init(AwaitEventRegistry::new)
33}
34
35// =============================================================================
36// Effect host + controller trait + scope + error
37// =============================================================================
38
39/// Stable semantic identity for one effectful runtime operation.
40///
41/// The scope is chosen by the host boundary before any nondeterministic work is
42/// planned. It is intentionally generic: Restate, an inline test host, or a
43/// future durable effect host all receive the same Lash scope vocabulary.
44#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum ExecutionScope {
47    Turn {
48        session_id: String,
49        turn_id: String,
50    },
51    Process {
52        process_id: String,
53    },
54    QueueDrain {
55        session_id: String,
56        drain_id: String,
57    },
58    SessionDelete {
59        session_id: String,
60    },
61    RuntimeOperation {
62        operation_id: String,
63    },
64}
65
66impl ExecutionScope {
67    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
68        Self::Turn {
69            session_id: session_id.into(),
70            turn_id: turn_id.into(),
71        }
72    }
73
74    pub fn process(process_id: impl Into<String>) -> Self {
75        Self::Process {
76            process_id: process_id.into(),
77        }
78    }
79
80    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
81        Self::QueueDrain {
82            session_id: session_id.into(),
83            drain_id: drain_id.into(),
84        }
85    }
86
87    pub fn session_delete(session_id: impl Into<String>) -> Self {
88        Self::SessionDelete {
89            session_id: session_id.into(),
90        }
91    }
92
93    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
94        Self::RuntimeOperation {
95            operation_id: operation_id.into(),
96        }
97    }
98
99    pub fn id(&self) -> &str {
100        match self {
101            Self::Turn { turn_id, .. } => turn_id,
102            Self::Process { process_id } => process_id,
103            Self::QueueDrain { drain_id, .. } => drain_id,
104            Self::SessionDelete { session_id } => session_id,
105            Self::RuntimeOperation { operation_id } => operation_id,
106        }
107    }
108
109    pub fn session_id(&self) -> Option<&str> {
110        match self {
111            Self::Turn { session_id, .. }
112            | Self::QueueDrain { session_id, .. }
113            | Self::SessionDelete { session_id } => Some(session_id),
114            Self::Process { .. } | Self::RuntimeOperation { .. } => None,
115        }
116    }
117
118    pub fn turn_id(&self) -> Option<&str> {
119        match self {
120            Self::Turn { turn_id, .. } => Some(turn_id),
121            _ => None,
122        }
123    }
124
125    pub fn validates_turn_trace_id(&self) -> bool {
126        matches!(self, Self::Turn { .. })
127    }
128
129    fn validate(&self) -> Result<(), RuntimeError> {
130        let missing = match self {
131            Self::Turn {
132                session_id,
133                turn_id,
134            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
135            Self::Process { process_id } => process_id.trim().is_empty(),
136            Self::QueueDrain {
137                session_id,
138                drain_id,
139            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
140            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
141            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
142        };
143        if missing {
144            return Err(RuntimeError::new(
145                RuntimeErrorCode::MissingExecutionScopeId,
146                "execution scopes require non-empty stable ids",
147            ));
148        }
149        Ok(())
150    }
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
154#[serde(tag = "type", rename_all = "snake_case")]
155pub enum AwaitEventWaitIdentity {
156    ToolCompletion {
157        tool_call_id: String,
158    },
159    ProcessSignal {
160        process_id: String,
161        signal_name: String,
162        ordinal: u64,
163    },
164    Custom {
165        key: String,
166    },
167}
168
169impl AwaitEventWaitIdentity {
170    pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
171        Self::ToolCompletion {
172            tool_call_id: tool_call_id.into(),
173        }
174    }
175
176    pub fn process_signal(
177        process_id: impl Into<String>,
178        signal_name: impl Into<String>,
179        ordinal: u64,
180    ) -> Self {
181        Self::ProcessSignal {
182            process_id: process_id.into(),
183            signal_name: signal_name.into(),
184            ordinal,
185        }
186    }
187
188    fn validate(&self) -> Result<(), RuntimeError> {
189        let invalid = match self {
190            Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
191            Self::ProcessSignal {
192                process_id,
193                signal_name,
194                ordinal,
195            } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
196            Self::Custom { key } => key.trim().is_empty(),
197        };
198        if invalid {
199            return Err(RuntimeError::new(
200                "invalid_await_event_wait_identity",
201                "await-event wait identity requires non-empty stable ids",
202            ));
203        }
204        Ok(())
205    }
206}
207
208#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
209pub struct AwaitEventKey {
210    pub scope: ExecutionScope,
211    pub wait: AwaitEventWaitIdentity,
212    pub key_id: String,
213    pub signature: String,
214}
215
216impl AwaitEventKey {
217    pub fn promise_key(&self) -> String {
218        format!("lash-await-event:{}", self.key_id)
219    }
220}
221
222#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
223pub struct ExternalCompletionError {
224    pub code: String,
225    pub message: String,
226    #[serde(default, skip_serializing_if = "Option::is_none")]
227    pub raw: Option<serde_json::Value>,
228}
229
230impl ExternalCompletionError {
231    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
232        Self {
233            code: code.into(),
234            message: message.into(),
235            raw: None,
236        }
237    }
238}
239
240#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
241#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
242pub enum Resolution {
243    Ok(serde_json::Value),
244    Err(ExternalCompletionError),
245    Timeout,
246    Cancelled,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "status", rename_all = "snake_case")]
251pub enum ResolveOutcome {
252    Accepted,
253    AlreadyResolved { terminal: Resolution },
254    UnknownOrRevoked,
255}
256
257#[derive(Debug)]
258struct AwaitEventEntry {
259    terminal: Option<Resolution>,
260    notify: Arc<Notify>,
261}
262
263#[derive(Debug)]
264struct AwaitEventRegistryState {
265    entries: HashMap<String, AwaitEventEntry>,
266    revoked_key_ids: HashSet<String>,
267    revoked_session_ids: HashSet<String>,
268}
269
270#[derive(Debug)]
271struct AwaitEventRegistry {
272    secret: Vec<u8>,
273    state: std::sync::Mutex<AwaitEventRegistryState>,
274}
275
276impl AwaitEventRegistry {
277    fn new() -> Self {
278        Self {
279            secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
280            state: std::sync::Mutex::new(AwaitEventRegistryState {
281                entries: HashMap::new(),
282                revoked_key_ids: HashSet::new(),
283                revoked_session_ids: HashSet::new(),
284            }),
285        }
286    }
287
288    fn key_for(
289        &self,
290        scope: &ExecutionScope,
291        wait: AwaitEventWaitIdentity,
292    ) -> Result<AwaitEventKey, RuntimeError> {
293        scope.validate()?;
294        wait.validate()?;
295        let key_id =
296            crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
297                RuntimeError::new(
298                    "await_event_key_hash",
299                    format!("failed to hash await-event identity: {err}"),
300                )
301            })?;
302        let signature = self.signature(scope, &wait, &key_id)?;
303        Ok(AwaitEventKey {
304            scope: scope.clone(),
305            wait,
306            key_id,
307            signature,
308        })
309    }
310
311    fn signature(
312        &self,
313        scope: &ExecutionScope,
314        wait: &AwaitEventWaitIdentity,
315        key_id: &str,
316    ) -> Result<String, RuntimeError> {
317        let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
318            RuntimeError::new(
319                "await_event_key_sign",
320                format!("failed to initialize await-event key signer: {err}"),
321            )
322        })?;
323        let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
324            RuntimeError::new(
325                "await_event_key_sign",
326                format!("failed to serialize await-event key identity: {err}"),
327            )
328        })?;
329        mac.update(&canonical);
330        Ok(format!("{:x}", mac.finalize().into_bytes()))
331    }
332
333    fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
334        let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
335        Ok(expected == key.signature)
336    }
337
338    fn resolve(
339        &self,
340        key: &AwaitEventKey,
341        resolution: Resolution,
342    ) -> Result<ResolveOutcome, RuntimeError> {
343        if !self.verify(key)? {
344            return Ok(ResolveOutcome::UnknownOrRevoked);
345        }
346        let mut state = self.state.lock().map_err(|_| {
347            RuntimeError::new(
348                "await_event_registry_poisoned",
349                "await-event registry lock poisoned",
350            )
351        })?;
352        if state.revoked_key_ids.contains(&key.key_id)
353            || key
354                .scope
355                .session_id()
356                .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
357        {
358            return Ok(ResolveOutcome::UnknownOrRevoked);
359        }
360        let entry = state
361            .entries
362            .entry(key.key_id.clone())
363            .or_insert_with(|| AwaitEventEntry {
364                terminal: None,
365                notify: Arc::new(Notify::new()),
366            });
367        if let Some(terminal) = &entry.terminal {
368            return Ok(ResolveOutcome::AlreadyResolved {
369                terminal: terminal.clone(),
370            });
371        }
372        entry.terminal = Some(resolution);
373        entry.notify.notify_waiters();
374        Ok(ResolveOutcome::Accepted)
375    }
376
377    async fn await_resolution(
378        &self,
379        key: &AwaitEventKey,
380        cancel: CancellationToken,
381        deadline: Option<Instant>,
382        clock: &dyn crate::Clock,
383    ) -> Result<Resolution, RuntimeError> {
384        if !self.verify(key)? {
385            return Err(RuntimeError::new(
386                "await_event_unknown_or_revoked",
387                "await-event key is invalid or revoked",
388            ));
389        }
390        loop {
391            let notify =
392                {
393                    let mut state = self.state.lock().map_err(|_| {
394                        RuntimeError::new(
395                            "await_event_registry_poisoned",
396                            "await-event registry lock poisoned",
397                        )
398                    })?;
399                    if state.revoked_key_ids.contains(&key.key_id)
400                        || key.scope.session_id().is_some_and(|session_id| {
401                            state.revoked_session_ids.contains(session_id)
402                        })
403                    {
404                        return Err(RuntimeError::new(
405                            "await_event_unknown_or_revoked",
406                            "await-event key is invalid or revoked",
407                        ));
408                    }
409                    let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
410                        AwaitEventEntry {
411                            terminal: None,
412                            notify: Arc::new(Notify::new()),
413                        }
414                    });
415                    if let Some(terminal) = entry.terminal.clone() {
416                        return Ok(terminal);
417                    }
418                    Arc::clone(&entry.notify)
419                };
420            if let Some(deadline) = deadline {
421                tokio::select! {
422                    _ = cancel.cancelled() => {
423                        let _ = self.resolve(key, Resolution::Cancelled)?;
424                    }
425                    _ = clock.sleep_until(deadline) => {
426                        let _ = self.resolve(key, Resolution::Timeout)?;
427                    }
428                    _ = notify.notified() => {}
429                }
430            } else {
431                tokio::select! {
432                    _ = cancel.cancelled() => {
433                        let _ = self.resolve(key, Resolution::Cancelled)?;
434                    }
435                    _ = notify.notified() => {}
436                }
437            }
438        }
439    }
440
441    fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
442        let mut state = self.state.lock().map_err(|_| {
443            RuntimeError::new(
444                "await_event_registry_poisoned",
445                "await-event registry lock poisoned",
446            )
447        })?;
448        state.revoked_session_ids.insert(session_id.to_string());
449        for entry in state.entries.values() {
450            entry.notify.notify_waiters();
451        }
452        Ok(())
453    }
454}
455
456enum ScopedEffectControllerInner<'run> {
457    Borrowed(&'run dyn RuntimeEffectController),
458    Shared(Arc<dyn RuntimeEffectController>),
459}
460
461impl Clone for ScopedEffectControllerInner<'_> {
462    fn clone(&self) -> Self {
463        match self {
464            Self::Borrowed(controller) => Self::Borrowed(*controller),
465            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
466        }
467    }
468}
469
470/// Scoped low-level controller plus the semantic execution scope it is serving.
471#[derive(Clone)]
472pub struct ScopedEffectController<'run> {
473    controller: ScopedEffectControllerInner<'run>,
474    scope: ExecutionScope,
475}
476
477impl<'run> ScopedEffectController<'run> {
478    pub fn borrowed(
479        controller: &'run dyn RuntimeEffectController,
480        scope: ExecutionScope,
481    ) -> Result<Self, RuntimeError> {
482        scope.validate()?;
483        Ok(Self {
484            controller: ScopedEffectControllerInner::Borrowed(controller),
485            scope,
486        })
487    }
488
489    pub fn shared(
490        controller: Arc<dyn RuntimeEffectController>,
491        scope: ExecutionScope,
492    ) -> Result<Self, RuntimeError> {
493        scope.validate()?;
494        Ok(Self {
495            controller: ScopedEffectControllerInner::Shared(controller),
496            scope,
497        })
498    }
499
500    pub fn controller(&self) -> &dyn RuntimeEffectController {
501        match &self.controller {
502            ScopedEffectControllerInner::Borrowed(controller) => *controller,
503            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
504        }
505    }
506
507    pub fn execution_scope(&self) -> &ExecutionScope {
508        &self.scope
509    }
510
511    pub fn scope_id(&self) -> &str {
512        self.scope.id()
513    }
514
515    pub fn turn_id(&self) -> Option<&str> {
516        self.scope.turn_id()
517    }
518}
519
520/// Deployment-level factory for scoped effect controllers.
521#[async_trait::async_trait]
522pub trait EffectHost: Send + Sync {
523    fn durability_tier(&self) -> crate::DurabilityTier {
524        crate::DurabilityTier::Inline
525    }
526
527    fn requires_durable_attachment_store(&self) -> bool {
528        false
529    }
530
531    fn supports_durable_effects(&self) -> bool {
532        false
533    }
534
535    fn scoped<'run>(
536        &'run self,
537        scope: ExecutionScope,
538    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
539
540    fn scoped_static(
541        &self,
542        _scope: ExecutionScope,
543    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
544        Ok(None)
545    }
546
547    async fn await_event_key(
548        &self,
549        _scope: &ExecutionScope,
550        _wait: AwaitEventWaitIdentity,
551    ) -> Result<AwaitEventKey, RuntimeError> {
552        Err(RuntimeError::new(
553            "await_event_unsupported",
554            "this effect host does not support await-event keys",
555        ))
556    }
557
558    async fn resolve_await_event(
559        &self,
560        _key: &AwaitEventKey,
561        _resolution: Resolution,
562    ) -> Result<ResolveOutcome, RuntimeError> {
563        Ok(ResolveOutcome::UnknownOrRevoked)
564    }
565
566    async fn await_await_event(
567        &self,
568        _key: &AwaitEventKey,
569        _cancel: CancellationToken,
570        _deadline: Option<Instant>,
571    ) -> Result<Resolution, RuntimeError> {
572        Err(RuntimeError::new(
573            "await_event_unsupported",
574            "this effect host does not support await-event waits",
575        ))
576    }
577
578    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
579        Ok(())
580    }
581}
582
583/// Boundary for nondeterministic runtime work.
584#[async_trait::async_trait]
585pub trait RuntimeEffectController: Send + Sync {
586    /// Durability tier this controller provides; defaults to
587    /// [`DurabilityTier::Inline`].
588    fn durability_tier(&self) -> crate::DurabilityTier {
589        crate::DurabilityTier::Inline
590    }
591
592    fn requires_durable_attachment_store(&self) -> bool {
593        false
594    }
595
596    fn supports_durable_effects(&self) -> bool {
597        false
598    }
599
600    /// Whether this controller can safely accept overlapping `execute_effect`
601    /// calls from one runtime coordinator.
602    ///
603    /// Local and store-backed controllers can usually fan out independent
604    /// effects. Some workflow substrates expose a single ordered journal
605    /// context where native operations must be awaited immediately before the
606    /// next context call is issued. Those controllers should return `false` so
607    /// coordinators serialize child effects while still replaying each child by
608    /// its own stable key.
609    fn supports_concurrent_effects(&self) -> bool {
610        true
611    }
612
613    async fn execute_effect(
614        &self,
615        envelope: RuntimeEffectEnvelope,
616        local_executor: RuntimeEffectLocalExecutor<'_>,
617    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
618
619    async fn await_event_key(
620        &self,
621        _scope: &ExecutionScope,
622        _wait: AwaitEventWaitIdentity,
623    ) -> Result<AwaitEventKey, RuntimeError> {
624        Err(RuntimeError::new(
625            "await_event_unsupported",
626            "this effect controller does not support await-event keys",
627        ))
628    }
629
630    async fn resolve_await_event(
631        &self,
632        _key: &AwaitEventKey,
633        _resolution: Resolution,
634    ) -> Result<ResolveOutcome, RuntimeError> {
635        Ok(ResolveOutcome::UnknownOrRevoked)
636    }
637
638    async fn await_await_event(
639        &self,
640        _key: &AwaitEventKey,
641        _cancel: CancellationToken,
642        _deadline: Option<Instant>,
643    ) -> Result<Resolution, RuntimeError> {
644        Err(RuntimeError::new(
645            "await_event_unsupported",
646            "this effect controller does not support await-event waits",
647        ))
648    }
649
650    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
651        Ok(())
652    }
653}
654
655/// Runtime-internal handle for effect-controller references carried through
656/// per-turn execution contexts.
657#[derive(Clone)]
658pub(crate) enum RuntimeEffectControllerHandle<'run> {
659    Borrowed(ScopedEffectController<'run>),
660    #[cfg(any(test, feature = "testing"))]
661    Shared {
662        controller: Arc<dyn RuntimeEffectController>,
663        scope: ExecutionScope,
664    },
665}
666
667impl<'run> RuntimeEffectControllerHandle<'run> {
668    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
669        Self::Borrowed(scoped)
670    }
671
672    #[cfg(any(test, feature = "testing"))]
673    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
674        Self::Shared {
675            controller,
676            scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
677        }
678    }
679
680    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
681        match self {
682            Self::Borrowed(scoped) => scoped.controller(),
683            #[cfg(any(test, feature = "testing"))]
684            Self::Shared { controller, .. } => controller.as_ref(),
685        }
686    }
687
688    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
689        match self {
690            Self::Borrowed(scoped) => scoped.clone(),
691            #[cfg(any(test, feature = "testing"))]
692            Self::Shared { controller, scope } => {
693                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
694                    .expect("runtime effect controller handle carries a valid scope")
695            }
696        }
697    }
698
699    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
700        self.clone()
701    }
702}
703
704#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
705#[error("{code}: {message}")]
706pub struct RuntimeEffectControllerError {
707    pub code: String,
708    pub message: String,
709}
710
711impl RuntimeEffectControllerError {
712    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
713        Self {
714            code: code.into(),
715            message: message.into(),
716        }
717    }
718
719    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
720        Self::new(
721            "runtime_effect_wrong_outcome",
722            format!(
723                "expected {} outcome, got {}",
724                expected.as_str(),
725                actual.as_str()
726            ),
727        )
728    }
729
730    pub(crate) fn into_runtime_error(self) -> RuntimeError {
731        RuntimeError::new(self.code, self.message)
732    }
733}
734
735impl From<RuntimeError> for RuntimeEffectControllerError {
736    fn from(err: RuntimeError) -> Self {
737        Self::new(err.code.as_str(), err.message)
738    }
739}
740
741impl From<PluginError> for RuntimeEffectControllerError {
742    fn from(err: PluginError) -> Self {
743        Self::new("plugin", err.to_string())
744    }
745}
746
747impl From<crate::StoreError> for RuntimeEffectControllerError {
748    fn from(err: crate::StoreError) -> Self {
749        Self::new("runtime_store", err.to_string())
750    }
751}
752
753// =============================================================================
754// Local executor (per-effect borrowed runner state)
755// =============================================================================
756
757#[async_trait::async_trait]
758pub(crate) trait ProcessRunner: Send + Sync {
759    async fn run_process(
760        &self,
761        registration: crate::ProcessRegistration,
762        execution_context: crate::ProcessExecutionContext,
763        registry: Arc<dyn ProcessRegistry>,
764        scoped_effect_controller: crate::ScopedEffectController<'_>,
765        cancellation: CancellationToken,
766    ) -> crate::ProcessAwaitOutput;
767}
768
769pub struct ProcessLocalExecution {
770    pub registry: Arc<dyn ProcessRegistry>,
771    pub process_work_driver: Option<crate::ProcessWorkDriver>,
772}
773
774pub(super) struct LocalTurnEffectRunner<'a, 'run> {
775    driver: &'a mut RuntimeTurnDriver<'run>,
776    machine: &'a mut crate::TurnMachine,
777    event_tx: mpsc::Sender<RuntimeStreamEvent>,
778    cancellation: CancellationToken,
779}
780
781pub(super) struct LocalDirectEffectRunner {
782    provider: ProviderHandle,
783    attachment_store: Arc<dyn AttachmentStore>,
784}
785
786struct LocalToolBatchEffectRunner<'run> {
787    context: crate::RuntimeExecutionContext<'run>,
788    child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
789}
790
791#[async_trait::async_trait]
792trait RuntimeEffectLocalRunner: Send {
793    async fn execute(
794        self: Box<Self>,
795        envelope: RuntimeEffectEnvelope,
796    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
797}
798
799#[cfg(any(test, feature = "testing"))]
800type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
801        RuntimeEffectEnvelope,
802    ) -> Pin<
803        Box<
804            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
805                + Send
806                + 'run,
807        >,
808    > + Send
809    + 'run;
810
811#[cfg(any(test, feature = "testing"))]
812struct TestingRuntimeEffectLocalRunner<'run> {
813    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
814}
815
816type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
817        serde_json::Value,
818    ) -> Pin<
819        Box<
820            dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
821                + Send
822                + 'run,
823        >,
824    > + Send
825    + 'run;
826
827struct DurableStepLocalRunner<'run> {
828    run: Box<DurableStepLocalRunnerFn<'run>>,
829}
830
831enum RuntimeEffectLocalExecutorState<'run> {
832    Unavailable,
833    SleepOnly {
834        cancellation: CancellationToken,
835        clock: Arc<dyn crate::Clock>,
836    },
837    ExternalWaitOptions {
838        cancellation: CancellationToken,
839        deadline: Option<Instant>,
840        clock: Arc<dyn crate::Clock>,
841    },
842    Process(ProcessLocalExecution),
843    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
844}
845
846/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
847///
848/// Durable controllers may ignore it and replay their own recorded result. The
849/// default inline controller delegates to it, so local provider/tool/checkpoint
850/// work still crosses the same `execute_effect` boundary as durable controllers.
851pub struct RuntimeEffectLocalExecutor<'run> {
852    state: RuntimeEffectLocalExecutorState<'run>,
853}
854
855impl<'run> RuntimeEffectLocalExecutor<'run> {
856    pub fn unavailable() -> Self {
857        Self {
858            state: RuntimeEffectLocalExecutorState::Unavailable,
859        }
860    }
861
862    pub fn sleep(cancellation: CancellationToken) -> Self {
863        Self::sleep_with_clock(cancellation, Arc::new(crate::SystemClock))
864    }
865
866    pub fn sleep_with_clock(cancellation: CancellationToken, clock: Arc<dyn crate::Clock>) -> Self {
867        Self {
868            state: RuntimeEffectLocalExecutorState::SleepOnly {
869                cancellation,
870                clock,
871            },
872        }
873    }
874
875    pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
876        Self::await_event_with_clock(cancellation, deadline, Arc::new(crate::SystemClock))
877    }
878
879    pub fn await_event_with_clock(
880        cancellation: CancellationToken,
881        deadline: Option<Instant>,
882        clock: Arc<dyn crate::Clock>,
883    ) -> Self {
884        Self {
885            state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
886                cancellation,
887                deadline,
888                clock,
889            },
890        }
891    }
892
893    pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
894        Self::processes_with_driver(registry, None)
895    }
896
897    pub fn processes_with_driver(
898        registry: Arc<dyn ProcessRegistry>,
899        process_work_driver: Option<crate::ProcessWorkDriver>,
900    ) -> Self {
901        Self {
902            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution {
903                registry,
904                process_work_driver,
905            }),
906        }
907    }
908
909    pub fn durable_step<F, Fut>(run: F) -> Self
910    where
911        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
912        Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
913    {
914        Self {
915            state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
916                run: Box::new(move |input| {
917                    Box::pin(async move { run(input).await.map_err(Into::into) })
918                }),
919            })),
920        }
921    }
922
923    #[cfg(any(test, feature = "testing"))]
924    pub fn testing<F, Fut>(run: F) -> Self
925    where
926        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
927        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
928            + Send
929            + 'run,
930    {
931        Self {
932            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
933                TestingRuntimeEffectLocalRunner {
934                    run: Box::new(move |envelope| Box::pin(run(envelope))),
935                },
936            )),
937        }
938    }
939
940    pub(in crate::runtime) fn turn<'scope>(
941        driver: &'run mut RuntimeTurnDriver<'scope>,
942        machine: &'run mut crate::TurnMachine,
943        event_tx: mpsc::Sender<RuntimeStreamEvent>,
944        cancellation: CancellationToken,
945    ) -> Self
946    where
947        'scope: 'run,
948    {
949        Self {
950            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
951                driver,
952                machine,
953                event_tx,
954                cancellation,
955            })),
956        }
957    }
958
959    pub(in crate::runtime) fn direct(
960        provider: ProviderHandle,
961        attachment_store: Arc<dyn AttachmentStore>,
962    ) -> Self {
963        Self {
964            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
965                provider,
966                attachment_store,
967            })),
968        }
969    }
970
971    pub(crate) fn tool_batch(
972        context: crate::RuntimeExecutionContext<'run>,
973        child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
974    ) -> Self {
975        Self {
976            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalToolBatchEffectRunner {
977                context,
978                child_trace_hooks,
979            })),
980        }
981    }
982
983    pub async fn execute(
984        self,
985        envelope: RuntimeEffectEnvelope,
986    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
987        match self.state {
988            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
989            RuntimeEffectLocalExecutorState::SleepOnly {
990                cancellation,
991                clock,
992            } => execute_local_sleep(envelope, cancellation, clock.as_ref()).await,
993            RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
994                Err(RuntimeEffectControllerError::new(
995                    "runtime_effect_local_executor_mismatch",
996                    format!(
997                        "local await-event options cannot execute {} command directly",
998                        envelope.command.kind().as_str()
999                    ),
1000                ))
1001            }
1002            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
1003                "runtime_effect_local_executor_unavailable",
1004                format!(
1005                    "no local executor is available for {}",
1006                    envelope.command.kind().as_str()
1007                ),
1008            )),
1009            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
1010                "runtime_effect_local_executor_mismatch",
1011                format!(
1012                    "process executor cannot execute {} command directly",
1013                    envelope.command.kind().as_str()
1014                ),
1015            )),
1016        }
1017    }
1018
1019    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
1020        match self.state {
1021            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
1022            _ => Err(RuntimeEffectControllerError::new(
1023                "runtime_effect_local_executor_unavailable",
1024                "no process executor is available for process command",
1025            )),
1026        }
1027    }
1028
1029    fn into_await_event_options(self) -> Result<AwaitEventOptions, RuntimeEffectControllerError> {
1030        match self.state {
1031            RuntimeEffectLocalExecutorState::ExternalWaitOptions {
1032                cancellation,
1033                deadline,
1034                clock,
1035            } => Ok((cancellation, deadline, clock)),
1036            _ => Ok((CancellationToken::new(), None, Arc::new(crate::SystemClock))),
1037        }
1038    }
1039}
1040
1041#[cfg(any(test, feature = "testing"))]
1042#[async_trait::async_trait]
1043impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
1044    async fn execute(
1045        self: Box<Self>,
1046        envelope: RuntimeEffectEnvelope,
1047    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1048        (self.run)(envelope).await
1049    }
1050}
1051
1052#[async_trait::async_trait]
1053impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
1054    async fn execute(
1055        self: Box<Self>,
1056        envelope: RuntimeEffectEnvelope,
1057    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1058        match envelope.command {
1059            RuntimeEffectCommand::DurableStep { input, .. } => {
1060                let value = (self.run)(input).await?;
1061                Ok(RuntimeEffectOutcome::DurableStep { value })
1062            }
1063            command => Err(RuntimeEffectControllerError::new(
1064                "runtime_effect_local_executor_mismatch",
1065                format!(
1066                    "local durable step executor cannot execute {} command",
1067                    command.kind().as_str()
1068                ),
1069            )),
1070        }
1071    }
1072}
1073
1074#[async_trait::async_trait]
1075impl RuntimeEffectLocalRunner for LocalToolBatchEffectRunner<'_> {
1076    async fn execute(
1077        self: Box<Self>,
1078        envelope: RuntimeEffectEnvelope,
1079    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1080        match envelope.command {
1081            RuntimeEffectCommand::ToolBatch { batch } => {
1082                let outcome = self
1083                    .context
1084                    .execute_prepared_tool_batch_launches(
1085                        batch,
1086                        envelope.invocation,
1087                        self.child_trace_hooks,
1088                    )
1089                    .await?;
1090                Ok(RuntimeEffectOutcome::ToolBatch {
1091                    launches: outcome.launches,
1092                    triggers: outcome.triggers,
1093                })
1094            }
1095            RuntimeEffectCommand::ToolAttempt {
1096                call,
1097                attempt,
1098                max_attempts,
1099            } => {
1100                let child_execution_trace_hook = self.child_trace_hooks.get(&call.call_id).cloned();
1101                let outcome = self
1102                    .context
1103                    .execute_prepared_tool_attempt_effect(
1104                        call,
1105                        attempt,
1106                        max_attempts,
1107                        envelope.invocation,
1108                        child_execution_trace_hook,
1109                    )
1110                    .await?;
1111                Ok(RuntimeEffectOutcome::ToolAttempt {
1112                    launch: outcome.launch,
1113                    triggers: outcome.triggers,
1114                })
1115            }
1116            command => Err(RuntimeEffectControllerError::new(
1117                "runtime_effect_local_executor_mismatch",
1118                format!(
1119                    "local tool executor cannot execute {} command",
1120                    command.kind().as_str()
1121                ),
1122            )),
1123        }
1124    }
1125}
1126
1127#[async_trait::async_trait]
1128impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
1129    async fn execute(
1130        self: Box<Self>,
1131        envelope: RuntimeEffectEnvelope,
1132    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1133        let runner = *self;
1134        match envelope.command {
1135            RuntimeEffectCommand::LlmCall { request } => {
1136                let protocol_iteration = runner.machine.protocol_iteration();
1137                let (result, text_streamed) = runner
1138                    .driver
1139                    .run_llm_call(
1140                        Arc::new((*request).into_request(None, None)),
1141                        protocol_iteration,
1142                        envelope.invocation,
1143                        &runner.event_tx,
1144                        &runner.cancellation,
1145                    )
1146                    .await;
1147                Ok(RuntimeEffectOutcome::LlmCall {
1148                    result,
1149                    text_streamed,
1150                })
1151            }
1152            RuntimeEffectCommand::ToolBatch { batch } => {
1153                let outcome = runner
1154                    .driver
1155                    .run_tool_batch(
1156                        batch,
1157                        envelope.invocation,
1158                        &runner.event_tx,
1159                        &runner.cancellation,
1160                    )
1161                    .await?;
1162                Ok(RuntimeEffectOutcome::ToolBatch {
1163                    launches: outcome.launches,
1164                    triggers: outcome.triggers,
1165                })
1166            }
1167            RuntimeEffectCommand::ExecCode { language, code } => {
1168                let protocol_iteration = runner.machine.protocol_iteration();
1169                let messages = runner.machine.message_sequence();
1170                Ok(RuntimeEffectOutcome::ExecCode {
1171                    result: runner
1172                        .driver
1173                        .run_exec_code(
1174                            language,
1175                            &code,
1176                            messages,
1177                            protocol_iteration,
1178                            envelope.invocation,
1179                            &runner.event_tx,
1180                        )
1181                        .await,
1182                })
1183            }
1184            RuntimeEffectCommand::Checkpoint { checkpoint } => {
1185                Ok(RuntimeEffectOutcome::Checkpoint {
1186                    result: runner
1187                        .driver
1188                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1189                        .await
1190                        .map_err(RuntimeEffectControllerError::from),
1191                })
1192            }
1193            RuntimeEffectCommand::SyncExecutionEnvironment {
1194                update_machine_config,
1195            } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1196                result: runner
1197                    .driver
1198                    .refresh_execution_environment(runner.machine, update_machine_config)
1199                    .await
1200                    .map_err(|err| err.to_string()),
1201            }),
1202            RuntimeEffectCommand::Sleep { duration_ms } => {
1203                sleep_with_cancellation(
1204                    duration_ms,
1205                    &runner.cancellation,
1206                    runner.driver.host.core.clock.as_ref(),
1207                )
1208                .await?;
1209                Ok(RuntimeEffectOutcome::Sleep)
1210            }
1211            command => Err(RuntimeEffectControllerError::new(
1212                "runtime_effect_local_executor_mismatch",
1213                format!(
1214                    "local turn executor cannot execute {} command",
1215                    command.kind().as_str()
1216                ),
1217            )),
1218        }
1219    }
1220}
1221
1222#[async_trait::async_trait]
1223impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1224    async fn execute(
1225        mut self: Box<Self>,
1226        envelope: RuntimeEffectEnvelope,
1227    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1228        match envelope.command {
1229            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1230                result: self
1231                    .run_direct_llm_request((*request).into_request(
1232                        crate::session_model::transport_stream_events(&self.provider, None),
1233                        None,
1234                    ))
1235                    .await,
1236            }),
1237            RuntimeEffectCommand::Sleep { duration_ms } => {
1238                sleep_with_cancellation(
1239                    duration_ms,
1240                    &CancellationToken::new(),
1241                    &crate::SystemClock,
1242                )
1243                .await?;
1244                Ok(RuntimeEffectOutcome::Sleep)
1245            }
1246            command => Err(RuntimeEffectControllerError::new(
1247                "runtime_effect_local_executor_mismatch",
1248                format!(
1249                    "local direct executor cannot execute {} command",
1250                    command.kind().as_str()
1251                ),
1252            )),
1253        }
1254    }
1255}
1256
1257impl LocalDirectEffectRunner {
1258    async fn run_direct_llm_request(
1259        &mut self,
1260        request: CoreLlmRequest,
1261    ) -> Result<LlmResponse, LlmCallError> {
1262        let request = crate::attachments::resolve_llm_request_attachments(
1263            request,
1264            self.attachment_store.as_ref(),
1265        )
1266        .await
1267        .map_err(|err| LlmCallError {
1268            message: err.to_string(),
1269            retryable: false,
1270            raw: None,
1271            code: Some("attachment_resolution_failed".to_string()),
1272            terminal_reason: crate::LlmTerminalReason::ProviderError,
1273            request_body: None,
1274        })?;
1275        self.provider
1276            .complete(request)
1277            .await
1278            .map_err(llm_call_error_from_transport)
1279    }
1280}
1281
1282async fn execute_local_sleep(
1283    envelope: RuntimeEffectEnvelope,
1284    cancellation: CancellationToken,
1285    clock: &dyn crate::Clock,
1286) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1287    match envelope.command {
1288        RuntimeEffectCommand::Sleep { duration_ms } => {
1289            sleep_with_cancellation(duration_ms, &cancellation, clock).await?;
1290            Ok(RuntimeEffectOutcome::Sleep)
1291        }
1292        command => Err(RuntimeEffectControllerError::new(
1293            "runtime_effect_local_executor_mismatch",
1294            format!(
1295                "local sleep executor cannot execute {} command",
1296                command.kind().as_str()
1297            ),
1298        )),
1299    }
1300}
1301
1302async fn sleep_with_cancellation(
1303    duration_ms: u64,
1304    cancellation: &CancellationToken,
1305    clock: &dyn crate::Clock,
1306) -> Result<(), RuntimeEffectControllerError> {
1307    let sleep = clock.sleep(std::time::Duration::from_millis(duration_ms));
1308    tokio::pin!(sleep);
1309    tokio::select! {
1310        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1311            "runtime_effect_sleep_cancelled",
1312            "runtime effect sleep was cancelled",
1313        )),
1314        _ = &mut sleep => Ok(()),
1315    }
1316}
1317
1318// =============================================================================
1319// Default in-process effect controller
1320// =============================================================================
1321
1322/// Default in-process effect controller.
1323///
1324/// The inline controller executes local runners in process, provides in-memory
1325/// await-event resolution, and exposes durable-tool-effect semantics for local
1326/// runs. It does not make in-flight effects crash durable; workflow adapters
1327/// provide that by recording outcomes in their own history.
1328#[derive(Clone, Default)]
1329pub struct InlineRuntimeEffectController;
1330
1331#[async_trait::async_trait]
1332impl RuntimeEffectController for InlineRuntimeEffectController {
1333    fn supports_durable_effects(&self) -> bool {
1334        true
1335    }
1336
1337    async fn execute_effect(
1338        &self,
1339        envelope: RuntimeEffectEnvelope,
1340        local_executor: RuntimeEffectLocalExecutor<'_>,
1341    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1342        match envelope.command {
1343            RuntimeEffectCommand::AwaitEvent { key } => {
1344                let (cancellation, deadline, clock) = local_executor.into_await_event_options()?;
1345                let resolution = inline_await_events()
1346                    .await_resolution(&key, cancellation, deadline, clock.as_ref())
1347                    .await
1348                    .map_err(RuntimeEffectControllerError::from)?;
1349                Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1350            }
1351            RuntimeEffectCommand::Process { command } => {
1352                let execution = local_executor.into_process()?;
1353                let registry = execution.registry;
1354                let process_work_driver = execution.process_work_driver;
1355                let result = tokio::task::spawn(async move {
1356                    Self::execute_process_command(registry, process_work_driver, *command).await
1357                })
1358                .await
1359                .map_err(|err| {
1360                    RuntimeEffectControllerError::new(
1361                        "runtime_effect_process_task_join",
1362                        format!("inline process effect task failed: {err}"),
1363                    )
1364                })??;
1365                Ok(RuntimeEffectOutcome::Process { result })
1366            }
1367            _ => local_executor.execute(envelope).await,
1368        }
1369    }
1370
1371    async fn await_event_key(
1372        &self,
1373        scope: &ExecutionScope,
1374        wait: AwaitEventWaitIdentity,
1375    ) -> Result<AwaitEventKey, RuntimeError> {
1376        inline_await_events().key_for(scope, wait)
1377    }
1378
1379    async fn resolve_await_event(
1380        &self,
1381        key: &AwaitEventKey,
1382        resolution: Resolution,
1383    ) -> Result<ResolveOutcome, RuntimeError> {
1384        inline_await_events().resolve(key, resolution)
1385    }
1386
1387    async fn await_await_event(
1388        &self,
1389        key: &AwaitEventKey,
1390        cancel: CancellationToken,
1391        deadline: Option<Instant>,
1392    ) -> Result<Resolution, RuntimeError> {
1393        inline_await_events()
1394            .await_resolution(key, cancel, deadline, &crate::SystemClock)
1395            .await
1396    }
1397
1398    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1399        inline_await_events().revoke_session(session_id)
1400    }
1401}
1402
1403/// In-process deployment effect host.
1404#[derive(Clone)]
1405pub struct InlineEffectHost {
1406    controller: Arc<dyn RuntimeEffectController>,
1407}
1408
1409impl InlineEffectHost {
1410    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1411        Self { controller }
1412    }
1413}
1414
1415impl Default for InlineEffectHost {
1416    fn default() -> Self {
1417        Self::new(Arc::new(InlineRuntimeEffectController))
1418    }
1419}
1420
1421#[async_trait::async_trait]
1422impl EffectHost for InlineEffectHost {
1423    fn durability_tier(&self) -> crate::DurabilityTier {
1424        self.controller.durability_tier()
1425    }
1426
1427    fn requires_durable_attachment_store(&self) -> bool {
1428        self.controller.requires_durable_attachment_store()
1429    }
1430
1431    fn supports_durable_effects(&self) -> bool {
1432        self.controller.supports_durable_effects()
1433    }
1434
1435    fn scoped<'run>(
1436        &'run self,
1437        scope: ExecutionScope,
1438    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1439        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1440    }
1441
1442    fn scoped_static(
1443        &self,
1444        scope: ExecutionScope,
1445    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1446        Ok(Some(ScopedEffectController::shared(
1447            Arc::clone(&self.controller),
1448            scope,
1449        )?))
1450    }
1451
1452    async fn await_event_key(
1453        &self,
1454        scope: &ExecutionScope,
1455        wait: AwaitEventWaitIdentity,
1456    ) -> Result<AwaitEventKey, RuntimeError> {
1457        self.controller.await_event_key(scope, wait).await
1458    }
1459
1460    async fn resolve_await_event(
1461        &self,
1462        key: &AwaitEventKey,
1463        resolution: Resolution,
1464    ) -> Result<ResolveOutcome, RuntimeError> {
1465        self.controller.resolve_await_event(key, resolution).await
1466    }
1467
1468    async fn await_await_event(
1469        &self,
1470        key: &AwaitEventKey,
1471        cancel: CancellationToken,
1472        deadline: Option<Instant>,
1473    ) -> Result<Resolution, RuntimeError> {
1474        self.controller
1475            .await_await_event(key, cancel, deadline)
1476            .await
1477    }
1478
1479    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1480        self.controller
1481            .revoke_await_events_for_session(session_id)
1482            .await
1483    }
1484}
1485
1486impl InlineRuntimeEffectController {
1487    /// Register the process (and any handle grant) into the durable registry.
1488    ///
1489    /// The inline controller no longer runs the process here: the registry's
1490    /// non-terminal row *is* the durable work queue, and the host-owned
1491    /// [`ProcessWorkDriver`](crate::ProcessWorkDriver) is the sole executor.
1492    /// Registering the row is all this path does; the control seam drives the
1493    /// host driver after a successful start.
1494    pub(crate) async fn start_process(
1495        registry: Arc<dyn crate::ProcessRegistry>,
1496        registration: crate::ProcessRegistration,
1497        grant: Option<crate::ProcessStartGrant>,
1498    ) -> Result<ProcessRecord, PluginError> {
1499        let registration_for_record = registration.clone();
1500        let record = registry.register_process(registration_for_record).await?;
1501        if let Some(grant) = grant {
1502            registry
1503                .grant_handle(&grant.session_scope, &registration.id, grant.descriptor)
1504                .await?;
1505        }
1506        Ok(record)
1507    }
1508
1509    pub(crate) async fn request_process_cancel(
1510        &self,
1511        registry: Arc<dyn crate::ProcessRegistry>,
1512        process_id: &str,
1513        reason: Option<String>,
1514    ) -> Result<ProcessRecord, PluginError> {
1515        // Cancellation is a durable signal: the cancel event is what the
1516        // runner-run process observes, so the inline controller appends it and
1517        // no longer tracks an in-process cancellation token.
1518        registry
1519            .append_event(
1520                process_id,
1521                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1522            )
1523            .await?;
1524        registry
1525            .get_process(process_id)
1526            .await
1527            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1528    }
1529
1530    async fn execute_process_command(
1531        registry: Arc<dyn crate::ProcessRegistry>,
1532        process_work_driver: Option<crate::ProcessWorkDriver>,
1533        command: ProcessCommand,
1534    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1535        match command {
1536            ProcessCommand::Start {
1537                registration,
1538                grant,
1539                execution_context: _,
1540            } => {
1541                let record = Self::start_process(registry, registration, grant).await?;
1542                if let Some(driver) = process_work_driver.as_ref() {
1543                    driver.claim_and_run_pending("process_start").await?;
1544                }
1545                Ok(ProcessEffectOutcome::Start { record })
1546            }
1547            ProcessCommand::List {
1548                session_scope,
1549                mode,
1550            } => {
1551                let entries = match mode {
1552                    crate::ProcessListMode::Live => {
1553                        registry.list_live_handle_grants(&session_scope).await?
1554                    }
1555                    crate::ProcessListMode::All => {
1556                        registry.list_handle_grants(&session_scope).await?
1557                    }
1558                };
1559                Ok(ProcessEffectOutcome::List { entries })
1560            }
1561            ProcessCommand::Transfer {
1562                from_scope,
1563                to_scope,
1564                process_ids,
1565            } => {
1566                registry
1567                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1568                    .await?;
1569                Ok(ProcessEffectOutcome::Transfer)
1570            }
1571            ProcessCommand::DeleteSession { session_id } => {
1572                let report = registry.delete_session_process_state(&session_id).await?;
1573                Ok(ProcessEffectOutcome::DeleteSession { report })
1574            }
1575            ProcessCommand::Await { process_id } => {
1576                let output = registry.await_process(&process_id).await?;
1577                Ok(ProcessEffectOutcome::Await { output })
1578            }
1579            ProcessCommand::Cancel { process_id, reason } => {
1580                let record = InlineRuntimeEffectController
1581                    .request_process_cancel(registry, &process_id, reason)
1582                    .await?;
1583                Ok(ProcessEffectOutcome::Cancel { record })
1584            }
1585            ProcessCommand::Signal {
1586                process_id,
1587                request,
1588                ..
1589            } => {
1590                let result = registry.append_event(&process_id, request).await?;
1591                Ok(ProcessEffectOutcome::Signal {
1592                    event: result.event,
1593                })
1594            }
1595        }
1596    }
1597}
1598
1599impl std::fmt::Debug for InlineRuntimeEffectController {
1600    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1601        f.debug_struct("InlineRuntimeEffectController").finish()
1602    }
1603}