Skip to main content

lash_core/runtime/effect/
executor.rs

1use std::collections::{HashMap, HashSet};
2#[cfg(any(test, feature = "testing"))]
3use std::pin::Pin;
4use std::sync::{Arc, OnceLock};
5use std::time::Instant;
6
7use hmac::{Hmac, Mac};
8use serde::{Deserialize, Serialize};
9use tokio::sync::{Notify, mpsc};
10use tokio_util::sync::CancellationToken;
11
12use crate::AttachmentStore;
13use crate::LlmRequest as CoreLlmRequest;
14use crate::LlmResponse;
15use crate::ProcessRecord;
16use crate::ProcessRegistry;
17use crate::provider::ProviderHandle;
18use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
19use crate::sansio::LlmCallError;
20use crate::{PluginError, RuntimeError, RuntimeErrorCode};
21
22use super::envelope::{
23    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
24    RuntimeEffectKind, RuntimeEffectOutcome,
25};
26use super::outcome::llm_call_error_from_transport;
27
28type HmacSha256 = Hmac<sha2::Sha256>;
29
30fn inline_await_events() -> &'static AwaitEventRegistry {
31    static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
32    REGISTRY.get_or_init(AwaitEventRegistry::new)
33}
34
35// =============================================================================
36// Effect host + controller trait + scope + error
37// =============================================================================
38
39/// Stable semantic identity for one effectful runtime operation.
40///
41/// The scope is chosen by the host boundary before any nondeterministic work is
42/// planned. It is intentionally generic: Restate, an inline test host, or a
43/// future durable effect host all receive the same Lash scope vocabulary.
44#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum ExecutionScope {
47    Turn {
48        session_id: String,
49        turn_id: String,
50    },
51    Process {
52        process_id: String,
53    },
54    QueueDrain {
55        session_id: String,
56        drain_id: String,
57    },
58    SessionDelete {
59        session_id: String,
60    },
61    RuntimeOperation {
62        operation_id: String,
63    },
64}
65
66impl ExecutionScope {
67    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
68        Self::Turn {
69            session_id: session_id.into(),
70            turn_id: turn_id.into(),
71        }
72    }
73
74    pub fn process(process_id: impl Into<String>) -> Self {
75        Self::Process {
76            process_id: process_id.into(),
77        }
78    }
79
80    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
81        Self::QueueDrain {
82            session_id: session_id.into(),
83            drain_id: drain_id.into(),
84        }
85    }
86
87    pub fn session_delete(session_id: impl Into<String>) -> Self {
88        Self::SessionDelete {
89            session_id: session_id.into(),
90        }
91    }
92
93    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
94        Self::RuntimeOperation {
95            operation_id: operation_id.into(),
96        }
97    }
98
99    pub fn id(&self) -> &str {
100        match self {
101            Self::Turn { turn_id, .. } => turn_id,
102            Self::Process { process_id } => process_id,
103            Self::QueueDrain { drain_id, .. } => drain_id,
104            Self::SessionDelete { session_id } => session_id,
105            Self::RuntimeOperation { operation_id } => operation_id,
106        }
107    }
108
109    pub fn session_id(&self) -> Option<&str> {
110        match self {
111            Self::Turn { session_id, .. }
112            | Self::QueueDrain { session_id, .. }
113            | Self::SessionDelete { session_id } => Some(session_id),
114            Self::Process { .. } | Self::RuntimeOperation { .. } => None,
115        }
116    }
117
118    pub fn turn_id(&self) -> Option<&str> {
119        match self {
120            Self::Turn { turn_id, .. } => Some(turn_id),
121            _ => None,
122        }
123    }
124
125    pub fn validates_turn_trace_id(&self) -> bool {
126        matches!(self, Self::Turn { .. })
127    }
128
129    fn validate(&self) -> Result<(), RuntimeError> {
130        let missing = match self {
131            Self::Turn {
132                session_id,
133                turn_id,
134            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
135            Self::Process { process_id } => process_id.trim().is_empty(),
136            Self::QueueDrain {
137                session_id,
138                drain_id,
139            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
140            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
141            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
142        };
143        if missing {
144            return Err(RuntimeError::new(
145                RuntimeErrorCode::MissingExecutionScopeId,
146                "execution scopes require non-empty stable ids",
147            ));
148        }
149        Ok(())
150    }
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
154#[serde(tag = "type", rename_all = "snake_case")]
155pub enum AwaitEventWaitIdentity {
156    ToolCompletion {
157        tool_call_id: String,
158    },
159    ProcessSignal {
160        process_id: String,
161        signal_name: String,
162        ordinal: u64,
163    },
164    Custom {
165        key: String,
166    },
167}
168
169impl AwaitEventWaitIdentity {
170    pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
171        Self::ToolCompletion {
172            tool_call_id: tool_call_id.into(),
173        }
174    }
175
176    pub fn process_signal(
177        process_id: impl Into<String>,
178        signal_name: impl Into<String>,
179        ordinal: u64,
180    ) -> Self {
181        Self::ProcessSignal {
182            process_id: process_id.into(),
183            signal_name: signal_name.into(),
184            ordinal,
185        }
186    }
187
188    fn validate(&self) -> Result<(), RuntimeError> {
189        let invalid = match self {
190            Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
191            Self::ProcessSignal {
192                process_id,
193                signal_name,
194                ordinal,
195            } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
196            Self::Custom { key } => key.trim().is_empty(),
197        };
198        if invalid {
199            return Err(RuntimeError::new(
200                "invalid_await_event_wait_identity",
201                "await-event wait identity requires non-empty stable ids",
202            ));
203        }
204        Ok(())
205    }
206}
207
208#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
209pub struct AwaitEventKey {
210    pub scope: ExecutionScope,
211    pub wait: AwaitEventWaitIdentity,
212    pub key_id: String,
213    pub signature: String,
214}
215
216impl AwaitEventKey {
217    pub fn promise_key(&self) -> String {
218        format!("lash-await-event:{}", self.key_id)
219    }
220}
221
222#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
223pub struct ExternalCompletionError {
224    pub code: String,
225    pub message: String,
226    #[serde(default, skip_serializing_if = "Option::is_none")]
227    pub raw: Option<serde_json::Value>,
228}
229
230impl ExternalCompletionError {
231    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
232        Self {
233            code: code.into(),
234            message: message.into(),
235            raw: None,
236        }
237    }
238}
239
240#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
241#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
242pub enum Resolution {
243    Ok(serde_json::Value),
244    Err(ExternalCompletionError),
245    Timeout,
246    Cancelled,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "status", rename_all = "snake_case")]
251pub enum ResolveOutcome {
252    Accepted,
253    AlreadyResolved { terminal: Resolution },
254    UnknownOrRevoked,
255}
256
257#[derive(Debug)]
258struct AwaitEventEntry {
259    terminal: Option<Resolution>,
260    notify: Arc<Notify>,
261}
262
263#[derive(Debug)]
264struct AwaitEventRegistryState {
265    entries: HashMap<String, AwaitEventEntry>,
266    revoked_key_ids: HashSet<String>,
267    revoked_session_ids: HashSet<String>,
268}
269
270#[derive(Debug)]
271struct AwaitEventRegistry {
272    secret: Vec<u8>,
273    state: std::sync::Mutex<AwaitEventRegistryState>,
274}
275
276impl AwaitEventRegistry {
277    fn new() -> Self {
278        Self {
279            secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
280            state: std::sync::Mutex::new(AwaitEventRegistryState {
281                entries: HashMap::new(),
282                revoked_key_ids: HashSet::new(),
283                revoked_session_ids: HashSet::new(),
284            }),
285        }
286    }
287
288    fn key_for(
289        &self,
290        scope: &ExecutionScope,
291        wait: AwaitEventWaitIdentity,
292    ) -> Result<AwaitEventKey, RuntimeError> {
293        scope.validate()?;
294        wait.validate()?;
295        let key_id =
296            crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
297                RuntimeError::new(
298                    "await_event_key_hash",
299                    format!("failed to hash await-event identity: {err}"),
300                )
301            })?;
302        let signature = self.signature(scope, &wait, &key_id)?;
303        Ok(AwaitEventKey {
304            scope: scope.clone(),
305            wait,
306            key_id,
307            signature,
308        })
309    }
310
311    fn signature(
312        &self,
313        scope: &ExecutionScope,
314        wait: &AwaitEventWaitIdentity,
315        key_id: &str,
316    ) -> Result<String, RuntimeError> {
317        let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
318            RuntimeError::new(
319                "await_event_key_sign",
320                format!("failed to initialize await-event key signer: {err}"),
321            )
322        })?;
323        let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
324            RuntimeError::new(
325                "await_event_key_sign",
326                format!("failed to serialize await-event key identity: {err}"),
327            )
328        })?;
329        mac.update(&canonical);
330        Ok(format!("{:x}", mac.finalize().into_bytes()))
331    }
332
333    fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
334        let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
335        Ok(expected == key.signature)
336    }
337
338    fn resolve(
339        &self,
340        key: &AwaitEventKey,
341        resolution: Resolution,
342    ) -> Result<ResolveOutcome, RuntimeError> {
343        if !self.verify(key)? {
344            return Ok(ResolveOutcome::UnknownOrRevoked);
345        }
346        let mut state = self.state.lock().map_err(|_| {
347            RuntimeError::new(
348                "await_event_registry_poisoned",
349                "await-event registry lock poisoned",
350            )
351        })?;
352        if state.revoked_key_ids.contains(&key.key_id)
353            || key
354                .scope
355                .session_id()
356                .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
357        {
358            return Ok(ResolveOutcome::UnknownOrRevoked);
359        }
360        let entry = state
361            .entries
362            .entry(key.key_id.clone())
363            .or_insert_with(|| AwaitEventEntry {
364                terminal: None,
365                notify: Arc::new(Notify::new()),
366            });
367        if let Some(terminal) = &entry.terminal {
368            return Ok(ResolveOutcome::AlreadyResolved {
369                terminal: terminal.clone(),
370            });
371        }
372        entry.terminal = Some(resolution);
373        entry.notify.notify_waiters();
374        Ok(ResolveOutcome::Accepted)
375    }
376
377    async fn await_resolution(
378        &self,
379        key: &AwaitEventKey,
380        cancel: CancellationToken,
381        deadline: Option<Instant>,
382    ) -> Result<Resolution, RuntimeError> {
383        if !self.verify(key)? {
384            return Err(RuntimeError::new(
385                "await_event_unknown_or_revoked",
386                "await-event key is invalid or revoked",
387            ));
388        }
389        loop {
390            let notify =
391                {
392                    let mut state = self.state.lock().map_err(|_| {
393                        RuntimeError::new(
394                            "await_event_registry_poisoned",
395                            "await-event registry lock poisoned",
396                        )
397                    })?;
398                    if state.revoked_key_ids.contains(&key.key_id)
399                        || key.scope.session_id().is_some_and(|session_id| {
400                            state.revoked_session_ids.contains(session_id)
401                        })
402                    {
403                        return Err(RuntimeError::new(
404                            "await_event_unknown_or_revoked",
405                            "await-event key is invalid or revoked",
406                        ));
407                    }
408                    let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
409                        AwaitEventEntry {
410                            terminal: None,
411                            notify: Arc::new(Notify::new()),
412                        }
413                    });
414                    if let Some(terminal) = entry.terminal.clone() {
415                        return Ok(terminal);
416                    }
417                    Arc::clone(&entry.notify)
418                };
419            if let Some(deadline) = deadline {
420                tokio::select! {
421                    _ = cancel.cancelled() => {
422                        let _ = self.resolve(key, Resolution::Cancelled)?;
423                    }
424                    _ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
425                        let _ = self.resolve(key, Resolution::Timeout)?;
426                    }
427                    _ = notify.notified() => {}
428                }
429            } else {
430                tokio::select! {
431                    _ = cancel.cancelled() => {
432                        let _ = self.resolve(key, Resolution::Cancelled)?;
433                    }
434                    _ = notify.notified() => {}
435                }
436            }
437        }
438    }
439
440    fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
441        let mut state = self.state.lock().map_err(|_| {
442            RuntimeError::new(
443                "await_event_registry_poisoned",
444                "await-event registry lock poisoned",
445            )
446        })?;
447        state.revoked_session_ids.insert(session_id.to_string());
448        for entry in state.entries.values() {
449            entry.notify.notify_waiters();
450        }
451        Ok(())
452    }
453}
454
455enum ScopedEffectControllerInner<'run> {
456    Borrowed(&'run dyn RuntimeEffectController),
457    Shared(Arc<dyn RuntimeEffectController>),
458}
459
460impl Clone for ScopedEffectControllerInner<'_> {
461    fn clone(&self) -> Self {
462        match self {
463            Self::Borrowed(controller) => Self::Borrowed(*controller),
464            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
465        }
466    }
467}
468
469/// Scoped low-level controller plus the semantic execution scope it is serving.
470#[derive(Clone)]
471pub struct ScopedEffectController<'run> {
472    controller: ScopedEffectControllerInner<'run>,
473    scope: ExecutionScope,
474}
475
476impl<'run> ScopedEffectController<'run> {
477    pub fn borrowed(
478        controller: &'run dyn RuntimeEffectController,
479        scope: ExecutionScope,
480    ) -> Result<Self, RuntimeError> {
481        scope.validate()?;
482        Ok(Self {
483            controller: ScopedEffectControllerInner::Borrowed(controller),
484            scope,
485        })
486    }
487
488    pub fn shared(
489        controller: Arc<dyn RuntimeEffectController>,
490        scope: ExecutionScope,
491    ) -> Result<Self, RuntimeError> {
492        scope.validate()?;
493        Ok(Self {
494            controller: ScopedEffectControllerInner::Shared(controller),
495            scope,
496        })
497    }
498
499    pub fn controller(&self) -> &dyn RuntimeEffectController {
500        match &self.controller {
501            ScopedEffectControllerInner::Borrowed(controller) => *controller,
502            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
503        }
504    }
505
506    pub fn execution_scope(&self) -> &ExecutionScope {
507        &self.scope
508    }
509
510    pub fn scope_id(&self) -> &str {
511        self.scope.id()
512    }
513
514    pub fn turn_id(&self) -> Option<&str> {
515        self.scope.turn_id()
516    }
517}
518
519/// Deployment-level factory for scoped effect controllers.
520#[async_trait::async_trait]
521pub trait EffectHost: Send + Sync {
522    fn durability_tier(&self) -> crate::DurabilityTier {
523        crate::DurabilityTier::Inline
524    }
525
526    fn requires_durable_attachment_store(&self) -> bool {
527        false
528    }
529
530    fn scoped<'run>(
531        &'run self,
532        scope: ExecutionScope,
533    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
534
535    fn scoped_static(
536        &self,
537        _scope: ExecutionScope,
538    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
539        Ok(None)
540    }
541
542    async fn await_event_key(
543        &self,
544        _scope: &ExecutionScope,
545        _wait: AwaitEventWaitIdentity,
546    ) -> Result<AwaitEventKey, RuntimeError> {
547        Err(RuntimeError::new(
548            "await_event_unsupported",
549            "this effect host does not support await-event keys",
550        ))
551    }
552
553    async fn resolve_await_event(
554        &self,
555        _key: &AwaitEventKey,
556        _resolution: Resolution,
557    ) -> Result<ResolveOutcome, RuntimeError> {
558        Ok(ResolveOutcome::UnknownOrRevoked)
559    }
560
561    async fn await_await_event(
562        &self,
563        _key: &AwaitEventKey,
564        _cancel: CancellationToken,
565        _deadline: Option<Instant>,
566    ) -> Result<Resolution, RuntimeError> {
567        Err(RuntimeError::new(
568            "await_event_unsupported",
569            "this effect host does not support await-event waits",
570        ))
571    }
572
573    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
574        Ok(())
575    }
576}
577
578/// Boundary for nondeterministic runtime work.
579#[async_trait::async_trait]
580pub trait RuntimeEffectController: Send + Sync {
581    /// Durability tier this controller provides; defaults to
582    /// [`DurabilityTier::Inline`].
583    fn durability_tier(&self) -> crate::DurabilityTier {
584        crate::DurabilityTier::Inline
585    }
586
587    fn requires_durable_attachment_store(&self) -> bool {
588        false
589    }
590
591    async fn execute_effect(
592        &self,
593        envelope: RuntimeEffectEnvelope,
594        local_executor: RuntimeEffectLocalExecutor<'_>,
595    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
596
597    async fn await_event_key(
598        &self,
599        _scope: &ExecutionScope,
600        _wait: AwaitEventWaitIdentity,
601    ) -> Result<AwaitEventKey, RuntimeError> {
602        Err(RuntimeError::new(
603            "await_event_unsupported",
604            "this effect controller does not support await-event keys",
605        ))
606    }
607
608    async fn resolve_await_event(
609        &self,
610        _key: &AwaitEventKey,
611        _resolution: Resolution,
612    ) -> Result<ResolveOutcome, RuntimeError> {
613        Ok(ResolveOutcome::UnknownOrRevoked)
614    }
615
616    async fn await_await_event(
617        &self,
618        _key: &AwaitEventKey,
619        _cancel: CancellationToken,
620        _deadline: Option<Instant>,
621    ) -> Result<Resolution, RuntimeError> {
622        Err(RuntimeError::new(
623            "await_event_unsupported",
624            "this effect controller does not support await-event waits",
625        ))
626    }
627
628    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
629        Ok(())
630    }
631}
632
633/// Runtime-internal handle for effect-controller references carried through
634/// per-turn execution contexts.
635#[derive(Clone)]
636pub(crate) enum RuntimeEffectControllerHandle<'run> {
637    Borrowed(ScopedEffectController<'run>),
638    #[cfg(any(test, feature = "testing"))]
639    Shared {
640        controller: Arc<dyn RuntimeEffectController>,
641        scope: ExecutionScope,
642    },
643}
644
645impl<'run> RuntimeEffectControllerHandle<'run> {
646    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
647        Self::Borrowed(scoped)
648    }
649
650    #[cfg(any(test, feature = "testing"))]
651    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
652        Self::Shared {
653            controller,
654            scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
655        }
656    }
657
658    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
659        match self {
660            Self::Borrowed(scoped) => scoped.controller(),
661            #[cfg(any(test, feature = "testing"))]
662            Self::Shared { controller, .. } => controller.as_ref(),
663        }
664    }
665
666    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
667        match self {
668            Self::Borrowed(scoped) => scoped.clone(),
669            #[cfg(any(test, feature = "testing"))]
670            Self::Shared { controller, scope } => {
671                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
672                    .expect("runtime effect controller handle carries a valid scope")
673            }
674        }
675    }
676
677    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
678        self.clone()
679    }
680}
681
682#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
683#[error("{code}: {message}")]
684pub struct RuntimeEffectControllerError {
685    pub code: String,
686    pub message: String,
687}
688
689impl RuntimeEffectControllerError {
690    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
691        Self {
692            code: code.into(),
693            message: message.into(),
694        }
695    }
696
697    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
698        Self::new(
699            "runtime_effect_wrong_outcome",
700            format!(
701                "expected {} outcome, got {}",
702                expected.as_str(),
703                actual.as_str()
704            ),
705        )
706    }
707
708    pub(crate) fn into_runtime_error(self) -> RuntimeError {
709        RuntimeError::new(self.code, self.message)
710    }
711}
712
713impl From<RuntimeError> for RuntimeEffectControllerError {
714    fn from(err: RuntimeError) -> Self {
715        Self::new(err.code.as_str(), err.message)
716    }
717}
718
719impl From<PluginError> for RuntimeEffectControllerError {
720    fn from(err: PluginError) -> Self {
721        Self::new("plugin", err.to_string())
722    }
723}
724
725impl From<crate::StoreError> for RuntimeEffectControllerError {
726    fn from(err: crate::StoreError) -> Self {
727        Self::new("runtime_store", err.to_string())
728    }
729}
730
731// =============================================================================
732// Local executor (per-effect borrowed runner state)
733// =============================================================================
734
735#[async_trait::async_trait]
736pub(crate) trait ProcessRunner: Send + Sync {
737    async fn run_process(
738        &self,
739        registration: crate::ProcessRegistration,
740        execution_context: crate::ProcessExecutionContext,
741        registry: Arc<dyn ProcessRegistry>,
742        scoped_effect_controller: crate::ScopedEffectController<'_>,
743        cancellation: CancellationToken,
744    ) -> crate::ProcessAwaitOutput;
745}
746
747pub struct ProcessLocalExecution {
748    pub registry: Arc<dyn ProcessRegistry>,
749}
750
751pub(super) struct LocalTurnEffectRunner<'a, 'run> {
752    driver: &'a mut RuntimeTurnDriver<'run>,
753    machine: &'a mut crate::TurnMachine,
754    event_tx: mpsc::Sender<RuntimeStreamEvent>,
755    cancellation: CancellationToken,
756}
757
758pub(super) struct LocalDirectEffectRunner {
759    provider: ProviderHandle,
760    attachment_store: Arc<dyn AttachmentStore>,
761}
762
763#[async_trait::async_trait]
764trait RuntimeEffectLocalRunner: Send {
765    async fn execute(
766        self: Box<Self>,
767        envelope: RuntimeEffectEnvelope,
768    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
769}
770
771#[cfg(any(test, feature = "testing"))]
772type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
773        RuntimeEffectEnvelope,
774    ) -> Pin<
775        Box<
776            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
777                + Send
778                + 'run,
779        >,
780    > + Send
781    + 'run;
782
783#[cfg(any(test, feature = "testing"))]
784struct TestingRuntimeEffectLocalRunner<'run> {
785    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
786}
787
788enum RuntimeEffectLocalExecutorState<'run> {
789    Unavailable,
790    SleepOnly {
791        cancellation: CancellationToken,
792    },
793    ExternalWaitOptions {
794        cancellation: CancellationToken,
795        deadline: Option<Instant>,
796    },
797    Process(ProcessLocalExecution),
798    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
799}
800
801/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
802///
803/// Durable controllers may ignore it and replay their own recorded result. The
804/// default inline controller delegates to it, so local provider/tool/checkpoint
805/// work still crosses the same `execute_effect` boundary as durable controllers.
806pub struct RuntimeEffectLocalExecutor<'run> {
807    state: RuntimeEffectLocalExecutorState<'run>,
808}
809
810impl<'run> RuntimeEffectLocalExecutor<'run> {
811    pub fn unavailable() -> Self {
812        Self {
813            state: RuntimeEffectLocalExecutorState::Unavailable,
814        }
815    }
816
817    pub fn sleep(cancellation: CancellationToken) -> Self {
818        Self {
819            state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
820        }
821    }
822
823    pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
824        Self {
825            state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
826                cancellation,
827                deadline,
828            },
829        }
830    }
831
832    pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
833        Self {
834            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
835        }
836    }
837
838    #[cfg(any(test, feature = "testing"))]
839    pub fn testing<F, Fut>(run: F) -> Self
840    where
841        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
842        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
843            + Send
844            + 'run,
845    {
846        Self {
847            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
848                TestingRuntimeEffectLocalRunner {
849                    run: Box::new(move |envelope| Box::pin(run(envelope))),
850                },
851            )),
852        }
853    }
854
855    pub(in crate::runtime) fn turn<'scope>(
856        driver: &'run mut RuntimeTurnDriver<'scope>,
857        machine: &'run mut crate::TurnMachine,
858        event_tx: mpsc::Sender<RuntimeStreamEvent>,
859        cancellation: CancellationToken,
860    ) -> Self
861    where
862        'scope: 'run,
863    {
864        Self {
865            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
866                driver,
867                machine,
868                event_tx,
869                cancellation,
870            })),
871        }
872    }
873
874    pub(in crate::runtime) fn direct(
875        provider: ProviderHandle,
876        attachment_store: Arc<dyn AttachmentStore>,
877    ) -> Self {
878        Self {
879            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
880                provider,
881                attachment_store,
882            })),
883        }
884    }
885
886    pub async fn execute(
887        self,
888        envelope: RuntimeEffectEnvelope,
889    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
890        match self.state {
891            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
892            RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
893                execute_local_sleep(envelope, cancellation).await
894            }
895            RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
896                Err(RuntimeEffectControllerError::new(
897                    "runtime_effect_local_executor_mismatch",
898                    format!(
899                        "local await-event options cannot execute {} command directly",
900                        envelope.command.kind().as_str()
901                    ),
902                ))
903            }
904            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
905                "runtime_effect_local_executor_unavailable",
906                format!(
907                    "no local executor is available for {}",
908                    envelope.command.kind().as_str()
909                ),
910            )),
911            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
912                "runtime_effect_local_executor_mismatch",
913                format!(
914                    "process executor cannot execute {} command directly",
915                    envelope.command.kind().as_str()
916                ),
917            )),
918        }
919    }
920
921    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
922        match self.state {
923            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
924            _ => Err(RuntimeEffectControllerError::new(
925                "runtime_effect_local_executor_unavailable",
926                "no process executor is available for process command",
927            )),
928        }
929    }
930
931    fn into_await_event_options(
932        self,
933    ) -> Result<(CancellationToken, Option<Instant>), RuntimeEffectControllerError> {
934        match self.state {
935            RuntimeEffectLocalExecutorState::ExternalWaitOptions {
936                cancellation,
937                deadline,
938            } => Ok((cancellation, deadline)),
939            _ => Ok((CancellationToken::new(), None)),
940        }
941    }
942}
943
944#[cfg(any(test, feature = "testing"))]
945#[async_trait::async_trait]
946impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
947    async fn execute(
948        self: Box<Self>,
949        envelope: RuntimeEffectEnvelope,
950    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
951        (self.run)(envelope).await
952    }
953}
954
955#[async_trait::async_trait]
956impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
957    async fn execute(
958        self: Box<Self>,
959        envelope: RuntimeEffectEnvelope,
960    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
961        let runner = *self;
962        match envelope.command {
963            RuntimeEffectCommand::LlmCall { request } => {
964                let protocol_iteration = runner.machine.protocol_iteration();
965                let (result, text_streamed) = runner
966                    .driver
967                    .run_llm_call(
968                        Arc::new((*request).into_request(None, None)),
969                        protocol_iteration,
970                        envelope.invocation,
971                        &runner.event_tx,
972                        &runner.cancellation,
973                    )
974                    .await;
975                Ok(RuntimeEffectOutcome::LlmCall {
976                    result,
977                    text_streamed,
978                })
979            }
980            RuntimeEffectCommand::ToolCall { call } => {
981                let tool_name = call.tool_name.clone();
982                let mut outcome = runner
983                    .driver
984                    .run_tool_calls(
985                        vec![(call, envelope.invocation)],
986                        &runner.event_tx,
987                        &runner.cancellation,
988                    )
989                    .await?;
990                let launch = outcome.launches.pop().ok_or_else(|| {
991                    RuntimeEffectControllerError::new(
992                        "tool_result_missing",
993                        format!("tool `{tool_name}` completed without a launch result"),
994                    )
995                })?;
996                Ok(RuntimeEffectOutcome::ToolCall {
997                    launch,
998                    triggers: outcome.triggers,
999                })
1000            }
1001            RuntimeEffectCommand::ExecCode { code } => {
1002                let protocol_iteration = runner.machine.protocol_iteration();
1003                let messages = runner.machine.message_sequence();
1004                Ok(RuntimeEffectOutcome::ExecCode {
1005                    result: runner
1006                        .driver
1007                        .run_exec_code(
1008                            &code,
1009                            messages,
1010                            protocol_iteration,
1011                            envelope.invocation,
1012                            &runner.event_tx,
1013                        )
1014                        .await,
1015                })
1016            }
1017            RuntimeEffectCommand::Checkpoint { checkpoint } => {
1018                Ok(RuntimeEffectOutcome::Checkpoint {
1019                    result: runner
1020                        .driver
1021                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1022                        .await
1023                        .map_err(RuntimeEffectControllerError::from),
1024                })
1025            }
1026            RuntimeEffectCommand::SyncExecutionEnvironment {
1027                update_machine_config,
1028            } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1029                result: runner
1030                    .driver
1031                    .refresh_execution_environment(runner.machine, update_machine_config)
1032                    .await
1033                    .map_err(|err| err.to_string()),
1034            }),
1035            RuntimeEffectCommand::Sleep { duration_ms } => {
1036                sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
1037                Ok(RuntimeEffectOutcome::Sleep)
1038            }
1039            command => Err(RuntimeEffectControllerError::new(
1040                "runtime_effect_local_executor_mismatch",
1041                format!(
1042                    "local turn executor cannot execute {} command",
1043                    command.kind().as_str()
1044                ),
1045            )),
1046        }
1047    }
1048}
1049
1050#[async_trait::async_trait]
1051impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1052    async fn execute(
1053        mut self: Box<Self>,
1054        envelope: RuntimeEffectEnvelope,
1055    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1056        match envelope.command {
1057            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1058                result: self
1059                    .run_direct_llm_request((*request).into_request(
1060                        crate::session_model::transport_stream_events(&self.provider, None),
1061                        None,
1062                    ))
1063                    .await,
1064            }),
1065            RuntimeEffectCommand::Sleep { duration_ms } => {
1066                sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
1067                Ok(RuntimeEffectOutcome::Sleep)
1068            }
1069            command => Err(RuntimeEffectControllerError::new(
1070                "runtime_effect_local_executor_mismatch",
1071                format!(
1072                    "local direct executor cannot execute {} command",
1073                    command.kind().as_str()
1074                ),
1075            )),
1076        }
1077    }
1078}
1079
1080impl LocalDirectEffectRunner {
1081    async fn run_direct_llm_request(
1082        &mut self,
1083        request: CoreLlmRequest,
1084    ) -> Result<LlmResponse, LlmCallError> {
1085        let request = crate::attachments::resolve_llm_request_attachments(
1086            request,
1087            self.attachment_store.as_ref(),
1088        )
1089        .await
1090        .map_err(|err| LlmCallError {
1091            message: err.to_string(),
1092            retryable: false,
1093            raw: None,
1094            code: Some("attachment_resolution_failed".to_string()),
1095            terminal_reason: crate::LlmTerminalReason::ProviderError,
1096            request_body: None,
1097        })?;
1098        self.provider
1099            .complete(request)
1100            .await
1101            .map_err(llm_call_error_from_transport)
1102    }
1103}
1104
1105async fn execute_local_sleep(
1106    envelope: RuntimeEffectEnvelope,
1107    cancellation: CancellationToken,
1108) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1109    match envelope.command {
1110        RuntimeEffectCommand::Sleep { duration_ms } => {
1111            sleep_with_cancellation(duration_ms, &cancellation).await?;
1112            Ok(RuntimeEffectOutcome::Sleep)
1113        }
1114        command => Err(RuntimeEffectControllerError::new(
1115            "runtime_effect_local_executor_mismatch",
1116            format!(
1117                "local sleep executor cannot execute {} command",
1118                command.kind().as_str()
1119            ),
1120        )),
1121    }
1122}
1123
1124async fn sleep_with_cancellation(
1125    duration_ms: u64,
1126    cancellation: &CancellationToken,
1127) -> Result<(), RuntimeEffectControllerError> {
1128    let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
1129    tokio::pin!(sleep);
1130    tokio::select! {
1131        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1132            "runtime_effect_sleep_cancelled",
1133            "runtime effect sleep was cancelled",
1134        )),
1135        _ = &mut sleep => Ok(()),
1136    }
1137}
1138
1139// =============================================================================
1140// Default in-process effect controller
1141// =============================================================================
1142
1143/// Default in-process effect controller.
1144///
1145/// Stateless: the inline controller only registers process rows; the
1146/// lease-protected [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole
1147/// executor.
1148#[derive(Clone, Default)]
1149pub struct InlineRuntimeEffectController;
1150
1151#[async_trait::async_trait]
1152impl RuntimeEffectController for InlineRuntimeEffectController {
1153    async fn execute_effect(
1154        &self,
1155        envelope: RuntimeEffectEnvelope,
1156        local_executor: RuntimeEffectLocalExecutor<'_>,
1157    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1158        match envelope.command {
1159            RuntimeEffectCommand::AwaitEvent { key } => {
1160                let (cancellation, deadline) = local_executor.into_await_event_options()?;
1161                let resolution = self
1162                    .await_await_event(&key, cancellation, deadline)
1163                    .await
1164                    .map_err(RuntimeEffectControllerError::from)?;
1165                Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1166            }
1167            RuntimeEffectCommand::Process { command } => {
1168                let execution = local_executor.into_process()?;
1169                let registry = execution.registry;
1170                let result = tokio::task::spawn(async move {
1171                    Self::execute_process_command(registry, *command).await
1172                })
1173                .await
1174                .map_err(|err| {
1175                    RuntimeEffectControllerError::new(
1176                        "runtime_effect_process_task_join",
1177                        format!("inline process effect task failed: {err}"),
1178                    )
1179                })??;
1180                Ok(RuntimeEffectOutcome::Process { result })
1181            }
1182            _ => local_executor.execute(envelope).await,
1183        }
1184    }
1185
1186    async fn await_event_key(
1187        &self,
1188        scope: &ExecutionScope,
1189        wait: AwaitEventWaitIdentity,
1190    ) -> Result<AwaitEventKey, RuntimeError> {
1191        inline_await_events().key_for(scope, wait)
1192    }
1193
1194    async fn resolve_await_event(
1195        &self,
1196        key: &AwaitEventKey,
1197        resolution: Resolution,
1198    ) -> Result<ResolveOutcome, RuntimeError> {
1199        inline_await_events().resolve(key, resolution)
1200    }
1201
1202    async fn await_await_event(
1203        &self,
1204        key: &AwaitEventKey,
1205        cancel: CancellationToken,
1206        deadline: Option<Instant>,
1207    ) -> Result<Resolution, RuntimeError> {
1208        inline_await_events()
1209            .await_resolution(key, cancel, deadline)
1210            .await
1211    }
1212
1213    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1214        inline_await_events().revoke_session(session_id)
1215    }
1216}
1217
1218/// In-process deployment effect host.
1219#[derive(Clone)]
1220pub struct InlineEffectHost {
1221    controller: Arc<dyn RuntimeEffectController>,
1222}
1223
1224impl InlineEffectHost {
1225    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1226        Self { controller }
1227    }
1228}
1229
1230impl Default for InlineEffectHost {
1231    fn default() -> Self {
1232        Self::new(Arc::new(InlineRuntimeEffectController))
1233    }
1234}
1235
1236#[async_trait::async_trait]
1237impl EffectHost for InlineEffectHost {
1238    fn durability_tier(&self) -> crate::DurabilityTier {
1239        self.controller.durability_tier()
1240    }
1241
1242    fn requires_durable_attachment_store(&self) -> bool {
1243        self.controller.requires_durable_attachment_store()
1244    }
1245
1246    fn scoped<'run>(
1247        &'run self,
1248        scope: ExecutionScope,
1249    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1250        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1251    }
1252
1253    fn scoped_static(
1254        &self,
1255        scope: ExecutionScope,
1256    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1257        Ok(Some(ScopedEffectController::shared(
1258            Arc::clone(&self.controller),
1259            scope,
1260        )?))
1261    }
1262
1263    async fn await_event_key(
1264        &self,
1265        scope: &ExecutionScope,
1266        wait: AwaitEventWaitIdentity,
1267    ) -> Result<AwaitEventKey, RuntimeError> {
1268        self.controller.await_event_key(scope, wait).await
1269    }
1270
1271    async fn resolve_await_event(
1272        &self,
1273        key: &AwaitEventKey,
1274        resolution: Resolution,
1275    ) -> Result<ResolveOutcome, RuntimeError> {
1276        self.controller.resolve_await_event(key, resolution).await
1277    }
1278
1279    async fn await_await_event(
1280        &self,
1281        key: &AwaitEventKey,
1282        cancel: CancellationToken,
1283        deadline: Option<Instant>,
1284    ) -> Result<Resolution, RuntimeError> {
1285        self.controller
1286            .await_await_event(key, cancel, deadline)
1287            .await
1288    }
1289
1290    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1291        self.controller
1292            .revoke_await_events_for_session(session_id)
1293            .await
1294    }
1295}
1296
1297impl InlineRuntimeEffectController {
1298    /// Register the process (and any handle grant) into the durable registry.
1299    ///
1300    /// The inline controller no longer runs the process here: the registry's
1301    /// non-terminal row *is* the durable work queue, and the lease-protected
1302    /// [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole executor. The
1303    /// control seam pokes that runner after a successful start, so registering
1304    /// the row is all this path does.
1305    pub(crate) async fn start_process(
1306        registry: Arc<dyn crate::ProcessRegistry>,
1307        registration: crate::ProcessRegistration,
1308        grant: Option<crate::ProcessStartGrant>,
1309    ) -> Result<ProcessRecord, PluginError> {
1310        let registration_for_record = registration.clone();
1311        let record = registry.register_process(registration_for_record).await?;
1312        if let Some(grant) = grant {
1313            registry
1314                .grant_handle(&grant.session_scope, &registration.id, grant.descriptor)
1315                .await?;
1316        }
1317        Ok(record)
1318    }
1319
1320    pub(crate) async fn request_process_cancel(
1321        &self,
1322        registry: Arc<dyn crate::ProcessRegistry>,
1323        process_id: &str,
1324        reason: Option<String>,
1325    ) -> Result<ProcessRecord, PluginError> {
1326        // Cancellation is a durable signal: the cancel event is what the
1327        // runner-run process observes, so the inline controller appends it and
1328        // no longer tracks an in-process cancellation token.
1329        registry
1330            .append_event(
1331                process_id,
1332                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1333            )
1334            .await?;
1335        registry
1336            .get_process(process_id)
1337            .await
1338            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1339    }
1340
1341    async fn execute_process_command(
1342        registry: Arc<dyn crate::ProcessRegistry>,
1343        command: ProcessCommand,
1344    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1345        match command {
1346            ProcessCommand::Start {
1347                registration,
1348                grant,
1349                execution_context: _,
1350            } => {
1351                let record = Self::start_process(registry, registration, grant).await?;
1352                Ok(ProcessEffectOutcome::Start { record })
1353            }
1354            ProcessCommand::List {
1355                session_scope,
1356                mode,
1357            } => {
1358                let entries = match mode {
1359                    crate::ProcessListMode::Live => {
1360                        registry.list_live_handle_grants(&session_scope).await?
1361                    }
1362                    crate::ProcessListMode::All => {
1363                        registry.list_handle_grants(&session_scope).await?
1364                    }
1365                };
1366                Ok(ProcessEffectOutcome::List { entries })
1367            }
1368            ProcessCommand::Transfer {
1369                from_scope,
1370                to_scope,
1371                process_ids,
1372            } => {
1373                registry
1374                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1375                    .await?;
1376                Ok(ProcessEffectOutcome::Transfer)
1377            }
1378            ProcessCommand::DeleteSession { session_id } => {
1379                let report = registry.delete_session_process_state(&session_id).await?;
1380                Ok(ProcessEffectOutcome::DeleteSession { report })
1381            }
1382            ProcessCommand::Await { process_id } => {
1383                let output = registry.await_process(&process_id).await?;
1384                Ok(ProcessEffectOutcome::Await { output })
1385            }
1386            ProcessCommand::Cancel { process_id, reason } => {
1387                let record = InlineRuntimeEffectController
1388                    .request_process_cancel(registry, &process_id, reason)
1389                    .await?;
1390                Ok(ProcessEffectOutcome::Cancel { record })
1391            }
1392            ProcessCommand::Signal {
1393                process_id,
1394                request,
1395                ..
1396            } => {
1397                let result = registry.append_event(&process_id, request).await?;
1398                Ok(ProcessEffectOutcome::Signal {
1399                    event: result.event,
1400                })
1401            }
1402        }
1403    }
1404}
1405
1406impl std::fmt::Debug for InlineRuntimeEffectController {
1407    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1408        f.debug_struct("InlineRuntimeEffectController").finish()
1409    }
1410}