Skip to main content

lash_core/runtime/effect/
executor.rs

1#[cfg(any(test, feature = "testing"))]
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7use tokio_util::sync::CancellationToken;
8
9use crate::AttachmentStore;
10use crate::LlmRequest as CoreLlmRequest;
11use crate::LlmResponse;
12use crate::ProcessRecord;
13use crate::ProcessRegistry;
14use crate::provider::ProviderHandle;
15use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
16use crate::sansio::LlmCallError;
17use crate::{PluginError, RuntimeError, RuntimeErrorCode};
18
19use super::envelope::{
20    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
21    RuntimeEffectKind, RuntimeEffectOutcome,
22};
23use super::outcome::llm_call_error_from_transport;
24
25// =============================================================================
26// Effect host + controller trait + scope + error
27// =============================================================================
28
29/// Stable semantic identity for one effectful runtime operation.
30///
31/// The scope is chosen by the host boundary before any nondeterministic work is
32/// planned. It is intentionally generic: Restate, an inline test host, or a
33/// future durable effect host all receive the same Lash scope vocabulary.
34#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(tag = "type", rename_all = "snake_case")]
36pub enum EffectScope {
37    Turn {
38        session_id: String,
39        turn_id: String,
40    },
41    Process {
42        process_id: String,
43    },
44    HostEvent {
45        session_id: String,
46        event_id: String,
47    },
48    QueueDrain {
49        session_id: String,
50        drain_id: String,
51    },
52    Cron {
53        job_id: String,
54        execution_id: String,
55    },
56    SessionDelete {
57        session_id: String,
58    },
59    RuntimeOperation {
60        operation_id: String,
61    },
62}
63
64impl EffectScope {
65    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
66        Self::Turn {
67            session_id: session_id.into(),
68            turn_id: turn_id.into(),
69        }
70    }
71
72    pub fn process(process_id: impl Into<String>) -> Self {
73        Self::Process {
74            process_id: process_id.into(),
75        }
76    }
77
78    pub fn host_event(session_id: impl Into<String>, event_id: impl Into<String>) -> Self {
79        Self::HostEvent {
80            session_id: session_id.into(),
81            event_id: event_id.into(),
82        }
83    }
84
85    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
86        Self::QueueDrain {
87            session_id: session_id.into(),
88            drain_id: drain_id.into(),
89        }
90    }
91
92    pub fn cron(job_id: impl Into<String>, execution_id: impl Into<String>) -> Self {
93        Self::Cron {
94            job_id: job_id.into(),
95            execution_id: execution_id.into(),
96        }
97    }
98
99    pub fn session_delete(session_id: impl Into<String>) -> Self {
100        Self::SessionDelete {
101            session_id: session_id.into(),
102        }
103    }
104
105    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
106        Self::RuntimeOperation {
107            operation_id: operation_id.into(),
108        }
109    }
110
111    pub fn id(&self) -> &str {
112        match self {
113            Self::Turn { turn_id, .. } => turn_id,
114            Self::Process { process_id } => process_id,
115            Self::HostEvent { event_id, .. } => event_id,
116            Self::QueueDrain { drain_id, .. } => drain_id,
117            Self::Cron { execution_id, .. } => execution_id,
118            Self::SessionDelete { session_id } => session_id,
119            Self::RuntimeOperation { operation_id } => operation_id,
120        }
121    }
122
123    pub fn session_id(&self) -> Option<&str> {
124        match self {
125            Self::Turn { session_id, .. }
126            | Self::HostEvent { session_id, .. }
127            | Self::QueueDrain { session_id, .. }
128            | Self::SessionDelete { session_id } => Some(session_id),
129            Self::Process { .. } | Self::Cron { .. } | Self::RuntimeOperation { .. } => None,
130        }
131    }
132
133    pub fn turn_id(&self) -> Option<&str> {
134        match self {
135            Self::Turn { turn_id, .. } => Some(turn_id),
136            _ => None,
137        }
138    }
139
140    pub fn validates_turn_trace_id(&self) -> bool {
141        matches!(self, Self::Turn { .. })
142    }
143
144    fn validate(&self) -> Result<(), RuntimeError> {
145        let missing = match self {
146            Self::Turn {
147                session_id,
148                turn_id,
149            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
150            Self::Process { process_id } => process_id.trim().is_empty(),
151            Self::HostEvent {
152                session_id,
153                event_id,
154            } => session_id.trim().is_empty() || event_id.trim().is_empty(),
155            Self::QueueDrain {
156                session_id,
157                drain_id,
158            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
159            Self::Cron {
160                job_id,
161                execution_id,
162            } => job_id.trim().is_empty() || execution_id.trim().is_empty(),
163            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
164            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
165        };
166        if missing {
167            return Err(RuntimeError::new(
168                RuntimeErrorCode::MissingEffectScopeId,
169                "effect scopes require non-empty stable ids",
170            ));
171        }
172        Ok(())
173    }
174}
175
176enum ScopedEffectControllerInner<'run> {
177    Borrowed(&'run dyn RuntimeEffectController),
178    Shared(Arc<dyn RuntimeEffectController>),
179}
180
181impl Clone for ScopedEffectControllerInner<'_> {
182    fn clone(&self) -> Self {
183        match self {
184            Self::Borrowed(controller) => Self::Borrowed(*controller),
185            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
186        }
187    }
188}
189
190/// Scoped low-level controller plus the semantic effect scope it is serving.
191#[derive(Clone)]
192pub struct ScopedEffectController<'run> {
193    controller: ScopedEffectControllerInner<'run>,
194    scope: EffectScope,
195}
196
197impl<'run> ScopedEffectController<'run> {
198    pub fn borrowed(
199        controller: &'run dyn RuntimeEffectController,
200        scope: EffectScope,
201    ) -> Result<Self, RuntimeError> {
202        scope.validate()?;
203        Ok(Self {
204            controller: ScopedEffectControllerInner::Borrowed(controller),
205            scope,
206        })
207    }
208
209    pub fn shared(
210        controller: Arc<dyn RuntimeEffectController>,
211        scope: EffectScope,
212    ) -> Result<Self, RuntimeError> {
213        scope.validate()?;
214        Ok(Self {
215            controller: ScopedEffectControllerInner::Shared(controller),
216            scope,
217        })
218    }
219
220    pub fn controller(&self) -> &dyn RuntimeEffectController {
221        match &self.controller {
222            ScopedEffectControllerInner::Borrowed(controller) => *controller,
223            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
224        }
225    }
226
227    pub fn effect_scope(&self) -> &EffectScope {
228        &self.scope
229    }
230
231    pub fn scope_id(&self) -> &str {
232        self.scope.id()
233    }
234
235    pub fn turn_id(&self) -> Option<&str> {
236        self.scope.turn_id()
237    }
238}
239
240/// Deployment-level factory for scoped effect controllers.
241#[async_trait::async_trait]
242pub trait EffectHost: Send + Sync {
243    fn durability_tier(&self) -> crate::DurabilityTier {
244        crate::DurabilityTier::Inline
245    }
246
247    fn requires_durable_attachment_store(&self) -> bool {
248        false
249    }
250
251    fn scoped<'run>(
252        &'run self,
253        scope: EffectScope,
254    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
255
256    fn scoped_static(
257        &self,
258        _scope: EffectScope,
259    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
260        Ok(None)
261    }
262}
263
264/// Boundary for nondeterministic runtime work.
265#[async_trait::async_trait]
266pub trait RuntimeEffectController: Send + Sync {
267    /// Durability tier this controller provides; defaults to
268    /// [`DurabilityTier::Inline`].
269    fn durability_tier(&self) -> crate::DurabilityTier {
270        crate::DurabilityTier::Inline
271    }
272
273    fn requires_durable_attachment_store(&self) -> bool {
274        false
275    }
276
277    async fn execute_effect(
278        &self,
279        envelope: RuntimeEffectEnvelope,
280        local_executor: RuntimeEffectLocalExecutor<'_>,
281    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
282}
283
284/// Runtime-internal handle for effect-controller references carried through
285/// per-turn execution contexts.
286#[derive(Clone)]
287pub(crate) enum RuntimeEffectControllerHandle<'run> {
288    Borrowed(ScopedEffectController<'run>),
289    #[cfg(any(test, feature = "testing"))]
290    Shared {
291        controller: Arc<dyn RuntimeEffectController>,
292        scope: EffectScope,
293    },
294}
295
296impl<'run> RuntimeEffectControllerHandle<'run> {
297    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
298        Self::Borrowed(scoped)
299    }
300
301    #[cfg(any(test, feature = "testing"))]
302    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
303        Self::Shared {
304            controller,
305            scope: EffectScope::runtime_operation("test-runtime-effect-controller"),
306        }
307    }
308
309    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
310        match self {
311            Self::Borrowed(scoped) => scoped.controller(),
312            #[cfg(any(test, feature = "testing"))]
313            Self::Shared { controller, .. } => controller.as_ref(),
314        }
315    }
316
317    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
318        match self {
319            Self::Borrowed(scoped) => scoped.clone(),
320            #[cfg(any(test, feature = "testing"))]
321            Self::Shared { controller, scope } => {
322                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
323                    .expect("runtime effect controller handle carries a valid scope")
324            }
325        }
326    }
327
328    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
329        self.clone()
330    }
331}
332
333#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
334#[error("{code}: {message}")]
335pub struct RuntimeEffectControllerError {
336    pub code: String,
337    pub message: String,
338}
339
340impl RuntimeEffectControllerError {
341    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
342        Self {
343            code: code.into(),
344            message: message.into(),
345        }
346    }
347
348    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
349        Self::new(
350            "runtime_effect_wrong_outcome",
351            format!(
352                "expected {} outcome, got {}",
353                expected.as_str(),
354                actual.as_str()
355            ),
356        )
357    }
358
359    pub(crate) fn into_runtime_error(self) -> RuntimeError {
360        RuntimeError::new(self.code, self.message)
361    }
362}
363
364impl From<RuntimeError> for RuntimeEffectControllerError {
365    fn from(err: RuntimeError) -> Self {
366        Self::new(err.code.as_str(), err.message)
367    }
368}
369
370impl From<PluginError> for RuntimeEffectControllerError {
371    fn from(err: PluginError) -> Self {
372        Self::new("plugin", err.to_string())
373    }
374}
375
376impl From<crate::StoreError> for RuntimeEffectControllerError {
377    fn from(err: crate::StoreError) -> Self {
378        Self::new("runtime_store", err.to_string())
379    }
380}
381
382// =============================================================================
383// Local executor (per-effect borrowed runner state)
384// =============================================================================
385
386#[async_trait::async_trait]
387pub(crate) trait ProcessRunner: Send + Sync {
388    async fn run_process(
389        &self,
390        registration: crate::ProcessRegistration,
391        execution_context: crate::ProcessExecutionContext,
392        registry: Arc<dyn ProcessRegistry>,
393        scoped_effect_controller: crate::ScopedEffectController<'_>,
394        cancellation: CancellationToken,
395    ) -> crate::ProcessAwaitOutput;
396}
397
398pub struct ProcessLocalExecution {
399    pub registry: Arc<dyn ProcessRegistry>,
400}
401
402pub(super) struct LocalTurnEffectRunner<'a, 'run> {
403    driver: &'a mut RuntimeTurnDriver<'run>,
404    machine: &'a mut crate::TurnMachine,
405    event_tx: mpsc::Sender<RuntimeStreamEvent>,
406    cancellation: CancellationToken,
407}
408
409pub(super) struct LocalDirectEffectRunner {
410    provider: ProviderHandle,
411    attachment_store: Arc<dyn AttachmentStore>,
412}
413
414#[async_trait::async_trait]
415trait RuntimeEffectLocalRunner: Send {
416    async fn execute(
417        self: Box<Self>,
418        envelope: RuntimeEffectEnvelope,
419    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
420}
421
422#[cfg(any(test, feature = "testing"))]
423type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
424        RuntimeEffectEnvelope,
425    ) -> Pin<
426        Box<
427            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
428                + Send
429                + 'run,
430        >,
431    > + Send
432    + 'run;
433
434#[cfg(any(test, feature = "testing"))]
435struct TestingRuntimeEffectLocalRunner<'run> {
436    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
437}
438
439enum RuntimeEffectLocalExecutorState<'run> {
440    Unavailable,
441    SleepOnly { cancellation: CancellationToken },
442    Process(ProcessLocalExecution),
443    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
444}
445
446/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
447///
448/// Durable controllers may ignore it and replay their own recorded result. The
449/// default inline controller delegates to it, so local provider/tool/checkpoint
450/// work still crosses the same `execute_effect` boundary as durable controllers.
451pub struct RuntimeEffectLocalExecutor<'run> {
452    state: RuntimeEffectLocalExecutorState<'run>,
453}
454
455impl<'run> RuntimeEffectLocalExecutor<'run> {
456    pub fn unavailable() -> Self {
457        Self {
458            state: RuntimeEffectLocalExecutorState::Unavailable,
459        }
460    }
461
462    pub fn sleep(cancellation: CancellationToken) -> Self {
463        Self {
464            state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
465        }
466    }
467
468    pub fn process_control(registry: Arc<dyn ProcessRegistry>) -> Self {
469        Self {
470            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
471        }
472    }
473
474    #[cfg(any(test, feature = "testing"))]
475    pub fn testing<F, Fut>(run: F) -> Self
476    where
477        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
478        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
479            + Send
480            + 'run,
481    {
482        Self {
483            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
484                TestingRuntimeEffectLocalRunner {
485                    run: Box::new(move |envelope| Box::pin(run(envelope))),
486                },
487            )),
488        }
489    }
490
491    pub(in crate::runtime) fn turn<'scope>(
492        driver: &'run mut RuntimeTurnDriver<'scope>,
493        machine: &'run mut crate::TurnMachine,
494        event_tx: mpsc::Sender<RuntimeStreamEvent>,
495        cancellation: CancellationToken,
496    ) -> Self
497    where
498        'scope: 'run,
499    {
500        Self {
501            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
502                driver,
503                machine,
504                event_tx,
505                cancellation,
506            })),
507        }
508    }
509
510    pub(in crate::runtime) fn direct(
511        provider: ProviderHandle,
512        attachment_store: Arc<dyn AttachmentStore>,
513    ) -> Self {
514        Self {
515            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
516                provider,
517                attachment_store,
518            })),
519        }
520    }
521
522    pub async fn execute(
523        self,
524        envelope: RuntimeEffectEnvelope,
525    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
526        match self.state {
527            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
528            RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
529                execute_local_sleep(envelope, cancellation).await
530            }
531            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
532                "runtime_effect_local_executor_unavailable",
533                format!(
534                    "no local executor is available for {}",
535                    envelope.command.kind().as_str()
536                ),
537            )),
538            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
539                "runtime_effect_local_executor_mismatch",
540                format!(
541                    "process executor cannot execute {} command directly",
542                    envelope.command.kind().as_str()
543                ),
544            )),
545        }
546    }
547
548    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
549        match self.state {
550            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
551            _ => Err(RuntimeEffectControllerError::new(
552                "runtime_effect_local_executor_unavailable",
553                "no process executor is available for process command",
554            )),
555        }
556    }
557}
558
559#[cfg(any(test, feature = "testing"))]
560#[async_trait::async_trait]
561impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
562    async fn execute(
563        self: Box<Self>,
564        envelope: RuntimeEffectEnvelope,
565    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
566        (self.run)(envelope).await
567    }
568}
569
570#[async_trait::async_trait]
571impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
572    async fn execute(
573        self: Box<Self>,
574        envelope: RuntimeEffectEnvelope,
575    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
576        let runner = *self;
577        match envelope.command {
578            RuntimeEffectCommand::LlmCall { request } => {
579                let protocol_iteration = runner.machine.protocol_iteration();
580                let (result, text_streamed) = runner
581                    .driver
582                    .run_llm_call(
583                        Arc::new((*request).into_request(None, None)),
584                        protocol_iteration,
585                        envelope.invocation,
586                        &runner.event_tx,
587                        &runner.cancellation,
588                    )
589                    .await;
590                Ok(RuntimeEffectOutcome::LlmCall {
591                    result,
592                    text_streamed,
593                })
594            }
595            RuntimeEffectCommand::ToolCall { call } => {
596                let tool_name = call.tool_name.clone();
597                let mut outcome = runner
598                    .driver
599                    .run_tool_calls(
600                        vec![(call, envelope.invocation)],
601                        &runner.event_tx,
602                        &runner.cancellation,
603                    )
604                    .await?;
605                let result = outcome.completed.pop().ok_or_else(|| {
606                    RuntimeEffectControllerError::new(
607                        "tool_result_missing",
608                        format!("tool `{tool_name}` completed without a result"),
609                    )
610                })?;
611                Ok(RuntimeEffectOutcome::ToolCall {
612                    result,
613                    host_events: outcome.host_events,
614                })
615            }
616            RuntimeEffectCommand::ExecCode { code } => {
617                let protocol_iteration = runner.machine.protocol_iteration();
618                let messages = runner.machine.message_sequence();
619                Ok(RuntimeEffectOutcome::ExecCode {
620                    result: runner
621                        .driver
622                        .run_exec_code(
623                            &code,
624                            messages,
625                            protocol_iteration,
626                            envelope.invocation,
627                            &runner.event_tx,
628                        )
629                        .await,
630                })
631            }
632            RuntimeEffectCommand::Checkpoint { checkpoint } => {
633                Ok(RuntimeEffectOutcome::Checkpoint {
634                    result: runner
635                        .driver
636                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
637                        .await
638                        .map_err(RuntimeEffectControllerError::from),
639                })
640            }
641            RuntimeEffectCommand::SyncExecutionSurface {
642                update_machine_config,
643            } => Ok(RuntimeEffectOutcome::SyncExecutionSurface {
644                result: runner
645                    .driver
646                    .refresh_execution_surface(runner.machine, update_machine_config)
647                    .await
648                    .map_err(|err| err.to_string()),
649            }),
650            RuntimeEffectCommand::Sleep { duration_ms } => {
651                sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
652                Ok(RuntimeEffectOutcome::Sleep)
653            }
654            command => Err(RuntimeEffectControllerError::new(
655                "runtime_effect_local_executor_mismatch",
656                format!(
657                    "local turn executor cannot execute {} command",
658                    command.kind().as_str()
659                ),
660            )),
661        }
662    }
663}
664
665#[async_trait::async_trait]
666impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
667    async fn execute(
668        mut self: Box<Self>,
669        envelope: RuntimeEffectEnvelope,
670    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
671        match envelope.command {
672            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
673                result: self
674                    .run_direct_llm_request((*request).into_request(
675                        crate::session_model::transport_stream_events(&self.provider, None),
676                        None,
677                    ))
678                    .await,
679            }),
680            RuntimeEffectCommand::Sleep { duration_ms } => {
681                sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
682                Ok(RuntimeEffectOutcome::Sleep)
683            }
684            command => Err(RuntimeEffectControllerError::new(
685                "runtime_effect_local_executor_mismatch",
686                format!(
687                    "local direct executor cannot execute {} command",
688                    command.kind().as_str()
689                ),
690            )),
691        }
692    }
693}
694
695impl LocalDirectEffectRunner {
696    async fn run_direct_llm_request(
697        &mut self,
698        request: CoreLlmRequest,
699    ) -> Result<LlmResponse, LlmCallError> {
700        let request = crate::attachments::resolve_llm_request_attachments(
701            request,
702            self.attachment_store.as_ref(),
703        )
704        .map_err(|err| LlmCallError {
705            message: err.to_string(),
706            retryable: false,
707            raw: None,
708            code: Some("attachment_resolution_failed".to_string()),
709            terminal_reason: crate::LlmTerminalReason::ProviderError,
710            request_body: None,
711        })?;
712        self.provider
713            .complete(request)
714            .await
715            .map_err(llm_call_error_from_transport)
716    }
717}
718
719async fn execute_local_sleep(
720    envelope: RuntimeEffectEnvelope,
721    cancellation: CancellationToken,
722) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
723    match envelope.command {
724        RuntimeEffectCommand::Sleep { duration_ms } => {
725            sleep_with_cancellation(duration_ms, &cancellation).await?;
726            Ok(RuntimeEffectOutcome::Sleep)
727        }
728        command => Err(RuntimeEffectControllerError::new(
729            "runtime_effect_local_executor_mismatch",
730            format!(
731                "local sleep executor cannot execute {} command",
732                command.kind().as_str()
733            ),
734        )),
735    }
736}
737
738async fn sleep_with_cancellation(
739    duration_ms: u64,
740    cancellation: &CancellationToken,
741) -> Result<(), RuntimeEffectControllerError> {
742    let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
743    tokio::pin!(sleep);
744    tokio::select! {
745        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
746            "runtime_effect_sleep_cancelled",
747            "runtime effect sleep was cancelled",
748        )),
749        _ = &mut sleep => Ok(()),
750    }
751}
752
753// =============================================================================
754// Default in-process effect controller
755// =============================================================================
756
757/// Default in-process effect controller.
758///
759/// Stateless: the inline controller only registers process rows; the
760/// lease-protected [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole
761/// executor.
762#[derive(Clone, Default)]
763pub struct InlineRuntimeEffectController;
764
765#[async_trait::async_trait]
766impl RuntimeEffectController for InlineRuntimeEffectController {
767    async fn execute_effect(
768        &self,
769        envelope: RuntimeEffectEnvelope,
770        local_executor: RuntimeEffectLocalExecutor<'_>,
771    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
772        match envelope.command {
773            RuntimeEffectCommand::Process { command } => {
774                let result = self
775                    .execute_process_command(command, local_executor)
776                    .await?;
777                Ok(RuntimeEffectOutcome::Process { result })
778            }
779            _ => local_executor.execute(envelope).await,
780        }
781    }
782}
783
784/// In-process deployment effect host.
785#[derive(Clone)]
786pub struct InlineEffectHost {
787    controller: Arc<dyn RuntimeEffectController>,
788}
789
790impl InlineEffectHost {
791    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
792        Self { controller }
793    }
794}
795
796impl Default for InlineEffectHost {
797    fn default() -> Self {
798        Self::new(Arc::new(InlineRuntimeEffectController))
799    }
800}
801
802impl EffectHost for InlineEffectHost {
803    fn durability_tier(&self) -> crate::DurabilityTier {
804        self.controller.durability_tier()
805    }
806
807    fn requires_durable_attachment_store(&self) -> bool {
808        self.controller.requires_durable_attachment_store()
809    }
810
811    fn scoped<'run>(
812        &'run self,
813        scope: EffectScope,
814    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
815        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
816    }
817
818    fn scoped_static(
819        &self,
820        scope: EffectScope,
821    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
822        Ok(Some(ScopedEffectController::shared(
823            Arc::clone(&self.controller),
824            scope,
825        )?))
826    }
827}
828
829impl InlineRuntimeEffectController {
830    /// Register the process (and any handle grant) into the durable registry.
831    ///
832    /// The inline controller no longer runs the process here: the registry's
833    /// non-terminal row *is* the durable work queue, and the lease-protected
834    /// [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole executor. The
835    /// control seam pokes that runner after a successful start, so registering
836    /// the row is all this path does.
837    pub(crate) async fn start_process(
838        &self,
839        registry: Arc<dyn crate::ProcessRegistry>,
840        registration: crate::ProcessRegistration,
841        grant: Option<crate::ProcessStartGrant>,
842    ) -> Result<ProcessRecord, PluginError> {
843        let registration_for_record = registration.clone();
844        let record = registry.register_process(registration_for_record).await?;
845        if let Some(grant) = grant {
846            registry
847                .grant_handle(&grant.owner_scope, &registration.id, grant.descriptor)
848                .await?;
849        }
850        Ok(record)
851    }
852
853    pub(crate) async fn request_process_cancel(
854        &self,
855        registry: Arc<dyn crate::ProcessRegistry>,
856        process_id: &str,
857        reason: Option<String>,
858    ) -> Result<ProcessRecord, PluginError> {
859        // Cancellation is a durable signal: the cancel event is what the
860        // runner-run process observes, so the inline controller appends it and
861        // no longer tracks an in-process cancellation token.
862        registry
863            .append_event(
864                process_id,
865                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
866            )
867            .await?;
868        registry
869            .get_process(process_id)
870            .await
871            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
872    }
873
874    async fn execute_process_command(
875        &self,
876        command: ProcessCommand,
877        local_executor: RuntimeEffectLocalExecutor<'_>,
878    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
879        let execution = local_executor.into_process()?;
880        let registry = execution.registry;
881        match command {
882            ProcessCommand::Start {
883                registration,
884                grant,
885                execution_context: _,
886            } => {
887                let record = self.start_process(registry, registration, grant).await?;
888                Ok(ProcessEffectOutcome::Start { record })
889            }
890            ProcessCommand::List { owner_scope, mode } => {
891                let entries = match mode {
892                    crate::ProcessListMode::Live => {
893                        registry.list_live_handle_grants(&owner_scope).await?
894                    }
895                    crate::ProcessListMode::All => {
896                        registry.list_handle_grants(&owner_scope).await?
897                    }
898                };
899                Ok(ProcessEffectOutcome::List { entries })
900            }
901            ProcessCommand::Transfer {
902                from_scope,
903                to_scope,
904                process_ids,
905            } => {
906                registry
907                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
908                    .await?;
909                Ok(ProcessEffectOutcome::Transfer)
910            }
911            ProcessCommand::DeleteSession { session_id } => {
912                let report = registry.delete_session_process_state(&session_id).await?;
913                for process_id in &report.cancel_process_ids {
914                    registry
915                        .append_event(
916                            process_id,
917                            crate::ProcessEventAppendRequest::cancel_requested(
918                                process_id,
919                                Some("session deleted".to_string()),
920                            ),
921                        )
922                        .await?;
923                }
924                Ok(ProcessEffectOutcome::DeleteSession { report })
925            }
926            ProcessCommand::Await { process_id } => {
927                let output = registry.await_process(&process_id).await?;
928                Ok(ProcessEffectOutcome::Await { output })
929            }
930            ProcessCommand::Cancel { process_id, reason } => {
931                let record = self
932                    .request_process_cancel(registry, &process_id, reason)
933                    .await?;
934                Ok(ProcessEffectOutcome::Cancel { record })
935            }
936            ProcessCommand::Signal {
937                process_id,
938                request,
939                ..
940            } => {
941                let result = registry.append_event(&process_id, request).await?;
942                Ok(ProcessEffectOutcome::Signal {
943                    event: result.event,
944                })
945            }
946        }
947    }
948}
949
950impl std::fmt::Debug for InlineRuntimeEffectController {
951    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
952        f.debug_struct("InlineRuntimeEffectController").finish()
953    }
954}