1use std::collections::{HashMap, HashSet};
2use std::pin::Pin;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5
6use hmac::{Hmac, Mac};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{Notify, mpsc};
9use tokio_util::sync::CancellationToken;
10
11use crate::AttachmentStore;
12use crate::LlmRequest as CoreLlmRequest;
13use crate::LlmResponse;
14use crate::ProcessRecord;
15use crate::ProcessRegistry;
16use crate::provider::ProviderHandle;
17use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
18use crate::sansio::LlmCallError;
19use crate::{PluginError, RuntimeError, RuntimeErrorCode};
20
21use super::envelope::{
22 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
23 RuntimeEffectKind, RuntimeEffectOutcome,
24};
25use super::outcome::llm_call_error_from_transport;
26
27type HmacSha256 = Hmac<sha2::Sha256>;
28
29fn inline_await_events() -> &'static AwaitEventRegistry {
30 static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
31 REGISTRY.get_or_init(AwaitEventRegistry::new)
32}
33
34#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
44#[serde(tag = "type", rename_all = "snake_case")]
45pub enum ExecutionScope {
46 Turn {
47 session_id: String,
48 turn_id: String,
49 },
50 Process {
51 process_id: String,
52 },
53 QueueDrain {
54 session_id: String,
55 drain_id: String,
56 },
57 SessionDelete {
58 session_id: String,
59 },
60 RuntimeOperation {
61 operation_id: String,
62 },
63}
64
65impl ExecutionScope {
66 pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
67 Self::Turn {
68 session_id: session_id.into(),
69 turn_id: turn_id.into(),
70 }
71 }
72
73 pub fn process(process_id: impl Into<String>) -> Self {
74 Self::Process {
75 process_id: process_id.into(),
76 }
77 }
78
79 pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
80 Self::QueueDrain {
81 session_id: session_id.into(),
82 drain_id: drain_id.into(),
83 }
84 }
85
86 pub fn session_delete(session_id: impl Into<String>) -> Self {
87 Self::SessionDelete {
88 session_id: session_id.into(),
89 }
90 }
91
92 pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
93 Self::RuntimeOperation {
94 operation_id: operation_id.into(),
95 }
96 }
97
98 pub fn id(&self) -> &str {
99 match self {
100 Self::Turn { turn_id, .. } => turn_id,
101 Self::Process { process_id } => process_id,
102 Self::QueueDrain { drain_id, .. } => drain_id,
103 Self::SessionDelete { session_id } => session_id,
104 Self::RuntimeOperation { operation_id } => operation_id,
105 }
106 }
107
108 pub fn session_id(&self) -> Option<&str> {
109 match self {
110 Self::Turn { session_id, .. }
111 | Self::QueueDrain { session_id, .. }
112 | Self::SessionDelete { session_id } => Some(session_id),
113 Self::Process { .. } | Self::RuntimeOperation { .. } => None,
114 }
115 }
116
117 pub fn turn_id(&self) -> Option<&str> {
118 match self {
119 Self::Turn { turn_id, .. } => Some(turn_id),
120 _ => None,
121 }
122 }
123
124 pub fn validates_turn_trace_id(&self) -> bool {
125 matches!(self, Self::Turn { .. })
126 }
127
128 fn validate(&self) -> Result<(), RuntimeError> {
129 let missing = match self {
130 Self::Turn {
131 session_id,
132 turn_id,
133 } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
134 Self::Process { process_id } => process_id.trim().is_empty(),
135 Self::QueueDrain {
136 session_id,
137 drain_id,
138 } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
139 Self::SessionDelete { session_id } => session_id.trim().is_empty(),
140 Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
141 };
142 if missing {
143 return Err(RuntimeError::new(
144 RuntimeErrorCode::MissingExecutionScopeId,
145 "execution scopes require non-empty stable ids",
146 ));
147 }
148 Ok(())
149 }
150}
151
152#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
153#[serde(tag = "type", rename_all = "snake_case")]
154pub enum AwaitEventWaitIdentity {
155 ToolCompletion {
156 tool_call_id: String,
157 },
158 ProcessSignal {
159 process_id: String,
160 signal_name: String,
161 ordinal: u64,
162 },
163 Custom {
164 key: String,
165 },
166}
167
168impl AwaitEventWaitIdentity {
169 pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
170 Self::ToolCompletion {
171 tool_call_id: tool_call_id.into(),
172 }
173 }
174
175 pub fn process_signal(
176 process_id: impl Into<String>,
177 signal_name: impl Into<String>,
178 ordinal: u64,
179 ) -> Self {
180 Self::ProcessSignal {
181 process_id: process_id.into(),
182 signal_name: signal_name.into(),
183 ordinal,
184 }
185 }
186
187 fn validate(&self) -> Result<(), RuntimeError> {
188 let invalid = match self {
189 Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
190 Self::ProcessSignal {
191 process_id,
192 signal_name,
193 ordinal,
194 } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
195 Self::Custom { key } => key.trim().is_empty(),
196 };
197 if invalid {
198 return Err(RuntimeError::new(
199 "invalid_await_event_wait_identity",
200 "await-event wait identity requires non-empty stable ids",
201 ));
202 }
203 Ok(())
204 }
205}
206
207#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub struct AwaitEventKey {
209 pub scope: ExecutionScope,
210 pub wait: AwaitEventWaitIdentity,
211 pub key_id: String,
212 pub signature: String,
213}
214
215impl AwaitEventKey {
216 pub fn promise_key(&self) -> String {
217 format!("lash-await-event:{}", self.key_id)
218 }
219}
220
221#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
222pub struct ExternalCompletionError {
223 pub code: String,
224 pub message: String,
225 #[serde(default, skip_serializing_if = "Option::is_none")]
226 pub raw: Option<serde_json::Value>,
227}
228
229impl ExternalCompletionError {
230 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
231 Self {
232 code: code.into(),
233 message: message.into(),
234 raw: None,
235 }
236 }
237}
238
239#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
240#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
241pub enum Resolution {
242 Ok(serde_json::Value),
243 Err(ExternalCompletionError),
244 Timeout,
245 Cancelled,
246}
247
248#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
249#[serde(tag = "status", rename_all = "snake_case")]
250pub enum ResolveOutcome {
251 Accepted,
252 AlreadyResolved { terminal: Resolution },
253 UnknownOrRevoked,
254}
255
256#[derive(Debug)]
257struct AwaitEventEntry {
258 terminal: Option<Resolution>,
259 notify: Arc<Notify>,
260}
261
262#[derive(Debug)]
263struct AwaitEventRegistryState {
264 entries: HashMap<String, AwaitEventEntry>,
265 revoked_key_ids: HashSet<String>,
266 revoked_session_ids: HashSet<String>,
267}
268
269#[derive(Debug)]
270struct AwaitEventRegistry {
271 secret: Vec<u8>,
272 state: std::sync::Mutex<AwaitEventRegistryState>,
273}
274
275impl AwaitEventRegistry {
276 fn new() -> Self {
277 Self {
278 secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
279 state: std::sync::Mutex::new(AwaitEventRegistryState {
280 entries: HashMap::new(),
281 revoked_key_ids: HashSet::new(),
282 revoked_session_ids: HashSet::new(),
283 }),
284 }
285 }
286
287 fn key_for(
288 &self,
289 scope: &ExecutionScope,
290 wait: AwaitEventWaitIdentity,
291 ) -> Result<AwaitEventKey, RuntimeError> {
292 scope.validate()?;
293 wait.validate()?;
294 let key_id =
295 crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
296 RuntimeError::new(
297 "await_event_key_hash",
298 format!("failed to hash await-event identity: {err}"),
299 )
300 })?;
301 let signature = self.signature(scope, &wait, &key_id)?;
302 Ok(AwaitEventKey {
303 scope: scope.clone(),
304 wait,
305 key_id,
306 signature,
307 })
308 }
309
310 fn signature(
311 &self,
312 scope: &ExecutionScope,
313 wait: &AwaitEventWaitIdentity,
314 key_id: &str,
315 ) -> Result<String, RuntimeError> {
316 let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
317 RuntimeError::new(
318 "await_event_key_sign",
319 format!("failed to initialize await-event key signer: {err}"),
320 )
321 })?;
322 let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
323 RuntimeError::new(
324 "await_event_key_sign",
325 format!("failed to serialize await-event key identity: {err}"),
326 )
327 })?;
328 mac.update(&canonical);
329 Ok(format!("{:x}", mac.finalize().into_bytes()))
330 }
331
332 fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
333 let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
334 Ok(expected == key.signature)
335 }
336
337 fn resolve(
338 &self,
339 key: &AwaitEventKey,
340 resolution: Resolution,
341 ) -> Result<ResolveOutcome, RuntimeError> {
342 if !self.verify(key)? {
343 return Ok(ResolveOutcome::UnknownOrRevoked);
344 }
345 let mut state = self.state.lock().map_err(|_| {
346 RuntimeError::new(
347 "await_event_registry_poisoned",
348 "await-event registry lock poisoned",
349 )
350 })?;
351 if state.revoked_key_ids.contains(&key.key_id)
352 || key
353 .scope
354 .session_id()
355 .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
356 {
357 return Ok(ResolveOutcome::UnknownOrRevoked);
358 }
359 let entry = state
360 .entries
361 .entry(key.key_id.clone())
362 .or_insert_with(|| AwaitEventEntry {
363 terminal: None,
364 notify: Arc::new(Notify::new()),
365 });
366 if let Some(terminal) = &entry.terminal {
367 return Ok(ResolveOutcome::AlreadyResolved {
368 terminal: terminal.clone(),
369 });
370 }
371 entry.terminal = Some(resolution);
372 entry.notify.notify_waiters();
373 Ok(ResolveOutcome::Accepted)
374 }
375
376 async fn await_resolution(
377 &self,
378 key: &AwaitEventKey,
379 cancel: CancellationToken,
380 deadline: Option<Instant>,
381 clock: &dyn crate::Clock,
382 ) -> Result<Resolution, RuntimeError> {
383 if !self.verify(key)? {
384 return Err(RuntimeError::new(
385 "await_event_unknown_or_revoked",
386 "await-event key is invalid or revoked",
387 ));
388 }
389 loop {
390 let notify =
391 {
392 let mut state = self.state.lock().map_err(|_| {
393 RuntimeError::new(
394 "await_event_registry_poisoned",
395 "await-event registry lock poisoned",
396 )
397 })?;
398 if state.revoked_key_ids.contains(&key.key_id)
399 || key.scope.session_id().is_some_and(|session_id| {
400 state.revoked_session_ids.contains(session_id)
401 })
402 {
403 return Err(RuntimeError::new(
404 "await_event_unknown_or_revoked",
405 "await-event key is invalid or revoked",
406 ));
407 }
408 let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
409 AwaitEventEntry {
410 terminal: None,
411 notify: Arc::new(Notify::new()),
412 }
413 });
414 if let Some(terminal) = entry.terminal.clone() {
415 return Ok(terminal);
416 }
417 Arc::clone(&entry.notify)
418 };
419 if let Some(deadline) = deadline {
420 tokio::select! {
421 _ = cancel.cancelled() => {
422 let _ = self.resolve(key, Resolution::Cancelled)?;
423 }
424 _ = clock.sleep_until(deadline) => {
425 let _ = self.resolve(key, Resolution::Timeout)?;
426 }
427 _ = notify.notified() => {}
428 }
429 } else {
430 tokio::select! {
431 _ = cancel.cancelled() => {
432 let _ = self.resolve(key, Resolution::Cancelled)?;
433 }
434 _ = notify.notified() => {}
435 }
436 }
437 }
438 }
439
440 fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
441 let mut state = self.state.lock().map_err(|_| {
442 RuntimeError::new(
443 "await_event_registry_poisoned",
444 "await-event registry lock poisoned",
445 )
446 })?;
447 state.revoked_session_ids.insert(session_id.to_string());
448 for entry in state.entries.values() {
449 entry.notify.notify_waiters();
450 }
451 Ok(())
452 }
453}
454
455enum ScopedEffectControllerInner<'run> {
456 Borrowed(&'run dyn RuntimeEffectController),
457 Shared(Arc<dyn RuntimeEffectController>),
458}
459
460impl Clone for ScopedEffectControllerInner<'_> {
461 fn clone(&self) -> Self {
462 match self {
463 Self::Borrowed(controller) => Self::Borrowed(*controller),
464 Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
465 }
466 }
467}
468
469#[derive(Clone)]
471pub struct ScopedEffectController<'run> {
472 controller: ScopedEffectControllerInner<'run>,
473 scope: ExecutionScope,
474}
475
476impl<'run> ScopedEffectController<'run> {
477 pub fn borrowed(
478 controller: &'run dyn RuntimeEffectController,
479 scope: ExecutionScope,
480 ) -> Result<Self, RuntimeError> {
481 scope.validate()?;
482 Ok(Self {
483 controller: ScopedEffectControllerInner::Borrowed(controller),
484 scope,
485 })
486 }
487
488 pub fn shared(
489 controller: Arc<dyn RuntimeEffectController>,
490 scope: ExecutionScope,
491 ) -> Result<Self, RuntimeError> {
492 scope.validate()?;
493 Ok(Self {
494 controller: ScopedEffectControllerInner::Shared(controller),
495 scope,
496 })
497 }
498
499 pub fn controller(&self) -> &dyn RuntimeEffectController {
500 match &self.controller {
501 ScopedEffectControllerInner::Borrowed(controller) => *controller,
502 ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
503 }
504 }
505
506 pub fn execution_scope(&self) -> &ExecutionScope {
507 &self.scope
508 }
509
510 pub fn scope_id(&self) -> &str {
511 self.scope.id()
512 }
513
514 pub fn turn_id(&self) -> Option<&str> {
515 self.scope.turn_id()
516 }
517}
518
519#[async_trait::async_trait]
521pub trait EffectHost: Send + Sync {
522 fn durability_tier(&self) -> crate::DurabilityTier {
523 crate::DurabilityTier::Inline
524 }
525
526 fn requires_durable_attachment_store(&self) -> bool {
527 false
528 }
529
530 fn supports_durable_effects(&self) -> bool {
531 false
532 }
533
534 fn scoped<'run>(
535 &'run self,
536 scope: ExecutionScope,
537 ) -> Result<ScopedEffectController<'run>, RuntimeError>;
538
539 fn scoped_static(
540 &self,
541 _scope: ExecutionScope,
542 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
543 Ok(None)
544 }
545
546 async fn await_event_key(
547 &self,
548 _scope: &ExecutionScope,
549 _wait: AwaitEventWaitIdentity,
550 ) -> Result<AwaitEventKey, RuntimeError> {
551 Err(RuntimeError::new(
552 "await_event_unsupported",
553 "this effect host does not support await-event keys",
554 ))
555 }
556
557 async fn resolve_await_event(
558 &self,
559 _key: &AwaitEventKey,
560 _resolution: Resolution,
561 ) -> Result<ResolveOutcome, RuntimeError> {
562 Ok(ResolveOutcome::UnknownOrRevoked)
563 }
564
565 async fn await_await_event(
566 &self,
567 _key: &AwaitEventKey,
568 _cancel: CancellationToken,
569 _deadline: Option<Instant>,
570 ) -> Result<Resolution, RuntimeError> {
571 Err(RuntimeError::new(
572 "await_event_unsupported",
573 "this effect host does not support await-event waits",
574 ))
575 }
576
577 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
578 Ok(())
579 }
580}
581
582#[async_trait::async_trait]
584pub trait RuntimeEffectController: Send + Sync {
585 fn durability_tier(&self) -> crate::DurabilityTier {
588 crate::DurabilityTier::Inline
589 }
590
591 fn requires_durable_attachment_store(&self) -> bool {
592 false
593 }
594
595 fn supports_durable_effects(&self) -> bool {
596 false
597 }
598
599 async fn execute_effect(
600 &self,
601 envelope: RuntimeEffectEnvelope,
602 local_executor: RuntimeEffectLocalExecutor<'_>,
603 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
604
605 async fn await_event_key(
606 &self,
607 _scope: &ExecutionScope,
608 _wait: AwaitEventWaitIdentity,
609 ) -> Result<AwaitEventKey, RuntimeError> {
610 Err(RuntimeError::new(
611 "await_event_unsupported",
612 "this effect controller does not support await-event keys",
613 ))
614 }
615
616 async fn resolve_await_event(
617 &self,
618 _key: &AwaitEventKey,
619 _resolution: Resolution,
620 ) -> Result<ResolveOutcome, RuntimeError> {
621 Ok(ResolveOutcome::UnknownOrRevoked)
622 }
623
624 async fn await_await_event(
625 &self,
626 _key: &AwaitEventKey,
627 _cancel: CancellationToken,
628 _deadline: Option<Instant>,
629 ) -> Result<Resolution, RuntimeError> {
630 Err(RuntimeError::new(
631 "await_event_unsupported",
632 "this effect controller does not support await-event waits",
633 ))
634 }
635
636 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
637 Ok(())
638 }
639}
640
641#[derive(Clone)]
644pub(crate) enum RuntimeEffectControllerHandle<'run> {
645 Borrowed(ScopedEffectController<'run>),
646 #[cfg(any(test, feature = "testing"))]
647 Shared {
648 controller: Arc<dyn RuntimeEffectController>,
649 scope: ExecutionScope,
650 },
651}
652
653impl<'run> RuntimeEffectControllerHandle<'run> {
654 pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
655 Self::Borrowed(scoped)
656 }
657
658 #[cfg(any(test, feature = "testing"))]
659 pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
660 Self::Shared {
661 controller,
662 scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
663 }
664 }
665
666 pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
667 match self {
668 Self::Borrowed(scoped) => scoped.controller(),
669 #[cfg(any(test, feature = "testing"))]
670 Self::Shared { controller, .. } => controller.as_ref(),
671 }
672 }
673
674 pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
675 match self {
676 Self::Borrowed(scoped) => scoped.clone(),
677 #[cfg(any(test, feature = "testing"))]
678 Self::Shared { controller, scope } => {
679 ScopedEffectController::shared(Arc::clone(controller), scope.clone())
680 .expect("runtime effect controller handle carries a valid scope")
681 }
682 }
683 }
684
685 pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
686 self.clone()
687 }
688}
689
690#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
691#[error("{code}: {message}")]
692pub struct RuntimeEffectControllerError {
693 pub code: String,
694 pub message: String,
695}
696
697impl RuntimeEffectControllerError {
698 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
699 Self {
700 code: code.into(),
701 message: message.into(),
702 }
703 }
704
705 pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
706 Self::new(
707 "runtime_effect_wrong_outcome",
708 format!(
709 "expected {} outcome, got {}",
710 expected.as_str(),
711 actual.as_str()
712 ),
713 )
714 }
715
716 pub(crate) fn into_runtime_error(self) -> RuntimeError {
717 RuntimeError::new(self.code, self.message)
718 }
719}
720
721impl From<RuntimeError> for RuntimeEffectControllerError {
722 fn from(err: RuntimeError) -> Self {
723 Self::new(err.code.as_str(), err.message)
724 }
725}
726
727impl From<PluginError> for RuntimeEffectControllerError {
728 fn from(err: PluginError) -> Self {
729 Self::new("plugin", err.to_string())
730 }
731}
732
733impl From<crate::StoreError> for RuntimeEffectControllerError {
734 fn from(err: crate::StoreError) -> Self {
735 Self::new("runtime_store", err.to_string())
736 }
737}
738
739#[async_trait::async_trait]
744pub(crate) trait ProcessRunner: Send + Sync {
745 async fn run_process(
746 &self,
747 registration: crate::ProcessRegistration,
748 execution_context: crate::ProcessExecutionContext,
749 registry: Arc<dyn ProcessRegistry>,
750 scoped_effect_controller: crate::ScopedEffectController<'_>,
751 cancellation: CancellationToken,
752 ) -> crate::ProcessAwaitOutput;
753}
754
755pub struct ProcessLocalExecution {
756 pub registry: Arc<dyn ProcessRegistry>,
757 pub process_work_driver: Option<crate::ProcessWorkDriver>,
758}
759
760pub(super) struct LocalTurnEffectRunner<'a, 'run> {
761 driver: &'a mut RuntimeTurnDriver<'run>,
762 machine: &'a mut crate::TurnMachine,
763 event_tx: mpsc::Sender<RuntimeStreamEvent>,
764 cancellation: CancellationToken,
765}
766
767pub(super) struct LocalDirectEffectRunner {
768 provider: ProviderHandle,
769 attachment_store: Arc<dyn AttachmentStore>,
770}
771
772struct LocalToolBatchEffectRunner<'run> {
773 context: crate::RuntimeExecutionContext<'run>,
774 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
775}
776
777#[async_trait::async_trait]
778trait RuntimeEffectLocalRunner: Send {
779 async fn execute(
780 self: Box<Self>,
781 envelope: RuntimeEffectEnvelope,
782 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
783}
784
785#[cfg(any(test, feature = "testing"))]
786type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
787 RuntimeEffectEnvelope,
788 ) -> Pin<
789 Box<
790 dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
791 + Send
792 + 'run,
793 >,
794 > + Send
795 + 'run;
796
797#[cfg(any(test, feature = "testing"))]
798struct TestingRuntimeEffectLocalRunner<'run> {
799 run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
800}
801
802type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
803 serde_json::Value,
804 ) -> Pin<
805 Box<
806 dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
807 + Send
808 + 'run,
809 >,
810 > + Send
811 + 'run;
812
813struct DurableStepLocalRunner<'run> {
814 run: Box<DurableStepLocalRunnerFn<'run>>,
815}
816
817enum RuntimeEffectLocalExecutorState<'run> {
818 Unavailable,
819 SleepOnly {
820 cancellation: CancellationToken,
821 clock: Arc<dyn crate::Clock>,
822 },
823 ExternalWaitOptions {
824 cancellation: CancellationToken,
825 deadline: Option<Instant>,
826 clock: Arc<dyn crate::Clock>,
827 },
828 Process(ProcessLocalExecution),
829 Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
830}
831
832pub struct RuntimeEffectLocalExecutor<'run> {
838 state: RuntimeEffectLocalExecutorState<'run>,
839}
840
841impl<'run> RuntimeEffectLocalExecutor<'run> {
842 pub fn unavailable() -> Self {
843 Self {
844 state: RuntimeEffectLocalExecutorState::Unavailable,
845 }
846 }
847
848 pub fn sleep(cancellation: CancellationToken) -> Self {
849 Self::sleep_with_clock(cancellation, Arc::new(crate::SystemClock))
850 }
851
852 pub fn sleep_with_clock(cancellation: CancellationToken, clock: Arc<dyn crate::Clock>) -> Self {
853 Self {
854 state: RuntimeEffectLocalExecutorState::SleepOnly {
855 cancellation,
856 clock,
857 },
858 }
859 }
860
861 pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
862 Self::await_event_with_clock(cancellation, deadline, Arc::new(crate::SystemClock))
863 }
864
865 pub fn await_event_with_clock(
866 cancellation: CancellationToken,
867 deadline: Option<Instant>,
868 clock: Arc<dyn crate::Clock>,
869 ) -> Self {
870 Self {
871 state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
872 cancellation,
873 deadline,
874 clock,
875 },
876 }
877 }
878
879 pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
880 Self::processes_with_driver(registry, None)
881 }
882
883 pub fn processes_with_driver(
884 registry: Arc<dyn ProcessRegistry>,
885 process_work_driver: Option<crate::ProcessWorkDriver>,
886 ) -> Self {
887 Self {
888 state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution {
889 registry,
890 process_work_driver,
891 }),
892 }
893 }
894
895 pub fn durable_step<F, Fut>(run: F) -> Self
896 where
897 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
898 Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
899 {
900 Self {
901 state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
902 run: Box::new(move |input| {
903 Box::pin(async move { run(input).await.map_err(Into::into) })
904 }),
905 })),
906 }
907 }
908
909 #[cfg(any(test, feature = "testing"))]
910 pub fn testing<F, Fut>(run: F) -> Self
911 where
912 F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
913 Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
914 + Send
915 + 'run,
916 {
917 Self {
918 state: RuntimeEffectLocalExecutorState::Runner(Box::new(
919 TestingRuntimeEffectLocalRunner {
920 run: Box::new(move |envelope| Box::pin(run(envelope))),
921 },
922 )),
923 }
924 }
925
926 pub(in crate::runtime) fn turn<'scope>(
927 driver: &'run mut RuntimeTurnDriver<'scope>,
928 machine: &'run mut crate::TurnMachine,
929 event_tx: mpsc::Sender<RuntimeStreamEvent>,
930 cancellation: CancellationToken,
931 ) -> Self
932 where
933 'scope: 'run,
934 {
935 Self {
936 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
937 driver,
938 machine,
939 event_tx,
940 cancellation,
941 })),
942 }
943 }
944
945 pub(in crate::runtime) fn direct(
946 provider: ProviderHandle,
947 attachment_store: Arc<dyn AttachmentStore>,
948 ) -> Self {
949 Self {
950 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
951 provider,
952 attachment_store,
953 })),
954 }
955 }
956
957 pub(crate) fn tool_batch(
958 context: crate::RuntimeExecutionContext<'run>,
959 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
960 ) -> Self {
961 Self {
962 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalToolBatchEffectRunner {
963 context,
964 child_trace_hooks,
965 })),
966 }
967 }
968
969 pub async fn execute(
970 self,
971 envelope: RuntimeEffectEnvelope,
972 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
973 match self.state {
974 RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
975 RuntimeEffectLocalExecutorState::SleepOnly {
976 cancellation,
977 clock,
978 } => execute_local_sleep(envelope, cancellation, clock.as_ref()).await,
979 RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
980 Err(RuntimeEffectControllerError::new(
981 "runtime_effect_local_executor_mismatch",
982 format!(
983 "local await-event options cannot execute {} command directly",
984 envelope.command.kind().as_str()
985 ),
986 ))
987 }
988 RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
989 "runtime_effect_local_executor_unavailable",
990 format!(
991 "no local executor is available for {}",
992 envelope.command.kind().as_str()
993 ),
994 )),
995 RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
996 "runtime_effect_local_executor_mismatch",
997 format!(
998 "process executor cannot execute {} command directly",
999 envelope.command.kind().as_str()
1000 ),
1001 )),
1002 }
1003 }
1004
1005 pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
1006 match self.state {
1007 RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
1008 _ => Err(RuntimeEffectControllerError::new(
1009 "runtime_effect_local_executor_unavailable",
1010 "no process executor is available for process command",
1011 )),
1012 }
1013 }
1014
1015 fn into_await_event_options(
1016 self,
1017 ) -> Result<
1018 (CancellationToken, Option<Instant>, Arc<dyn crate::Clock>),
1019 RuntimeEffectControllerError,
1020 > {
1021 match self.state {
1022 RuntimeEffectLocalExecutorState::ExternalWaitOptions {
1023 cancellation,
1024 deadline,
1025 clock,
1026 } => Ok((cancellation, deadline, clock)),
1027 _ => Ok((CancellationToken::new(), None, Arc::new(crate::SystemClock))),
1028 }
1029 }
1030}
1031
1032#[cfg(any(test, feature = "testing"))]
1033#[async_trait::async_trait]
1034impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
1035 async fn execute(
1036 self: Box<Self>,
1037 envelope: RuntimeEffectEnvelope,
1038 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1039 (self.run)(envelope).await
1040 }
1041}
1042
1043#[async_trait::async_trait]
1044impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
1045 async fn execute(
1046 self: Box<Self>,
1047 envelope: RuntimeEffectEnvelope,
1048 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1049 match envelope.command {
1050 RuntimeEffectCommand::DurableStep { input, .. } => {
1051 let value = (self.run)(input).await?;
1052 Ok(RuntimeEffectOutcome::DurableStep { value })
1053 }
1054 command => Err(RuntimeEffectControllerError::new(
1055 "runtime_effect_local_executor_mismatch",
1056 format!(
1057 "local durable step executor cannot execute {} command",
1058 command.kind().as_str()
1059 ),
1060 )),
1061 }
1062 }
1063}
1064
1065#[async_trait::async_trait]
1066impl RuntimeEffectLocalRunner for LocalToolBatchEffectRunner<'_> {
1067 async fn execute(
1068 self: Box<Self>,
1069 envelope: RuntimeEffectEnvelope,
1070 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1071 match envelope.command {
1072 RuntimeEffectCommand::ToolBatch { batch } => {
1073 let outcome = self
1074 .context
1075 .execute_prepared_tool_batch_launches(
1076 batch,
1077 envelope.invocation,
1078 self.child_trace_hooks,
1079 )
1080 .await?;
1081 Ok(RuntimeEffectOutcome::ToolBatch {
1082 launches: outcome.launches,
1083 triggers: outcome.triggers,
1084 })
1085 }
1086 command => Err(RuntimeEffectControllerError::new(
1087 "runtime_effect_local_executor_mismatch",
1088 format!(
1089 "local tool-batch executor cannot execute {} command",
1090 command.kind().as_str()
1091 ),
1092 )),
1093 }
1094 }
1095}
1096
1097#[async_trait::async_trait]
1098impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
1099 async fn execute(
1100 self: Box<Self>,
1101 envelope: RuntimeEffectEnvelope,
1102 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1103 let runner = *self;
1104 match envelope.command {
1105 RuntimeEffectCommand::LlmCall { request } => {
1106 let protocol_iteration = runner.machine.protocol_iteration();
1107 let (result, text_streamed) = runner
1108 .driver
1109 .run_llm_call(
1110 Arc::new((*request).into_request(None, None)),
1111 protocol_iteration,
1112 envelope.invocation,
1113 &runner.event_tx,
1114 &runner.cancellation,
1115 )
1116 .await;
1117 Ok(RuntimeEffectOutcome::LlmCall {
1118 result,
1119 text_streamed,
1120 })
1121 }
1122 RuntimeEffectCommand::ToolCall { call } => {
1123 let tool_name = call.tool_name.clone();
1124 let mut outcome = runner
1125 .driver
1126 .run_tool_calls(
1127 vec![(call, envelope.invocation)],
1128 &runner.event_tx,
1129 &runner.cancellation,
1130 )
1131 .await?;
1132 let launch = outcome.launches.pop().ok_or_else(|| {
1133 RuntimeEffectControllerError::new(
1134 "tool_result_missing",
1135 format!("tool `{tool_name}` completed without a launch result"),
1136 )
1137 })?;
1138 Ok(RuntimeEffectOutcome::ToolCall {
1139 launch,
1140 triggers: outcome.triggers,
1141 })
1142 }
1143 RuntimeEffectCommand::ToolBatch { batch } => {
1144 let outcome = runner
1145 .driver
1146 .run_tool_batch(
1147 batch,
1148 envelope.invocation,
1149 &runner.event_tx,
1150 &runner.cancellation,
1151 )
1152 .await?;
1153 Ok(RuntimeEffectOutcome::ToolBatch {
1154 launches: outcome.launches,
1155 triggers: outcome.triggers,
1156 })
1157 }
1158 RuntimeEffectCommand::ExecCode { language, code } => {
1159 let protocol_iteration = runner.machine.protocol_iteration();
1160 let messages = runner.machine.message_sequence();
1161 Ok(RuntimeEffectOutcome::ExecCode {
1162 result: runner
1163 .driver
1164 .run_exec_code(
1165 language,
1166 &code,
1167 messages,
1168 protocol_iteration,
1169 envelope.invocation,
1170 &runner.event_tx,
1171 )
1172 .await,
1173 })
1174 }
1175 RuntimeEffectCommand::Checkpoint { checkpoint } => {
1176 Ok(RuntimeEffectOutcome::Checkpoint {
1177 result: runner
1178 .driver
1179 .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1180 .await
1181 .map_err(RuntimeEffectControllerError::from),
1182 })
1183 }
1184 RuntimeEffectCommand::SyncExecutionEnvironment {
1185 update_machine_config,
1186 } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1187 result: runner
1188 .driver
1189 .refresh_execution_environment(runner.machine, update_machine_config)
1190 .await
1191 .map_err(|err| err.to_string()),
1192 }),
1193 RuntimeEffectCommand::Sleep { duration_ms } => {
1194 sleep_with_cancellation(
1195 duration_ms,
1196 &runner.cancellation,
1197 runner.driver.host.core.clock.as_ref(),
1198 )
1199 .await?;
1200 Ok(RuntimeEffectOutcome::Sleep)
1201 }
1202 command => Err(RuntimeEffectControllerError::new(
1203 "runtime_effect_local_executor_mismatch",
1204 format!(
1205 "local turn executor cannot execute {} command",
1206 command.kind().as_str()
1207 ),
1208 )),
1209 }
1210 }
1211}
1212
1213#[async_trait::async_trait]
1214impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1215 async fn execute(
1216 mut self: Box<Self>,
1217 envelope: RuntimeEffectEnvelope,
1218 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1219 match envelope.command {
1220 RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1221 result: self
1222 .run_direct_llm_request((*request).into_request(
1223 crate::session_model::transport_stream_events(&self.provider, None),
1224 None,
1225 ))
1226 .await,
1227 }),
1228 RuntimeEffectCommand::Sleep { duration_ms } => {
1229 sleep_with_cancellation(
1230 duration_ms,
1231 &CancellationToken::new(),
1232 &crate::SystemClock,
1233 )
1234 .await?;
1235 Ok(RuntimeEffectOutcome::Sleep)
1236 }
1237 command => Err(RuntimeEffectControllerError::new(
1238 "runtime_effect_local_executor_mismatch",
1239 format!(
1240 "local direct executor cannot execute {} command",
1241 command.kind().as_str()
1242 ),
1243 )),
1244 }
1245 }
1246}
1247
1248impl LocalDirectEffectRunner {
1249 async fn run_direct_llm_request(
1250 &mut self,
1251 request: CoreLlmRequest,
1252 ) -> Result<LlmResponse, LlmCallError> {
1253 let request = crate::attachments::resolve_llm_request_attachments(
1254 request,
1255 self.attachment_store.as_ref(),
1256 )
1257 .await
1258 .map_err(|err| LlmCallError {
1259 message: err.to_string(),
1260 retryable: false,
1261 raw: None,
1262 code: Some("attachment_resolution_failed".to_string()),
1263 terminal_reason: crate::LlmTerminalReason::ProviderError,
1264 request_body: None,
1265 })?;
1266 self.provider
1267 .complete(request)
1268 .await
1269 .map_err(llm_call_error_from_transport)
1270 }
1271}
1272
1273async fn execute_local_sleep(
1274 envelope: RuntimeEffectEnvelope,
1275 cancellation: CancellationToken,
1276 clock: &dyn crate::Clock,
1277) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1278 match envelope.command {
1279 RuntimeEffectCommand::Sleep { duration_ms } => {
1280 sleep_with_cancellation(duration_ms, &cancellation, clock).await?;
1281 Ok(RuntimeEffectOutcome::Sleep)
1282 }
1283 command => Err(RuntimeEffectControllerError::new(
1284 "runtime_effect_local_executor_mismatch",
1285 format!(
1286 "local sleep executor cannot execute {} command",
1287 command.kind().as_str()
1288 ),
1289 )),
1290 }
1291}
1292
1293async fn sleep_with_cancellation(
1294 duration_ms: u64,
1295 cancellation: &CancellationToken,
1296 clock: &dyn crate::Clock,
1297) -> Result<(), RuntimeEffectControllerError> {
1298 let sleep = clock.sleep(std::time::Duration::from_millis(duration_ms));
1299 tokio::pin!(sleep);
1300 tokio::select! {
1301 _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1302 "runtime_effect_sleep_cancelled",
1303 "runtime effect sleep was cancelled",
1304 )),
1305 _ = &mut sleep => Ok(()),
1306 }
1307}
1308
1309#[derive(Clone, Default)]
1320pub struct InlineRuntimeEffectController;
1321
1322#[async_trait::async_trait]
1323impl RuntimeEffectController for InlineRuntimeEffectController {
1324 fn supports_durable_effects(&self) -> bool {
1325 true
1326 }
1327
1328 async fn execute_effect(
1329 &self,
1330 envelope: RuntimeEffectEnvelope,
1331 local_executor: RuntimeEffectLocalExecutor<'_>,
1332 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1333 match envelope.command {
1334 RuntimeEffectCommand::AwaitEvent { key } => {
1335 let (cancellation, deadline, clock) = local_executor.into_await_event_options()?;
1336 let resolution = inline_await_events()
1337 .await_resolution(&key, cancellation, deadline, clock.as_ref())
1338 .await
1339 .map_err(RuntimeEffectControllerError::from)?;
1340 Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1341 }
1342 RuntimeEffectCommand::Process { command } => {
1343 let execution = local_executor.into_process()?;
1344 let registry = execution.registry;
1345 let process_work_driver = execution.process_work_driver;
1346 let result = tokio::task::spawn(async move {
1347 Self::execute_process_command(registry, process_work_driver, *command).await
1348 })
1349 .await
1350 .map_err(|err| {
1351 RuntimeEffectControllerError::new(
1352 "runtime_effect_process_task_join",
1353 format!("inline process effect task failed: {err}"),
1354 )
1355 })??;
1356 Ok(RuntimeEffectOutcome::Process { result })
1357 }
1358 _ => local_executor.execute(envelope).await,
1359 }
1360 }
1361
1362 async fn await_event_key(
1363 &self,
1364 scope: &ExecutionScope,
1365 wait: AwaitEventWaitIdentity,
1366 ) -> Result<AwaitEventKey, RuntimeError> {
1367 inline_await_events().key_for(scope, wait)
1368 }
1369
1370 async fn resolve_await_event(
1371 &self,
1372 key: &AwaitEventKey,
1373 resolution: Resolution,
1374 ) -> Result<ResolveOutcome, RuntimeError> {
1375 inline_await_events().resolve(key, resolution)
1376 }
1377
1378 async fn await_await_event(
1379 &self,
1380 key: &AwaitEventKey,
1381 cancel: CancellationToken,
1382 deadline: Option<Instant>,
1383 ) -> Result<Resolution, RuntimeError> {
1384 inline_await_events()
1385 .await_resolution(key, cancel, deadline, &crate::SystemClock)
1386 .await
1387 }
1388
1389 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1390 inline_await_events().revoke_session(session_id)
1391 }
1392}
1393
1394#[derive(Clone)]
1396pub struct InlineEffectHost {
1397 controller: Arc<dyn RuntimeEffectController>,
1398}
1399
1400impl InlineEffectHost {
1401 pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1402 Self { controller }
1403 }
1404}
1405
1406impl Default for InlineEffectHost {
1407 fn default() -> Self {
1408 Self::new(Arc::new(InlineRuntimeEffectController))
1409 }
1410}
1411
1412#[async_trait::async_trait]
1413impl EffectHost for InlineEffectHost {
1414 fn durability_tier(&self) -> crate::DurabilityTier {
1415 self.controller.durability_tier()
1416 }
1417
1418 fn requires_durable_attachment_store(&self) -> bool {
1419 self.controller.requires_durable_attachment_store()
1420 }
1421
1422 fn supports_durable_effects(&self) -> bool {
1423 self.controller.supports_durable_effects()
1424 }
1425
1426 fn scoped<'run>(
1427 &'run self,
1428 scope: ExecutionScope,
1429 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1430 ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1431 }
1432
1433 fn scoped_static(
1434 &self,
1435 scope: ExecutionScope,
1436 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1437 Ok(Some(ScopedEffectController::shared(
1438 Arc::clone(&self.controller),
1439 scope,
1440 )?))
1441 }
1442
1443 async fn await_event_key(
1444 &self,
1445 scope: &ExecutionScope,
1446 wait: AwaitEventWaitIdentity,
1447 ) -> Result<AwaitEventKey, RuntimeError> {
1448 self.controller.await_event_key(scope, wait).await
1449 }
1450
1451 async fn resolve_await_event(
1452 &self,
1453 key: &AwaitEventKey,
1454 resolution: Resolution,
1455 ) -> Result<ResolveOutcome, RuntimeError> {
1456 self.controller.resolve_await_event(key, resolution).await
1457 }
1458
1459 async fn await_await_event(
1460 &self,
1461 key: &AwaitEventKey,
1462 cancel: CancellationToken,
1463 deadline: Option<Instant>,
1464 ) -> Result<Resolution, RuntimeError> {
1465 self.controller
1466 .await_await_event(key, cancel, deadline)
1467 .await
1468 }
1469
1470 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1471 self.controller
1472 .revoke_await_events_for_session(session_id)
1473 .await
1474 }
1475}
1476
1477impl InlineRuntimeEffectController {
1478 pub(crate) async fn start_process(
1486 registry: Arc<dyn crate::ProcessRegistry>,
1487 registration: crate::ProcessRegistration,
1488 grant: Option<crate::ProcessStartGrant>,
1489 ) -> Result<ProcessRecord, PluginError> {
1490 let registration_for_record = registration.clone();
1491 let record = registry.register_process(registration_for_record).await?;
1492 if let Some(grant) = grant {
1493 registry
1494 .grant_handle(&grant.session_scope, ®istration.id, grant.descriptor)
1495 .await?;
1496 }
1497 Ok(record)
1498 }
1499
1500 pub(crate) async fn request_process_cancel(
1501 &self,
1502 registry: Arc<dyn crate::ProcessRegistry>,
1503 process_id: &str,
1504 reason: Option<String>,
1505 ) -> Result<ProcessRecord, PluginError> {
1506 registry
1510 .append_event(
1511 process_id,
1512 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1513 )
1514 .await?;
1515 registry
1516 .get_process(process_id)
1517 .await
1518 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1519 }
1520
1521 async fn execute_process_command(
1522 registry: Arc<dyn crate::ProcessRegistry>,
1523 process_work_driver: Option<crate::ProcessWorkDriver>,
1524 command: ProcessCommand,
1525 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1526 match command {
1527 ProcessCommand::Start {
1528 registration,
1529 grant,
1530 execution_context: _,
1531 } => {
1532 let record = Self::start_process(registry, registration, grant).await?;
1533 if let Some(driver) = process_work_driver.as_ref() {
1534 driver.claim_and_run_pending("process_start").await?;
1535 }
1536 Ok(ProcessEffectOutcome::Start { record })
1537 }
1538 ProcessCommand::List {
1539 session_scope,
1540 mode,
1541 } => {
1542 let entries = match mode {
1543 crate::ProcessListMode::Live => {
1544 registry.list_live_handle_grants(&session_scope).await?
1545 }
1546 crate::ProcessListMode::All => {
1547 registry.list_handle_grants(&session_scope).await?
1548 }
1549 };
1550 Ok(ProcessEffectOutcome::List { entries })
1551 }
1552 ProcessCommand::Transfer {
1553 from_scope,
1554 to_scope,
1555 process_ids,
1556 } => {
1557 registry
1558 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1559 .await?;
1560 Ok(ProcessEffectOutcome::Transfer)
1561 }
1562 ProcessCommand::DeleteSession { session_id } => {
1563 let report = registry.delete_session_process_state(&session_id).await?;
1564 Ok(ProcessEffectOutcome::DeleteSession { report })
1565 }
1566 ProcessCommand::Await { process_id } => {
1567 let output = registry.await_process(&process_id).await?;
1568 Ok(ProcessEffectOutcome::Await { output })
1569 }
1570 ProcessCommand::Cancel { process_id, reason } => {
1571 let record = InlineRuntimeEffectController
1572 .request_process_cancel(registry, &process_id, reason)
1573 .await?;
1574 Ok(ProcessEffectOutcome::Cancel { record })
1575 }
1576 ProcessCommand::Signal {
1577 process_id,
1578 request,
1579 ..
1580 } => {
1581 let result = registry.append_event(&process_id, request).await?;
1582 Ok(ProcessEffectOutcome::Signal {
1583 event: result.event,
1584 })
1585 }
1586 }
1587 }
1588}
1589
1590impl std::fmt::Debug for InlineRuntimeEffectController {
1591 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1592 f.debug_struct("InlineRuntimeEffectController").finish()
1593 }
1594}