Skip to main content

lash_core/runtime/effect/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::pin::Pin;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5
6use hmac::{Hmac, Mac};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{Notify, mpsc};
9use tokio_util::sync::CancellationToken;
10
11use crate::AttachmentStore;
12use crate::LlmRequest as CoreLlmRequest;
13use crate::LlmResponse;
14use crate::ProcessRecord;
15use crate::ProcessRegistry;
16use crate::provider::ProviderHandle;
17use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
18use crate::sansio::LlmCallError;
19use crate::{PluginError, RuntimeError, RuntimeErrorCode};
20
21use super::envelope::{
22    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
23    RuntimeEffectKind, RuntimeEffectOutcome,
24};
25use super::outcome::llm_call_error_from_transport;
26
27type HmacSha256 = Hmac<sha2::Sha256>;
28
29fn inline_await_events() -> &'static AwaitEventRegistry {
30    static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
31    REGISTRY.get_or_init(AwaitEventRegistry::new)
32}
33
34// =============================================================================
35// Effect host + controller trait + scope + error
36// =============================================================================
37
38/// Stable semantic identity for one effectful runtime operation.
39///
40/// The scope is chosen by the host boundary before any nondeterministic work is
41/// planned. It is intentionally generic: Restate, an inline test host, or a
42/// future durable effect host all receive the same Lash scope vocabulary.
43#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
44#[serde(tag = "type", rename_all = "snake_case")]
45pub enum ExecutionScope {
46    Turn {
47        session_id: String,
48        turn_id: String,
49    },
50    Process {
51        process_id: String,
52    },
53    QueueDrain {
54        session_id: String,
55        drain_id: String,
56    },
57    SessionDelete {
58        session_id: String,
59    },
60    RuntimeOperation {
61        operation_id: String,
62    },
63}
64
65impl ExecutionScope {
66    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
67        Self::Turn {
68            session_id: session_id.into(),
69            turn_id: turn_id.into(),
70        }
71    }
72
73    pub fn process(process_id: impl Into<String>) -> Self {
74        Self::Process {
75            process_id: process_id.into(),
76        }
77    }
78
79    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
80        Self::QueueDrain {
81            session_id: session_id.into(),
82            drain_id: drain_id.into(),
83        }
84    }
85
86    pub fn session_delete(session_id: impl Into<String>) -> Self {
87        Self::SessionDelete {
88            session_id: session_id.into(),
89        }
90    }
91
92    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
93        Self::RuntimeOperation {
94            operation_id: operation_id.into(),
95        }
96    }
97
98    pub fn id(&self) -> &str {
99        match self {
100            Self::Turn { turn_id, .. } => turn_id,
101            Self::Process { process_id } => process_id,
102            Self::QueueDrain { drain_id, .. } => drain_id,
103            Self::SessionDelete { session_id } => session_id,
104            Self::RuntimeOperation { operation_id } => operation_id,
105        }
106    }
107
108    pub fn session_id(&self) -> Option<&str> {
109        match self {
110            Self::Turn { session_id, .. }
111            | Self::QueueDrain { session_id, .. }
112            | Self::SessionDelete { session_id } => Some(session_id),
113            Self::Process { .. } | Self::RuntimeOperation { .. } => None,
114        }
115    }
116
117    pub fn turn_id(&self) -> Option<&str> {
118        match self {
119            Self::Turn { turn_id, .. } => Some(turn_id),
120            _ => None,
121        }
122    }
123
124    pub fn validates_turn_trace_id(&self) -> bool {
125        matches!(self, Self::Turn { .. })
126    }
127
128    fn validate(&self) -> Result<(), RuntimeError> {
129        let missing = match self {
130            Self::Turn {
131                session_id,
132                turn_id,
133            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
134            Self::Process { process_id } => process_id.trim().is_empty(),
135            Self::QueueDrain {
136                session_id,
137                drain_id,
138            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
139            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
140            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
141        };
142        if missing {
143            return Err(RuntimeError::new(
144                RuntimeErrorCode::MissingExecutionScopeId,
145                "execution scopes require non-empty stable ids",
146            ));
147        }
148        Ok(())
149    }
150}
151
152#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
153#[serde(tag = "type", rename_all = "snake_case")]
154pub enum AwaitEventWaitIdentity {
155    ToolCompletion {
156        tool_call_id: String,
157    },
158    ProcessSignal {
159        process_id: String,
160        signal_name: String,
161        ordinal: u64,
162    },
163    Custom {
164        key: String,
165    },
166}
167
168impl AwaitEventWaitIdentity {
169    pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
170        Self::ToolCompletion {
171            tool_call_id: tool_call_id.into(),
172        }
173    }
174
175    pub fn process_signal(
176        process_id: impl Into<String>,
177        signal_name: impl Into<String>,
178        ordinal: u64,
179    ) -> Self {
180        Self::ProcessSignal {
181            process_id: process_id.into(),
182            signal_name: signal_name.into(),
183            ordinal,
184        }
185    }
186
187    fn validate(&self) -> Result<(), RuntimeError> {
188        let invalid = match self {
189            Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
190            Self::ProcessSignal {
191                process_id,
192                signal_name,
193                ordinal,
194            } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
195            Self::Custom { key } => key.trim().is_empty(),
196        };
197        if invalid {
198            return Err(RuntimeError::new(
199                "invalid_await_event_wait_identity",
200                "await-event wait identity requires non-empty stable ids",
201            ));
202        }
203        Ok(())
204    }
205}
206
207#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub struct AwaitEventKey {
209    pub scope: ExecutionScope,
210    pub wait: AwaitEventWaitIdentity,
211    pub key_id: String,
212    pub signature: String,
213}
214
215impl AwaitEventKey {
216    pub fn promise_key(&self) -> String {
217        format!("lash-await-event:{}", self.key_id)
218    }
219}
220
221#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
222pub struct ExternalCompletionError {
223    pub code: String,
224    pub message: String,
225    #[serde(default, skip_serializing_if = "Option::is_none")]
226    pub raw: Option<serde_json::Value>,
227}
228
229impl ExternalCompletionError {
230    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
231        Self {
232            code: code.into(),
233            message: message.into(),
234            raw: None,
235        }
236    }
237}
238
239#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
240#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
241pub enum Resolution {
242    Ok(serde_json::Value),
243    Err(ExternalCompletionError),
244    Timeout,
245    Cancelled,
246}
247
248#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
249#[serde(tag = "status", rename_all = "snake_case")]
250pub enum ResolveOutcome {
251    Accepted,
252    AlreadyResolved { terminal: Resolution },
253    UnknownOrRevoked,
254}
255
256#[derive(Debug)]
257struct AwaitEventEntry {
258    terminal: Option<Resolution>,
259    notify: Arc<Notify>,
260}
261
262#[derive(Debug)]
263struct AwaitEventRegistryState {
264    entries: HashMap<String, AwaitEventEntry>,
265    revoked_key_ids: HashSet<String>,
266    revoked_session_ids: HashSet<String>,
267}
268
269#[derive(Debug)]
270struct AwaitEventRegistry {
271    secret: Vec<u8>,
272    state: std::sync::Mutex<AwaitEventRegistryState>,
273}
274
275impl AwaitEventRegistry {
276    fn new() -> Self {
277        Self {
278            secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
279            state: std::sync::Mutex::new(AwaitEventRegistryState {
280                entries: HashMap::new(),
281                revoked_key_ids: HashSet::new(),
282                revoked_session_ids: HashSet::new(),
283            }),
284        }
285    }
286
287    fn key_for(
288        &self,
289        scope: &ExecutionScope,
290        wait: AwaitEventWaitIdentity,
291    ) -> Result<AwaitEventKey, RuntimeError> {
292        scope.validate()?;
293        wait.validate()?;
294        let key_id =
295            crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
296                RuntimeError::new(
297                    "await_event_key_hash",
298                    format!("failed to hash await-event identity: {err}"),
299                )
300            })?;
301        let signature = self.signature(scope, &wait, &key_id)?;
302        Ok(AwaitEventKey {
303            scope: scope.clone(),
304            wait,
305            key_id,
306            signature,
307        })
308    }
309
310    fn signature(
311        &self,
312        scope: &ExecutionScope,
313        wait: &AwaitEventWaitIdentity,
314        key_id: &str,
315    ) -> Result<String, RuntimeError> {
316        let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
317            RuntimeError::new(
318                "await_event_key_sign",
319                format!("failed to initialize await-event key signer: {err}"),
320            )
321        })?;
322        let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
323            RuntimeError::new(
324                "await_event_key_sign",
325                format!("failed to serialize await-event key identity: {err}"),
326            )
327        })?;
328        mac.update(&canonical);
329        Ok(format!("{:x}", mac.finalize().into_bytes()))
330    }
331
332    fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
333        let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
334        Ok(expected == key.signature)
335    }
336
337    fn resolve(
338        &self,
339        key: &AwaitEventKey,
340        resolution: Resolution,
341    ) -> Result<ResolveOutcome, RuntimeError> {
342        if !self.verify(key)? {
343            return Ok(ResolveOutcome::UnknownOrRevoked);
344        }
345        let mut state = self.state.lock().map_err(|_| {
346            RuntimeError::new(
347                "await_event_registry_poisoned",
348                "await-event registry lock poisoned",
349            )
350        })?;
351        if state.revoked_key_ids.contains(&key.key_id)
352            || key
353                .scope
354                .session_id()
355                .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
356        {
357            return Ok(ResolveOutcome::UnknownOrRevoked);
358        }
359        let entry = state
360            .entries
361            .entry(key.key_id.clone())
362            .or_insert_with(|| AwaitEventEntry {
363                terminal: None,
364                notify: Arc::new(Notify::new()),
365            });
366        if let Some(terminal) = &entry.terminal {
367            return Ok(ResolveOutcome::AlreadyResolved {
368                terminal: terminal.clone(),
369            });
370        }
371        entry.terminal = Some(resolution);
372        entry.notify.notify_waiters();
373        Ok(ResolveOutcome::Accepted)
374    }
375
376    async fn await_resolution(
377        &self,
378        key: &AwaitEventKey,
379        cancel: CancellationToken,
380        deadline: Option<Instant>,
381    ) -> Result<Resolution, RuntimeError> {
382        if !self.verify(key)? {
383            return Err(RuntimeError::new(
384                "await_event_unknown_or_revoked",
385                "await-event key is invalid or revoked",
386            ));
387        }
388        loop {
389            let notify =
390                {
391                    let mut state = self.state.lock().map_err(|_| {
392                        RuntimeError::new(
393                            "await_event_registry_poisoned",
394                            "await-event registry lock poisoned",
395                        )
396                    })?;
397                    if state.revoked_key_ids.contains(&key.key_id)
398                        || key.scope.session_id().is_some_and(|session_id| {
399                            state.revoked_session_ids.contains(session_id)
400                        })
401                    {
402                        return Err(RuntimeError::new(
403                            "await_event_unknown_or_revoked",
404                            "await-event key is invalid or revoked",
405                        ));
406                    }
407                    let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
408                        AwaitEventEntry {
409                            terminal: None,
410                            notify: Arc::new(Notify::new()),
411                        }
412                    });
413                    if let Some(terminal) = entry.terminal.clone() {
414                        return Ok(terminal);
415                    }
416                    Arc::clone(&entry.notify)
417                };
418            if let Some(deadline) = deadline {
419                tokio::select! {
420                    _ = cancel.cancelled() => {
421                        let _ = self.resolve(key, Resolution::Cancelled)?;
422                    }
423                    _ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
424                        let _ = self.resolve(key, Resolution::Timeout)?;
425                    }
426                    _ = notify.notified() => {}
427                }
428            } else {
429                tokio::select! {
430                    _ = cancel.cancelled() => {
431                        let _ = self.resolve(key, Resolution::Cancelled)?;
432                    }
433                    _ = notify.notified() => {}
434                }
435            }
436        }
437    }
438
439    fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
440        let mut state = self.state.lock().map_err(|_| {
441            RuntimeError::new(
442                "await_event_registry_poisoned",
443                "await-event registry lock poisoned",
444            )
445        })?;
446        state.revoked_session_ids.insert(session_id.to_string());
447        for entry in state.entries.values() {
448            entry.notify.notify_waiters();
449        }
450        Ok(())
451    }
452}
453
454enum ScopedEffectControllerInner<'run> {
455    Borrowed(&'run dyn RuntimeEffectController),
456    Shared(Arc<dyn RuntimeEffectController>),
457}
458
459impl Clone for ScopedEffectControllerInner<'_> {
460    fn clone(&self) -> Self {
461        match self {
462            Self::Borrowed(controller) => Self::Borrowed(*controller),
463            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
464        }
465    }
466}
467
468/// Scoped low-level controller plus the semantic execution scope it is serving.
469#[derive(Clone)]
470pub struct ScopedEffectController<'run> {
471    controller: ScopedEffectControllerInner<'run>,
472    scope: ExecutionScope,
473}
474
475impl<'run> ScopedEffectController<'run> {
476    pub fn borrowed(
477        controller: &'run dyn RuntimeEffectController,
478        scope: ExecutionScope,
479    ) -> Result<Self, RuntimeError> {
480        scope.validate()?;
481        Ok(Self {
482            controller: ScopedEffectControllerInner::Borrowed(controller),
483            scope,
484        })
485    }
486
487    pub fn shared(
488        controller: Arc<dyn RuntimeEffectController>,
489        scope: ExecutionScope,
490    ) -> Result<Self, RuntimeError> {
491        scope.validate()?;
492        Ok(Self {
493            controller: ScopedEffectControllerInner::Shared(controller),
494            scope,
495        })
496    }
497
498    pub fn controller(&self) -> &dyn RuntimeEffectController {
499        match &self.controller {
500            ScopedEffectControllerInner::Borrowed(controller) => *controller,
501            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
502        }
503    }
504
505    pub fn execution_scope(&self) -> &ExecutionScope {
506        &self.scope
507    }
508
509    pub fn scope_id(&self) -> &str {
510        self.scope.id()
511    }
512
513    pub fn turn_id(&self) -> Option<&str> {
514        self.scope.turn_id()
515    }
516}
517
518/// Deployment-level factory for scoped effect controllers.
519#[async_trait::async_trait]
520pub trait EffectHost: Send + Sync {
521    fn durability_tier(&self) -> crate::DurabilityTier {
522        crate::DurabilityTier::Inline
523    }
524
525    fn requires_durable_attachment_store(&self) -> bool {
526        false
527    }
528
529    fn supports_durable_effects(&self) -> bool {
530        false
531    }
532
533    fn scoped<'run>(
534        &'run self,
535        scope: ExecutionScope,
536    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
537
538    fn scoped_static(
539        &self,
540        _scope: ExecutionScope,
541    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
542        Ok(None)
543    }
544
545    async fn await_event_key(
546        &self,
547        _scope: &ExecutionScope,
548        _wait: AwaitEventWaitIdentity,
549    ) -> Result<AwaitEventKey, RuntimeError> {
550        Err(RuntimeError::new(
551            "await_event_unsupported",
552            "this effect host does not support await-event keys",
553        ))
554    }
555
556    async fn resolve_await_event(
557        &self,
558        _key: &AwaitEventKey,
559        _resolution: Resolution,
560    ) -> Result<ResolveOutcome, RuntimeError> {
561        Ok(ResolveOutcome::UnknownOrRevoked)
562    }
563
564    async fn await_await_event(
565        &self,
566        _key: &AwaitEventKey,
567        _cancel: CancellationToken,
568        _deadline: Option<Instant>,
569    ) -> Result<Resolution, RuntimeError> {
570        Err(RuntimeError::new(
571            "await_event_unsupported",
572            "this effect host does not support await-event waits",
573        ))
574    }
575
576    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
577        Ok(())
578    }
579}
580
581/// Boundary for nondeterministic runtime work.
582#[async_trait::async_trait]
583pub trait RuntimeEffectController: Send + Sync {
584    /// Durability tier this controller provides; defaults to
585    /// [`DurabilityTier::Inline`].
586    fn durability_tier(&self) -> crate::DurabilityTier {
587        crate::DurabilityTier::Inline
588    }
589
590    fn requires_durable_attachment_store(&self) -> bool {
591        false
592    }
593
594    fn supports_durable_effects(&self) -> bool {
595        false
596    }
597
598    async fn execute_effect(
599        &self,
600        envelope: RuntimeEffectEnvelope,
601        local_executor: RuntimeEffectLocalExecutor<'_>,
602    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
603
604    async fn await_event_key(
605        &self,
606        _scope: &ExecutionScope,
607        _wait: AwaitEventWaitIdentity,
608    ) -> Result<AwaitEventKey, RuntimeError> {
609        Err(RuntimeError::new(
610            "await_event_unsupported",
611            "this effect controller does not support await-event keys",
612        ))
613    }
614
615    async fn resolve_await_event(
616        &self,
617        _key: &AwaitEventKey,
618        _resolution: Resolution,
619    ) -> Result<ResolveOutcome, RuntimeError> {
620        Ok(ResolveOutcome::UnknownOrRevoked)
621    }
622
623    async fn await_await_event(
624        &self,
625        _key: &AwaitEventKey,
626        _cancel: CancellationToken,
627        _deadline: Option<Instant>,
628    ) -> Result<Resolution, RuntimeError> {
629        Err(RuntimeError::new(
630            "await_event_unsupported",
631            "this effect controller does not support await-event waits",
632        ))
633    }
634
635    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
636        Ok(())
637    }
638}
639
640/// Runtime-internal handle for effect-controller references carried through
641/// per-turn execution contexts.
642#[derive(Clone)]
643pub(crate) enum RuntimeEffectControllerHandle<'run> {
644    Borrowed(ScopedEffectController<'run>),
645    #[cfg(any(test, feature = "testing"))]
646    Shared {
647        controller: Arc<dyn RuntimeEffectController>,
648        scope: ExecutionScope,
649    },
650}
651
652impl<'run> RuntimeEffectControllerHandle<'run> {
653    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
654        Self::Borrowed(scoped)
655    }
656
657    #[cfg(any(test, feature = "testing"))]
658    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
659        Self::Shared {
660            controller,
661            scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
662        }
663    }
664
665    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
666        match self {
667            Self::Borrowed(scoped) => scoped.controller(),
668            #[cfg(any(test, feature = "testing"))]
669            Self::Shared { controller, .. } => controller.as_ref(),
670        }
671    }
672
673    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
674        match self {
675            Self::Borrowed(scoped) => scoped.clone(),
676            #[cfg(any(test, feature = "testing"))]
677            Self::Shared { controller, scope } => {
678                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
679                    .expect("runtime effect controller handle carries a valid scope")
680            }
681        }
682    }
683
684    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
685        self.clone()
686    }
687}
688
689#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
690#[error("{code}: {message}")]
691pub struct RuntimeEffectControllerError {
692    pub code: String,
693    pub message: String,
694}
695
696impl RuntimeEffectControllerError {
697    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
698        Self {
699            code: code.into(),
700            message: message.into(),
701        }
702    }
703
704    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
705        Self::new(
706            "runtime_effect_wrong_outcome",
707            format!(
708                "expected {} outcome, got {}",
709                expected.as_str(),
710                actual.as_str()
711            ),
712        )
713    }
714
715    pub(crate) fn into_runtime_error(self) -> RuntimeError {
716        RuntimeError::new(self.code, self.message)
717    }
718}
719
720impl From<RuntimeError> for RuntimeEffectControllerError {
721    fn from(err: RuntimeError) -> Self {
722        Self::new(err.code.as_str(), err.message)
723    }
724}
725
726impl From<PluginError> for RuntimeEffectControllerError {
727    fn from(err: PluginError) -> Self {
728        Self::new("plugin", err.to_string())
729    }
730}
731
732impl From<crate::StoreError> for RuntimeEffectControllerError {
733    fn from(err: crate::StoreError) -> Self {
734        Self::new("runtime_store", err.to_string())
735    }
736}
737
738// =============================================================================
739// Local executor (per-effect borrowed runner state)
740// =============================================================================
741
742#[async_trait::async_trait]
743pub(crate) trait ProcessRunner: Send + Sync {
744    async fn run_process(
745        &self,
746        registration: crate::ProcessRegistration,
747        execution_context: crate::ProcessExecutionContext,
748        registry: Arc<dyn ProcessRegistry>,
749        scoped_effect_controller: crate::ScopedEffectController<'_>,
750        cancellation: CancellationToken,
751    ) -> crate::ProcessAwaitOutput;
752}
753
754pub struct ProcessLocalExecution {
755    pub registry: Arc<dyn ProcessRegistry>,
756}
757
758pub(super) struct LocalTurnEffectRunner<'a, 'run> {
759    driver: &'a mut RuntimeTurnDriver<'run>,
760    machine: &'a mut crate::TurnMachine,
761    event_tx: mpsc::Sender<RuntimeStreamEvent>,
762    cancellation: CancellationToken,
763}
764
765pub(super) struct LocalDirectEffectRunner {
766    provider: ProviderHandle,
767    attachment_store: Arc<dyn AttachmentStore>,
768}
769
770#[async_trait::async_trait]
771trait RuntimeEffectLocalRunner: Send {
772    async fn execute(
773        self: Box<Self>,
774        envelope: RuntimeEffectEnvelope,
775    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
776}
777
778#[cfg(any(test, feature = "testing"))]
779type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
780        RuntimeEffectEnvelope,
781    ) -> Pin<
782        Box<
783            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
784                + Send
785                + 'run,
786        >,
787    > + Send
788    + 'run;
789
790#[cfg(any(test, feature = "testing"))]
791struct TestingRuntimeEffectLocalRunner<'run> {
792    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
793}
794
795type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
796        serde_json::Value,
797    ) -> Pin<
798        Box<
799            dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
800                + Send
801                + 'run,
802        >,
803    > + Send
804    + 'run;
805
806struct DurableStepLocalRunner<'run> {
807    run: Box<DurableStepLocalRunnerFn<'run>>,
808}
809
810enum RuntimeEffectLocalExecutorState<'run> {
811    Unavailable,
812    SleepOnly {
813        cancellation: CancellationToken,
814    },
815    ExternalWaitOptions {
816        cancellation: CancellationToken,
817        deadline: Option<Instant>,
818    },
819    Process(ProcessLocalExecution),
820    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
821}
822
823/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
824///
825/// Durable controllers may ignore it and replay their own recorded result. The
826/// default inline controller delegates to it, so local provider/tool/checkpoint
827/// work still crosses the same `execute_effect` boundary as durable controllers.
828pub struct RuntimeEffectLocalExecutor<'run> {
829    state: RuntimeEffectLocalExecutorState<'run>,
830}
831
832impl<'run> RuntimeEffectLocalExecutor<'run> {
833    pub fn unavailable() -> Self {
834        Self {
835            state: RuntimeEffectLocalExecutorState::Unavailable,
836        }
837    }
838
839    pub fn sleep(cancellation: CancellationToken) -> Self {
840        Self {
841            state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
842        }
843    }
844
845    pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
846        Self {
847            state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
848                cancellation,
849                deadline,
850            },
851        }
852    }
853
854    pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
855        Self {
856            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
857        }
858    }
859
860    pub fn durable_step<F, Fut>(run: F) -> Self
861    where
862        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
863        Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
864    {
865        Self {
866            state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
867                run: Box::new(move |input| {
868                    Box::pin(async move { run(input).await.map_err(Into::into) })
869                }),
870            })),
871        }
872    }
873
874    #[cfg(any(test, feature = "testing"))]
875    pub fn testing<F, Fut>(run: F) -> Self
876    where
877        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
878        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
879            + Send
880            + 'run,
881    {
882        Self {
883            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
884                TestingRuntimeEffectLocalRunner {
885                    run: Box::new(move |envelope| Box::pin(run(envelope))),
886                },
887            )),
888        }
889    }
890
891    pub(in crate::runtime) fn turn<'scope>(
892        driver: &'run mut RuntimeTurnDriver<'scope>,
893        machine: &'run mut crate::TurnMachine,
894        event_tx: mpsc::Sender<RuntimeStreamEvent>,
895        cancellation: CancellationToken,
896    ) -> Self
897    where
898        'scope: 'run,
899    {
900        Self {
901            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
902                driver,
903                machine,
904                event_tx,
905                cancellation,
906            })),
907        }
908    }
909
910    pub(in crate::runtime) fn direct(
911        provider: ProviderHandle,
912        attachment_store: Arc<dyn AttachmentStore>,
913    ) -> Self {
914        Self {
915            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
916                provider,
917                attachment_store,
918            })),
919        }
920    }
921
922    pub async fn execute(
923        self,
924        envelope: RuntimeEffectEnvelope,
925    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
926        match self.state {
927            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
928            RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
929                execute_local_sleep(envelope, cancellation).await
930            }
931            RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
932                Err(RuntimeEffectControllerError::new(
933                    "runtime_effect_local_executor_mismatch",
934                    format!(
935                        "local await-event options cannot execute {} command directly",
936                        envelope.command.kind().as_str()
937                    ),
938                ))
939            }
940            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
941                "runtime_effect_local_executor_unavailable",
942                format!(
943                    "no local executor is available for {}",
944                    envelope.command.kind().as_str()
945                ),
946            )),
947            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
948                "runtime_effect_local_executor_mismatch",
949                format!(
950                    "process executor cannot execute {} command directly",
951                    envelope.command.kind().as_str()
952                ),
953            )),
954        }
955    }
956
957    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
958        match self.state {
959            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
960            _ => Err(RuntimeEffectControllerError::new(
961                "runtime_effect_local_executor_unavailable",
962                "no process executor is available for process command",
963            )),
964        }
965    }
966
967    fn into_await_event_options(
968        self,
969    ) -> Result<(CancellationToken, Option<Instant>), RuntimeEffectControllerError> {
970        match self.state {
971            RuntimeEffectLocalExecutorState::ExternalWaitOptions {
972                cancellation,
973                deadline,
974            } => Ok((cancellation, deadline)),
975            _ => Ok((CancellationToken::new(), None)),
976        }
977    }
978}
979
980#[cfg(any(test, feature = "testing"))]
981#[async_trait::async_trait]
982impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
983    async fn execute(
984        self: Box<Self>,
985        envelope: RuntimeEffectEnvelope,
986    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
987        (self.run)(envelope).await
988    }
989}
990
991#[async_trait::async_trait]
992impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
993    async fn execute(
994        self: Box<Self>,
995        envelope: RuntimeEffectEnvelope,
996    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
997        match envelope.command {
998            RuntimeEffectCommand::DurableStep { input, .. } => {
999                let value = (self.run)(input).await?;
1000                Ok(RuntimeEffectOutcome::DurableStep { value })
1001            }
1002            command => Err(RuntimeEffectControllerError::new(
1003                "runtime_effect_local_executor_mismatch",
1004                format!(
1005                    "local durable step executor cannot execute {} command",
1006                    command.kind().as_str()
1007                ),
1008            )),
1009        }
1010    }
1011}
1012
1013#[async_trait::async_trait]
1014impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
1015    async fn execute(
1016        self: Box<Self>,
1017        envelope: RuntimeEffectEnvelope,
1018    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1019        let runner = *self;
1020        match envelope.command {
1021            RuntimeEffectCommand::LlmCall { request } => {
1022                let protocol_iteration = runner.machine.protocol_iteration();
1023                let (result, text_streamed) = runner
1024                    .driver
1025                    .run_llm_call(
1026                        Arc::new((*request).into_request(None, None)),
1027                        protocol_iteration,
1028                        envelope.invocation,
1029                        &runner.event_tx,
1030                        &runner.cancellation,
1031                    )
1032                    .await;
1033                Ok(RuntimeEffectOutcome::LlmCall {
1034                    result,
1035                    text_streamed,
1036                })
1037            }
1038            RuntimeEffectCommand::ToolCall { call } => {
1039                let tool_name = call.tool_name.clone();
1040                let mut outcome = runner
1041                    .driver
1042                    .run_tool_calls(
1043                        vec![(call, envelope.invocation)],
1044                        &runner.event_tx,
1045                        &runner.cancellation,
1046                    )
1047                    .await?;
1048                let launch = outcome.launches.pop().ok_or_else(|| {
1049                    RuntimeEffectControllerError::new(
1050                        "tool_result_missing",
1051                        format!("tool `{tool_name}` completed without a launch result"),
1052                    )
1053                })?;
1054                Ok(RuntimeEffectOutcome::ToolCall {
1055                    launch,
1056                    triggers: outcome.triggers,
1057                })
1058            }
1059            RuntimeEffectCommand::ExecCode { language, code } => {
1060                let protocol_iteration = runner.machine.protocol_iteration();
1061                let messages = runner.machine.message_sequence();
1062                Ok(RuntimeEffectOutcome::ExecCode {
1063                    result: runner
1064                        .driver
1065                        .run_exec_code(
1066                            language,
1067                            &code,
1068                            messages,
1069                            protocol_iteration,
1070                            envelope.invocation,
1071                            &runner.event_tx,
1072                        )
1073                        .await,
1074                })
1075            }
1076            RuntimeEffectCommand::Checkpoint { checkpoint } => {
1077                Ok(RuntimeEffectOutcome::Checkpoint {
1078                    result: runner
1079                        .driver
1080                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1081                        .await
1082                        .map_err(RuntimeEffectControllerError::from),
1083                })
1084            }
1085            RuntimeEffectCommand::SyncExecutionEnvironment {
1086                update_machine_config,
1087            } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1088                result: runner
1089                    .driver
1090                    .refresh_execution_environment(runner.machine, update_machine_config)
1091                    .await
1092                    .map_err(|err| err.to_string()),
1093            }),
1094            RuntimeEffectCommand::Sleep { duration_ms } => {
1095                sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
1096                Ok(RuntimeEffectOutcome::Sleep)
1097            }
1098            command => Err(RuntimeEffectControllerError::new(
1099                "runtime_effect_local_executor_mismatch",
1100                format!(
1101                    "local turn executor cannot execute {} command",
1102                    command.kind().as_str()
1103                ),
1104            )),
1105        }
1106    }
1107}
1108
1109#[async_trait::async_trait]
1110impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1111    async fn execute(
1112        mut self: Box<Self>,
1113        envelope: RuntimeEffectEnvelope,
1114    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1115        match envelope.command {
1116            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1117                result: self
1118                    .run_direct_llm_request((*request).into_request(
1119                        crate::session_model::transport_stream_events(&self.provider, None),
1120                        None,
1121                    ))
1122                    .await,
1123            }),
1124            RuntimeEffectCommand::Sleep { duration_ms } => {
1125                sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
1126                Ok(RuntimeEffectOutcome::Sleep)
1127            }
1128            command => Err(RuntimeEffectControllerError::new(
1129                "runtime_effect_local_executor_mismatch",
1130                format!(
1131                    "local direct executor cannot execute {} command",
1132                    command.kind().as_str()
1133                ),
1134            )),
1135        }
1136    }
1137}
1138
1139impl LocalDirectEffectRunner {
1140    async fn run_direct_llm_request(
1141        &mut self,
1142        request: CoreLlmRequest,
1143    ) -> Result<LlmResponse, LlmCallError> {
1144        let request = crate::attachments::resolve_llm_request_attachments(
1145            request,
1146            self.attachment_store.as_ref(),
1147        )
1148        .await
1149        .map_err(|err| LlmCallError {
1150            message: err.to_string(),
1151            retryable: false,
1152            raw: None,
1153            code: Some("attachment_resolution_failed".to_string()),
1154            terminal_reason: crate::LlmTerminalReason::ProviderError,
1155            request_body: None,
1156        })?;
1157        self.provider
1158            .complete(request)
1159            .await
1160            .map_err(llm_call_error_from_transport)
1161    }
1162}
1163
1164async fn execute_local_sleep(
1165    envelope: RuntimeEffectEnvelope,
1166    cancellation: CancellationToken,
1167) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1168    match envelope.command {
1169        RuntimeEffectCommand::Sleep { duration_ms } => {
1170            sleep_with_cancellation(duration_ms, &cancellation).await?;
1171            Ok(RuntimeEffectOutcome::Sleep)
1172        }
1173        command => Err(RuntimeEffectControllerError::new(
1174            "runtime_effect_local_executor_mismatch",
1175            format!(
1176                "local sleep executor cannot execute {} command",
1177                command.kind().as_str()
1178            ),
1179        )),
1180    }
1181}
1182
1183async fn sleep_with_cancellation(
1184    duration_ms: u64,
1185    cancellation: &CancellationToken,
1186) -> Result<(), RuntimeEffectControllerError> {
1187    let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
1188    tokio::pin!(sleep);
1189    tokio::select! {
1190        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1191            "runtime_effect_sleep_cancelled",
1192            "runtime effect sleep was cancelled",
1193        )),
1194        _ = &mut sleep => Ok(()),
1195    }
1196}
1197
1198// =============================================================================
1199// Default in-process effect controller
1200// =============================================================================
1201
1202/// Default in-process effect controller.
1203///
1204/// The inline controller executes local runners in process, provides in-memory
1205/// await-event resolution, and exposes durable-tool-effect semantics for local
1206/// runs. It does not make in-flight effects crash durable; workflow adapters
1207/// provide that by recording outcomes in their own history.
1208#[derive(Clone, Default)]
1209pub struct InlineRuntimeEffectController;
1210
1211#[async_trait::async_trait]
1212impl RuntimeEffectController for InlineRuntimeEffectController {
1213    fn supports_durable_effects(&self) -> bool {
1214        true
1215    }
1216
1217    async fn execute_effect(
1218        &self,
1219        envelope: RuntimeEffectEnvelope,
1220        local_executor: RuntimeEffectLocalExecutor<'_>,
1221    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1222        match envelope.command {
1223            RuntimeEffectCommand::AwaitEvent { key } => {
1224                let (cancellation, deadline) = local_executor.into_await_event_options()?;
1225                let resolution = self
1226                    .await_await_event(&key, cancellation, deadline)
1227                    .await
1228                    .map_err(RuntimeEffectControllerError::from)?;
1229                Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1230            }
1231            RuntimeEffectCommand::Process { command } => {
1232                let execution = local_executor.into_process()?;
1233                let registry = execution.registry;
1234                let result = tokio::task::spawn(async move {
1235                    Self::execute_process_command(registry, *command).await
1236                })
1237                .await
1238                .map_err(|err| {
1239                    RuntimeEffectControllerError::new(
1240                        "runtime_effect_process_task_join",
1241                        format!("inline process effect task failed: {err}"),
1242                    )
1243                })??;
1244                Ok(RuntimeEffectOutcome::Process { result })
1245            }
1246            _ => local_executor.execute(envelope).await,
1247        }
1248    }
1249
1250    async fn await_event_key(
1251        &self,
1252        scope: &ExecutionScope,
1253        wait: AwaitEventWaitIdentity,
1254    ) -> Result<AwaitEventKey, RuntimeError> {
1255        inline_await_events().key_for(scope, wait)
1256    }
1257
1258    async fn resolve_await_event(
1259        &self,
1260        key: &AwaitEventKey,
1261        resolution: Resolution,
1262    ) -> Result<ResolveOutcome, RuntimeError> {
1263        inline_await_events().resolve(key, resolution)
1264    }
1265
1266    async fn await_await_event(
1267        &self,
1268        key: &AwaitEventKey,
1269        cancel: CancellationToken,
1270        deadline: Option<Instant>,
1271    ) -> Result<Resolution, RuntimeError> {
1272        inline_await_events()
1273            .await_resolution(key, cancel, deadline)
1274            .await
1275    }
1276
1277    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1278        inline_await_events().revoke_session(session_id)
1279    }
1280}
1281
1282/// In-process deployment effect host.
1283#[derive(Clone)]
1284pub struct InlineEffectHost {
1285    controller: Arc<dyn RuntimeEffectController>,
1286}
1287
1288impl InlineEffectHost {
1289    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1290        Self { controller }
1291    }
1292}
1293
1294impl Default for InlineEffectHost {
1295    fn default() -> Self {
1296        Self::new(Arc::new(InlineRuntimeEffectController))
1297    }
1298}
1299
1300#[async_trait::async_trait]
1301impl EffectHost for InlineEffectHost {
1302    fn durability_tier(&self) -> crate::DurabilityTier {
1303        self.controller.durability_tier()
1304    }
1305
1306    fn requires_durable_attachment_store(&self) -> bool {
1307        self.controller.requires_durable_attachment_store()
1308    }
1309
1310    fn supports_durable_effects(&self) -> bool {
1311        self.controller.supports_durable_effects()
1312    }
1313
1314    fn scoped<'run>(
1315        &'run self,
1316        scope: ExecutionScope,
1317    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1318        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1319    }
1320
1321    fn scoped_static(
1322        &self,
1323        scope: ExecutionScope,
1324    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1325        Ok(Some(ScopedEffectController::shared(
1326            Arc::clone(&self.controller),
1327            scope,
1328        )?))
1329    }
1330
1331    async fn await_event_key(
1332        &self,
1333        scope: &ExecutionScope,
1334        wait: AwaitEventWaitIdentity,
1335    ) -> Result<AwaitEventKey, RuntimeError> {
1336        self.controller.await_event_key(scope, wait).await
1337    }
1338
1339    async fn resolve_await_event(
1340        &self,
1341        key: &AwaitEventKey,
1342        resolution: Resolution,
1343    ) -> Result<ResolveOutcome, RuntimeError> {
1344        self.controller.resolve_await_event(key, resolution).await
1345    }
1346
1347    async fn await_await_event(
1348        &self,
1349        key: &AwaitEventKey,
1350        cancel: CancellationToken,
1351        deadline: Option<Instant>,
1352    ) -> Result<Resolution, RuntimeError> {
1353        self.controller
1354            .await_await_event(key, cancel, deadline)
1355            .await
1356    }
1357
1358    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1359        self.controller
1360            .revoke_await_events_for_session(session_id)
1361            .await
1362    }
1363}
1364
1365impl InlineRuntimeEffectController {
1366    /// Register the process (and any handle grant) into the durable registry.
1367    ///
1368    /// The inline controller no longer runs the process here: the registry's
1369    /// non-terminal row *is* the durable work queue, and the lease-protected
1370    /// [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole executor. The
1371    /// control seam pokes that runner after a successful start, so registering
1372    /// the row is all this path does.
1373    pub(crate) async fn start_process(
1374        registry: Arc<dyn crate::ProcessRegistry>,
1375        registration: crate::ProcessRegistration,
1376        grant: Option<crate::ProcessStartGrant>,
1377    ) -> Result<ProcessRecord, PluginError> {
1378        let registration_for_record = registration.clone();
1379        let record = registry.register_process(registration_for_record).await?;
1380        if let Some(grant) = grant {
1381            registry
1382                .grant_handle(&grant.session_scope, &registration.id, grant.descriptor)
1383                .await?;
1384        }
1385        Ok(record)
1386    }
1387
1388    pub(crate) async fn request_process_cancel(
1389        &self,
1390        registry: Arc<dyn crate::ProcessRegistry>,
1391        process_id: &str,
1392        reason: Option<String>,
1393    ) -> Result<ProcessRecord, PluginError> {
1394        // Cancellation is a durable signal: the cancel event is what the
1395        // runner-run process observes, so the inline controller appends it and
1396        // no longer tracks an in-process cancellation token.
1397        registry
1398            .append_event(
1399                process_id,
1400                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1401            )
1402            .await?;
1403        registry
1404            .get_process(process_id)
1405            .await
1406            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1407    }
1408
1409    async fn execute_process_command(
1410        registry: Arc<dyn crate::ProcessRegistry>,
1411        command: ProcessCommand,
1412    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1413        match command {
1414            ProcessCommand::Start {
1415                registration,
1416                grant,
1417                execution_context: _,
1418            } => {
1419                let record = Self::start_process(registry, registration, grant).await?;
1420                Ok(ProcessEffectOutcome::Start { record })
1421            }
1422            ProcessCommand::List {
1423                session_scope,
1424                mode,
1425            } => {
1426                let entries = match mode {
1427                    crate::ProcessListMode::Live => {
1428                        registry.list_live_handle_grants(&session_scope).await?
1429                    }
1430                    crate::ProcessListMode::All => {
1431                        registry.list_handle_grants(&session_scope).await?
1432                    }
1433                };
1434                Ok(ProcessEffectOutcome::List { entries })
1435            }
1436            ProcessCommand::Transfer {
1437                from_scope,
1438                to_scope,
1439                process_ids,
1440            } => {
1441                registry
1442                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1443                    .await?;
1444                Ok(ProcessEffectOutcome::Transfer)
1445            }
1446            ProcessCommand::DeleteSession { session_id } => {
1447                let report = registry.delete_session_process_state(&session_id).await?;
1448                Ok(ProcessEffectOutcome::DeleteSession { report })
1449            }
1450            ProcessCommand::Await { process_id } => {
1451                let output = registry.await_process(&process_id).await?;
1452                Ok(ProcessEffectOutcome::Await { output })
1453            }
1454            ProcessCommand::Cancel { process_id, reason } => {
1455                let record = InlineRuntimeEffectController
1456                    .request_process_cancel(registry, &process_id, reason)
1457                    .await?;
1458                Ok(ProcessEffectOutcome::Cancel { record })
1459            }
1460            ProcessCommand::Signal {
1461                process_id,
1462                request,
1463                ..
1464            } => {
1465                let result = registry.append_event(&process_id, request).await?;
1466                Ok(ProcessEffectOutcome::Signal {
1467                    event: result.event,
1468                })
1469            }
1470        }
1471    }
1472}
1473
1474impl std::fmt::Debug for InlineRuntimeEffectController {
1475    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1476        f.debug_struct("InlineRuntimeEffectController").finish()
1477    }
1478}