Skip to main content

lash_core/runtime/effect/
executor.rs

1#[cfg(any(test, feature = "testing"))]
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7use tokio_util::sync::CancellationToken;
8
9use crate::AttachmentStore;
10use crate::LlmRequest as CoreLlmRequest;
11use crate::LlmResponse;
12use crate::ProcessRecord;
13use crate::ProcessRegistry;
14use crate::provider::ProviderHandle;
15use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
16use crate::sansio::LlmCallError;
17use crate::{PluginError, RuntimeError, RuntimeErrorCode};
18
19use super::envelope::{
20    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
21    RuntimeEffectKind, RuntimeEffectOutcome,
22};
23use super::outcome::llm_call_error_from_transport;
24
25// =============================================================================
26// Effect host + controller trait + scope + error
27// =============================================================================
28
29/// Stable semantic identity for one effectful runtime operation.
30///
31/// The scope is chosen by the host boundary before any nondeterministic work is
32/// planned. It is intentionally generic: Restate, an inline test host, or a
33/// future durable effect host all receive the same Lash scope vocabulary.
34#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(tag = "type", rename_all = "snake_case")]
36pub enum EffectScope {
37    Turn {
38        session_id: String,
39        turn_id: String,
40    },
41    Process {
42        process_id: String,
43    },
44    QueueDrain {
45        session_id: String,
46        drain_id: String,
47    },
48    SessionDelete {
49        session_id: String,
50    },
51    RuntimeOperation {
52        operation_id: String,
53    },
54}
55
56impl EffectScope {
57    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
58        Self::Turn {
59            session_id: session_id.into(),
60            turn_id: turn_id.into(),
61        }
62    }
63
64    pub fn process(process_id: impl Into<String>) -> Self {
65        Self::Process {
66            process_id: process_id.into(),
67        }
68    }
69
70    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
71        Self::QueueDrain {
72            session_id: session_id.into(),
73            drain_id: drain_id.into(),
74        }
75    }
76
77    pub fn session_delete(session_id: impl Into<String>) -> Self {
78        Self::SessionDelete {
79            session_id: session_id.into(),
80        }
81    }
82
83    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
84        Self::RuntimeOperation {
85            operation_id: operation_id.into(),
86        }
87    }
88
89    pub fn id(&self) -> &str {
90        match self {
91            Self::Turn { turn_id, .. } => turn_id,
92            Self::Process { process_id } => process_id,
93            Self::QueueDrain { drain_id, .. } => drain_id,
94            Self::SessionDelete { session_id } => session_id,
95            Self::RuntimeOperation { operation_id } => operation_id,
96        }
97    }
98
99    pub fn session_id(&self) -> Option<&str> {
100        match self {
101            Self::Turn { session_id, .. }
102            | Self::QueueDrain { session_id, .. }
103            | Self::SessionDelete { session_id } => Some(session_id),
104            Self::Process { .. } | Self::RuntimeOperation { .. } => None,
105        }
106    }
107
108    pub fn turn_id(&self) -> Option<&str> {
109        match self {
110            Self::Turn { turn_id, .. } => Some(turn_id),
111            _ => None,
112        }
113    }
114
115    pub fn validates_turn_trace_id(&self) -> bool {
116        matches!(self, Self::Turn { .. })
117    }
118
119    fn validate(&self) -> Result<(), RuntimeError> {
120        let missing = match self {
121            Self::Turn {
122                session_id,
123                turn_id,
124            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
125            Self::Process { process_id } => process_id.trim().is_empty(),
126            Self::QueueDrain {
127                session_id,
128                drain_id,
129            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
130            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
131            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
132        };
133        if missing {
134            return Err(RuntimeError::new(
135                RuntimeErrorCode::MissingEffectScopeId,
136                "effect scopes require non-empty stable ids",
137            ));
138        }
139        Ok(())
140    }
141}
142
143enum ScopedEffectControllerInner<'run> {
144    Borrowed(&'run dyn RuntimeEffectController),
145    Shared(Arc<dyn RuntimeEffectController>),
146}
147
148impl Clone for ScopedEffectControllerInner<'_> {
149    fn clone(&self) -> Self {
150        match self {
151            Self::Borrowed(controller) => Self::Borrowed(*controller),
152            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
153        }
154    }
155}
156
157/// Scoped low-level controller plus the semantic effect scope it is serving.
158#[derive(Clone)]
159pub struct ScopedEffectController<'run> {
160    controller: ScopedEffectControllerInner<'run>,
161    scope: EffectScope,
162}
163
164impl<'run> ScopedEffectController<'run> {
165    pub fn borrowed(
166        controller: &'run dyn RuntimeEffectController,
167        scope: EffectScope,
168    ) -> Result<Self, RuntimeError> {
169        scope.validate()?;
170        Ok(Self {
171            controller: ScopedEffectControllerInner::Borrowed(controller),
172            scope,
173        })
174    }
175
176    pub fn shared(
177        controller: Arc<dyn RuntimeEffectController>,
178        scope: EffectScope,
179    ) -> Result<Self, RuntimeError> {
180        scope.validate()?;
181        Ok(Self {
182            controller: ScopedEffectControllerInner::Shared(controller),
183            scope,
184        })
185    }
186
187    pub fn controller(&self) -> &dyn RuntimeEffectController {
188        match &self.controller {
189            ScopedEffectControllerInner::Borrowed(controller) => *controller,
190            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
191        }
192    }
193
194    pub fn effect_scope(&self) -> &EffectScope {
195        &self.scope
196    }
197
198    pub fn scope_id(&self) -> &str {
199        self.scope.id()
200    }
201
202    pub fn turn_id(&self) -> Option<&str> {
203        self.scope.turn_id()
204    }
205}
206
207/// Deployment-level factory for scoped effect controllers.
208#[async_trait::async_trait]
209pub trait EffectHost: Send + Sync {
210    fn durability_tier(&self) -> crate::DurabilityTier {
211        crate::DurabilityTier::Inline
212    }
213
214    fn requires_durable_attachment_store(&self) -> bool {
215        false
216    }
217
218    fn scoped<'run>(
219        &'run self,
220        scope: EffectScope,
221    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
222
223    fn scoped_static(
224        &self,
225        _scope: EffectScope,
226    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
227        Ok(None)
228    }
229}
230
231/// Boundary for nondeterministic runtime work.
232#[async_trait::async_trait]
233pub trait RuntimeEffectController: Send + Sync {
234    /// Durability tier this controller provides; defaults to
235    /// [`DurabilityTier::Inline`].
236    fn durability_tier(&self) -> crate::DurabilityTier {
237        crate::DurabilityTier::Inline
238    }
239
240    fn requires_durable_attachment_store(&self) -> bool {
241        false
242    }
243
244    async fn execute_effect(
245        &self,
246        envelope: RuntimeEffectEnvelope,
247        local_executor: RuntimeEffectLocalExecutor<'_>,
248    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
249}
250
251/// Runtime-internal handle for effect-controller references carried through
252/// per-turn execution contexts.
253#[derive(Clone)]
254pub(crate) enum RuntimeEffectControllerHandle<'run> {
255    Borrowed(ScopedEffectController<'run>),
256    #[cfg(any(test, feature = "testing"))]
257    Shared {
258        controller: Arc<dyn RuntimeEffectController>,
259        scope: EffectScope,
260    },
261}
262
263impl<'run> RuntimeEffectControllerHandle<'run> {
264    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
265        Self::Borrowed(scoped)
266    }
267
268    #[cfg(any(test, feature = "testing"))]
269    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
270        Self::Shared {
271            controller,
272            scope: EffectScope::runtime_operation("test-runtime-effect-controller"),
273        }
274    }
275
276    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
277        match self {
278            Self::Borrowed(scoped) => scoped.controller(),
279            #[cfg(any(test, feature = "testing"))]
280            Self::Shared { controller, .. } => controller.as_ref(),
281        }
282    }
283
284    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
285        match self {
286            Self::Borrowed(scoped) => scoped.clone(),
287            #[cfg(any(test, feature = "testing"))]
288            Self::Shared { controller, scope } => {
289                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
290                    .expect("runtime effect controller handle carries a valid scope")
291            }
292        }
293    }
294
295    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
296        self.clone()
297    }
298}
299
300#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
301#[error("{code}: {message}")]
302pub struct RuntimeEffectControllerError {
303    pub code: String,
304    pub message: String,
305}
306
307impl RuntimeEffectControllerError {
308    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
309        Self {
310            code: code.into(),
311            message: message.into(),
312        }
313    }
314
315    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
316        Self::new(
317            "runtime_effect_wrong_outcome",
318            format!(
319                "expected {} outcome, got {}",
320                expected.as_str(),
321                actual.as_str()
322            ),
323        )
324    }
325
326    pub(crate) fn into_runtime_error(self) -> RuntimeError {
327        RuntimeError::new(self.code, self.message)
328    }
329}
330
331impl From<RuntimeError> for RuntimeEffectControllerError {
332    fn from(err: RuntimeError) -> Self {
333        Self::new(err.code.as_str(), err.message)
334    }
335}
336
337impl From<PluginError> for RuntimeEffectControllerError {
338    fn from(err: PluginError) -> Self {
339        Self::new("plugin", err.to_string())
340    }
341}
342
343impl From<crate::StoreError> for RuntimeEffectControllerError {
344    fn from(err: crate::StoreError) -> Self {
345        Self::new("runtime_store", err.to_string())
346    }
347}
348
349// =============================================================================
350// Local executor (per-effect borrowed runner state)
351// =============================================================================
352
353#[async_trait::async_trait]
354pub(crate) trait ProcessRunner: Send + Sync {
355    async fn run_process(
356        &self,
357        registration: crate::ProcessRegistration,
358        execution_context: crate::ProcessExecutionContext,
359        registry: Arc<dyn ProcessRegistry>,
360        scoped_effect_controller: crate::ScopedEffectController<'_>,
361        cancellation: CancellationToken,
362    ) -> crate::ProcessAwaitOutput;
363}
364
365pub struct ProcessLocalExecution {
366    pub registry: Arc<dyn ProcessRegistry>,
367}
368
369pub(super) struct LocalTurnEffectRunner<'a, 'run> {
370    driver: &'a mut RuntimeTurnDriver<'run>,
371    machine: &'a mut crate::TurnMachine,
372    event_tx: mpsc::Sender<RuntimeStreamEvent>,
373    cancellation: CancellationToken,
374}
375
376pub(super) struct LocalDirectEffectRunner {
377    provider: ProviderHandle,
378    attachment_store: Arc<dyn AttachmentStore>,
379}
380
381#[async_trait::async_trait]
382trait RuntimeEffectLocalRunner: Send {
383    async fn execute(
384        self: Box<Self>,
385        envelope: RuntimeEffectEnvelope,
386    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
387}
388
389#[cfg(any(test, feature = "testing"))]
390type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
391        RuntimeEffectEnvelope,
392    ) -> Pin<
393        Box<
394            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
395                + Send
396                + 'run,
397        >,
398    > + Send
399    + 'run;
400
401#[cfg(any(test, feature = "testing"))]
402struct TestingRuntimeEffectLocalRunner<'run> {
403    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
404}
405
406enum RuntimeEffectLocalExecutorState<'run> {
407    Unavailable,
408    SleepOnly {
409        cancellation: CancellationToken,
410    },
411    AwaitEvent {
412        key: String,
413        registry: Arc<dyn ProcessRegistry>,
414        process_id: String,
415        event_type: String,
416        event_ordinal: u64,
417        cancellation: CancellationToken,
418    },
419    Process(ProcessLocalExecution),
420    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
421}
422
423/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
424///
425/// Durable controllers may ignore it and replay their own recorded result. The
426/// default inline controller delegates to it, so local provider/tool/checkpoint
427/// work still crosses the same `execute_effect` boundary as durable controllers.
428pub struct RuntimeEffectLocalExecutor<'run> {
429    state: RuntimeEffectLocalExecutorState<'run>,
430}
431
432impl<'run> RuntimeEffectLocalExecutor<'run> {
433    pub fn unavailable() -> Self {
434        Self {
435            state: RuntimeEffectLocalExecutorState::Unavailable,
436        }
437    }
438
439    pub fn sleep(cancellation: CancellationToken) -> Self {
440        Self {
441            state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
442        }
443    }
444
445    pub fn await_process_event(
446        key: impl Into<String>,
447        registry: Arc<dyn ProcessRegistry>,
448        process_id: impl Into<String>,
449        event_type: impl Into<String>,
450        event_ordinal: u64,
451        cancellation: CancellationToken,
452    ) -> Self {
453        Self {
454            state: RuntimeEffectLocalExecutorState::AwaitEvent {
455                key: key.into(),
456                registry,
457                process_id: process_id.into(),
458                event_type: event_type.into(),
459                event_ordinal,
460                cancellation,
461            },
462        }
463    }
464
465    pub fn process_control(registry: Arc<dyn ProcessRegistry>) -> Self {
466        Self {
467            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
468        }
469    }
470
471    #[cfg(any(test, feature = "testing"))]
472    pub fn testing<F, Fut>(run: F) -> Self
473    where
474        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
475        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
476            + Send
477            + 'run,
478    {
479        Self {
480            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
481                TestingRuntimeEffectLocalRunner {
482                    run: Box::new(move |envelope| Box::pin(run(envelope))),
483                },
484            )),
485        }
486    }
487
488    pub(in crate::runtime) fn turn<'scope>(
489        driver: &'run mut RuntimeTurnDriver<'scope>,
490        machine: &'run mut crate::TurnMachine,
491        event_tx: mpsc::Sender<RuntimeStreamEvent>,
492        cancellation: CancellationToken,
493    ) -> Self
494    where
495        'scope: 'run,
496    {
497        Self {
498            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
499                driver,
500                machine,
501                event_tx,
502                cancellation,
503            })),
504        }
505    }
506
507    pub(in crate::runtime) fn direct(
508        provider: ProviderHandle,
509        attachment_store: Arc<dyn AttachmentStore>,
510    ) -> Self {
511        Self {
512            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
513                provider,
514                attachment_store,
515            })),
516        }
517    }
518
519    pub async fn execute(
520        self,
521        envelope: RuntimeEffectEnvelope,
522    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
523        match self.state {
524            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
525            RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
526                execute_local_sleep(envelope, cancellation).await
527            }
528            RuntimeEffectLocalExecutorState::AwaitEvent {
529                key,
530                registry,
531                process_id,
532                event_type,
533                event_ordinal,
534                cancellation,
535            } => {
536                execute_local_await_event(
537                    envelope,
538                    &key,
539                    registry,
540                    &process_id,
541                    &event_type,
542                    event_ordinal,
543                    cancellation,
544                )
545                .await
546            }
547            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
548                "runtime_effect_local_executor_unavailable",
549                format!(
550                    "no local executor is available for {}",
551                    envelope.command.kind().as_str()
552                ),
553            )),
554            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
555                "runtime_effect_local_executor_mismatch",
556                format!(
557                    "process executor cannot execute {} command directly",
558                    envelope.command.kind().as_str()
559                ),
560            )),
561        }
562    }
563
564    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
565        match self.state {
566            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
567            _ => Err(RuntimeEffectControllerError::new(
568                "runtime_effect_local_executor_unavailable",
569                "no process executor is available for process command",
570            )),
571        }
572    }
573}
574
575#[cfg(any(test, feature = "testing"))]
576#[async_trait::async_trait]
577impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
578    async fn execute(
579        self: Box<Self>,
580        envelope: RuntimeEffectEnvelope,
581    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
582        (self.run)(envelope).await
583    }
584}
585
586#[async_trait::async_trait]
587impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
588    async fn execute(
589        self: Box<Self>,
590        envelope: RuntimeEffectEnvelope,
591    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
592        let runner = *self;
593        match envelope.command {
594            RuntimeEffectCommand::LlmCall { request } => {
595                let protocol_iteration = runner.machine.protocol_iteration();
596                let (result, text_streamed) = runner
597                    .driver
598                    .run_llm_call(
599                        Arc::new((*request).into_request(None, None)),
600                        protocol_iteration,
601                        envelope.invocation,
602                        &runner.event_tx,
603                        &runner.cancellation,
604                    )
605                    .await;
606                Ok(RuntimeEffectOutcome::LlmCall {
607                    result,
608                    text_streamed,
609                })
610            }
611            RuntimeEffectCommand::ToolCall { call } => {
612                let tool_name = call.tool_name.clone();
613                let mut outcome = runner
614                    .driver
615                    .run_tool_calls(
616                        vec![(call, envelope.invocation)],
617                        &runner.event_tx,
618                        &runner.cancellation,
619                    )
620                    .await?;
621                let result = outcome.completed.pop().ok_or_else(|| {
622                    RuntimeEffectControllerError::new(
623                        "tool_result_missing",
624                        format!("tool `{tool_name}` completed without a result"),
625                    )
626                })?;
627                Ok(RuntimeEffectOutcome::ToolCall {
628                    result,
629                    host_events: outcome.host_events,
630                })
631            }
632            RuntimeEffectCommand::ExecCode { code } => {
633                let protocol_iteration = runner.machine.protocol_iteration();
634                let messages = runner.machine.message_sequence();
635                Ok(RuntimeEffectOutcome::ExecCode {
636                    result: runner
637                        .driver
638                        .run_exec_code(
639                            &code,
640                            messages,
641                            protocol_iteration,
642                            envelope.invocation,
643                            &runner.event_tx,
644                        )
645                        .await,
646                })
647            }
648            RuntimeEffectCommand::Checkpoint { checkpoint } => {
649                Ok(RuntimeEffectOutcome::Checkpoint {
650                    result: runner
651                        .driver
652                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
653                        .await
654                        .map_err(RuntimeEffectControllerError::from),
655                })
656            }
657            RuntimeEffectCommand::SyncExecutionSurface {
658                update_machine_config,
659            } => Ok(RuntimeEffectOutcome::SyncExecutionSurface {
660                result: runner
661                    .driver
662                    .refresh_execution_surface(runner.machine, update_machine_config)
663                    .await
664                    .map_err(|err| err.to_string()),
665            }),
666            RuntimeEffectCommand::Sleep { duration_ms } => {
667                sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
668                Ok(RuntimeEffectOutcome::Sleep)
669            }
670            command => Err(RuntimeEffectControllerError::new(
671                "runtime_effect_local_executor_mismatch",
672                format!(
673                    "local turn executor cannot execute {} command",
674                    command.kind().as_str()
675                ),
676            )),
677        }
678    }
679}
680
681#[async_trait::async_trait]
682impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
683    async fn execute(
684        mut self: Box<Self>,
685        envelope: RuntimeEffectEnvelope,
686    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
687        match envelope.command {
688            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
689                result: self
690                    .run_direct_llm_request((*request).into_request(
691                        crate::session_model::transport_stream_events(&self.provider, None),
692                        None,
693                    ))
694                    .await,
695            }),
696            RuntimeEffectCommand::Sleep { duration_ms } => {
697                sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
698                Ok(RuntimeEffectOutcome::Sleep)
699            }
700            command => Err(RuntimeEffectControllerError::new(
701                "runtime_effect_local_executor_mismatch",
702                format!(
703                    "local direct executor cannot execute {} command",
704                    command.kind().as_str()
705                ),
706            )),
707        }
708    }
709}
710
711impl LocalDirectEffectRunner {
712    async fn run_direct_llm_request(
713        &mut self,
714        request: CoreLlmRequest,
715    ) -> Result<LlmResponse, LlmCallError> {
716        let request = crate::attachments::resolve_llm_request_attachments(
717            request,
718            self.attachment_store.as_ref(),
719        )
720        .await
721        .map_err(|err| LlmCallError {
722            message: err.to_string(),
723            retryable: false,
724            raw: None,
725            code: Some("attachment_resolution_failed".to_string()),
726            terminal_reason: crate::LlmTerminalReason::ProviderError,
727            request_body: None,
728        })?;
729        self.provider
730            .complete(request)
731            .await
732            .map_err(llm_call_error_from_transport)
733    }
734}
735
736async fn execute_local_sleep(
737    envelope: RuntimeEffectEnvelope,
738    cancellation: CancellationToken,
739) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
740    match envelope.command {
741        RuntimeEffectCommand::Sleep { duration_ms } => {
742            sleep_with_cancellation(duration_ms, &cancellation).await?;
743            Ok(RuntimeEffectOutcome::Sleep)
744        }
745        command => Err(RuntimeEffectControllerError::new(
746            "runtime_effect_local_executor_mismatch",
747            format!(
748                "local sleep executor cannot execute {} command",
749                command.kind().as_str()
750            ),
751        )),
752    }
753}
754
755async fn execute_local_await_event(
756    envelope: RuntimeEffectEnvelope,
757    expected_key: &str,
758    registry: Arc<dyn ProcessRegistry>,
759    process_id: &str,
760    event_type: &str,
761    event_ordinal: u64,
762    cancellation: CancellationToken,
763) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
764    match envelope.command {
765        RuntimeEffectCommand::AwaitEvent { key } if key == expected_key => {
766            if event_ordinal == 0 {
767                return Err(RuntimeEffectControllerError::new(
768                    "runtime_effect_await_event_invalid_ordinal",
769                    "event ordinal must be one-based",
770                ));
771            }
772            let mut after_sequence = 0;
773            let mut matching_count = 0;
774            let event = loop {
775                let wait = registry.wait_event_after(process_id, event_type, after_sequence);
776                tokio::pin!(wait);
777                let event = tokio::select! {
778                    _ = cancellation.cancelled() => {
779                        return Err(RuntimeEffectControllerError::new(
780                            "runtime_effect_await_event_cancelled",
781                            "runtime effect event wait was cancelled",
782                        ));
783                    }
784                    event = &mut wait => event?,
785                };
786                matching_count += 1;
787                if matching_count == event_ordinal {
788                    break event;
789                }
790                after_sequence = event.sequence;
791            };
792            Ok(RuntimeEffectOutcome::AwaitEvent {
793                payload: event.payload,
794            })
795        }
796        RuntimeEffectCommand::AwaitEvent { key } => Err(RuntimeEffectControllerError::new(
797            "runtime_effect_await_event_key_mismatch",
798            format!("local event wait expected `{expected_key}`, got `{key}`"),
799        )),
800        command => Err(RuntimeEffectControllerError::new(
801            "runtime_effect_local_executor_mismatch",
802            format!(
803                "local event wait executor cannot execute {} command",
804                command.kind().as_str()
805            ),
806        )),
807    }
808}
809
810async fn sleep_with_cancellation(
811    duration_ms: u64,
812    cancellation: &CancellationToken,
813) -> Result<(), RuntimeEffectControllerError> {
814    let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
815    tokio::pin!(sleep);
816    tokio::select! {
817        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
818            "runtime_effect_sleep_cancelled",
819            "runtime effect sleep was cancelled",
820        )),
821        _ = &mut sleep => Ok(()),
822    }
823}
824
825// =============================================================================
826// Default in-process effect controller
827// =============================================================================
828
829/// Default in-process effect controller.
830///
831/// Stateless: the inline controller only registers process rows; the
832/// lease-protected [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole
833/// executor.
834#[derive(Clone, Default)]
835pub struct InlineRuntimeEffectController;
836
837#[async_trait::async_trait]
838impl RuntimeEffectController for InlineRuntimeEffectController {
839    async fn execute_effect(
840        &self,
841        envelope: RuntimeEffectEnvelope,
842        local_executor: RuntimeEffectLocalExecutor<'_>,
843    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
844        match envelope.command {
845            RuntimeEffectCommand::Process { command } => {
846                let execution = local_executor.into_process()?;
847                let registry = execution.registry;
848                let result = tokio::task::spawn(async move {
849                    Self::execute_process_command(registry, *command).await
850                })
851                .await
852                .map_err(|err| {
853                    RuntimeEffectControllerError::new(
854                        "runtime_effect_process_task_join",
855                        format!("inline process effect task failed: {err}"),
856                    )
857                })??;
858                Ok(RuntimeEffectOutcome::Process { result })
859            }
860            _ => local_executor.execute(envelope).await,
861        }
862    }
863}
864
865/// In-process deployment effect host.
866#[derive(Clone)]
867pub struct InlineEffectHost {
868    controller: Arc<dyn RuntimeEffectController>,
869}
870
871impl InlineEffectHost {
872    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
873        Self { controller }
874    }
875}
876
877impl Default for InlineEffectHost {
878    fn default() -> Self {
879        Self::new(Arc::new(InlineRuntimeEffectController))
880    }
881}
882
883impl EffectHost for InlineEffectHost {
884    fn durability_tier(&self) -> crate::DurabilityTier {
885        self.controller.durability_tier()
886    }
887
888    fn requires_durable_attachment_store(&self) -> bool {
889        self.controller.requires_durable_attachment_store()
890    }
891
892    fn scoped<'run>(
893        &'run self,
894        scope: EffectScope,
895    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
896        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
897    }
898
899    fn scoped_static(
900        &self,
901        scope: EffectScope,
902    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
903        Ok(Some(ScopedEffectController::shared(
904            Arc::clone(&self.controller),
905            scope,
906        )?))
907    }
908}
909
910impl InlineRuntimeEffectController {
911    /// Register the process (and any handle grant) into the durable registry.
912    ///
913    /// The inline controller no longer runs the process here: the registry's
914    /// non-terminal row *is* the durable work queue, and the lease-protected
915    /// [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole executor. The
916    /// control seam pokes that runner after a successful start, so registering
917    /// the row is all this path does.
918    pub(crate) async fn start_process(
919        registry: Arc<dyn crate::ProcessRegistry>,
920        registration: crate::ProcessRegistration,
921        grant: Option<crate::ProcessStartGrant>,
922    ) -> Result<ProcessRecord, PluginError> {
923        let registration_for_record = registration.clone();
924        let record = registry.register_process(registration_for_record).await?;
925        if let Some(grant) = grant {
926            registry
927                .grant_handle(&grant.session_scope, &registration.id, grant.descriptor)
928                .await?;
929        }
930        Ok(record)
931    }
932
933    pub(crate) async fn request_process_cancel(
934        &self,
935        registry: Arc<dyn crate::ProcessRegistry>,
936        process_id: &str,
937        reason: Option<String>,
938    ) -> Result<ProcessRecord, PluginError> {
939        // Cancellation is a durable signal: the cancel event is what the
940        // runner-run process observes, so the inline controller appends it and
941        // no longer tracks an in-process cancellation token.
942        registry
943            .append_event(
944                process_id,
945                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
946            )
947            .await?;
948        registry
949            .get_process(process_id)
950            .await
951            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
952    }
953
954    async fn execute_process_command(
955        registry: Arc<dyn crate::ProcessRegistry>,
956        command: ProcessCommand,
957    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
958        match command {
959            ProcessCommand::Start {
960                registration,
961                grant,
962                execution_context: _,
963            } => {
964                let record = Self::start_process(registry, registration, grant).await?;
965                Ok(ProcessEffectOutcome::Start { record })
966            }
967            ProcessCommand::List {
968                session_scope,
969                mode,
970            } => {
971                let entries = match mode {
972                    crate::ProcessListMode::Live => {
973                        registry.list_live_handle_grants(&session_scope).await?
974                    }
975                    crate::ProcessListMode::All => {
976                        registry.list_handle_grants(&session_scope).await?
977                    }
978                };
979                Ok(ProcessEffectOutcome::List { entries })
980            }
981            ProcessCommand::Transfer {
982                from_scope,
983                to_scope,
984                process_ids,
985            } => {
986                registry
987                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
988                    .await?;
989                Ok(ProcessEffectOutcome::Transfer)
990            }
991            ProcessCommand::DeleteSession { session_id } => {
992                let report = registry.delete_session_process_state(&session_id).await?;
993                Ok(ProcessEffectOutcome::DeleteSession { report })
994            }
995            ProcessCommand::Await { process_id } => {
996                let output = registry.await_process(&process_id).await?;
997                Ok(ProcessEffectOutcome::Await { output })
998            }
999            ProcessCommand::Cancel { process_id, reason } => {
1000                let record = InlineRuntimeEffectController
1001                    .request_process_cancel(registry, &process_id, reason)
1002                    .await?;
1003                Ok(ProcessEffectOutcome::Cancel { record })
1004            }
1005            ProcessCommand::Signal {
1006                process_id,
1007                request,
1008                ..
1009            } => {
1010                let result = registry.append_event(&process_id, request).await?;
1011                Ok(ProcessEffectOutcome::Signal {
1012                    event: result.event,
1013                })
1014            }
1015        }
1016    }
1017}
1018
1019impl std::fmt::Debug for InlineRuntimeEffectController {
1020    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1021        f.debug_struct("InlineRuntimeEffectController").finish()
1022    }
1023}