1use std::collections::{HashMap, HashSet};
2use std::pin::Pin;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5
6use hmac::{Hmac, Mac};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{Notify, mpsc};
9use tokio_util::sync::CancellationToken;
10
11use crate::AttachmentStore;
12use crate::LlmRequest as CoreLlmRequest;
13use crate::LlmResponse;
14use crate::ProcessRecord;
15use crate::ProcessRegistry;
16use crate::provider::ProviderHandle;
17use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
18use crate::sansio::LlmCallError;
19use crate::{PluginError, RuntimeError, RuntimeErrorCode};
20
21use super::envelope::{
22 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
23 RuntimeEffectKind, RuntimeEffectOutcome,
24};
25use super::outcome::llm_call_error_from_transport;
26
27type HmacSha256 = Hmac<sha2::Sha256>;
28type AwaitEventOptions = (CancellationToken, Option<Instant>, Arc<dyn crate::Clock>);
29
30fn inline_await_events() -> &'static AwaitEventRegistry {
31 static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
32 REGISTRY.get_or_init(AwaitEventRegistry::new)
33}
34
35#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum ExecutionScope {
47 Turn {
48 session_id: String,
49 turn_id: String,
50 },
51 Process {
52 process_id: String,
53 },
54 QueueDrain {
55 session_id: String,
56 drain_id: String,
57 },
58 SessionDelete {
59 session_id: String,
60 },
61 RuntimeOperation {
62 operation_id: String,
63 },
64}
65
66impl ExecutionScope {
67 pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
68 Self::Turn {
69 session_id: session_id.into(),
70 turn_id: turn_id.into(),
71 }
72 }
73
74 pub fn process(process_id: impl Into<String>) -> Self {
75 Self::Process {
76 process_id: process_id.into(),
77 }
78 }
79
80 pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
81 Self::QueueDrain {
82 session_id: session_id.into(),
83 drain_id: drain_id.into(),
84 }
85 }
86
87 pub fn session_delete(session_id: impl Into<String>) -> Self {
88 Self::SessionDelete {
89 session_id: session_id.into(),
90 }
91 }
92
93 pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
94 Self::RuntimeOperation {
95 operation_id: operation_id.into(),
96 }
97 }
98
99 pub fn id(&self) -> &str {
100 match self {
101 Self::Turn { turn_id, .. } => turn_id,
102 Self::Process { process_id } => process_id,
103 Self::QueueDrain { drain_id, .. } => drain_id,
104 Self::SessionDelete { session_id } => session_id,
105 Self::RuntimeOperation { operation_id } => operation_id,
106 }
107 }
108
109 pub fn session_id(&self) -> Option<&str> {
110 match self {
111 Self::Turn { session_id, .. }
112 | Self::QueueDrain { session_id, .. }
113 | Self::SessionDelete { session_id } => Some(session_id),
114 Self::Process { .. } | Self::RuntimeOperation { .. } => None,
115 }
116 }
117
118 pub fn turn_id(&self) -> Option<&str> {
119 match self {
120 Self::Turn { turn_id, .. } => Some(turn_id),
121 _ => None,
122 }
123 }
124
125 pub fn validates_turn_trace_id(&self) -> bool {
126 matches!(self, Self::Turn { .. })
127 }
128
129 fn validate(&self) -> Result<(), RuntimeError> {
130 let missing = match self {
131 Self::Turn {
132 session_id,
133 turn_id,
134 } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
135 Self::Process { process_id } => process_id.trim().is_empty(),
136 Self::QueueDrain {
137 session_id,
138 drain_id,
139 } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
140 Self::SessionDelete { session_id } => session_id.trim().is_empty(),
141 Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
142 };
143 if missing {
144 return Err(RuntimeError::new(
145 RuntimeErrorCode::MissingExecutionScopeId,
146 "execution scopes require non-empty stable ids",
147 ));
148 }
149 Ok(())
150 }
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
154#[serde(tag = "type", rename_all = "snake_case")]
155pub enum AwaitEventWaitIdentity {
156 ToolCompletion {
157 tool_call_id: String,
158 },
159 ProcessSignal {
160 process_id: String,
161 signal_name: String,
162 ordinal: u64,
163 },
164 Custom {
165 key: String,
166 },
167}
168
169impl AwaitEventWaitIdentity {
170 pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
171 Self::ToolCompletion {
172 tool_call_id: tool_call_id.into(),
173 }
174 }
175
176 pub fn process_signal(
177 process_id: impl Into<String>,
178 signal_name: impl Into<String>,
179 ordinal: u64,
180 ) -> Self {
181 Self::ProcessSignal {
182 process_id: process_id.into(),
183 signal_name: signal_name.into(),
184 ordinal,
185 }
186 }
187
188 fn validate(&self) -> Result<(), RuntimeError> {
189 let invalid = match self {
190 Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
191 Self::ProcessSignal {
192 process_id,
193 signal_name,
194 ordinal,
195 } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
196 Self::Custom { key } => key.trim().is_empty(),
197 };
198 if invalid {
199 return Err(RuntimeError::new(
200 "invalid_await_event_wait_identity",
201 "await-event wait identity requires non-empty stable ids",
202 ));
203 }
204 Ok(())
205 }
206}
207
208#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
209pub struct AwaitEventKey {
210 pub scope: ExecutionScope,
211 pub wait: AwaitEventWaitIdentity,
212 pub key_id: String,
213 pub signature: String,
214}
215
216impl AwaitEventKey {
217 pub fn promise_key(&self) -> String {
218 format!("lash-await-event:{}", self.key_id)
219 }
220}
221
222#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
223pub struct ExternalCompletionError {
224 pub code: String,
225 pub message: String,
226 #[serde(default, skip_serializing_if = "Option::is_none")]
227 pub raw: Option<serde_json::Value>,
228}
229
230impl ExternalCompletionError {
231 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
232 Self {
233 code: code.into(),
234 message: message.into(),
235 raw: None,
236 }
237 }
238}
239
240#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
241#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
242pub enum Resolution {
243 Ok(serde_json::Value),
244 Err(ExternalCompletionError),
245 Timeout,
246 Cancelled,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "status", rename_all = "snake_case")]
251pub enum ResolveOutcome {
252 Accepted,
253 AlreadyResolved { terminal: Resolution },
254 UnknownOrRevoked,
255}
256
257#[derive(Debug)]
258struct AwaitEventEntry {
259 terminal: Option<Resolution>,
260 notify: Arc<Notify>,
261}
262
263#[derive(Debug)]
264struct AwaitEventRegistryState {
265 entries: HashMap<String, AwaitEventEntry>,
266 revoked_key_ids: HashSet<String>,
267 revoked_session_ids: HashSet<String>,
268}
269
270#[derive(Debug)]
271struct AwaitEventRegistry {
272 secret: Vec<u8>,
273 state: std::sync::Mutex<AwaitEventRegistryState>,
274}
275
276impl AwaitEventRegistry {
277 fn new() -> Self {
278 Self {
279 secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
280 state: std::sync::Mutex::new(AwaitEventRegistryState {
281 entries: HashMap::new(),
282 revoked_key_ids: HashSet::new(),
283 revoked_session_ids: HashSet::new(),
284 }),
285 }
286 }
287
288 fn key_for(
289 &self,
290 scope: &ExecutionScope,
291 wait: AwaitEventWaitIdentity,
292 ) -> Result<AwaitEventKey, RuntimeError> {
293 scope.validate()?;
294 wait.validate()?;
295 let key_id =
296 crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
297 RuntimeError::new(
298 "await_event_key_hash",
299 format!("failed to hash await-event identity: {err}"),
300 )
301 })?;
302 let signature = self.signature(scope, &wait, &key_id)?;
303 Ok(AwaitEventKey {
304 scope: scope.clone(),
305 wait,
306 key_id,
307 signature,
308 })
309 }
310
311 fn signature(
312 &self,
313 scope: &ExecutionScope,
314 wait: &AwaitEventWaitIdentity,
315 key_id: &str,
316 ) -> Result<String, RuntimeError> {
317 let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
318 RuntimeError::new(
319 "await_event_key_sign",
320 format!("failed to initialize await-event key signer: {err}"),
321 )
322 })?;
323 let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
324 RuntimeError::new(
325 "await_event_key_sign",
326 format!("failed to serialize await-event key identity: {err}"),
327 )
328 })?;
329 mac.update(&canonical);
330 Ok(format!("{:x}", mac.finalize().into_bytes()))
331 }
332
333 fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
334 let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
335 Ok(expected == key.signature)
336 }
337
338 fn resolve(
339 &self,
340 key: &AwaitEventKey,
341 resolution: Resolution,
342 ) -> Result<ResolveOutcome, RuntimeError> {
343 if !self.verify(key)? {
344 return Ok(ResolveOutcome::UnknownOrRevoked);
345 }
346 let mut state = self.state.lock().map_err(|_| {
347 RuntimeError::new(
348 "await_event_registry_poisoned",
349 "await-event registry lock poisoned",
350 )
351 })?;
352 if state.revoked_key_ids.contains(&key.key_id)
353 || key
354 .scope
355 .session_id()
356 .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
357 {
358 return Ok(ResolveOutcome::UnknownOrRevoked);
359 }
360 let entry = state
361 .entries
362 .entry(key.key_id.clone())
363 .or_insert_with(|| AwaitEventEntry {
364 terminal: None,
365 notify: Arc::new(Notify::new()),
366 });
367 if let Some(terminal) = &entry.terminal {
368 return Ok(ResolveOutcome::AlreadyResolved {
369 terminal: terminal.clone(),
370 });
371 }
372 entry.terminal = Some(resolution);
373 entry.notify.notify_waiters();
374 Ok(ResolveOutcome::Accepted)
375 }
376
377 async fn await_resolution(
378 &self,
379 key: &AwaitEventKey,
380 cancel: CancellationToken,
381 deadline: Option<Instant>,
382 clock: &dyn crate::Clock,
383 ) -> Result<Resolution, RuntimeError> {
384 if !self.verify(key)? {
385 return Err(RuntimeError::new(
386 "await_event_unknown_or_revoked",
387 "await-event key is invalid or revoked",
388 ));
389 }
390 loop {
391 let notify =
392 {
393 let mut state = self.state.lock().map_err(|_| {
394 RuntimeError::new(
395 "await_event_registry_poisoned",
396 "await-event registry lock poisoned",
397 )
398 })?;
399 if state.revoked_key_ids.contains(&key.key_id)
400 || key.scope.session_id().is_some_and(|session_id| {
401 state.revoked_session_ids.contains(session_id)
402 })
403 {
404 return Err(RuntimeError::new(
405 "await_event_unknown_or_revoked",
406 "await-event key is invalid or revoked",
407 ));
408 }
409 let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
410 AwaitEventEntry {
411 terminal: None,
412 notify: Arc::new(Notify::new()),
413 }
414 });
415 if let Some(terminal) = entry.terminal.clone() {
416 return Ok(terminal);
417 }
418 Arc::clone(&entry.notify)
419 };
420 if let Some(deadline) = deadline {
421 tokio::select! {
422 _ = cancel.cancelled() => {
423 let _ = self.resolve(key, Resolution::Cancelled)?;
424 }
425 _ = clock.sleep_until(deadline) => {
426 let _ = self.resolve(key, Resolution::Timeout)?;
427 }
428 _ = notify.notified() => {}
429 }
430 } else {
431 tokio::select! {
432 _ = cancel.cancelled() => {
433 let _ = self.resolve(key, Resolution::Cancelled)?;
434 }
435 _ = notify.notified() => {}
436 }
437 }
438 }
439 }
440
441 fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
442 let mut state = self.state.lock().map_err(|_| {
443 RuntimeError::new(
444 "await_event_registry_poisoned",
445 "await-event registry lock poisoned",
446 )
447 })?;
448 state.revoked_session_ids.insert(session_id.to_string());
449 for entry in state.entries.values() {
450 entry.notify.notify_waiters();
451 }
452 Ok(())
453 }
454}
455
456enum ScopedEffectControllerInner<'run> {
457 Borrowed(&'run dyn RuntimeEffectController),
458 Shared(Arc<dyn RuntimeEffectController>),
459}
460
461impl Clone for ScopedEffectControllerInner<'_> {
462 fn clone(&self) -> Self {
463 match self {
464 Self::Borrowed(controller) => Self::Borrowed(*controller),
465 Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
466 }
467 }
468}
469
470#[derive(Clone)]
472pub struct ScopedEffectController<'run> {
473 controller: ScopedEffectControllerInner<'run>,
474 scope: ExecutionScope,
475}
476
477impl<'run> ScopedEffectController<'run> {
478 pub fn borrowed(
479 controller: &'run dyn RuntimeEffectController,
480 scope: ExecutionScope,
481 ) -> Result<Self, RuntimeError> {
482 scope.validate()?;
483 Ok(Self {
484 controller: ScopedEffectControllerInner::Borrowed(controller),
485 scope,
486 })
487 }
488
489 pub fn shared(
490 controller: Arc<dyn RuntimeEffectController>,
491 scope: ExecutionScope,
492 ) -> Result<Self, RuntimeError> {
493 scope.validate()?;
494 Ok(Self {
495 controller: ScopedEffectControllerInner::Shared(controller),
496 scope,
497 })
498 }
499
500 pub fn controller(&self) -> &dyn RuntimeEffectController {
501 match &self.controller {
502 ScopedEffectControllerInner::Borrowed(controller) => *controller,
503 ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
504 }
505 }
506
507 pub fn execution_scope(&self) -> &ExecutionScope {
508 &self.scope
509 }
510
511 pub fn scope_id(&self) -> &str {
512 self.scope.id()
513 }
514
515 pub fn turn_id(&self) -> Option<&str> {
516 self.scope.turn_id()
517 }
518}
519
520#[async_trait::async_trait]
522pub trait EffectHost: Send + Sync {
523 fn durability_tier(&self) -> crate::DurabilityTier {
524 crate::DurabilityTier::Inline
525 }
526
527 fn requires_durable_attachment_store(&self) -> bool {
528 false
529 }
530
531 fn supports_durable_effects(&self) -> bool {
532 false
533 }
534
535 fn scoped<'run>(
536 &'run self,
537 scope: ExecutionScope,
538 ) -> Result<ScopedEffectController<'run>, RuntimeError>;
539
540 fn scoped_static(
541 &self,
542 _scope: ExecutionScope,
543 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
544 Ok(None)
545 }
546
547 async fn await_event_key(
548 &self,
549 _scope: &ExecutionScope,
550 _wait: AwaitEventWaitIdentity,
551 ) -> Result<AwaitEventKey, RuntimeError> {
552 Err(RuntimeError::new(
553 "await_event_unsupported",
554 "this effect host does not support await-event keys",
555 ))
556 }
557
558 async fn resolve_await_event(
559 &self,
560 _key: &AwaitEventKey,
561 _resolution: Resolution,
562 ) -> Result<ResolveOutcome, RuntimeError> {
563 Ok(ResolveOutcome::UnknownOrRevoked)
564 }
565
566 async fn await_await_event(
567 &self,
568 _key: &AwaitEventKey,
569 _cancel: CancellationToken,
570 _deadline: Option<Instant>,
571 ) -> Result<Resolution, RuntimeError> {
572 Err(RuntimeError::new(
573 "await_event_unsupported",
574 "this effect host does not support await-event waits",
575 ))
576 }
577
578 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
579 Ok(())
580 }
581}
582
583#[async_trait::async_trait]
585pub trait RuntimeEffectController: Send + Sync {
586 fn durability_tier(&self) -> crate::DurabilityTier {
589 crate::DurabilityTier::Inline
590 }
591
592 fn requires_durable_attachment_store(&self) -> bool {
593 false
594 }
595
596 fn supports_durable_effects(&self) -> bool {
597 false
598 }
599
600 fn supports_concurrent_effects(&self) -> bool {
610 true
611 }
612
613 async fn execute_effect(
614 &self,
615 envelope: RuntimeEffectEnvelope,
616 local_executor: RuntimeEffectLocalExecutor<'_>,
617 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
618
619 async fn await_event_key(
620 &self,
621 _scope: &ExecutionScope,
622 _wait: AwaitEventWaitIdentity,
623 ) -> Result<AwaitEventKey, RuntimeError> {
624 Err(RuntimeError::new(
625 "await_event_unsupported",
626 "this effect controller does not support await-event keys",
627 ))
628 }
629
630 async fn resolve_await_event(
631 &self,
632 _key: &AwaitEventKey,
633 _resolution: Resolution,
634 ) -> Result<ResolveOutcome, RuntimeError> {
635 Ok(ResolveOutcome::UnknownOrRevoked)
636 }
637
638 async fn await_await_event(
639 &self,
640 _key: &AwaitEventKey,
641 _cancel: CancellationToken,
642 _deadline: Option<Instant>,
643 ) -> Result<Resolution, RuntimeError> {
644 Err(RuntimeError::new(
645 "await_event_unsupported",
646 "this effect controller does not support await-event waits",
647 ))
648 }
649
650 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
651 Ok(())
652 }
653}
654
655#[derive(Clone)]
658pub(crate) enum RuntimeEffectControllerHandle<'run> {
659 Borrowed(ScopedEffectController<'run>),
660 #[cfg(any(test, feature = "testing"))]
661 Shared {
662 controller: Arc<dyn RuntimeEffectController>,
663 scope: ExecutionScope,
664 },
665}
666
667impl<'run> RuntimeEffectControllerHandle<'run> {
668 pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
669 Self::Borrowed(scoped)
670 }
671
672 #[cfg(any(test, feature = "testing"))]
673 pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
674 Self::Shared {
675 controller,
676 scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
677 }
678 }
679
680 pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
681 match self {
682 Self::Borrowed(scoped) => scoped.controller(),
683 #[cfg(any(test, feature = "testing"))]
684 Self::Shared { controller, .. } => controller.as_ref(),
685 }
686 }
687
688 pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
689 match self {
690 Self::Borrowed(scoped) => scoped.clone(),
691 #[cfg(any(test, feature = "testing"))]
692 Self::Shared { controller, scope } => {
693 ScopedEffectController::shared(Arc::clone(controller), scope.clone())
694 .expect("runtime effect controller handle carries a valid scope")
695 }
696 }
697 }
698
699 pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
700 self.clone()
701 }
702}
703
704#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
705#[error("{code}: {message}")]
706pub struct RuntimeEffectControllerError {
707 pub code: String,
708 pub message: String,
709}
710
711impl RuntimeEffectControllerError {
712 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
713 Self {
714 code: code.into(),
715 message: message.into(),
716 }
717 }
718
719 pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
720 Self::new(
721 "runtime_effect_wrong_outcome",
722 format!(
723 "expected {} outcome, got {}",
724 expected.as_str(),
725 actual.as_str()
726 ),
727 )
728 }
729
730 pub(crate) fn into_runtime_error(self) -> RuntimeError {
731 RuntimeError::new(self.code, self.message)
732 }
733}
734
735impl From<RuntimeError> for RuntimeEffectControllerError {
736 fn from(err: RuntimeError) -> Self {
737 Self::new(err.code.as_str(), err.message)
738 }
739}
740
741impl From<PluginError> for RuntimeEffectControllerError {
742 fn from(err: PluginError) -> Self {
743 Self::new("plugin", err.to_string())
744 }
745}
746
747impl From<crate::StoreError> for RuntimeEffectControllerError {
748 fn from(err: crate::StoreError) -> Self {
749 Self::new("runtime_store", err.to_string())
750 }
751}
752
753#[async_trait::async_trait]
758pub(crate) trait ProcessRunner: Send + Sync {
759 async fn run_process(
760 &self,
761 registration: crate::ProcessRegistration,
762 execution_context: crate::ProcessExecutionContext,
763 registry: Arc<dyn ProcessRegistry>,
764 scoped_effect_controller: crate::ScopedEffectController<'_>,
765 cancellation: CancellationToken,
766 ) -> crate::ProcessAwaitOutput;
767}
768
769pub struct ProcessLocalExecution {
770 pub registry: Arc<dyn ProcessRegistry>,
771 pub process_work_driver: Option<crate::ProcessWorkDriver>,
772}
773
774pub(super) struct LocalTurnEffectRunner<'a, 'run> {
775 driver: &'a mut RuntimeTurnDriver<'run>,
776 machine: &'a mut crate::TurnMachine,
777 event_tx: mpsc::Sender<RuntimeStreamEvent>,
778 cancellation: CancellationToken,
779}
780
781pub(super) struct LocalDirectEffectRunner {
782 provider: ProviderHandle,
783 attachment_store: Arc<dyn AttachmentStore>,
784}
785
786struct LocalToolBatchEffectRunner<'run> {
787 context: crate::RuntimeExecutionContext<'run>,
788 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
789}
790
791#[async_trait::async_trait]
792trait RuntimeEffectLocalRunner: Send {
793 async fn execute(
794 self: Box<Self>,
795 envelope: RuntimeEffectEnvelope,
796 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
797}
798
799#[cfg(any(test, feature = "testing"))]
800type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
801 RuntimeEffectEnvelope,
802 ) -> Pin<
803 Box<
804 dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
805 + Send
806 + 'run,
807 >,
808 > + Send
809 + 'run;
810
811#[cfg(any(test, feature = "testing"))]
812struct TestingRuntimeEffectLocalRunner<'run> {
813 run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
814}
815
816type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
817 serde_json::Value,
818 ) -> Pin<
819 Box<
820 dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
821 + Send
822 + 'run,
823 >,
824 > + Send
825 + 'run;
826
827struct DurableStepLocalRunner<'run> {
828 run: Box<DurableStepLocalRunnerFn<'run>>,
829}
830
831enum RuntimeEffectLocalExecutorState<'run> {
832 Unavailable,
833 SleepOnly {
834 cancellation: CancellationToken,
835 clock: Arc<dyn crate::Clock>,
836 },
837 ExternalWaitOptions {
838 cancellation: CancellationToken,
839 deadline: Option<Instant>,
840 clock: Arc<dyn crate::Clock>,
841 },
842 Process(ProcessLocalExecution),
843 Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
844}
845
846pub struct RuntimeEffectLocalExecutor<'run> {
852 state: RuntimeEffectLocalExecutorState<'run>,
853}
854
855impl<'run> RuntimeEffectLocalExecutor<'run> {
856 pub fn unavailable() -> Self {
857 Self {
858 state: RuntimeEffectLocalExecutorState::Unavailable,
859 }
860 }
861
862 pub fn sleep(cancellation: CancellationToken) -> Self {
863 Self::sleep_with_clock(cancellation, Arc::new(crate::SystemClock))
864 }
865
866 pub fn sleep_with_clock(cancellation: CancellationToken, clock: Arc<dyn crate::Clock>) -> Self {
867 Self {
868 state: RuntimeEffectLocalExecutorState::SleepOnly {
869 cancellation,
870 clock,
871 },
872 }
873 }
874
875 pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
876 Self::await_event_with_clock(cancellation, deadline, Arc::new(crate::SystemClock))
877 }
878
879 pub fn await_event_with_clock(
880 cancellation: CancellationToken,
881 deadline: Option<Instant>,
882 clock: Arc<dyn crate::Clock>,
883 ) -> Self {
884 Self {
885 state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
886 cancellation,
887 deadline,
888 clock,
889 },
890 }
891 }
892
893 pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
894 Self::processes_with_driver(registry, None)
895 }
896
897 pub fn processes_with_driver(
898 registry: Arc<dyn ProcessRegistry>,
899 process_work_driver: Option<crate::ProcessWorkDriver>,
900 ) -> Self {
901 Self {
902 state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution {
903 registry,
904 process_work_driver,
905 }),
906 }
907 }
908
909 pub fn durable_step<F, Fut>(run: F) -> Self
910 where
911 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
912 Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
913 {
914 Self {
915 state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
916 run: Box::new(move |input| {
917 Box::pin(async move { run(input).await.map_err(Into::into) })
918 }),
919 })),
920 }
921 }
922
923 #[cfg(any(test, feature = "testing"))]
924 pub fn testing<F, Fut>(run: F) -> Self
925 where
926 F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
927 Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
928 + Send
929 + 'run,
930 {
931 Self {
932 state: RuntimeEffectLocalExecutorState::Runner(Box::new(
933 TestingRuntimeEffectLocalRunner {
934 run: Box::new(move |envelope| Box::pin(run(envelope))),
935 },
936 )),
937 }
938 }
939
940 pub(in crate::runtime) fn turn<'scope>(
941 driver: &'run mut RuntimeTurnDriver<'scope>,
942 machine: &'run mut crate::TurnMachine,
943 event_tx: mpsc::Sender<RuntimeStreamEvent>,
944 cancellation: CancellationToken,
945 ) -> Self
946 where
947 'scope: 'run,
948 {
949 Self {
950 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
951 driver,
952 machine,
953 event_tx,
954 cancellation,
955 })),
956 }
957 }
958
959 pub(in crate::runtime) fn direct(
960 provider: ProviderHandle,
961 attachment_store: Arc<dyn AttachmentStore>,
962 ) -> Self {
963 Self {
964 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
965 provider,
966 attachment_store,
967 })),
968 }
969 }
970
971 pub(crate) fn tool_batch(
972 context: crate::RuntimeExecutionContext<'run>,
973 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
974 ) -> Self {
975 Self {
976 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalToolBatchEffectRunner {
977 context,
978 child_trace_hooks,
979 })),
980 }
981 }
982
983 pub async fn execute(
984 self,
985 envelope: RuntimeEffectEnvelope,
986 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
987 match self.state {
988 RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
989 RuntimeEffectLocalExecutorState::SleepOnly {
990 cancellation,
991 clock,
992 } => execute_local_sleep(envelope, cancellation, clock.as_ref()).await,
993 RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
994 Err(RuntimeEffectControllerError::new(
995 "runtime_effect_local_executor_mismatch",
996 format!(
997 "local await-event options cannot execute {} command directly",
998 envelope.command.kind().as_str()
999 ),
1000 ))
1001 }
1002 RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
1003 "runtime_effect_local_executor_unavailable",
1004 format!(
1005 "no local executor is available for {}",
1006 envelope.command.kind().as_str()
1007 ),
1008 )),
1009 RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
1010 "runtime_effect_local_executor_mismatch",
1011 format!(
1012 "process executor cannot execute {} command directly",
1013 envelope.command.kind().as_str()
1014 ),
1015 )),
1016 }
1017 }
1018
1019 pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
1020 match self.state {
1021 RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
1022 _ => Err(RuntimeEffectControllerError::new(
1023 "runtime_effect_local_executor_unavailable",
1024 "no process executor is available for process command",
1025 )),
1026 }
1027 }
1028
1029 fn into_await_event_options(self) -> Result<AwaitEventOptions, RuntimeEffectControllerError> {
1030 match self.state {
1031 RuntimeEffectLocalExecutorState::ExternalWaitOptions {
1032 cancellation,
1033 deadline,
1034 clock,
1035 } => Ok((cancellation, deadline, clock)),
1036 _ => Ok((CancellationToken::new(), None, Arc::new(crate::SystemClock))),
1037 }
1038 }
1039}
1040
1041#[cfg(any(test, feature = "testing"))]
1042#[async_trait::async_trait]
1043impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
1044 async fn execute(
1045 self: Box<Self>,
1046 envelope: RuntimeEffectEnvelope,
1047 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1048 (self.run)(envelope).await
1049 }
1050}
1051
1052#[async_trait::async_trait]
1053impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
1054 async fn execute(
1055 self: Box<Self>,
1056 envelope: RuntimeEffectEnvelope,
1057 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1058 match envelope.command {
1059 RuntimeEffectCommand::DurableStep { input, .. } => {
1060 let value = (self.run)(input).await?;
1061 Ok(RuntimeEffectOutcome::DurableStep { value })
1062 }
1063 command => Err(RuntimeEffectControllerError::new(
1064 "runtime_effect_local_executor_mismatch",
1065 format!(
1066 "local durable step executor cannot execute {} command",
1067 command.kind().as_str()
1068 ),
1069 )),
1070 }
1071 }
1072}
1073
1074#[async_trait::async_trait]
1075impl RuntimeEffectLocalRunner for LocalToolBatchEffectRunner<'_> {
1076 async fn execute(
1077 self: Box<Self>,
1078 envelope: RuntimeEffectEnvelope,
1079 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1080 match envelope.command {
1081 RuntimeEffectCommand::ToolBatch { batch } => {
1082 let outcome = self
1083 .context
1084 .execute_prepared_tool_batch_launches(
1085 batch,
1086 envelope.invocation,
1087 self.child_trace_hooks,
1088 )
1089 .await?;
1090 Ok(RuntimeEffectOutcome::ToolBatch {
1091 launches: outcome.launches,
1092 triggers: outcome.triggers,
1093 })
1094 }
1095 RuntimeEffectCommand::ToolAttempt {
1096 call,
1097 attempt,
1098 max_attempts,
1099 } => {
1100 let child_execution_trace_hook = self.child_trace_hooks.get(&call.call_id).cloned();
1101 let outcome = self
1102 .context
1103 .execute_prepared_tool_attempt_effect(
1104 call,
1105 attempt,
1106 max_attempts,
1107 envelope.invocation,
1108 child_execution_trace_hook,
1109 )
1110 .await?;
1111 Ok(RuntimeEffectOutcome::ToolAttempt {
1112 launch: outcome.launch,
1113 triggers: outcome.triggers,
1114 })
1115 }
1116 command => Err(RuntimeEffectControllerError::new(
1117 "runtime_effect_local_executor_mismatch",
1118 format!(
1119 "local tool executor cannot execute {} command",
1120 command.kind().as_str()
1121 ),
1122 )),
1123 }
1124 }
1125}
1126
1127#[async_trait::async_trait]
1128impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
1129 async fn execute(
1130 self: Box<Self>,
1131 envelope: RuntimeEffectEnvelope,
1132 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1133 let runner = *self;
1134 match envelope.command {
1135 RuntimeEffectCommand::LlmCall { request } => {
1136 let protocol_iteration = runner.machine.protocol_iteration();
1137 let (result, text_streamed) = runner
1138 .driver
1139 .run_llm_call(
1140 Arc::new((*request).into_request(None, None)),
1141 protocol_iteration,
1142 envelope.invocation,
1143 &runner.event_tx,
1144 &runner.cancellation,
1145 )
1146 .await;
1147 Ok(RuntimeEffectOutcome::LlmCall {
1148 result,
1149 text_streamed,
1150 })
1151 }
1152 RuntimeEffectCommand::ToolBatch { batch } => {
1153 let outcome = runner
1154 .driver
1155 .run_tool_batch(
1156 batch,
1157 envelope.invocation,
1158 &runner.event_tx,
1159 &runner.cancellation,
1160 )
1161 .await?;
1162 Ok(RuntimeEffectOutcome::ToolBatch {
1163 launches: outcome.launches,
1164 triggers: outcome.triggers,
1165 })
1166 }
1167 RuntimeEffectCommand::ExecCode { language, code } => {
1168 let protocol_iteration = runner.machine.protocol_iteration();
1169 let messages = runner.machine.message_sequence();
1170 Ok(RuntimeEffectOutcome::ExecCode {
1171 result: runner
1172 .driver
1173 .run_exec_code(
1174 language,
1175 &code,
1176 messages,
1177 protocol_iteration,
1178 envelope.invocation,
1179 &runner.event_tx,
1180 )
1181 .await,
1182 })
1183 }
1184 RuntimeEffectCommand::Checkpoint { checkpoint } => {
1185 Ok(RuntimeEffectOutcome::Checkpoint {
1186 result: runner
1187 .driver
1188 .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1189 .await
1190 .map_err(RuntimeEffectControllerError::from),
1191 })
1192 }
1193 RuntimeEffectCommand::SyncExecutionEnvironment {
1194 update_machine_config,
1195 } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1196 result: runner
1197 .driver
1198 .refresh_execution_environment(runner.machine, update_machine_config)
1199 .await
1200 .map_err(|err| err.to_string()),
1201 }),
1202 RuntimeEffectCommand::Sleep { duration_ms } => {
1203 sleep_with_cancellation(
1204 duration_ms,
1205 &runner.cancellation,
1206 runner.driver.host.core.clock.as_ref(),
1207 )
1208 .await?;
1209 Ok(RuntimeEffectOutcome::Sleep)
1210 }
1211 command => Err(RuntimeEffectControllerError::new(
1212 "runtime_effect_local_executor_mismatch",
1213 format!(
1214 "local turn executor cannot execute {} command",
1215 command.kind().as_str()
1216 ),
1217 )),
1218 }
1219 }
1220}
1221
1222#[async_trait::async_trait]
1223impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1224 async fn execute(
1225 mut self: Box<Self>,
1226 envelope: RuntimeEffectEnvelope,
1227 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1228 match envelope.command {
1229 RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1230 result: self
1231 .run_direct_llm_request((*request).into_request(
1232 crate::session_model::transport_stream_events(&self.provider, None),
1233 None,
1234 ))
1235 .await,
1236 }),
1237 RuntimeEffectCommand::Sleep { duration_ms } => {
1238 sleep_with_cancellation(
1239 duration_ms,
1240 &CancellationToken::new(),
1241 &crate::SystemClock,
1242 )
1243 .await?;
1244 Ok(RuntimeEffectOutcome::Sleep)
1245 }
1246 command => Err(RuntimeEffectControllerError::new(
1247 "runtime_effect_local_executor_mismatch",
1248 format!(
1249 "local direct executor cannot execute {} command",
1250 command.kind().as_str()
1251 ),
1252 )),
1253 }
1254 }
1255}
1256
1257impl LocalDirectEffectRunner {
1258 async fn run_direct_llm_request(
1259 &mut self,
1260 request: CoreLlmRequest,
1261 ) -> Result<LlmResponse, LlmCallError> {
1262 let request = crate::attachments::resolve_llm_request_attachments(
1263 request,
1264 self.attachment_store.as_ref(),
1265 )
1266 .await
1267 .map_err(|err| LlmCallError {
1268 message: err.to_string(),
1269 retryable: false,
1270 raw: None,
1271 code: Some("attachment_resolution_failed".to_string()),
1272 terminal_reason: crate::LlmTerminalReason::ProviderError,
1273 request_body: None,
1274 })?;
1275 self.provider
1276 .complete(request)
1277 .await
1278 .map_err(llm_call_error_from_transport)
1279 }
1280}
1281
1282async fn execute_local_sleep(
1283 envelope: RuntimeEffectEnvelope,
1284 cancellation: CancellationToken,
1285 clock: &dyn crate::Clock,
1286) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1287 match envelope.command {
1288 RuntimeEffectCommand::Sleep { duration_ms } => {
1289 sleep_with_cancellation(duration_ms, &cancellation, clock).await?;
1290 Ok(RuntimeEffectOutcome::Sleep)
1291 }
1292 command => Err(RuntimeEffectControllerError::new(
1293 "runtime_effect_local_executor_mismatch",
1294 format!(
1295 "local sleep executor cannot execute {} command",
1296 command.kind().as_str()
1297 ),
1298 )),
1299 }
1300}
1301
1302async fn sleep_with_cancellation(
1303 duration_ms: u64,
1304 cancellation: &CancellationToken,
1305 clock: &dyn crate::Clock,
1306) -> Result<(), RuntimeEffectControllerError> {
1307 let sleep = clock.sleep(std::time::Duration::from_millis(duration_ms));
1308 tokio::pin!(sleep);
1309 tokio::select! {
1310 _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1311 "runtime_effect_sleep_cancelled",
1312 "runtime effect sleep was cancelled",
1313 )),
1314 _ = &mut sleep => Ok(()),
1315 }
1316}
1317
1318#[derive(Clone, Default)]
1329pub struct InlineRuntimeEffectController;
1330
1331#[async_trait::async_trait]
1332impl RuntimeEffectController for InlineRuntimeEffectController {
1333 fn supports_durable_effects(&self) -> bool {
1334 true
1335 }
1336
1337 async fn execute_effect(
1338 &self,
1339 envelope: RuntimeEffectEnvelope,
1340 local_executor: RuntimeEffectLocalExecutor<'_>,
1341 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1342 match envelope.command {
1343 RuntimeEffectCommand::AwaitEvent { key } => {
1344 let (cancellation, deadline, clock) = local_executor.into_await_event_options()?;
1345 let resolution = inline_await_events()
1346 .await_resolution(&key, cancellation, deadline, clock.as_ref())
1347 .await
1348 .map_err(RuntimeEffectControllerError::from)?;
1349 Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1350 }
1351 RuntimeEffectCommand::Process { command } => {
1352 let execution = local_executor.into_process()?;
1353 let registry = execution.registry;
1354 let process_work_driver = execution.process_work_driver;
1355 let result = tokio::task::spawn(async move {
1356 Self::execute_process_command(registry, process_work_driver, *command).await
1357 })
1358 .await
1359 .map_err(|err| {
1360 RuntimeEffectControllerError::new(
1361 "runtime_effect_process_task_join",
1362 format!("inline process effect task failed: {err}"),
1363 )
1364 })??;
1365 Ok(RuntimeEffectOutcome::Process { result })
1366 }
1367 _ => local_executor.execute(envelope).await,
1368 }
1369 }
1370
1371 async fn await_event_key(
1372 &self,
1373 scope: &ExecutionScope,
1374 wait: AwaitEventWaitIdentity,
1375 ) -> Result<AwaitEventKey, RuntimeError> {
1376 inline_await_events().key_for(scope, wait)
1377 }
1378
1379 async fn resolve_await_event(
1380 &self,
1381 key: &AwaitEventKey,
1382 resolution: Resolution,
1383 ) -> Result<ResolveOutcome, RuntimeError> {
1384 inline_await_events().resolve(key, resolution)
1385 }
1386
1387 async fn await_await_event(
1388 &self,
1389 key: &AwaitEventKey,
1390 cancel: CancellationToken,
1391 deadline: Option<Instant>,
1392 ) -> Result<Resolution, RuntimeError> {
1393 inline_await_events()
1394 .await_resolution(key, cancel, deadline, &crate::SystemClock)
1395 .await
1396 }
1397
1398 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1399 inline_await_events().revoke_session(session_id)
1400 }
1401}
1402
1403#[derive(Clone)]
1405pub struct InlineEffectHost {
1406 controller: Arc<dyn RuntimeEffectController>,
1407}
1408
1409impl InlineEffectHost {
1410 pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1411 Self { controller }
1412 }
1413}
1414
1415impl Default for InlineEffectHost {
1416 fn default() -> Self {
1417 Self::new(Arc::new(InlineRuntimeEffectController))
1418 }
1419}
1420
1421#[async_trait::async_trait]
1422impl EffectHost for InlineEffectHost {
1423 fn durability_tier(&self) -> crate::DurabilityTier {
1424 self.controller.durability_tier()
1425 }
1426
1427 fn requires_durable_attachment_store(&self) -> bool {
1428 self.controller.requires_durable_attachment_store()
1429 }
1430
1431 fn supports_durable_effects(&self) -> bool {
1432 self.controller.supports_durable_effects()
1433 }
1434
1435 fn scoped<'run>(
1436 &'run self,
1437 scope: ExecutionScope,
1438 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1439 ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1440 }
1441
1442 fn scoped_static(
1443 &self,
1444 scope: ExecutionScope,
1445 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1446 Ok(Some(ScopedEffectController::shared(
1447 Arc::clone(&self.controller),
1448 scope,
1449 )?))
1450 }
1451
1452 async fn await_event_key(
1453 &self,
1454 scope: &ExecutionScope,
1455 wait: AwaitEventWaitIdentity,
1456 ) -> Result<AwaitEventKey, RuntimeError> {
1457 self.controller.await_event_key(scope, wait).await
1458 }
1459
1460 async fn resolve_await_event(
1461 &self,
1462 key: &AwaitEventKey,
1463 resolution: Resolution,
1464 ) -> Result<ResolveOutcome, RuntimeError> {
1465 self.controller.resolve_await_event(key, resolution).await
1466 }
1467
1468 async fn await_await_event(
1469 &self,
1470 key: &AwaitEventKey,
1471 cancel: CancellationToken,
1472 deadline: Option<Instant>,
1473 ) -> Result<Resolution, RuntimeError> {
1474 self.controller
1475 .await_await_event(key, cancel, deadline)
1476 .await
1477 }
1478
1479 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1480 self.controller
1481 .revoke_await_events_for_session(session_id)
1482 .await
1483 }
1484}
1485
1486impl InlineRuntimeEffectController {
1487 pub(crate) async fn start_process(
1495 registry: Arc<dyn crate::ProcessRegistry>,
1496 registration: crate::ProcessRegistration,
1497 grant: Option<crate::ProcessStartGrant>,
1498 ) -> Result<ProcessRecord, PluginError> {
1499 let registration_for_record = registration.clone();
1500 let record = registry.register_process(registration_for_record).await?;
1501 if let Some(grant) = grant {
1502 registry
1503 .grant_handle(&grant.session_scope, ®istration.id, grant.descriptor)
1504 .await?;
1505 }
1506 Ok(record)
1507 }
1508
1509 pub(crate) async fn request_process_cancel(
1510 &self,
1511 registry: Arc<dyn crate::ProcessRegistry>,
1512 process_id: &str,
1513 reason: Option<String>,
1514 ) -> Result<ProcessRecord, PluginError> {
1515 registry
1519 .append_event(
1520 process_id,
1521 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1522 )
1523 .await?;
1524 registry
1525 .get_process(process_id)
1526 .await
1527 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1528 }
1529
1530 async fn execute_process_command(
1531 registry: Arc<dyn crate::ProcessRegistry>,
1532 process_work_driver: Option<crate::ProcessWorkDriver>,
1533 command: ProcessCommand,
1534 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1535 match command {
1536 ProcessCommand::Start {
1537 registration,
1538 grant,
1539 execution_context: _,
1540 } => {
1541 let record = Self::start_process(registry, registration, grant).await?;
1542 if let Some(driver) = process_work_driver.as_ref() {
1543 driver.claim_and_run_pending("process_start").await?;
1544 }
1545 Ok(ProcessEffectOutcome::Start { record })
1546 }
1547 ProcessCommand::List {
1548 session_scope,
1549 mode,
1550 } => {
1551 let entries = match mode {
1552 crate::ProcessListMode::Live => {
1553 registry.list_live_handle_grants(&session_scope).await?
1554 }
1555 crate::ProcessListMode::All => {
1556 registry.list_handle_grants(&session_scope).await?
1557 }
1558 };
1559 Ok(ProcessEffectOutcome::List { entries })
1560 }
1561 ProcessCommand::Transfer {
1562 from_scope,
1563 to_scope,
1564 process_ids,
1565 } => {
1566 registry
1567 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1568 .await?;
1569 Ok(ProcessEffectOutcome::Transfer)
1570 }
1571 ProcessCommand::DeleteSession { session_id } => {
1572 let report = registry.delete_session_process_state(&session_id).await?;
1573 Ok(ProcessEffectOutcome::DeleteSession { report })
1574 }
1575 ProcessCommand::Await { process_id } => {
1576 let output = registry.await_process(&process_id).await?;
1577 Ok(ProcessEffectOutcome::Await { output })
1578 }
1579 ProcessCommand::Cancel { process_id, reason } => {
1580 let record = InlineRuntimeEffectController
1581 .request_process_cancel(registry, &process_id, reason)
1582 .await?;
1583 Ok(ProcessEffectOutcome::Cancel { record })
1584 }
1585 ProcessCommand::Signal {
1586 process_id,
1587 request,
1588 ..
1589 } => {
1590 let result = registry.append_event(&process_id, request).await?;
1591 Ok(ProcessEffectOutcome::Signal {
1592 event: result.event,
1593 })
1594 }
1595 }
1596 }
1597}
1598
1599impl std::fmt::Debug for InlineRuntimeEffectController {
1600 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1601 f.debug_struct("InlineRuntimeEffectController").finish()
1602 }
1603}