Skip to main content

lash_core/runtime/effect/
executor.rs

1#[cfg(any(test, feature = "testing"))]
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7use tokio_util::sync::CancellationToken;
8
9use crate::AttachmentStore;
10use crate::LlmRequest as CoreLlmRequest;
11use crate::LlmResponse;
12use crate::ProcessRecord;
13use crate::ProcessRegistry;
14use crate::provider::ProviderHandle;
15use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
16use crate::sansio::LlmCallError;
17use crate::{PluginError, RuntimeError, RuntimeErrorCode};
18
19use super::envelope::{
20    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
21    RuntimeEffectKind, RuntimeEffectOutcome,
22};
23use super::outcome::llm_call_error_from_transport;
24
25// =============================================================================
26// Effect host + controller trait + scope + error
27// =============================================================================
28
29/// Stable semantic identity for one effectful runtime operation.
30///
31/// The scope is chosen by the host boundary before any nondeterministic work is
32/// planned. It is intentionally generic: Restate, an inline test host, or a
33/// future durable effect host all receive the same Lash scope vocabulary.
34#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(tag = "type", rename_all = "snake_case")]
36pub enum EffectScope {
37    Turn {
38        session_id: String,
39        turn_id: String,
40    },
41    Process {
42        process_id: String,
43    },
44    QueueDrain {
45        session_id: String,
46        drain_id: String,
47    },
48    SessionDelete {
49        session_id: String,
50    },
51    RuntimeOperation {
52        operation_id: String,
53    },
54}
55
56impl EffectScope {
57    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
58        Self::Turn {
59            session_id: session_id.into(),
60            turn_id: turn_id.into(),
61        }
62    }
63
64    pub fn process(process_id: impl Into<String>) -> Self {
65        Self::Process {
66            process_id: process_id.into(),
67        }
68    }
69
70    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
71        Self::QueueDrain {
72            session_id: session_id.into(),
73            drain_id: drain_id.into(),
74        }
75    }
76
77    pub fn session_delete(session_id: impl Into<String>) -> Self {
78        Self::SessionDelete {
79            session_id: session_id.into(),
80        }
81    }
82
83    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
84        Self::RuntimeOperation {
85            operation_id: operation_id.into(),
86        }
87    }
88
89    pub fn id(&self) -> &str {
90        match self {
91            Self::Turn { turn_id, .. } => turn_id,
92            Self::Process { process_id } => process_id,
93            Self::QueueDrain { drain_id, .. } => drain_id,
94            Self::SessionDelete { session_id } => session_id,
95            Self::RuntimeOperation { operation_id } => operation_id,
96        }
97    }
98
99    pub fn session_id(&self) -> Option<&str> {
100        match self {
101            Self::Turn { session_id, .. }
102            | Self::QueueDrain { session_id, .. }
103            | Self::SessionDelete { session_id } => Some(session_id),
104            Self::Process { .. } | Self::RuntimeOperation { .. } => None,
105        }
106    }
107
108    pub fn turn_id(&self) -> Option<&str> {
109        match self {
110            Self::Turn { turn_id, .. } => Some(turn_id),
111            _ => None,
112        }
113    }
114
115    pub fn validates_turn_trace_id(&self) -> bool {
116        matches!(self, Self::Turn { .. })
117    }
118
119    fn validate(&self) -> Result<(), RuntimeError> {
120        let missing = match self {
121            Self::Turn {
122                session_id,
123                turn_id,
124            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
125            Self::Process { process_id } => process_id.trim().is_empty(),
126            Self::QueueDrain {
127                session_id,
128                drain_id,
129            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
130            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
131            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
132        };
133        if missing {
134            return Err(RuntimeError::new(
135                RuntimeErrorCode::MissingEffectScopeId,
136                "effect scopes require non-empty stable ids",
137            ));
138        }
139        Ok(())
140    }
141}
142
143enum ScopedEffectControllerInner<'run> {
144    Borrowed(&'run dyn RuntimeEffectController),
145    Shared(Arc<dyn RuntimeEffectController>),
146}
147
148impl Clone for ScopedEffectControllerInner<'_> {
149    fn clone(&self) -> Self {
150        match self {
151            Self::Borrowed(controller) => Self::Borrowed(*controller),
152            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
153        }
154    }
155}
156
157/// Scoped low-level controller plus the semantic effect scope it is serving.
158#[derive(Clone)]
159pub struct ScopedEffectController<'run> {
160    controller: ScopedEffectControllerInner<'run>,
161    scope: EffectScope,
162}
163
164impl<'run> ScopedEffectController<'run> {
165    pub fn borrowed(
166        controller: &'run dyn RuntimeEffectController,
167        scope: EffectScope,
168    ) -> Result<Self, RuntimeError> {
169        scope.validate()?;
170        Ok(Self {
171            controller: ScopedEffectControllerInner::Borrowed(controller),
172            scope,
173        })
174    }
175
176    pub fn shared(
177        controller: Arc<dyn RuntimeEffectController>,
178        scope: EffectScope,
179    ) -> Result<Self, RuntimeError> {
180        scope.validate()?;
181        Ok(Self {
182            controller: ScopedEffectControllerInner::Shared(controller),
183            scope,
184        })
185    }
186
187    pub fn controller(&self) -> &dyn RuntimeEffectController {
188        match &self.controller {
189            ScopedEffectControllerInner::Borrowed(controller) => *controller,
190            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
191        }
192    }
193
194    pub fn effect_scope(&self) -> &EffectScope {
195        &self.scope
196    }
197
198    pub fn scope_id(&self) -> &str {
199        self.scope.id()
200    }
201
202    pub fn turn_id(&self) -> Option<&str> {
203        self.scope.turn_id()
204    }
205}
206
207/// Deployment-level factory for scoped effect controllers.
208#[async_trait::async_trait]
209pub trait EffectHost: Send + Sync {
210    fn durability_tier(&self) -> crate::DurabilityTier {
211        crate::DurabilityTier::Inline
212    }
213
214    fn requires_durable_attachment_store(&self) -> bool {
215        false
216    }
217
218    fn scoped<'run>(
219        &'run self,
220        scope: EffectScope,
221    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
222
223    fn scoped_static(
224        &self,
225        _scope: EffectScope,
226    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
227        Ok(None)
228    }
229}
230
231/// Boundary for nondeterministic runtime work.
232#[async_trait::async_trait]
233pub trait RuntimeEffectController: Send + Sync {
234    /// Durability tier this controller provides; defaults to
235    /// [`DurabilityTier::Inline`].
236    fn durability_tier(&self) -> crate::DurabilityTier {
237        crate::DurabilityTier::Inline
238    }
239
240    fn requires_durable_attachment_store(&self) -> bool {
241        false
242    }
243
244    async fn execute_effect(
245        &self,
246        envelope: RuntimeEffectEnvelope,
247        local_executor: RuntimeEffectLocalExecutor<'_>,
248    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
249}
250
251/// Runtime-internal handle for effect-controller references carried through
252/// per-turn execution contexts.
253#[derive(Clone)]
254pub(crate) enum RuntimeEffectControllerHandle<'run> {
255    Borrowed(ScopedEffectController<'run>),
256    #[cfg(any(test, feature = "testing"))]
257    Shared {
258        controller: Arc<dyn RuntimeEffectController>,
259        scope: EffectScope,
260    },
261}
262
263impl<'run> RuntimeEffectControllerHandle<'run> {
264    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
265        Self::Borrowed(scoped)
266    }
267
268    #[cfg(any(test, feature = "testing"))]
269    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
270        Self::Shared {
271            controller,
272            scope: EffectScope::runtime_operation("test-runtime-effect-controller"),
273        }
274    }
275
276    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
277        match self {
278            Self::Borrowed(scoped) => scoped.controller(),
279            #[cfg(any(test, feature = "testing"))]
280            Self::Shared { controller, .. } => controller.as_ref(),
281        }
282    }
283
284    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
285        match self {
286            Self::Borrowed(scoped) => scoped.clone(),
287            #[cfg(any(test, feature = "testing"))]
288            Self::Shared { controller, scope } => {
289                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
290                    .expect("runtime effect controller handle carries a valid scope")
291            }
292        }
293    }
294
295    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
296        self.clone()
297    }
298}
299
300#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
301#[error("{code}: {message}")]
302pub struct RuntimeEffectControllerError {
303    pub code: String,
304    pub message: String,
305}
306
307impl RuntimeEffectControllerError {
308    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
309        Self {
310            code: code.into(),
311            message: message.into(),
312        }
313    }
314
315    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
316        Self::new(
317            "runtime_effect_wrong_outcome",
318            format!(
319                "expected {} outcome, got {}",
320                expected.as_str(),
321                actual.as_str()
322            ),
323        )
324    }
325
326    pub(crate) fn into_runtime_error(self) -> RuntimeError {
327        RuntimeError::new(self.code, self.message)
328    }
329}
330
331impl From<RuntimeError> for RuntimeEffectControllerError {
332    fn from(err: RuntimeError) -> Self {
333        Self::new(err.code.as_str(), err.message)
334    }
335}
336
337impl From<PluginError> for RuntimeEffectControllerError {
338    fn from(err: PluginError) -> Self {
339        Self::new("plugin", err.to_string())
340    }
341}
342
343impl From<crate::StoreError> for RuntimeEffectControllerError {
344    fn from(err: crate::StoreError) -> Self {
345        Self::new("runtime_store", err.to_string())
346    }
347}
348
349// =============================================================================
350// Local executor (per-effect borrowed runner state)
351// =============================================================================
352
353#[async_trait::async_trait]
354pub(crate) trait ProcessRunner: Send + Sync {
355    async fn run_process(
356        &self,
357        registration: crate::ProcessRegistration,
358        execution_context: crate::ProcessExecutionContext,
359        registry: Arc<dyn ProcessRegistry>,
360        scoped_effect_controller: crate::ScopedEffectController<'_>,
361        cancellation: CancellationToken,
362    ) -> crate::ProcessAwaitOutput;
363}
364
365pub struct ProcessLocalExecution {
366    pub registry: Arc<dyn ProcessRegistry>,
367}
368
369pub(super) struct LocalTurnEffectRunner<'a, 'run> {
370    driver: &'a mut RuntimeTurnDriver<'run>,
371    machine: &'a mut crate::TurnMachine,
372    event_tx: mpsc::Sender<RuntimeStreamEvent>,
373    cancellation: CancellationToken,
374}
375
376pub(super) struct LocalDirectEffectRunner {
377    provider: ProviderHandle,
378    attachment_store: Arc<dyn AttachmentStore>,
379}
380
381#[async_trait::async_trait]
382trait RuntimeEffectLocalRunner: Send {
383    async fn execute(
384        self: Box<Self>,
385        envelope: RuntimeEffectEnvelope,
386    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
387}
388
389#[cfg(any(test, feature = "testing"))]
390type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
391        RuntimeEffectEnvelope,
392    ) -> Pin<
393        Box<
394            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
395                + Send
396                + 'run,
397        >,
398    > + Send
399    + 'run;
400
401#[cfg(any(test, feature = "testing"))]
402struct TestingRuntimeEffectLocalRunner<'run> {
403    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
404}
405
406enum RuntimeEffectLocalExecutorState<'run> {
407    Unavailable,
408    SleepOnly { cancellation: CancellationToken },
409    Process(ProcessLocalExecution),
410    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
411}
412
413/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
414///
415/// Durable controllers may ignore it and replay their own recorded result. The
416/// default inline controller delegates to it, so local provider/tool/checkpoint
417/// work still crosses the same `execute_effect` boundary as durable controllers.
418pub struct RuntimeEffectLocalExecutor<'run> {
419    state: RuntimeEffectLocalExecutorState<'run>,
420}
421
422impl<'run> RuntimeEffectLocalExecutor<'run> {
423    pub fn unavailable() -> Self {
424        Self {
425            state: RuntimeEffectLocalExecutorState::Unavailable,
426        }
427    }
428
429    pub fn sleep(cancellation: CancellationToken) -> Self {
430        Self {
431            state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
432        }
433    }
434
435    pub fn process_control(registry: Arc<dyn ProcessRegistry>) -> Self {
436        Self {
437            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
438        }
439    }
440
441    #[cfg(any(test, feature = "testing"))]
442    pub fn testing<F, Fut>(run: F) -> Self
443    where
444        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
445        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
446            + Send
447            + 'run,
448    {
449        Self {
450            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
451                TestingRuntimeEffectLocalRunner {
452                    run: Box::new(move |envelope| Box::pin(run(envelope))),
453                },
454            )),
455        }
456    }
457
458    pub(in crate::runtime) fn turn<'scope>(
459        driver: &'run mut RuntimeTurnDriver<'scope>,
460        machine: &'run mut crate::TurnMachine,
461        event_tx: mpsc::Sender<RuntimeStreamEvent>,
462        cancellation: CancellationToken,
463    ) -> Self
464    where
465        'scope: 'run,
466    {
467        Self {
468            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
469                driver,
470                machine,
471                event_tx,
472                cancellation,
473            })),
474        }
475    }
476
477    pub(in crate::runtime) fn direct(
478        provider: ProviderHandle,
479        attachment_store: Arc<dyn AttachmentStore>,
480    ) -> Self {
481        Self {
482            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
483                provider,
484                attachment_store,
485            })),
486        }
487    }
488
489    pub async fn execute(
490        self,
491        envelope: RuntimeEffectEnvelope,
492    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
493        match self.state {
494            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
495            RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
496                execute_local_sleep(envelope, cancellation).await
497            }
498            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
499                "runtime_effect_local_executor_unavailable",
500                format!(
501                    "no local executor is available for {}",
502                    envelope.command.kind().as_str()
503                ),
504            )),
505            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
506                "runtime_effect_local_executor_mismatch",
507                format!(
508                    "process executor cannot execute {} command directly",
509                    envelope.command.kind().as_str()
510                ),
511            )),
512        }
513    }
514
515    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
516        match self.state {
517            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
518            _ => Err(RuntimeEffectControllerError::new(
519                "runtime_effect_local_executor_unavailable",
520                "no process executor is available for process command",
521            )),
522        }
523    }
524}
525
526#[cfg(any(test, feature = "testing"))]
527#[async_trait::async_trait]
528impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
529    async fn execute(
530        self: Box<Self>,
531        envelope: RuntimeEffectEnvelope,
532    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
533        (self.run)(envelope).await
534    }
535}
536
537#[async_trait::async_trait]
538impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
539    async fn execute(
540        self: Box<Self>,
541        envelope: RuntimeEffectEnvelope,
542    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
543        let runner = *self;
544        match envelope.command {
545            RuntimeEffectCommand::LlmCall { request } => {
546                let protocol_iteration = runner.machine.protocol_iteration();
547                let (result, text_streamed) = runner
548                    .driver
549                    .run_llm_call(
550                        Arc::new((*request).into_request(None, None)),
551                        protocol_iteration,
552                        envelope.invocation,
553                        &runner.event_tx,
554                        &runner.cancellation,
555                    )
556                    .await;
557                Ok(RuntimeEffectOutcome::LlmCall {
558                    result,
559                    text_streamed,
560                })
561            }
562            RuntimeEffectCommand::ToolCall { call } => {
563                let tool_name = call.tool_name.clone();
564                let mut outcome = runner
565                    .driver
566                    .run_tool_calls(
567                        vec![(call, envelope.invocation)],
568                        &runner.event_tx,
569                        &runner.cancellation,
570                    )
571                    .await?;
572                let result = outcome.completed.pop().ok_or_else(|| {
573                    RuntimeEffectControllerError::new(
574                        "tool_result_missing",
575                        format!("tool `{tool_name}` completed without a result"),
576                    )
577                })?;
578                Ok(RuntimeEffectOutcome::ToolCall {
579                    result,
580                    host_events: outcome.host_events,
581                })
582            }
583            RuntimeEffectCommand::ExecCode { code } => {
584                let protocol_iteration = runner.machine.protocol_iteration();
585                let messages = runner.machine.message_sequence();
586                Ok(RuntimeEffectOutcome::ExecCode {
587                    result: runner
588                        .driver
589                        .run_exec_code(
590                            &code,
591                            messages,
592                            protocol_iteration,
593                            envelope.invocation,
594                            &runner.event_tx,
595                        )
596                        .await,
597                })
598            }
599            RuntimeEffectCommand::Checkpoint { checkpoint } => {
600                Ok(RuntimeEffectOutcome::Checkpoint {
601                    result: runner
602                        .driver
603                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
604                        .await
605                        .map_err(RuntimeEffectControllerError::from),
606                })
607            }
608            RuntimeEffectCommand::SyncExecutionSurface {
609                update_machine_config,
610            } => Ok(RuntimeEffectOutcome::SyncExecutionSurface {
611                result: runner
612                    .driver
613                    .refresh_execution_surface(runner.machine, update_machine_config)
614                    .await
615                    .map_err(|err| err.to_string()),
616            }),
617            RuntimeEffectCommand::Sleep { duration_ms } => {
618                sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
619                Ok(RuntimeEffectOutcome::Sleep)
620            }
621            command => Err(RuntimeEffectControllerError::new(
622                "runtime_effect_local_executor_mismatch",
623                format!(
624                    "local turn executor cannot execute {} command",
625                    command.kind().as_str()
626                ),
627            )),
628        }
629    }
630}
631
632#[async_trait::async_trait]
633impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
634    async fn execute(
635        mut self: Box<Self>,
636        envelope: RuntimeEffectEnvelope,
637    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
638        match envelope.command {
639            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
640                result: self
641                    .run_direct_llm_request((*request).into_request(
642                        crate::session_model::transport_stream_events(&self.provider, None),
643                        None,
644                    ))
645                    .await,
646            }),
647            RuntimeEffectCommand::Sleep { duration_ms } => {
648                sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
649                Ok(RuntimeEffectOutcome::Sleep)
650            }
651            command => Err(RuntimeEffectControllerError::new(
652                "runtime_effect_local_executor_mismatch",
653                format!(
654                    "local direct executor cannot execute {} command",
655                    command.kind().as_str()
656                ),
657            )),
658        }
659    }
660}
661
662impl LocalDirectEffectRunner {
663    async fn run_direct_llm_request(
664        &mut self,
665        request: CoreLlmRequest,
666    ) -> Result<LlmResponse, LlmCallError> {
667        let request = crate::attachments::resolve_llm_request_attachments(
668            request,
669            self.attachment_store.as_ref(),
670        )
671        .map_err(|err| LlmCallError {
672            message: err.to_string(),
673            retryable: false,
674            raw: None,
675            code: Some("attachment_resolution_failed".to_string()),
676            terminal_reason: crate::LlmTerminalReason::ProviderError,
677            request_body: None,
678        })?;
679        self.provider
680            .complete(request)
681            .await
682            .map_err(llm_call_error_from_transport)
683    }
684}
685
686async fn execute_local_sleep(
687    envelope: RuntimeEffectEnvelope,
688    cancellation: CancellationToken,
689) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
690    match envelope.command {
691        RuntimeEffectCommand::Sleep { duration_ms } => {
692            sleep_with_cancellation(duration_ms, &cancellation).await?;
693            Ok(RuntimeEffectOutcome::Sleep)
694        }
695        command => Err(RuntimeEffectControllerError::new(
696            "runtime_effect_local_executor_mismatch",
697            format!(
698                "local sleep executor cannot execute {} command",
699                command.kind().as_str()
700            ),
701        )),
702    }
703}
704
705async fn sleep_with_cancellation(
706    duration_ms: u64,
707    cancellation: &CancellationToken,
708) -> Result<(), RuntimeEffectControllerError> {
709    let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
710    tokio::pin!(sleep);
711    tokio::select! {
712        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
713            "runtime_effect_sleep_cancelled",
714            "runtime effect sleep was cancelled",
715        )),
716        _ = &mut sleep => Ok(()),
717    }
718}
719
720// =============================================================================
721// Default in-process effect controller
722// =============================================================================
723
724/// Default in-process effect controller.
725///
726/// Stateless: the inline controller only registers process rows; the
727/// lease-protected [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole
728/// executor.
729#[derive(Clone, Default)]
730pub struct InlineRuntimeEffectController;
731
732#[async_trait::async_trait]
733impl RuntimeEffectController for InlineRuntimeEffectController {
734    async fn execute_effect(
735        &self,
736        envelope: RuntimeEffectEnvelope,
737        local_executor: RuntimeEffectLocalExecutor<'_>,
738    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
739        match envelope.command {
740            RuntimeEffectCommand::Process { command } => {
741                let result = self
742                    .execute_process_command(command, local_executor)
743                    .await?;
744                Ok(RuntimeEffectOutcome::Process { result })
745            }
746            _ => local_executor.execute(envelope).await,
747        }
748    }
749}
750
751/// In-process deployment effect host.
752#[derive(Clone)]
753pub struct InlineEffectHost {
754    controller: Arc<dyn RuntimeEffectController>,
755}
756
757impl InlineEffectHost {
758    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
759        Self { controller }
760    }
761}
762
763impl Default for InlineEffectHost {
764    fn default() -> Self {
765        Self::new(Arc::new(InlineRuntimeEffectController))
766    }
767}
768
769impl EffectHost for InlineEffectHost {
770    fn durability_tier(&self) -> crate::DurabilityTier {
771        self.controller.durability_tier()
772    }
773
774    fn requires_durable_attachment_store(&self) -> bool {
775        self.controller.requires_durable_attachment_store()
776    }
777
778    fn scoped<'run>(
779        &'run self,
780        scope: EffectScope,
781    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
782        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
783    }
784
785    fn scoped_static(
786        &self,
787        scope: EffectScope,
788    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
789        Ok(Some(ScopedEffectController::shared(
790            Arc::clone(&self.controller),
791            scope,
792        )?))
793    }
794}
795
796impl InlineRuntimeEffectController {
797    /// Register the process (and any handle grant) into the durable registry.
798    ///
799    /// The inline controller no longer runs the process here: the registry's
800    /// non-terminal row *is* the durable work queue, and the lease-protected
801    /// [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole executor. The
802    /// control seam pokes that runner after a successful start, so registering
803    /// the row is all this path does.
804    pub(crate) async fn start_process(
805        &self,
806        registry: Arc<dyn crate::ProcessRegistry>,
807        registration: crate::ProcessRegistration,
808        grant: Option<crate::ProcessStartGrant>,
809    ) -> Result<ProcessRecord, PluginError> {
810        let registration_for_record = registration.clone();
811        let record = registry.register_process(registration_for_record).await?;
812        if let Some(grant) = grant {
813            registry
814                .grant_handle(&grant.owner_scope, &registration.id, grant.descriptor)
815                .await?;
816        }
817        Ok(record)
818    }
819
820    pub(crate) async fn request_process_cancel(
821        &self,
822        registry: Arc<dyn crate::ProcessRegistry>,
823        process_id: &str,
824        reason: Option<String>,
825    ) -> Result<ProcessRecord, PluginError> {
826        // Cancellation is a durable signal: the cancel event is what the
827        // runner-run process observes, so the inline controller appends it and
828        // no longer tracks an in-process cancellation token.
829        registry
830            .append_event(
831                process_id,
832                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
833            )
834            .await?;
835        registry
836            .get_process(process_id)
837            .await
838            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
839    }
840
841    async fn execute_process_command(
842        &self,
843        command: ProcessCommand,
844        local_executor: RuntimeEffectLocalExecutor<'_>,
845    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
846        let execution = local_executor.into_process()?;
847        let registry = execution.registry;
848        match command {
849            ProcessCommand::Start {
850                registration,
851                grant,
852                execution_context: _,
853            } => {
854                let record = self.start_process(registry, registration, grant).await?;
855                Ok(ProcessEffectOutcome::Start { record })
856            }
857            ProcessCommand::List { owner_scope, mode } => {
858                let entries = match mode {
859                    crate::ProcessListMode::Live => {
860                        registry.list_live_handle_grants(&owner_scope).await?
861                    }
862                    crate::ProcessListMode::All => {
863                        registry.list_handle_grants(&owner_scope).await?
864                    }
865                };
866                Ok(ProcessEffectOutcome::List { entries })
867            }
868            ProcessCommand::Transfer {
869                from_scope,
870                to_scope,
871                process_ids,
872            } => {
873                registry
874                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
875                    .await?;
876                Ok(ProcessEffectOutcome::Transfer)
877            }
878            ProcessCommand::DeleteSession { session_id } => {
879                let report = registry.delete_session_process_state(&session_id).await?;
880                for process_id in &report.cancel_process_ids {
881                    registry
882                        .append_event(
883                            process_id,
884                            crate::ProcessEventAppendRequest::cancel_requested(
885                                process_id,
886                                Some("session deleted".to_string()),
887                            ),
888                        )
889                        .await?;
890                }
891                Ok(ProcessEffectOutcome::DeleteSession { report })
892            }
893            ProcessCommand::Await { process_id } => {
894                let output = registry.await_process(&process_id).await?;
895                Ok(ProcessEffectOutcome::Await { output })
896            }
897            ProcessCommand::Cancel { process_id, reason } => {
898                let record = self
899                    .request_process_cancel(registry, &process_id, reason)
900                    .await?;
901                Ok(ProcessEffectOutcome::Cancel { record })
902            }
903            ProcessCommand::Signal {
904                process_id,
905                request,
906                ..
907            } => {
908                let result = registry.append_event(&process_id, request).await?;
909                Ok(ProcessEffectOutcome::Signal {
910                    event: result.event,
911                })
912            }
913        }
914    }
915}
916
917impl std::fmt::Debug for InlineRuntimeEffectController {
918    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
919        f.debug_struct("InlineRuntimeEffectController").finish()
920    }
921}