Skip to main content

lash_core/runtime/effect/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::pin::Pin;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5
6use hmac::{Hmac, Mac};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{Notify, mpsc};
9use tokio_util::sync::CancellationToken;
10
11use crate::AttachmentStore;
12use crate::LlmRequest as CoreLlmRequest;
13use crate::LlmResponse;
14use crate::ProcessRecord;
15use crate::ProcessRegistry;
16use crate::provider::ProviderHandle;
17use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
18use crate::sansio::LlmCallError;
19use crate::{PluginError, RuntimeError, RuntimeErrorCode};
20
21use super::envelope::{
22    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
23    RuntimeEffectKind, RuntimeEffectOutcome,
24};
25use super::outcome::llm_call_error_from_transport;
26
27type HmacSha256 = Hmac<sha2::Sha256>;
28
29fn inline_await_events() -> &'static AwaitEventRegistry {
30    static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
31    REGISTRY.get_or_init(AwaitEventRegistry::new)
32}
33
34// =============================================================================
35// Effect host + controller trait + scope + error
36// =============================================================================
37
38/// Stable semantic identity for one effectful runtime operation.
39///
40/// The scope is chosen by the host boundary before any nondeterministic work is
41/// planned. It is intentionally generic: Restate, an inline test host, or a
42/// future durable effect host all receive the same Lash scope vocabulary.
43#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
44#[serde(tag = "type", rename_all = "snake_case")]
45pub enum ExecutionScope {
46    Turn {
47        session_id: String,
48        turn_id: String,
49    },
50    Process {
51        process_id: String,
52    },
53    QueueDrain {
54        session_id: String,
55        drain_id: String,
56    },
57    SessionDelete {
58        session_id: String,
59    },
60    RuntimeOperation {
61        operation_id: String,
62    },
63}
64
65impl ExecutionScope {
66    pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
67        Self::Turn {
68            session_id: session_id.into(),
69            turn_id: turn_id.into(),
70        }
71    }
72
73    pub fn process(process_id: impl Into<String>) -> Self {
74        Self::Process {
75            process_id: process_id.into(),
76        }
77    }
78
79    pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
80        Self::QueueDrain {
81            session_id: session_id.into(),
82            drain_id: drain_id.into(),
83        }
84    }
85
86    pub fn session_delete(session_id: impl Into<String>) -> Self {
87        Self::SessionDelete {
88            session_id: session_id.into(),
89        }
90    }
91
92    pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
93        Self::RuntimeOperation {
94            operation_id: operation_id.into(),
95        }
96    }
97
98    pub fn id(&self) -> &str {
99        match self {
100            Self::Turn { turn_id, .. } => turn_id,
101            Self::Process { process_id } => process_id,
102            Self::QueueDrain { drain_id, .. } => drain_id,
103            Self::SessionDelete { session_id } => session_id,
104            Self::RuntimeOperation { operation_id } => operation_id,
105        }
106    }
107
108    pub fn session_id(&self) -> Option<&str> {
109        match self {
110            Self::Turn { session_id, .. }
111            | Self::QueueDrain { session_id, .. }
112            | Self::SessionDelete { session_id } => Some(session_id),
113            Self::Process { .. } | Self::RuntimeOperation { .. } => None,
114        }
115    }
116
117    pub fn turn_id(&self) -> Option<&str> {
118        match self {
119            Self::Turn { turn_id, .. } => Some(turn_id),
120            _ => None,
121        }
122    }
123
124    pub fn validates_turn_trace_id(&self) -> bool {
125        matches!(self, Self::Turn { .. })
126    }
127
128    fn validate(&self) -> Result<(), RuntimeError> {
129        let missing = match self {
130            Self::Turn {
131                session_id,
132                turn_id,
133            } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
134            Self::Process { process_id } => process_id.trim().is_empty(),
135            Self::QueueDrain {
136                session_id,
137                drain_id,
138            } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
139            Self::SessionDelete { session_id } => session_id.trim().is_empty(),
140            Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
141        };
142        if missing {
143            return Err(RuntimeError::new(
144                RuntimeErrorCode::MissingExecutionScopeId,
145                "execution scopes require non-empty stable ids",
146            ));
147        }
148        Ok(())
149    }
150}
151
152#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
153#[serde(tag = "type", rename_all = "snake_case")]
154pub enum AwaitEventWaitIdentity {
155    ToolCompletion {
156        tool_call_id: String,
157    },
158    ProcessSignal {
159        process_id: String,
160        signal_name: String,
161        ordinal: u64,
162    },
163    Custom {
164        key: String,
165    },
166}
167
168impl AwaitEventWaitIdentity {
169    pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
170        Self::ToolCompletion {
171            tool_call_id: tool_call_id.into(),
172        }
173    }
174
175    pub fn process_signal(
176        process_id: impl Into<String>,
177        signal_name: impl Into<String>,
178        ordinal: u64,
179    ) -> Self {
180        Self::ProcessSignal {
181            process_id: process_id.into(),
182            signal_name: signal_name.into(),
183            ordinal,
184        }
185    }
186
187    fn validate(&self) -> Result<(), RuntimeError> {
188        let invalid = match self {
189            Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
190            Self::ProcessSignal {
191                process_id,
192                signal_name,
193                ordinal,
194            } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
195            Self::Custom { key } => key.trim().is_empty(),
196        };
197        if invalid {
198            return Err(RuntimeError::new(
199                "invalid_await_event_wait_identity",
200                "await-event wait identity requires non-empty stable ids",
201            ));
202        }
203        Ok(())
204    }
205}
206
207#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub struct AwaitEventKey {
209    pub scope: ExecutionScope,
210    pub wait: AwaitEventWaitIdentity,
211    pub key_id: String,
212    pub signature: String,
213}
214
215impl AwaitEventKey {
216    pub fn promise_key(&self) -> String {
217        format!("lash-await-event:{}", self.key_id)
218    }
219}
220
221#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
222pub struct ExternalCompletionError {
223    pub code: String,
224    pub message: String,
225    #[serde(default, skip_serializing_if = "Option::is_none")]
226    pub raw: Option<serde_json::Value>,
227}
228
229impl ExternalCompletionError {
230    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
231        Self {
232            code: code.into(),
233            message: message.into(),
234            raw: None,
235        }
236    }
237}
238
239#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
240#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
241pub enum Resolution {
242    Ok(serde_json::Value),
243    Err(ExternalCompletionError),
244    Timeout,
245    Cancelled,
246}
247
248#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
249#[serde(tag = "status", rename_all = "snake_case")]
250pub enum ResolveOutcome {
251    Accepted,
252    AlreadyResolved { terminal: Resolution },
253    UnknownOrRevoked,
254}
255
256#[derive(Debug)]
257struct AwaitEventEntry {
258    terminal: Option<Resolution>,
259    notify: Arc<Notify>,
260}
261
262#[derive(Debug)]
263struct AwaitEventRegistryState {
264    entries: HashMap<String, AwaitEventEntry>,
265    revoked_key_ids: HashSet<String>,
266    revoked_session_ids: HashSet<String>,
267}
268
269#[derive(Debug)]
270struct AwaitEventRegistry {
271    secret: Vec<u8>,
272    state: std::sync::Mutex<AwaitEventRegistryState>,
273}
274
275impl AwaitEventRegistry {
276    fn new() -> Self {
277        Self {
278            secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
279            state: std::sync::Mutex::new(AwaitEventRegistryState {
280                entries: HashMap::new(),
281                revoked_key_ids: HashSet::new(),
282                revoked_session_ids: HashSet::new(),
283            }),
284        }
285    }
286
287    fn key_for(
288        &self,
289        scope: &ExecutionScope,
290        wait: AwaitEventWaitIdentity,
291    ) -> Result<AwaitEventKey, RuntimeError> {
292        scope.validate()?;
293        wait.validate()?;
294        let key_id =
295            crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
296                RuntimeError::new(
297                    "await_event_key_hash",
298                    format!("failed to hash await-event identity: {err}"),
299                )
300            })?;
301        let signature = self.signature(scope, &wait, &key_id)?;
302        Ok(AwaitEventKey {
303            scope: scope.clone(),
304            wait,
305            key_id,
306            signature,
307        })
308    }
309
310    fn signature(
311        &self,
312        scope: &ExecutionScope,
313        wait: &AwaitEventWaitIdentity,
314        key_id: &str,
315    ) -> Result<String, RuntimeError> {
316        let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
317            RuntimeError::new(
318                "await_event_key_sign",
319                format!("failed to initialize await-event key signer: {err}"),
320            )
321        })?;
322        let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
323            RuntimeError::new(
324                "await_event_key_sign",
325                format!("failed to serialize await-event key identity: {err}"),
326            )
327        })?;
328        mac.update(&canonical);
329        Ok(format!("{:x}", mac.finalize().into_bytes()))
330    }
331
332    fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
333        let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
334        Ok(expected == key.signature)
335    }
336
337    fn resolve(
338        &self,
339        key: &AwaitEventKey,
340        resolution: Resolution,
341    ) -> Result<ResolveOutcome, RuntimeError> {
342        if !self.verify(key)? {
343            return Ok(ResolveOutcome::UnknownOrRevoked);
344        }
345        let mut state = self.state.lock().map_err(|_| {
346            RuntimeError::new(
347                "await_event_registry_poisoned",
348                "await-event registry lock poisoned",
349            )
350        })?;
351        if state.revoked_key_ids.contains(&key.key_id)
352            || key
353                .scope
354                .session_id()
355                .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
356        {
357            return Ok(ResolveOutcome::UnknownOrRevoked);
358        }
359        let entry = state
360            .entries
361            .entry(key.key_id.clone())
362            .or_insert_with(|| AwaitEventEntry {
363                terminal: None,
364                notify: Arc::new(Notify::new()),
365            });
366        if let Some(terminal) = &entry.terminal {
367            return Ok(ResolveOutcome::AlreadyResolved {
368                terminal: terminal.clone(),
369            });
370        }
371        entry.terminal = Some(resolution);
372        entry.notify.notify_waiters();
373        Ok(ResolveOutcome::Accepted)
374    }
375
376    async fn await_resolution(
377        &self,
378        key: &AwaitEventKey,
379        cancel: CancellationToken,
380        deadline: Option<Instant>,
381    ) -> Result<Resolution, RuntimeError> {
382        if !self.verify(key)? {
383            return Err(RuntimeError::new(
384                "await_event_unknown_or_revoked",
385                "await-event key is invalid or revoked",
386            ));
387        }
388        loop {
389            let notify =
390                {
391                    let mut state = self.state.lock().map_err(|_| {
392                        RuntimeError::new(
393                            "await_event_registry_poisoned",
394                            "await-event registry lock poisoned",
395                        )
396                    })?;
397                    if state.revoked_key_ids.contains(&key.key_id)
398                        || key.scope.session_id().is_some_and(|session_id| {
399                            state.revoked_session_ids.contains(session_id)
400                        })
401                    {
402                        return Err(RuntimeError::new(
403                            "await_event_unknown_or_revoked",
404                            "await-event key is invalid or revoked",
405                        ));
406                    }
407                    let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
408                        AwaitEventEntry {
409                            terminal: None,
410                            notify: Arc::new(Notify::new()),
411                        }
412                    });
413                    if let Some(terminal) = entry.terminal.clone() {
414                        return Ok(terminal);
415                    }
416                    Arc::clone(&entry.notify)
417                };
418            if let Some(deadline) = deadline {
419                tokio::select! {
420                    _ = cancel.cancelled() => {
421                        let _ = self.resolve(key, Resolution::Cancelled)?;
422                    }
423                    _ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
424                        let _ = self.resolve(key, Resolution::Timeout)?;
425                    }
426                    _ = notify.notified() => {}
427                }
428            } else {
429                tokio::select! {
430                    _ = cancel.cancelled() => {
431                        let _ = self.resolve(key, Resolution::Cancelled)?;
432                    }
433                    _ = notify.notified() => {}
434                }
435            }
436        }
437    }
438
439    fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
440        let mut state = self.state.lock().map_err(|_| {
441            RuntimeError::new(
442                "await_event_registry_poisoned",
443                "await-event registry lock poisoned",
444            )
445        })?;
446        state.revoked_session_ids.insert(session_id.to_string());
447        for entry in state.entries.values() {
448            entry.notify.notify_waiters();
449        }
450        Ok(())
451    }
452}
453
454enum ScopedEffectControllerInner<'run> {
455    Borrowed(&'run dyn RuntimeEffectController),
456    Shared(Arc<dyn RuntimeEffectController>),
457}
458
459impl Clone for ScopedEffectControllerInner<'_> {
460    fn clone(&self) -> Self {
461        match self {
462            Self::Borrowed(controller) => Self::Borrowed(*controller),
463            Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
464        }
465    }
466}
467
468/// Scoped low-level controller plus the semantic execution scope it is serving.
469#[derive(Clone)]
470pub struct ScopedEffectController<'run> {
471    controller: ScopedEffectControllerInner<'run>,
472    scope: ExecutionScope,
473}
474
475impl<'run> ScopedEffectController<'run> {
476    pub fn borrowed(
477        controller: &'run dyn RuntimeEffectController,
478        scope: ExecutionScope,
479    ) -> Result<Self, RuntimeError> {
480        scope.validate()?;
481        Ok(Self {
482            controller: ScopedEffectControllerInner::Borrowed(controller),
483            scope,
484        })
485    }
486
487    pub fn shared(
488        controller: Arc<dyn RuntimeEffectController>,
489        scope: ExecutionScope,
490    ) -> Result<Self, RuntimeError> {
491        scope.validate()?;
492        Ok(Self {
493            controller: ScopedEffectControllerInner::Shared(controller),
494            scope,
495        })
496    }
497
498    pub fn controller(&self) -> &dyn RuntimeEffectController {
499        match &self.controller {
500            ScopedEffectControllerInner::Borrowed(controller) => *controller,
501            ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
502        }
503    }
504
505    pub fn execution_scope(&self) -> &ExecutionScope {
506        &self.scope
507    }
508
509    pub fn scope_id(&self) -> &str {
510        self.scope.id()
511    }
512
513    pub fn turn_id(&self) -> Option<&str> {
514        self.scope.turn_id()
515    }
516}
517
518/// Deployment-level factory for scoped effect controllers.
519#[async_trait::async_trait]
520pub trait EffectHost: Send + Sync {
521    fn durability_tier(&self) -> crate::DurabilityTier {
522        crate::DurabilityTier::Inline
523    }
524
525    fn requires_durable_attachment_store(&self) -> bool {
526        false
527    }
528
529    fn supports_durable_effects(&self) -> bool {
530        false
531    }
532
533    fn scoped<'run>(
534        &'run self,
535        scope: ExecutionScope,
536    ) -> Result<ScopedEffectController<'run>, RuntimeError>;
537
538    fn scoped_static(
539        &self,
540        _scope: ExecutionScope,
541    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
542        Ok(None)
543    }
544
545    async fn await_event_key(
546        &self,
547        _scope: &ExecutionScope,
548        _wait: AwaitEventWaitIdentity,
549    ) -> Result<AwaitEventKey, RuntimeError> {
550        Err(RuntimeError::new(
551            "await_event_unsupported",
552            "this effect host does not support await-event keys",
553        ))
554    }
555
556    async fn resolve_await_event(
557        &self,
558        _key: &AwaitEventKey,
559        _resolution: Resolution,
560    ) -> Result<ResolveOutcome, RuntimeError> {
561        Ok(ResolveOutcome::UnknownOrRevoked)
562    }
563
564    async fn await_await_event(
565        &self,
566        _key: &AwaitEventKey,
567        _cancel: CancellationToken,
568        _deadline: Option<Instant>,
569    ) -> Result<Resolution, RuntimeError> {
570        Err(RuntimeError::new(
571            "await_event_unsupported",
572            "this effect host does not support await-event waits",
573        ))
574    }
575
576    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
577        Ok(())
578    }
579}
580
581/// Boundary for nondeterministic runtime work.
582#[async_trait::async_trait]
583pub trait RuntimeEffectController: Send + Sync {
584    /// Durability tier this controller provides; defaults to
585    /// [`DurabilityTier::Inline`].
586    fn durability_tier(&self) -> crate::DurabilityTier {
587        crate::DurabilityTier::Inline
588    }
589
590    fn requires_durable_attachment_store(&self) -> bool {
591        false
592    }
593
594    fn supports_durable_effects(&self) -> bool {
595        false
596    }
597
598    async fn execute_effect(
599        &self,
600        envelope: RuntimeEffectEnvelope,
601        local_executor: RuntimeEffectLocalExecutor<'_>,
602    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
603
604    async fn await_event_key(
605        &self,
606        _scope: &ExecutionScope,
607        _wait: AwaitEventWaitIdentity,
608    ) -> Result<AwaitEventKey, RuntimeError> {
609        Err(RuntimeError::new(
610            "await_event_unsupported",
611            "this effect controller does not support await-event keys",
612        ))
613    }
614
615    async fn resolve_await_event(
616        &self,
617        _key: &AwaitEventKey,
618        _resolution: Resolution,
619    ) -> Result<ResolveOutcome, RuntimeError> {
620        Ok(ResolveOutcome::UnknownOrRevoked)
621    }
622
623    async fn await_await_event(
624        &self,
625        _key: &AwaitEventKey,
626        _cancel: CancellationToken,
627        _deadline: Option<Instant>,
628    ) -> Result<Resolution, RuntimeError> {
629        Err(RuntimeError::new(
630            "await_event_unsupported",
631            "this effect controller does not support await-event waits",
632        ))
633    }
634
635    async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
636        Ok(())
637    }
638}
639
640/// Runtime-internal handle for effect-controller references carried through
641/// per-turn execution contexts.
642#[derive(Clone)]
643pub(crate) enum RuntimeEffectControllerHandle<'run> {
644    Borrowed(ScopedEffectController<'run>),
645    #[cfg(any(test, feature = "testing"))]
646    Shared {
647        controller: Arc<dyn RuntimeEffectController>,
648        scope: ExecutionScope,
649    },
650}
651
652impl<'run> RuntimeEffectControllerHandle<'run> {
653    pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
654        Self::Borrowed(scoped)
655    }
656
657    #[cfg(any(test, feature = "testing"))]
658    pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
659        Self::Shared {
660            controller,
661            scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
662        }
663    }
664
665    pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
666        match self {
667            Self::Borrowed(scoped) => scoped.controller(),
668            #[cfg(any(test, feature = "testing"))]
669            Self::Shared { controller, .. } => controller.as_ref(),
670        }
671    }
672
673    pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
674        match self {
675            Self::Borrowed(scoped) => scoped.clone(),
676            #[cfg(any(test, feature = "testing"))]
677            Self::Shared { controller, scope } => {
678                ScopedEffectController::shared(Arc::clone(controller), scope.clone())
679                    .expect("runtime effect controller handle carries a valid scope")
680            }
681        }
682    }
683
684    pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
685        self.clone()
686    }
687}
688
689#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
690#[error("{code}: {message}")]
691pub struct RuntimeEffectControllerError {
692    pub code: String,
693    pub message: String,
694}
695
696impl RuntimeEffectControllerError {
697    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
698        Self {
699            code: code.into(),
700            message: message.into(),
701        }
702    }
703
704    pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
705        Self::new(
706            "runtime_effect_wrong_outcome",
707            format!(
708                "expected {} outcome, got {}",
709                expected.as_str(),
710                actual.as_str()
711            ),
712        )
713    }
714
715    pub(crate) fn into_runtime_error(self) -> RuntimeError {
716        RuntimeError::new(self.code, self.message)
717    }
718}
719
720impl From<RuntimeError> for RuntimeEffectControllerError {
721    fn from(err: RuntimeError) -> Self {
722        Self::new(err.code.as_str(), err.message)
723    }
724}
725
726impl From<PluginError> for RuntimeEffectControllerError {
727    fn from(err: PluginError) -> Self {
728        Self::new("plugin", err.to_string())
729    }
730}
731
732impl From<crate::StoreError> for RuntimeEffectControllerError {
733    fn from(err: crate::StoreError) -> Self {
734        Self::new("runtime_store", err.to_string())
735    }
736}
737
738// =============================================================================
739// Local executor (per-effect borrowed runner state)
740// =============================================================================
741
742#[async_trait::async_trait]
743pub(crate) trait ProcessRunner: Send + Sync {
744    async fn run_process(
745        &self,
746        registration: crate::ProcessRegistration,
747        execution_context: crate::ProcessExecutionContext,
748        registry: Arc<dyn ProcessRegistry>,
749        scoped_effect_controller: crate::ScopedEffectController<'_>,
750        cancellation: CancellationToken,
751    ) -> crate::ProcessAwaitOutput;
752}
753
754pub struct ProcessLocalExecution {
755    pub registry: Arc<dyn ProcessRegistry>,
756}
757
758pub(super) struct LocalTurnEffectRunner<'a, 'run> {
759    driver: &'a mut RuntimeTurnDriver<'run>,
760    machine: &'a mut crate::TurnMachine,
761    event_tx: mpsc::Sender<RuntimeStreamEvent>,
762    cancellation: CancellationToken,
763}
764
765pub(super) struct LocalDirectEffectRunner {
766    provider: ProviderHandle,
767    attachment_store: Arc<dyn AttachmentStore>,
768}
769
770#[async_trait::async_trait]
771trait RuntimeEffectLocalRunner: Send {
772    async fn execute(
773        self: Box<Self>,
774        envelope: RuntimeEffectEnvelope,
775    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
776}
777
778#[cfg(any(test, feature = "testing"))]
779type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
780        RuntimeEffectEnvelope,
781    ) -> Pin<
782        Box<
783            dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
784                + Send
785                + 'run,
786        >,
787    > + Send
788    + 'run;
789
790#[cfg(any(test, feature = "testing"))]
791struct TestingRuntimeEffectLocalRunner<'run> {
792    run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
793}
794
795type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
796        serde_json::Value,
797    ) -> Pin<
798        Box<
799            dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
800                + Send
801                + 'run,
802        >,
803    > + Send
804    + 'run;
805
806struct DurableStepLocalRunner<'run> {
807    run: Box<DurableStepLocalRunnerFn<'run>>,
808}
809
810enum RuntimeEffectLocalExecutorState<'run> {
811    Unavailable,
812    SleepOnly {
813        cancellation: CancellationToken,
814    },
815    ExternalWaitOptions {
816        cancellation: CancellationToken,
817        deadline: Option<Instant>,
818    },
819    Process(ProcessLocalExecution),
820    Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
821}
822
823/// Scoped local executor provided to a [`RuntimeEffectController`] for one effect.
824///
825/// Durable controllers may ignore it and replay their own recorded result. The
826/// default inline controller delegates to it, so local provider/tool/checkpoint
827/// work still crosses the same `execute_effect` boundary as durable controllers.
828pub struct RuntimeEffectLocalExecutor<'run> {
829    state: RuntimeEffectLocalExecutorState<'run>,
830}
831
832impl<'run> RuntimeEffectLocalExecutor<'run> {
833    pub fn unavailable() -> Self {
834        Self {
835            state: RuntimeEffectLocalExecutorState::Unavailable,
836        }
837    }
838
839    pub fn sleep(cancellation: CancellationToken) -> Self {
840        Self {
841            state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
842        }
843    }
844
845    pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
846        Self {
847            state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
848                cancellation,
849                deadline,
850            },
851        }
852    }
853
854    pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
855        Self {
856            state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
857        }
858    }
859
860    pub fn durable_step<F, Fut>(run: F) -> Self
861    where
862        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
863        Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
864    {
865        Self {
866            state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
867                run: Box::new(move |input| {
868                    Box::pin(async move { run(input).await.map_err(Into::into) })
869                }),
870            })),
871        }
872    }
873
874    #[cfg(any(test, feature = "testing"))]
875    pub fn testing<F, Fut>(run: F) -> Self
876    where
877        F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
878        Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
879            + Send
880            + 'run,
881    {
882        Self {
883            state: RuntimeEffectLocalExecutorState::Runner(Box::new(
884                TestingRuntimeEffectLocalRunner {
885                    run: Box::new(move |envelope| Box::pin(run(envelope))),
886                },
887            )),
888        }
889    }
890
891    pub(in crate::runtime) fn turn<'scope>(
892        driver: &'run mut RuntimeTurnDriver<'scope>,
893        machine: &'run mut crate::TurnMachine,
894        event_tx: mpsc::Sender<RuntimeStreamEvent>,
895        cancellation: CancellationToken,
896    ) -> Self
897    where
898        'scope: 'run,
899    {
900        Self {
901            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
902                driver,
903                machine,
904                event_tx,
905                cancellation,
906            })),
907        }
908    }
909
910    pub(in crate::runtime) fn direct(
911        provider: ProviderHandle,
912        attachment_store: Arc<dyn AttachmentStore>,
913    ) -> Self {
914        Self {
915            state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
916                provider,
917                attachment_store,
918            })),
919        }
920    }
921
922    pub async fn execute(
923        self,
924        envelope: RuntimeEffectEnvelope,
925    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
926        match self.state {
927            RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
928            RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
929                execute_local_sleep(envelope, cancellation).await
930            }
931            RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
932                Err(RuntimeEffectControllerError::new(
933                    "runtime_effect_local_executor_mismatch",
934                    format!(
935                        "local await-event options cannot execute {} command directly",
936                        envelope.command.kind().as_str()
937                    ),
938                ))
939            }
940            RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
941                "runtime_effect_local_executor_unavailable",
942                format!(
943                    "no local executor is available for {}",
944                    envelope.command.kind().as_str()
945                ),
946            )),
947            RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
948                "runtime_effect_local_executor_mismatch",
949                format!(
950                    "process executor cannot execute {} command directly",
951                    envelope.command.kind().as_str()
952                ),
953            )),
954        }
955    }
956
957    pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
958        match self.state {
959            RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
960            _ => Err(RuntimeEffectControllerError::new(
961                "runtime_effect_local_executor_unavailable",
962                "no process executor is available for process command",
963            )),
964        }
965    }
966
967    fn into_await_event_options(
968        self,
969    ) -> Result<(CancellationToken, Option<Instant>), RuntimeEffectControllerError> {
970        match self.state {
971            RuntimeEffectLocalExecutorState::ExternalWaitOptions {
972                cancellation,
973                deadline,
974            } => Ok((cancellation, deadline)),
975            _ => Ok((CancellationToken::new(), None)),
976        }
977    }
978}
979
980#[cfg(any(test, feature = "testing"))]
981#[async_trait::async_trait]
982impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
983    async fn execute(
984        self: Box<Self>,
985        envelope: RuntimeEffectEnvelope,
986    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
987        (self.run)(envelope).await
988    }
989}
990
991#[async_trait::async_trait]
992impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
993    async fn execute(
994        self: Box<Self>,
995        envelope: RuntimeEffectEnvelope,
996    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
997        match envelope.command {
998            RuntimeEffectCommand::DurableStep { input, .. } => {
999                let value = (self.run)(input).await?;
1000                Ok(RuntimeEffectOutcome::DurableStep { value })
1001            }
1002            command => Err(RuntimeEffectControllerError::new(
1003                "runtime_effect_local_executor_mismatch",
1004                format!(
1005                    "local durable step executor cannot execute {} command",
1006                    command.kind().as_str()
1007                ),
1008            )),
1009        }
1010    }
1011}
1012
1013#[async_trait::async_trait]
1014impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
1015    async fn execute(
1016        self: Box<Self>,
1017        envelope: RuntimeEffectEnvelope,
1018    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1019        let runner = *self;
1020        match envelope.command {
1021            RuntimeEffectCommand::LlmCall { request } => {
1022                let protocol_iteration = runner.machine.protocol_iteration();
1023                let (result, text_streamed) = runner
1024                    .driver
1025                    .run_llm_call(
1026                        Arc::new((*request).into_request(None, None)),
1027                        protocol_iteration,
1028                        envelope.invocation,
1029                        &runner.event_tx,
1030                        &runner.cancellation,
1031                    )
1032                    .await;
1033                Ok(RuntimeEffectOutcome::LlmCall {
1034                    result,
1035                    text_streamed,
1036                })
1037            }
1038            RuntimeEffectCommand::ToolCall { call } => {
1039                let tool_name = call.tool_name.clone();
1040                let mut outcome = runner
1041                    .driver
1042                    .run_tool_calls(
1043                        vec![(call, envelope.invocation)],
1044                        &runner.event_tx,
1045                        &runner.cancellation,
1046                    )
1047                    .await?;
1048                let launch = outcome.launches.pop().ok_or_else(|| {
1049                    RuntimeEffectControllerError::new(
1050                        "tool_result_missing",
1051                        format!("tool `{tool_name}` completed without a launch result"),
1052                    )
1053                })?;
1054                Ok(RuntimeEffectOutcome::ToolCall {
1055                    launch,
1056                    triggers: outcome.triggers,
1057                })
1058            }
1059            RuntimeEffectCommand::ExecCode { code } => {
1060                let protocol_iteration = runner.machine.protocol_iteration();
1061                let messages = runner.machine.message_sequence();
1062                Ok(RuntimeEffectOutcome::ExecCode {
1063                    result: runner
1064                        .driver
1065                        .run_exec_code(
1066                            &code,
1067                            messages,
1068                            protocol_iteration,
1069                            envelope.invocation,
1070                            &runner.event_tx,
1071                        )
1072                        .await,
1073                })
1074            }
1075            RuntimeEffectCommand::Checkpoint { checkpoint } => {
1076                Ok(RuntimeEffectOutcome::Checkpoint {
1077                    result: runner
1078                        .driver
1079                        .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1080                        .await
1081                        .map_err(RuntimeEffectControllerError::from),
1082                })
1083            }
1084            RuntimeEffectCommand::SyncExecutionEnvironment {
1085                update_machine_config,
1086            } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1087                result: runner
1088                    .driver
1089                    .refresh_execution_environment(runner.machine, update_machine_config)
1090                    .await
1091                    .map_err(|err| err.to_string()),
1092            }),
1093            RuntimeEffectCommand::Sleep { duration_ms } => {
1094                sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
1095                Ok(RuntimeEffectOutcome::Sleep)
1096            }
1097            command => Err(RuntimeEffectControllerError::new(
1098                "runtime_effect_local_executor_mismatch",
1099                format!(
1100                    "local turn executor cannot execute {} command",
1101                    command.kind().as_str()
1102                ),
1103            )),
1104        }
1105    }
1106}
1107
1108#[async_trait::async_trait]
1109impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1110    async fn execute(
1111        mut self: Box<Self>,
1112        envelope: RuntimeEffectEnvelope,
1113    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1114        match envelope.command {
1115            RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1116                result: self
1117                    .run_direct_llm_request((*request).into_request(
1118                        crate::session_model::transport_stream_events(&self.provider, None),
1119                        None,
1120                    ))
1121                    .await,
1122            }),
1123            RuntimeEffectCommand::Sleep { duration_ms } => {
1124                sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
1125                Ok(RuntimeEffectOutcome::Sleep)
1126            }
1127            command => Err(RuntimeEffectControllerError::new(
1128                "runtime_effect_local_executor_mismatch",
1129                format!(
1130                    "local direct executor cannot execute {} command",
1131                    command.kind().as_str()
1132                ),
1133            )),
1134        }
1135    }
1136}
1137
1138impl LocalDirectEffectRunner {
1139    async fn run_direct_llm_request(
1140        &mut self,
1141        request: CoreLlmRequest,
1142    ) -> Result<LlmResponse, LlmCallError> {
1143        let request = crate::attachments::resolve_llm_request_attachments(
1144            request,
1145            self.attachment_store.as_ref(),
1146        )
1147        .await
1148        .map_err(|err| LlmCallError {
1149            message: err.to_string(),
1150            retryable: false,
1151            raw: None,
1152            code: Some("attachment_resolution_failed".to_string()),
1153            terminal_reason: crate::LlmTerminalReason::ProviderError,
1154            request_body: None,
1155        })?;
1156        self.provider
1157            .complete(request)
1158            .await
1159            .map_err(llm_call_error_from_transport)
1160    }
1161}
1162
1163async fn execute_local_sleep(
1164    envelope: RuntimeEffectEnvelope,
1165    cancellation: CancellationToken,
1166) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1167    match envelope.command {
1168        RuntimeEffectCommand::Sleep { duration_ms } => {
1169            sleep_with_cancellation(duration_ms, &cancellation).await?;
1170            Ok(RuntimeEffectOutcome::Sleep)
1171        }
1172        command => Err(RuntimeEffectControllerError::new(
1173            "runtime_effect_local_executor_mismatch",
1174            format!(
1175                "local sleep executor cannot execute {} command",
1176                command.kind().as_str()
1177            ),
1178        )),
1179    }
1180}
1181
1182async fn sleep_with_cancellation(
1183    duration_ms: u64,
1184    cancellation: &CancellationToken,
1185) -> Result<(), RuntimeEffectControllerError> {
1186    let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
1187    tokio::pin!(sleep);
1188    tokio::select! {
1189        _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1190            "runtime_effect_sleep_cancelled",
1191            "runtime effect sleep was cancelled",
1192        )),
1193        _ = &mut sleep => Ok(()),
1194    }
1195}
1196
1197// =============================================================================
1198// Default in-process effect controller
1199// =============================================================================
1200
1201/// Default in-process effect controller.
1202///
1203/// The inline controller executes local runners in process, provides in-memory
1204/// await-event resolution, and exposes durable-tool-effect semantics for local
1205/// runs. It does not make in-flight effects crash durable; workflow adapters
1206/// provide that by recording outcomes in their own history.
1207#[derive(Clone, Default)]
1208pub struct InlineRuntimeEffectController;
1209
1210#[async_trait::async_trait]
1211impl RuntimeEffectController for InlineRuntimeEffectController {
1212    fn supports_durable_effects(&self) -> bool {
1213        true
1214    }
1215
1216    async fn execute_effect(
1217        &self,
1218        envelope: RuntimeEffectEnvelope,
1219        local_executor: RuntimeEffectLocalExecutor<'_>,
1220    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1221        match envelope.command {
1222            RuntimeEffectCommand::AwaitEvent { key } => {
1223                let (cancellation, deadline) = local_executor.into_await_event_options()?;
1224                let resolution = self
1225                    .await_await_event(&key, cancellation, deadline)
1226                    .await
1227                    .map_err(RuntimeEffectControllerError::from)?;
1228                Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1229            }
1230            RuntimeEffectCommand::Process { command } => {
1231                let execution = local_executor.into_process()?;
1232                let registry = execution.registry;
1233                let result = tokio::task::spawn(async move {
1234                    Self::execute_process_command(registry, *command).await
1235                })
1236                .await
1237                .map_err(|err| {
1238                    RuntimeEffectControllerError::new(
1239                        "runtime_effect_process_task_join",
1240                        format!("inline process effect task failed: {err}"),
1241                    )
1242                })??;
1243                Ok(RuntimeEffectOutcome::Process { result })
1244            }
1245            _ => local_executor.execute(envelope).await,
1246        }
1247    }
1248
1249    async fn await_event_key(
1250        &self,
1251        scope: &ExecutionScope,
1252        wait: AwaitEventWaitIdentity,
1253    ) -> Result<AwaitEventKey, RuntimeError> {
1254        inline_await_events().key_for(scope, wait)
1255    }
1256
1257    async fn resolve_await_event(
1258        &self,
1259        key: &AwaitEventKey,
1260        resolution: Resolution,
1261    ) -> Result<ResolveOutcome, RuntimeError> {
1262        inline_await_events().resolve(key, resolution)
1263    }
1264
1265    async fn await_await_event(
1266        &self,
1267        key: &AwaitEventKey,
1268        cancel: CancellationToken,
1269        deadline: Option<Instant>,
1270    ) -> Result<Resolution, RuntimeError> {
1271        inline_await_events()
1272            .await_resolution(key, cancel, deadline)
1273            .await
1274    }
1275
1276    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1277        inline_await_events().revoke_session(session_id)
1278    }
1279}
1280
1281/// In-process deployment effect host.
1282#[derive(Clone)]
1283pub struct InlineEffectHost {
1284    controller: Arc<dyn RuntimeEffectController>,
1285}
1286
1287impl InlineEffectHost {
1288    pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1289        Self { controller }
1290    }
1291}
1292
1293impl Default for InlineEffectHost {
1294    fn default() -> Self {
1295        Self::new(Arc::new(InlineRuntimeEffectController))
1296    }
1297}
1298
1299#[async_trait::async_trait]
1300impl EffectHost for InlineEffectHost {
1301    fn durability_tier(&self) -> crate::DurabilityTier {
1302        self.controller.durability_tier()
1303    }
1304
1305    fn requires_durable_attachment_store(&self) -> bool {
1306        self.controller.requires_durable_attachment_store()
1307    }
1308
1309    fn supports_durable_effects(&self) -> bool {
1310        self.controller.supports_durable_effects()
1311    }
1312
1313    fn scoped<'run>(
1314        &'run self,
1315        scope: ExecutionScope,
1316    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1317        ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1318    }
1319
1320    fn scoped_static(
1321        &self,
1322        scope: ExecutionScope,
1323    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1324        Ok(Some(ScopedEffectController::shared(
1325            Arc::clone(&self.controller),
1326            scope,
1327        )?))
1328    }
1329
1330    async fn await_event_key(
1331        &self,
1332        scope: &ExecutionScope,
1333        wait: AwaitEventWaitIdentity,
1334    ) -> Result<AwaitEventKey, RuntimeError> {
1335        self.controller.await_event_key(scope, wait).await
1336    }
1337
1338    async fn resolve_await_event(
1339        &self,
1340        key: &AwaitEventKey,
1341        resolution: Resolution,
1342    ) -> Result<ResolveOutcome, RuntimeError> {
1343        self.controller.resolve_await_event(key, resolution).await
1344    }
1345
1346    async fn await_await_event(
1347        &self,
1348        key: &AwaitEventKey,
1349        cancel: CancellationToken,
1350        deadline: Option<Instant>,
1351    ) -> Result<Resolution, RuntimeError> {
1352        self.controller
1353            .await_await_event(key, cancel, deadline)
1354            .await
1355    }
1356
1357    async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1358        self.controller
1359            .revoke_await_events_for_session(session_id)
1360            .await
1361    }
1362}
1363
1364impl InlineRuntimeEffectController {
1365    /// Register the process (and any handle grant) into the durable registry.
1366    ///
1367    /// The inline controller no longer runs the process here: the registry's
1368    /// non-terminal row *is* the durable work queue, and the lease-protected
1369    /// [`ProcessWorkRunner`](crate::ProcessWorkRunner) is the sole executor. The
1370    /// control seam pokes that runner after a successful start, so registering
1371    /// the row is all this path does.
1372    pub(crate) async fn start_process(
1373        registry: Arc<dyn crate::ProcessRegistry>,
1374        registration: crate::ProcessRegistration,
1375        grant: Option<crate::ProcessStartGrant>,
1376    ) -> Result<ProcessRecord, PluginError> {
1377        let registration_for_record = registration.clone();
1378        let record = registry.register_process(registration_for_record).await?;
1379        if let Some(grant) = grant {
1380            registry
1381                .grant_handle(&grant.session_scope, &registration.id, grant.descriptor)
1382                .await?;
1383        }
1384        Ok(record)
1385    }
1386
1387    pub(crate) async fn request_process_cancel(
1388        &self,
1389        registry: Arc<dyn crate::ProcessRegistry>,
1390        process_id: &str,
1391        reason: Option<String>,
1392    ) -> Result<ProcessRecord, PluginError> {
1393        // Cancellation is a durable signal: the cancel event is what the
1394        // runner-run process observes, so the inline controller appends it and
1395        // no longer tracks an in-process cancellation token.
1396        registry
1397            .append_event(
1398                process_id,
1399                crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1400            )
1401            .await?;
1402        registry
1403            .get_process(process_id)
1404            .await
1405            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1406    }
1407
1408    async fn execute_process_command(
1409        registry: Arc<dyn crate::ProcessRegistry>,
1410        command: ProcessCommand,
1411    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1412        match command {
1413            ProcessCommand::Start {
1414                registration,
1415                grant,
1416                execution_context: _,
1417            } => {
1418                let record = Self::start_process(registry, registration, grant).await?;
1419                Ok(ProcessEffectOutcome::Start { record })
1420            }
1421            ProcessCommand::List {
1422                session_scope,
1423                mode,
1424            } => {
1425                let entries = match mode {
1426                    crate::ProcessListMode::Live => {
1427                        registry.list_live_handle_grants(&session_scope).await?
1428                    }
1429                    crate::ProcessListMode::All => {
1430                        registry.list_handle_grants(&session_scope).await?
1431                    }
1432                };
1433                Ok(ProcessEffectOutcome::List { entries })
1434            }
1435            ProcessCommand::Transfer {
1436                from_scope,
1437                to_scope,
1438                process_ids,
1439            } => {
1440                registry
1441                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1442                    .await?;
1443                Ok(ProcessEffectOutcome::Transfer)
1444            }
1445            ProcessCommand::DeleteSession { session_id } => {
1446                let report = registry.delete_session_process_state(&session_id).await?;
1447                Ok(ProcessEffectOutcome::DeleteSession { report })
1448            }
1449            ProcessCommand::Await { process_id } => {
1450                let output = registry.await_process(&process_id).await?;
1451                Ok(ProcessEffectOutcome::Await { output })
1452            }
1453            ProcessCommand::Cancel { process_id, reason } => {
1454                let record = InlineRuntimeEffectController
1455                    .request_process_cancel(registry, &process_id, reason)
1456                    .await?;
1457                Ok(ProcessEffectOutcome::Cancel { record })
1458            }
1459            ProcessCommand::Signal {
1460                process_id,
1461                request,
1462                ..
1463            } => {
1464                let result = registry.append_event(&process_id, request).await?;
1465                Ok(ProcessEffectOutcome::Signal {
1466                    event: result.event,
1467                })
1468            }
1469        }
1470    }
1471}
1472
1473impl std::fmt::Debug for InlineRuntimeEffectController {
1474    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1475        f.debug_struct("InlineRuntimeEffectController").finish()
1476    }
1477}