Skip to main content

lash_core/runtime/effect/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::pin::Pin;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5
6use hmac::{Hmac, Mac};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{Notify, mpsc};
9use tokio_util::sync::CancellationToken;
10
11use crate::AttachmentStore;
12use crate::LlmRequest as CoreLlmRequest;
13use crate::LlmResponse;
14use crate::ProcessRecord;
15use crate::ProcessRegistry;
16use crate::provider::ProviderHandle;
17use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
18use crate::sansio::LlmCallError;
19use crate::{PluginError, RuntimeError, RuntimeErrorCode};
20
21use super::envelope::{
22    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
23    RuntimeEffectKind, RuntimeEffectOutcome,
24};
25use super::outcome::llm_call_error_from_transport;
26
27type HmacSha256 = Hmac<sha2::Sha256>;
28
29fn inline_await_events() -> &'static AwaitEventRegistry {
30    static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
31    REGISTRY.get_or_init(AwaitEventRegistry::new)
32}
33
34// =============================================================================
35// Effect host + controller trait + scope + error
36// =============================================================================
37
38/// Stable semantic identity for one effectful runtime operation.
39///
40/// The scope is chosen by the host boundary before any nondeterministic work is
41/// planned. It is intentionally generic: Restate, an inline test host, or a
42/// future durable effect host all receive the same Lash scope vocabulary.
43#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
44#[serde(tag = "type", rename_all = "snake_case")]
45pub enum ExecutionScope {
46    Turn {
47        session_id: String,
48        turn_id: String,
49    },
50    Process {
51        process_id: String,
52    },
53    QueueDrain {
54        session_id: String,
55        drain_id: String,
56    },
57    SessionDelete {
58        session_id: String,
59    },
60    RuntimeOperation {
61        operation_id: String,
62    },
63}
64
65impl ExecutionScope {
66    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
67        Self::Turn {
68            session_id: session_id.into(),
69            turn_id: turn_id.into(),
70        }
71    }
72
73    pub fn process(process_id: impl Into<String>) -> Self {
74        Self::Process {
75            process_id: process_id.into(),
76        }
77    }
78
79    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
80        Self::QueueDrain {
81            session_id: session_id.into(),
82            drain_id: drain_id.into(),
83        }
84    }
85
86    pub fn session_delete(session_id: impl Into<String>) -> Self {
87        Self::SessionDelete {
88            session_id: session_id.into(),
89        }
90    }
91
92    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
93        Self::RuntimeOperation {
94            operation_id: operation_id.into(),
95        }
96    }
97
98    pub fn id(&self) -> &str {
99        match self {
100            Self::Turn { turn_id, .. } => turn_id,
101            Self::Process { process_id } => process_id,
102            Self::QueueDrain { drain_id, .. } => drain_id,
103            Self::SessionDelete { session_id } => session_id,
104            Self::RuntimeOperation { operation_id } => operation_id,
105        }
106    }
107
108    pub fn session_id(&self) -> Option<&str> {
109        match self {
110            Self::Turn { session_id, .. }
111            | Self::QueueDrain { session_id, .. }
112            | Self::SessionDelete { session_id } => Some(session_id),
113            Self::Process { .. } | Self::RuntimeOperation { .. } => None,
114        }
115    }
116
117    pub fn turn_id(&self) -> Option<&str> {
118        match self {
119            Self::Turn { turn_id, .. } => Some(turn_id),
120            _ => None,
121        }
122    }
123
124    pub fn validates_turn_trace_id(&self) -> bool {
125        matches!(self, Self::Turn { .. })
126    }
127
128    fn validate(&self) -> Result<(), RuntimeError> {
129        let missing = match self {
130            Self::Turn {
131                session_id,
132                turn_id,
133            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
134            Self::Process { process_id } => process_id.trim().is_empty(),
135            Self::QueueDrain {
136                session_id,
137                drain_id,
138            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
139            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
140            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
141        };
142        if missing {
143            return Err(RuntimeError::new(
144                RuntimeErrorCode::MissingExecutionScopeId,
145                "execution scopes require non-empty stable ids",
146            ));
147        }
148        Ok(())
149    }
150}
151
152#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
153#[serde(tag = "type", rename_all = "snake_case")]
154pub enum AwaitEventWaitIdentity {
155    ToolCompletion {
156        tool_call_id: String,
157    },
158    ProcessSignal {
159        process_id: String,
160        signal_name: String,
161        ordinal: u64,
162    },
163    Custom {
164        key: String,
165    },
166}
167
168impl AwaitEventWaitIdentity {
169    pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
170        Self::ToolCompletion {
171            tool_call_id: tool_call_id.into(),
172        }
173    }
174
175    pub fn process_signal(
176        process_id: impl Into<String>,
177        signal_name: impl Into<String>,
178        ordinal: u64,
179    ) -> Self {
180        Self::ProcessSignal {
181            process_id: process_id.into(),
182            signal_name: signal_name.into(),
183            ordinal,
184        }
185    }
186
187    fn validate(&self) -> Result<(), RuntimeError> {
188        let invalid = match self {
189            Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
190            Self::ProcessSignal {
191                process_id,
192                signal_name,
193                ordinal,
194            } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
195            Self::Custom { key } => key.trim().is_empty(),
196        };
197        if invalid {
198            return Err(RuntimeError::new(
199                "invalid_await_event_wait_identity",
200                "await-event wait identity requires non-empty stable ids",
201            ));
202        }
203        Ok(())
204    }
205}
206
207#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub struct AwaitEventKey {
209    pub scope: ExecutionScope,
210    pub wait: AwaitEventWaitIdentity,
211    pub key_id: String,
212    pub signature: String,
213}
214
215impl AwaitEventKey {
216    pub fn promise_key(&self) -> String {
217        format!("lash-await-event:{}", self.key_id)
218    }
219}
220
221#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
222pub struct ExternalCompletionError {
223    pub code: String,
224    pub message: String,
225    #[serde(default, skip_serializing_if = "Option::is_none")]
226    pub raw: Option<serde_json::Value>,
227}
228
229impl ExternalCompletionError {
230    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
231        Self {
232            code: code.into(),
233            message: message.into(),
234            raw: None,
235        }
236    }
237}
238
239#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
240#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
241pub enum Resolution {
242    Ok(serde_json::Value),
243    Err(ExternalCompletionError),
244    Timeout,
245    Cancelled,
246}
247
248#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
249#[serde(tag = "status", rename_all = "snake_case")]
250pub enum ResolveOutcome {
251    Accepted,
252    AlreadyResolved { terminal: Resolution },
253    UnknownOrRevoked,
254}
255
256#[derive(Debug)]
257struct AwaitEventEntry {
258    terminal: Option<Resolution>,
259    notify: Arc<Notify>,
260}
261
262#[derive(Debug)]
263struct AwaitEventRegistryState {
264    entries: HashMap<String, AwaitEventEntry>,
265    revoked_key_ids: HashSet<String>,
266    revoked_session_ids: HashSet<String>,
267}
268
269#[derive(Debug)]
270struct AwaitEventRegistry {
271    secret: Vec<u8>,
272    state: std::sync::Mutex<AwaitEventRegistryState>,
273}
274
275impl AwaitEventRegistry {
276    fn new() -> Self {
277        Self {
278            secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
279            state: std::sync::Mutex::new(AwaitEventRegistryState {
280                entries: HashMap::new(),
281                revoked_key_ids: HashSet::new(),
282                revoked_session_ids: HashSet::new(),
283            }),
284        }
285    }
286
287    fn key_for(
288        &self,
289        scope: &ExecutionScope,
290        wait: AwaitEventWaitIdentity,
291    ) -> Result<AwaitEventKey, RuntimeError> {
292        scope.validate()?;
293        wait.validate()?;
294        let key_id =
295            crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
296                RuntimeError::new(
297                    "await_event_key_hash",
298                    format!("failed to hash await-event identity: {err}"),
299                )
300            })?;
301        let signature = self.signature(scope, &wait, &key_id)?;
302        Ok(AwaitEventKey {
303            scope: scope.clone(),
304            wait,
305            key_id,
306            signature,
307        })
308    }
309
310    fn signature(
311        &self,
312        scope: &ExecutionScope,
313        wait: &AwaitEventWaitIdentity,
314        key_id: &str,
315    ) -> Result<String, RuntimeError> {
316        let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
317            RuntimeError::new(
318                "await_event_key_sign",
319                format!("failed to initialize await-event key signer: {err}"),
320            )
321        })?;
322        let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
323            RuntimeError::new(
324                "await_event_key_sign",
325                format!("failed to serialize await-event key identity: {err}"),
326            )
327        })?;
328        mac.update(&canonical);
329        Ok(format!("{:x}", mac.finalize().into_bytes()))
330    }
331
332    fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
333        let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
334        Ok(expected == key.signature)
335    }
336
337    fn resolve(
338        &self,
339        key: &AwaitEventKey,
340        resolution: Resolution,
341    ) -> Result<ResolveOutcome, RuntimeError> {
342        if !self.verify(key)? {
343            return Ok(ResolveOutcome::UnknownOrRevoked);
344        }
345        let mut state = self.state.lock().map_err(|_| {
346            RuntimeError::new(
347                "await_event_registry_poisoned",
348                "await-event registry lock poisoned",
349            )
350        })?;
351        if state.revoked_key_ids.contains(&key.key_id)
352            || key
353                .scope
354                .session_id()
355                .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
356        {
357            return Ok(ResolveOutcome::UnknownOrRevoked);
358        }
359        let entry = state
360            .entries
361            .entry(key.key_id.clone())
362            .or_insert_with(|| AwaitEventEntry {
363                terminal: None,
364                notify: Arc::new(Notify::new()),
365            });
366        if let Some(terminal) = &entry.terminal {
367            return Ok(ResolveOutcome::AlreadyResolved {
368                terminal: terminal.clone(),
369            });
370        }
371        entry.terminal = Some(resolution);
372        entry.notify.notify_waiters();
373        Ok(ResolveOutcome::Accepted)
374    }
375
376    async fn await_resolution(
377        &self,
378        key: &AwaitEventKey,
379        cancel: CancellationToken,
380        deadline: Option<Instant>,
381        clock: &dyn crate::Clock,
382    ) -> Result<Resolution, RuntimeError> {
383        if !self.verify(key)? {
384            return Err(RuntimeError::new(
385                "await_event_unknown_or_revoked",
386                "await-event key is invalid or revoked",
387            ));
388        }
389        loop {
390            let notify =
391                {
392                    let mut state = self.state.lock().map_err(|_| {
393                        RuntimeError::new(
394                            "await_event_registry_poisoned",
395                            "await-event registry lock poisoned",
396                        )
397                    })?;
398                    if state.revoked_key_ids.contains(&key.key_id)
399                        || key.scope.session_id().is_some_and(|session_id| {
400                            state.revoked_session_ids.contains(session_id)
401                        })
402                    {
403                        return Err(RuntimeError::new(
404                            "await_event_unknown_or_revoked",
405                            "await-event key is invalid or revoked",
406                        ));
407                    }
408                    let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
409                        AwaitEventEntry {
410                            terminal: None,
411                            notify: Arc::new(Notify::new()),
412                        }
413                    });
414                    if let Some(terminal) = entry.terminal.clone() {
415                        return Ok(terminal);
416                    }
417                    Arc::clone(&entry.notify)
418                };
419            if let Some(deadline) = deadline {
420                tokio::select! {
421                    _ = cancel.cancelled() => {
422                        let _ = self.resolve(key, Resolution::Cancelled)?;
423                    }
424                    _ = clock.sleep_until(deadline) => {
425                        let _ = self.resolve(key, Resolution::Timeout)?;
426                    }
427                    _ = notify.notified() => {}
428                }
429            } else {
430                tokio::select! {
431                    _ = cancel.cancelled() => {
432                        let _ = self.resolve(key, Resolution::Cancelled)?;
433                    }
434                    _ = notify.notified() => {}
435                }
436            }
437        }
438    }
439
440    fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
441        let mut state = self.state.lock().map_err(|_| {
442            RuntimeError::new(
443                "await_event_registry_poisoned",
444                "await-event registry lock poisoned",
445            )
446        })?;
447        state.revoked_session_ids.insert(session_id.to_string());
448        for entry in state.entries.values() {
449            entry.notify.notify_waiters();
450        }
451        Ok(())
452    }
453}
454
455enum ScopedEffectControllerInner<'run> {
456    Borrowed(&'run dyn RuntimeEffectController),
457    Shared(Arc<dyn RuntimeEffectController>),
458}
459
460impl Clone for ScopedEffectControllerInner<'_> {
461    fn clone(&self) -> Self {
462        match self {
463            Self::Borrowed(controller) => Self::Borrowed(*controller),
464            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
465        }
466    }
467}
468
469/// Scoped low-level controller plus the semantic execution scope it is serving.
470#[derive(Clone)]
471pub struct ScopedEffectController<'run> {
472    controller: ScopedEffectControllerInner<'run>,
473    scope: ExecutionScope,
474}
475
476impl<'run> ScopedEffectController<'run> {
477    pub fn borrowed(
478        controller: &'run dyn RuntimeEffectController,
479        scope: ExecutionScope,
480    ) -> Result<Self, RuntimeError> {
481        scope.validate()?;
482        Ok(Self {
483            controller: ScopedEffectControllerInner::Borrowed(controller),
484            scope,
485        })
486    }
487
488    pub fn shared(
489        controller: Arc<dyn RuntimeEffectController>,
490        scope: ExecutionScope,
491    ) -> Result<Self, RuntimeError> {
492        scope.validate()?;
493        Ok(Self {
494            controller: ScopedEffectControllerInner::Shared(controller),
495            scope,
496        })
497    }
498
499    pub fn controller(&self) -> &dyn RuntimeEffectController {
500        match &self.controller {
501            ScopedEffectControllerInner::Borrowed(controller) => *controller,
502            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
503        }
504    }
505
506    pub fn execution_scope(&self) -> &ExecutionScope {
507        &self.scope
508    }
509
510    pub fn scope_id(&self) -> &str {
511        self.scope.id()
512    }
513
514    pub fn turn_id(&self) -> Option<&str> {
515        self.scope.turn_id()
516    }
517}
518
519/// Deployment-level factory for scoped effect controllers.
520#[async_trait::async_trait]
521pub trait EffectHost: Send + Sync {
522    fn durability_tier(&self) -> crate::DurabilityTier {
523        crate::DurabilityTier::Inline
524    }
525
526    fn requires_durable_attachment_store(&self) -> bool {
527        false
528    }
529
530    fn supports_durable_effects(&self) -> bool {
531        false
532    }
533
534    fn scoped<'run>(
535        &'run self,
536        scope: ExecutionScope,
537    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
538
539    fn scoped_static(
540        &self,
541        _scope: ExecutionScope,
542    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
543        Ok(None)
544    }
545
546    async fn await_event_key(
547        &self,
548        _scope: &ExecutionScope,
549        _wait: AwaitEventWaitIdentity,
550    ) -> Result<AwaitEventKey, RuntimeError> {
551        Err(RuntimeError::new(
552            "await_event_unsupported",
553            "this effect host does not support await-event keys",
554        ))
555    }
556
557    async fn resolve_await_event(
558        &self,
559        _key: &AwaitEventKey,
560        _resolution: Resolution,
561    ) -> Result<ResolveOutcome, RuntimeError> {
562        Ok(ResolveOutcome::UnknownOrRevoked)
563    }
564
565    async fn await_await_event(
566        &self,
567        _key: &AwaitEventKey,
568        _cancel: CancellationToken,
569        _deadline: Option<Instant>,
570    ) -> Result<Resolution, RuntimeError> {
571        Err(RuntimeError::new(
572            "await_event_unsupported",
573            "this effect host does not support await-event waits",
574        ))
575    }
576
577    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
578        Ok(())
579    }
580}
581
582/// Boundary for nondeterministic runtime work.
583#[async_trait::async_trait]
584pub trait RuntimeEffectController: Send + Sync {
585    /// Durability tier this controller provides; defaults to
586    /// [`DurabilityTier::Inline`].
587    fn durability_tier(&self) -> crate::DurabilityTier {
588        crate::DurabilityTier::Inline
589    }
590
591    fn requires_durable_attachment_store(&self) -> bool {
592        false
593    }
594
595    fn supports_durable_effects(&self) -> bool {
596        false
597    }
598
599    async fn execute_effect(
600        &self,
601        envelope: RuntimeEffectEnvelope,
602        local_executor: RuntimeEffectLocalExecutor<'_>,
603    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
604
605    async fn await_event_key(
606        &self,
607        _scope: &ExecutionScope,
608        _wait: AwaitEventWaitIdentity,
609    ) -> Result<AwaitEventKey, RuntimeError> {
610        Err(RuntimeError::new(
611            "await_event_unsupported",
612            "this effect controller does not support await-event keys",
613        ))
614    }
615
616    async fn resolve_await_event(
617        &self,
618        _key: &AwaitEventKey,
619        _resolution: Resolution,
620    ) -> Result<ResolveOutcome, RuntimeError> {
621        Ok(ResolveOutcome::UnknownOrRevoked)
622    }
623
624    async fn await_await_event(
625        &self,
626        _key: &AwaitEventKey,
627        _cancel: CancellationToken,
628        _deadline: Option<Instant>,
629    ) -> Result<Resolution, RuntimeError> {
630        Err(RuntimeError::new(
631            "await_event_unsupported",
632            "this effect controller does not support await-event waits",
633        ))
634    }
635
636    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
637        Ok(())
638    }
639}
640
641/// Runtime-internal handle for effect-controller references carried through
642/// per-turn execution contexts.
643#[derive(Clone)]
644pub(crate) enum RuntimeEffectControllerHandle<'run> {
645    Borrowed(ScopedEffectController<'run>),
646    #[cfg(any(test, feature = "testing"))]
647    Shared {
648        controller: Arc<dyn RuntimeEffectController>,
649        scope: ExecutionScope,
650    },
651}
652
653impl<'run> RuntimeEffectControllerHandle<'run> {
654    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
655        Self::Borrowed(scoped)
656    }
657
658    #[cfg(any(test, feature = "testing"))]
659    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
660        Self::Shared {
661            controller,
662            scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
663        }
664    }
665
666    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
667        match self {
668            Self::Borrowed(scoped) => scoped.controller(),
669            #[cfg(any(test, feature = "testing"))]
670            Self::Shared { controller, .. } => controller.as_ref(),
671        }
672    }
673
674    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
675        match self {
676            Self::Borrowed(scoped) => scoped.clone(),
677            #[cfg(any(test, feature = "testing"))]
678            Self::Shared { controller, scope } => {
679                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
680                    .expect("runtime effect controller handle carries a valid scope")
681            }
682        }
683    }
684
685    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
686        self.clone()
687    }
688}
689
690#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
691#[error("{code}: {message}")]
692pub struct RuntimeEffectControllerError {
693    pub code: String,
694    pub message: String,
695}
696
697impl RuntimeEffectControllerError {
698    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
699        Self {
700            code: code.into(),
701            message: message.into(),
702        }
703    }
704
705    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
706        Self::new(
707            "runtime_effect_wrong_outcome",
708            format!(
709                "expected {} outcome, got {}",
710                expected.as_str(),
711                actual.as_str()
712            ),
713        )
714    }
715
716    pub(crate) fn into_runtime_error(self) -> RuntimeError {
717        RuntimeError::new(self.code, self.message)
718    }
719}
720
721impl From<RuntimeError> for RuntimeEffectControllerError {
722    fn from(err: RuntimeError) -> Self {
723        Self::new(err.code.as_str(), err.message)
724    }
725}
726
727impl From<PluginError> for RuntimeEffectControllerError {
728    fn from(err: PluginError) -> Self {
729        Self::new("plugin", err.to_string())
730    }
731}
732
733impl From<crate::StoreError> for RuntimeEffectControllerError {
734    fn from(err: crate::StoreError) -> Self {
735        Self::new("runtime_store", err.to_string())
736    }
737}
738
739// =============================================================================
740// Local executor (per-effect borrowed runner state)
741// =============================================================================
742
743#[async_trait::async_trait]
744pub(crate) trait ProcessRunner: Send + Sync {
745    async fn run_process(
746        &self,
747        registration: crate::ProcessRegistration,
748        execution_context: crate::ProcessExecutionContext,
749        registry: Arc<dyn ProcessRegistry>,
750        scoped_effect_controller: crate::ScopedEffectController<'_>,
751        cancellation: CancellationToken,
752    ) -> crate::ProcessAwaitOutput;
753}
754
755pub struct ProcessLocalExecution {
756    pub registry: Arc<dyn ProcessRegistry>,
757    pub process_work_driver: Option<crate::ProcessWorkDriver>,
758}
759
760pub(super) struct LocalTurnEffectRunner<'a, 'run> {
761    driver: &'a mut RuntimeTurnDriver<'run>,
762    machine: &'a mut crate::TurnMachine,
763    event_tx: mpsc::Sender<RuntimeStreamEvent>,
764    cancellation: CancellationToken,
765}
766
767pub(super) struct LocalDirectEffectRunner {
768    provider: ProviderHandle,
769    attachment_store: Arc<dyn AttachmentStore>,
770}
771
772struct LocalToolBatchEffectRunner<'run> {
773    context: crate::RuntimeExecutionContext<'run>,
774    child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
775}
776
777#[async_trait::async_trait]
778trait RuntimeEffectLocalRunner: Send {
779    async fn execute(
780        self: Box<Self>,
781        envelope: RuntimeEffectEnvelope,
782    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
783}
784
785#[cfg(any(test, feature = "testing"))]
786type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
787        RuntimeEffectEnvelope,
788    ) -> Pin<
789        Box<
790            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
791                + Send
792                + 'run,
793        >,
794    > + Send
795    + 'run;
796
797#[cfg(any(test, feature = "testing"))]
798struct TestingRuntimeEffectLocalRunner<'run> {
799    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
800}
801
802type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
803        serde_json::Value,
804    ) -> Pin<
805        Box<
806            dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
807                + Send
808                + 'run,
809        >,
810    > + Send
811    + 'run;
812
813struct DurableStepLocalRunner<'run> {
814    run: Box<DurableStepLocalRunnerFn<'run>>,
815}
816
817enum RuntimeEffectLocalExecutorState<'run> {
818    Unavailable,
819    SleepOnly {
820        cancellation: CancellationToken,
821        clock: Arc<dyn crate::Clock>,
822    },
823    ExternalWaitOptions {
824        cancellation: CancellationToken,
825        deadline: Option<Instant>,
826        clock: Arc<dyn crate::Clock>,
827    },
828    Process(ProcessLocalExecution),
829    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
830}
831
832/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
833///
834/// Durable controllers may ignore it and replay their own recorded result. The
835/// default inline controller delegates to it, so local provider/tool/checkpoint
836/// work still crosses the same `execute_effect` boundary as durable controllers.
837pub struct RuntimeEffectLocalExecutor<'run> {
838    state: RuntimeEffectLocalExecutorState<'run>,
839}
840
841impl<'run> RuntimeEffectLocalExecutor<'run> {
842    pub fn unavailable() -> Self {
843        Self {
844            state: RuntimeEffectLocalExecutorState::Unavailable,
845        }
846    }
847
848    pub fn sleep(cancellation: CancellationToken) -> Self {
849        Self::sleep_with_clock(cancellation, Arc::new(crate::SystemClock))
850    }
851
852    pub fn sleep_with_clock(cancellation: CancellationToken, clock: Arc<dyn crate::Clock>) -> Self {
853        Self {
854            state: RuntimeEffectLocalExecutorState::SleepOnly {
855                cancellation,
856                clock,
857            },
858        }
859    }
860
861    pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
862        Self::await_event_with_clock(cancellation, deadline, Arc::new(crate::SystemClock))
863    }
864
865    pub fn await_event_with_clock(
866        cancellation: CancellationToken,
867        deadline: Option<Instant>,
868        clock: Arc<dyn crate::Clock>,
869    ) -> Self {
870        Self {
871            state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
872                cancellation,
873                deadline,
874                clock,
875            },
876        }
877    }
878
879    pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
880        Self::processes_with_driver(registry, None)
881    }
882
883    pub fn processes_with_driver(
884        registry: Arc<dyn ProcessRegistry>,
885        process_work_driver: Option<crate::ProcessWorkDriver>,
886    ) -> Self {
887        Self {
888            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution {
889                registry,
890                process_work_driver,
891            }),
892        }
893    }
894
895    pub fn durable_step<F, Fut>(run: F) -> Self
896    where
897        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
898        Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
899    {
900        Self {
901            state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
902                run: Box::new(move |input| {
903                    Box::pin(async move { run(input).await.map_err(Into::into) })
904                }),
905            })),
906        }
907    }
908
909    #[cfg(any(test, feature = "testing"))]
910    pub fn testing<F, Fut>(run: F) -> Self
911    where
912        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
913        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
914            + Send
915            + 'run,
916    {
917        Self {
918            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
919                TestingRuntimeEffectLocalRunner {
920                    run: Box::new(move |envelope| Box::pin(run(envelope))),
921                },
922            )),
923        }
924    }
925
926    pub(in crate::runtime) fn turn<'scope>(
927        driver: &'run mut RuntimeTurnDriver<'scope>,
928        machine: &'run mut crate::TurnMachine,
929        event_tx: mpsc::Sender<RuntimeStreamEvent>,
930        cancellation: CancellationToken,
931    ) -> Self
932    where
933        'scope: 'run,
934    {
935        Self {
936            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
937                driver,
938                machine,
939                event_tx,
940                cancellation,
941            })),
942        }
943    }
944
945    pub(in crate::runtime) fn direct(
946        provider: ProviderHandle,
947        attachment_store: Arc<dyn AttachmentStore>,
948    ) -> Self {
949        Self {
950            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
951                provider,
952                attachment_store,
953            })),
954        }
955    }
956
957    pub(crate) fn tool_batch(
958        context: crate::RuntimeExecutionContext<'run>,
959        child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
960    ) -> Self {
961        Self {
962            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalToolBatchEffectRunner {
963                context,
964                child_trace_hooks,
965            })),
966        }
967    }
968
969    pub async fn execute(
970        self,
971        envelope: RuntimeEffectEnvelope,
972    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
973        match self.state {
974            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
975            RuntimeEffectLocalExecutorState::SleepOnly {
976                cancellation,
977                clock,
978            } => execute_local_sleep(envelope, cancellation, clock.as_ref()).await,
979            RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
980                Err(RuntimeEffectControllerError::new(
981                    "runtime_effect_local_executor_mismatch",
982                    format!(
983                        "local await-event options cannot execute {} command directly",
984                        envelope.command.kind().as_str()
985                    ),
986                ))
987            }
988            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
989                "runtime_effect_local_executor_unavailable",
990                format!(
991                    "no local executor is available for {}",
992                    envelope.command.kind().as_str()
993                ),
994            )),
995            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
996                "runtime_effect_local_executor_mismatch",
997                format!(
998                    "process executor cannot execute {} command directly",
999                    envelope.command.kind().as_str()
1000                ),
1001            )),
1002        }
1003    }
1004
1005    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
1006        match self.state {
1007            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
1008            _ => Err(RuntimeEffectControllerError::new(
1009                "runtime_effect_local_executor_unavailable",
1010                "no process executor is available for process command",
1011            )),
1012        }
1013    }
1014
1015    fn into_await_event_options(
1016        self,
1017    ) -> Result<
1018        (CancellationToken, Option<Instant>, Arc<dyn crate::Clock>),
1019        RuntimeEffectControllerError,
1020    > {
1021        match self.state {
1022            RuntimeEffectLocalExecutorState::ExternalWaitOptions {
1023                cancellation,
1024                deadline,
1025                clock,
1026            } => Ok((cancellation, deadline, clock)),
1027            _ => Ok((CancellationToken::new(), None, Arc::new(crate::SystemClock))),
1028        }
1029    }
1030}
1031
1032#[cfg(any(test, feature = "testing"))]
1033#[async_trait::async_trait]
1034impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
1035    async fn execute(
1036        self: Box<Self>,
1037        envelope: RuntimeEffectEnvelope,
1038    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1039        (self.run)(envelope).await
1040    }
1041}
1042
1043#[async_trait::async_trait]
1044impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
1045    async fn execute(
1046        self: Box<Self>,
1047        envelope: RuntimeEffectEnvelope,
1048    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1049        match envelope.command {
1050            RuntimeEffectCommand::DurableStep { input, .. } => {
1051                let value = (self.run)(input).await?;
1052                Ok(RuntimeEffectOutcome::DurableStep { value })
1053            }
1054            command => Err(RuntimeEffectControllerError::new(
1055                "runtime_effect_local_executor_mismatch",
1056                format!(
1057                    "local durable step executor cannot execute {} command",
1058                    command.kind().as_str()
1059                ),
1060            )),
1061        }
1062    }
1063}
1064
1065#[async_trait::async_trait]
1066impl RuntimeEffectLocalRunner for LocalToolBatchEffectRunner<'_> {
1067    async fn execute(
1068        self: Box<Self>,
1069        envelope: RuntimeEffectEnvelope,
1070    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1071        match envelope.command {
1072            RuntimeEffectCommand::ToolBatch { batch } => {
1073                let outcome = self
1074                    .context
1075                    .execute_prepared_tool_batch_launches(
1076                        batch,
1077                        envelope.invocation,
1078                        self.child_trace_hooks,
1079                    )
1080                    .await?;
1081                Ok(RuntimeEffectOutcome::ToolBatch {
1082                    launches: outcome.launches,
1083                    triggers: outcome.triggers,
1084                })
1085            }
1086            command => Err(RuntimeEffectControllerError::new(
1087                "runtime_effect_local_executor_mismatch",
1088                format!(
1089                    "local tool-batch executor cannot execute {} command",
1090                    command.kind().as_str()
1091                ),
1092            )),
1093        }
1094    }
1095}
1096
1097#[async_trait::async_trait]
1098impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
1099    async fn execute(
1100        self: Box<Self>,
1101        envelope: RuntimeEffectEnvelope,
1102    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1103        let runner = *self;
1104        match envelope.command {
1105            RuntimeEffectCommand::LlmCall { request } => {
1106                let protocol_iteration = runner.machine.protocol_iteration();
1107                let (result, text_streamed) = runner
1108                    .driver
1109                    .run_llm_call(
1110                        Arc::new((*request).into_request(None, None)),
1111                        protocol_iteration,
1112                        envelope.invocation,
1113                        &runner.event_tx,
1114                        &runner.cancellation,
1115                    )
1116                    .await;
1117                Ok(RuntimeEffectOutcome::LlmCall {
1118                    result,
1119                    text_streamed,
1120                })
1121            }
1122            RuntimeEffectCommand::ToolCall { call } => {
1123                let tool_name = call.tool_name.clone();
1124                let mut outcome = runner
1125                    .driver
1126                    .run_tool_calls(
1127                        vec![(call, envelope.invocation)],
1128                        &runner.event_tx,
1129                        &runner.cancellation,
1130                    )
1131                    .await?;
1132                let launch = outcome.launches.pop().ok_or_else(|| {
1133                    RuntimeEffectControllerError::new(
1134                        "tool_result_missing",
1135                        format!("tool `{tool_name}` completed without a launch result"),
1136                    )
1137                })?;
1138                Ok(RuntimeEffectOutcome::ToolCall {
1139                    launch,
1140                    triggers: outcome.triggers,
1141                })
1142            }
1143            RuntimeEffectCommand::ToolBatch { batch } => {
1144                let outcome = runner
1145                    .driver
1146                    .run_tool_batch(
1147                        batch,
1148                        envelope.invocation,
1149                        &runner.event_tx,
1150                        &runner.cancellation,
1151                    )
1152                    .await?;
1153                Ok(RuntimeEffectOutcome::ToolBatch {
1154                    launches: outcome.launches,
1155                    triggers: outcome.triggers,
1156                })
1157            }
1158            RuntimeEffectCommand::ExecCode { language, code } => {
1159                let protocol_iteration = runner.machine.protocol_iteration();
1160                let messages = runner.machine.message_sequence();
1161                Ok(RuntimeEffectOutcome::ExecCode {
1162                    result: runner
1163                        .driver
1164                        .run_exec_code(
1165                            language,
1166                            &code,
1167                            messages,
1168                            protocol_iteration,
1169                            envelope.invocation,
1170                            &runner.event_tx,
1171                        )
1172                        .await,
1173                })
1174            }
1175            RuntimeEffectCommand::Checkpoint { checkpoint } => {
1176                Ok(RuntimeEffectOutcome::Checkpoint {
1177                    result: runner
1178                        .driver
1179                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1180                        .await
1181                        .map_err(RuntimeEffectControllerError::from),
1182                })
1183            }
1184            RuntimeEffectCommand::SyncExecutionEnvironment {
1185                update_machine_config,
1186            } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1187                result: runner
1188                    .driver
1189                    .refresh_execution_environment(runner.machine, update_machine_config)
1190                    .await
1191                    .map_err(|err| err.to_string()),
1192            }),
1193            RuntimeEffectCommand::Sleep { duration_ms } => {
1194                sleep_with_cancellation(
1195                    duration_ms,
1196                    &runner.cancellation,
1197                    runner.driver.host.core.clock.as_ref(),
1198                )
1199                .await?;
1200                Ok(RuntimeEffectOutcome::Sleep)
1201            }
1202            command => Err(RuntimeEffectControllerError::new(
1203                "runtime_effect_local_executor_mismatch",
1204                format!(
1205                    "local turn executor cannot execute {} command",
1206                    command.kind().as_str()
1207                ),
1208            )),
1209        }
1210    }
1211}
1212
1213#[async_trait::async_trait]
1214impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1215    async fn execute(
1216        mut self: Box<Self>,
1217        envelope: RuntimeEffectEnvelope,
1218    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1219        match envelope.command {
1220            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1221                result: self
1222                    .run_direct_llm_request((*request).into_request(
1223                        crate::session_model::transport_stream_events(&self.provider, None),
1224                        None,
1225                    ))
1226                    .await,
1227            }),
1228            RuntimeEffectCommand::Sleep { duration_ms } => {
1229                sleep_with_cancellation(
1230                    duration_ms,
1231                    &CancellationToken::new(),
1232                    &crate::SystemClock,
1233                )
1234                .await?;
1235                Ok(RuntimeEffectOutcome::Sleep)
1236            }
1237            command => Err(RuntimeEffectControllerError::new(
1238                "runtime_effect_local_executor_mismatch",
1239                format!(
1240                    "local direct executor cannot execute {} command",
1241                    command.kind().as_str()
1242                ),
1243            )),
1244        }
1245    }
1246}
1247
1248impl LocalDirectEffectRunner {
1249    async fn run_direct_llm_request(
1250        &mut self,
1251        request: CoreLlmRequest,
1252    ) -> Result<LlmResponse, LlmCallError> {
1253        let request = crate::attachments::resolve_llm_request_attachments(
1254            request,
1255            self.attachment_store.as_ref(),
1256        )
1257        .await
1258        .map_err(|err| LlmCallError {
1259            message: err.to_string(),
1260            retryable: false,
1261            raw: None,
1262            code: Some("attachment_resolution_failed".to_string()),
1263            terminal_reason: crate::LlmTerminalReason::ProviderError,
1264            request_body: None,
1265        })?;
1266        self.provider
1267            .complete(request)
1268            .await
1269            .map_err(llm_call_error_from_transport)
1270    }
1271}
1272
1273async fn execute_local_sleep(
1274    envelope: RuntimeEffectEnvelope,
1275    cancellation: CancellationToken,
1276    clock: &dyn crate::Clock,
1277) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1278    match envelope.command {
1279        RuntimeEffectCommand::Sleep { duration_ms } => {
1280            sleep_with_cancellation(duration_ms, &cancellation, clock).await?;
1281            Ok(RuntimeEffectOutcome::Sleep)
1282        }
1283        command => Err(RuntimeEffectControllerError::new(
1284            "runtime_effect_local_executor_mismatch",
1285            format!(
1286                "local sleep executor cannot execute {} command",
1287                command.kind().as_str()
1288            ),
1289        )),
1290    }
1291}
1292
1293async fn sleep_with_cancellation(
1294    duration_ms: u64,
1295    cancellation: &CancellationToken,
1296    clock: &dyn crate::Clock,
1297) -> Result<(), RuntimeEffectControllerError> {
1298    let sleep = clock.sleep(std::time::Duration::from_millis(duration_ms));
1299    tokio::pin!(sleep);
1300    tokio::select! {
1301        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1302            "runtime_effect_sleep_cancelled",
1303            "runtime effect sleep was cancelled",
1304        )),
1305        _ = &mut sleep => Ok(()),
1306    }
1307}
1308
1309// =============================================================================
1310// Default in-process effect controller
1311// =============================================================================
1312
1313/// Default in-process effect controller.
1314///
1315/// The inline controller executes local runners in process, provides in-memory
1316/// await-event resolution, and exposes durable-tool-effect semantics for local
1317/// runs. It does not make in-flight effects crash durable; workflow adapters
1318/// provide that by recording outcomes in their own history.
1319#[derive(Clone, Default)]
1320pub struct InlineRuntimeEffectController;
1321
1322#[async_trait::async_trait]
1323impl RuntimeEffectController for InlineRuntimeEffectController {
1324    fn supports_durable_effects(&self) -> bool {
1325        true
1326    }
1327
1328    async fn execute_effect(
1329        &self,
1330        envelope: RuntimeEffectEnvelope,
1331        local_executor: RuntimeEffectLocalExecutor<'_>,
1332    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1333        match envelope.command {
1334            RuntimeEffectCommand::AwaitEvent { key } => {
1335                let (cancellation, deadline, clock) = local_executor.into_await_event_options()?;
1336                let resolution = inline_await_events()
1337                    .await_resolution(&key, cancellation, deadline, clock.as_ref())
1338                    .await
1339                    .map_err(RuntimeEffectControllerError::from)?;
1340                Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1341            }
1342            RuntimeEffectCommand::Process { command } => {
1343                let execution = local_executor.into_process()?;
1344                let registry = execution.registry;
1345                let process_work_driver = execution.process_work_driver;
1346                let result = tokio::task::spawn(async move {
1347                    Self::execute_process_command(registry, process_work_driver, *command).await
1348                })
1349                .await
1350                .map_err(|err| {
1351                    RuntimeEffectControllerError::new(
1352                        "runtime_effect_process_task_join",
1353                        format!("inline process effect task failed: {err}"),
1354                    )
1355                })??;
1356                Ok(RuntimeEffectOutcome::Process { result })
1357            }
1358            _ => local_executor.execute(envelope).await,
1359        }
1360    }
1361
1362    async fn await_event_key(
1363        &self,
1364        scope: &ExecutionScope,
1365        wait: AwaitEventWaitIdentity,
1366    ) -> Result<AwaitEventKey, RuntimeError> {
1367        inline_await_events().key_for(scope, wait)
1368    }
1369
1370    async fn resolve_await_event(
1371        &self,
1372        key: &AwaitEventKey,
1373        resolution: Resolution,
1374    ) -> Result<ResolveOutcome, RuntimeError> {
1375        inline_await_events().resolve(key, resolution)
1376    }
1377
1378    async fn await_await_event(
1379        &self,
1380        key: &AwaitEventKey,
1381        cancel: CancellationToken,
1382        deadline: Option<Instant>,
1383    ) -> Result<Resolution, RuntimeError> {
1384        inline_await_events()
1385            .await_resolution(key, cancel, deadline, &crate::SystemClock)
1386            .await
1387    }
1388
1389    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1390        inline_await_events().revoke_session(session_id)
1391    }
1392}
1393
1394/// In-process deployment effect host.
1395#[derive(Clone)]
1396pub struct InlineEffectHost {
1397    controller: Arc<dyn RuntimeEffectController>,
1398}
1399
1400impl InlineEffectHost {
1401    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1402        Self { controller }
1403    }
1404}
1405
1406impl Default for InlineEffectHost {
1407    fn default() -> Self {
1408        Self::new(Arc::new(InlineRuntimeEffectController))
1409    }
1410}
1411
1412#[async_trait::async_trait]
1413impl EffectHost for InlineEffectHost {
1414    fn durability_tier(&self) -> crate::DurabilityTier {
1415        self.controller.durability_tier()
1416    }
1417
1418    fn requires_durable_attachment_store(&self) -> bool {
1419        self.controller.requires_durable_attachment_store()
1420    }
1421
1422    fn supports_durable_effects(&self) -> bool {
1423        self.controller.supports_durable_effects()
1424    }
1425
1426    fn scoped<'run>(
1427        &'run self,
1428        scope: ExecutionScope,
1429    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1430        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1431    }
1432
1433    fn scoped_static(
1434        &self,
1435        scope: ExecutionScope,
1436    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1437        Ok(Some(ScopedEffectController::shared(
1438            Arc::clone(&self.controller),
1439            scope,
1440        )?))
1441    }
1442
1443    async fn await_event_key(
1444        &self,
1445        scope: &ExecutionScope,
1446        wait: AwaitEventWaitIdentity,
1447    ) -> Result<AwaitEventKey, RuntimeError> {
1448        self.controller.await_event_key(scope, wait).await
1449    }
1450
1451    async fn resolve_await_event(
1452        &self,
1453        key: &AwaitEventKey,
1454        resolution: Resolution,
1455    ) -> Result<ResolveOutcome, RuntimeError> {
1456        self.controller.resolve_await_event(key, resolution).await
1457    }
1458
1459    async fn await_await_event(
1460        &self,
1461        key: &AwaitEventKey,
1462        cancel: CancellationToken,
1463        deadline: Option<Instant>,
1464    ) -> Result<Resolution, RuntimeError> {
1465        self.controller
1466            .await_await_event(key, cancel, deadline)
1467            .await
1468    }
1469
1470    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1471        self.controller
1472            .revoke_await_events_for_session(session_id)
1473            .await
1474    }
1475}
1476
1477impl InlineRuntimeEffectController {
1478    /// Register the process (and any handle grant) into the durable registry.
1479    ///
1480    /// The inline controller no longer runs the process here: the registry's
1481    /// non-terminal row *is* the durable work queue, and the host-owned
1482    /// [`ProcessWorkDriver`](crate::ProcessWorkDriver) is the sole executor.
1483    /// Registering the row is all this path does; the control seam drives the
1484    /// host driver after a successful start.
1485    pub(crate) async fn start_process(
1486        registry: Arc<dyn crate::ProcessRegistry>,
1487        registration: crate::ProcessRegistration,
1488        grant: Option<crate::ProcessStartGrant>,
1489    ) -> Result<ProcessRecord, PluginError> {
1490        let registration_for_record = registration.clone();
1491        let record = registry.register_process(registration_for_record).await?;
1492        if let Some(grant) = grant {
1493            registry
1494                .grant_handle(&grant.session_scope, &registration.id, grant.descriptor)
1495                .await?;
1496        }
1497        Ok(record)
1498    }
1499
1500    pub(crate) async fn request_process_cancel(
1501        &self,
1502        registry: Arc<dyn crate::ProcessRegistry>,
1503        process_id: &str,
1504        reason: Option<String>,
1505    ) -> Result<ProcessRecord, PluginError> {
1506        // Cancellation is a durable signal: the cancel event is what the
1507        // runner-run process observes, so the inline controller appends it and
1508        // no longer tracks an in-process cancellation token.
1509        registry
1510            .append_event(
1511                process_id,
1512                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1513            )
1514            .await?;
1515        registry
1516            .get_process(process_id)
1517            .await
1518            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1519    }
1520
1521    async fn execute_process_command(
1522        registry: Arc<dyn crate::ProcessRegistry>,
1523        process_work_driver: Option<crate::ProcessWorkDriver>,
1524        command: ProcessCommand,
1525    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1526        match command {
1527            ProcessCommand::Start {
1528                registration,
1529                grant,
1530                execution_context: _,
1531            } => {
1532                let record = Self::start_process(registry, registration, grant).await?;
1533                if let Some(driver) = process_work_driver.as_ref() {
1534                    driver.claim_and_run_pending("process_start").await?;
1535                }
1536                Ok(ProcessEffectOutcome::Start { record })
1537            }
1538            ProcessCommand::List {
1539                session_scope,
1540                mode,
1541            } => {
1542                let entries = match mode {
1543                    crate::ProcessListMode::Live => {
1544                        registry.list_live_handle_grants(&session_scope).await?
1545                    }
1546                    crate::ProcessListMode::All => {
1547                        registry.list_handle_grants(&session_scope).await?
1548                    }
1549                };
1550                Ok(ProcessEffectOutcome::List { entries })
1551            }
1552            ProcessCommand::Transfer {
1553                from_scope,
1554                to_scope,
1555                process_ids,
1556            } => {
1557                registry
1558                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1559                    .await?;
1560                Ok(ProcessEffectOutcome::Transfer)
1561            }
1562            ProcessCommand::DeleteSession { session_id } => {
1563                let report = registry.delete_session_process_state(&session_id).await?;
1564                Ok(ProcessEffectOutcome::DeleteSession { report })
1565            }
1566            ProcessCommand::Await { process_id } => {
1567                let output = registry.await_process(&process_id).await?;
1568                Ok(ProcessEffectOutcome::Await { output })
1569            }
1570            ProcessCommand::Cancel { process_id, reason } => {
1571                let record = InlineRuntimeEffectController
1572                    .request_process_cancel(registry, &process_id, reason)
1573                    .await?;
1574                Ok(ProcessEffectOutcome::Cancel { record })
1575            }
1576            ProcessCommand::Signal {
1577                process_id,
1578                request,
1579                ..
1580            } => {
1581                let result = registry.append_event(&process_id, request).await?;
1582                Ok(ProcessEffectOutcome::Signal {
1583                    event: result.event,
1584                })
1585            }
1586        }
1587    }
1588}
1589
1590impl std::fmt::Debug for InlineRuntimeEffectController {
1591    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1592        f.debug_struct("InlineRuntimeEffectController").finish()
1593    }
1594}