1use std::collections::{HashMap, HashSet};
2use std::pin::Pin;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5
6use hmac::{Hmac, Mac};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{Notify, mpsc};
9use tokio_util::sync::CancellationToken;
10
11use crate::AttachmentStore;
12use crate::LlmRequest as CoreLlmRequest;
13use crate::LlmResponse;
14use crate::ProcessRecord;
15use crate::ProcessRegistry;
16use crate::provider::ProviderHandle;
17use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
18use crate::sansio::LlmCallError;
19use crate::{PluginError, RuntimeError, RuntimeErrorCode};
20
21use super::envelope::{
22 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
23 RuntimeEffectKind, RuntimeEffectOutcome,
24};
25use super::outcome::llm_call_error_from_transport;
26
27type HmacSha256 = Hmac<sha2::Sha256>;
28type AwaitEventOptions = (CancellationToken, Option<Instant>, Arc<dyn crate::Clock>);
29
30fn inline_await_events() -> &'static AwaitEventRegistry {
31 static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
32 REGISTRY.get_or_init(AwaitEventRegistry::new)
33}
34
35#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum ExecutionScope {
47 Turn {
48 session_id: String,
49 turn_id: String,
50 },
51 Process {
52 process_id: String,
53 },
54 QueueDrain {
55 session_id: String,
56 drain_id: String,
57 },
58 SessionDelete {
59 session_id: String,
60 },
61 RuntimeOperation {
62 operation_id: String,
63 },
64}
65
66impl ExecutionScope {
67 pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
68 Self::Turn {
69 session_id: session_id.into(),
70 turn_id: turn_id.into(),
71 }
72 }
73
74 pub fn process(process_id: impl Into<String>) -> Self {
75 Self::Process {
76 process_id: process_id.into(),
77 }
78 }
79
80 pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
81 Self::QueueDrain {
82 session_id: session_id.into(),
83 drain_id: drain_id.into(),
84 }
85 }
86
87 pub fn session_delete(session_id: impl Into<String>) -> Self {
88 Self::SessionDelete {
89 session_id: session_id.into(),
90 }
91 }
92
93 pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
94 Self::RuntimeOperation {
95 operation_id: operation_id.into(),
96 }
97 }
98
99 pub fn id(&self) -> &str {
100 match self {
101 Self::Turn { turn_id, .. } => turn_id,
102 Self::Process { process_id } => process_id,
103 Self::QueueDrain { drain_id, .. } => drain_id,
104 Self::SessionDelete { session_id } => session_id,
105 Self::RuntimeOperation { operation_id } => operation_id,
106 }
107 }
108
109 pub fn session_id(&self) -> Option<&str> {
110 match self {
111 Self::Turn { session_id, .. }
112 | Self::QueueDrain { session_id, .. }
113 | Self::SessionDelete { session_id } => Some(session_id),
114 Self::Process { .. } | Self::RuntimeOperation { .. } => None,
115 }
116 }
117
118 pub fn turn_id(&self) -> Option<&str> {
119 match self {
120 Self::Turn { turn_id, .. } => Some(turn_id),
121 _ => None,
122 }
123 }
124
125 pub fn validates_turn_trace_id(&self) -> bool {
126 matches!(self, Self::Turn { .. })
127 }
128
129 fn validate(&self) -> Result<(), RuntimeError> {
130 let missing = match self {
131 Self::Turn {
132 session_id,
133 turn_id,
134 } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
135 Self::Process { process_id } => process_id.trim().is_empty(),
136 Self::QueueDrain {
137 session_id,
138 drain_id,
139 } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
140 Self::SessionDelete { session_id } => session_id.trim().is_empty(),
141 Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
142 };
143 if missing {
144 return Err(RuntimeError::new(
145 RuntimeErrorCode::MissingExecutionScopeId,
146 "execution scopes require non-empty stable ids",
147 ));
148 }
149 Ok(())
150 }
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
154#[serde(tag = "type", rename_all = "snake_case")]
155pub enum AwaitEventWaitIdentity {
156 ToolCompletion {
157 tool_call_id: String,
158 },
159 ProcessSignal {
160 process_id: String,
161 signal_name: String,
162 ordinal: u64,
163 },
164 Custom {
165 key: String,
166 },
167}
168
169impl AwaitEventWaitIdentity {
170 pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
171 Self::ToolCompletion {
172 tool_call_id: tool_call_id.into(),
173 }
174 }
175
176 pub fn process_signal(
177 process_id: impl Into<String>,
178 signal_name: impl Into<String>,
179 ordinal: u64,
180 ) -> Self {
181 Self::ProcessSignal {
182 process_id: process_id.into(),
183 signal_name: signal_name.into(),
184 ordinal,
185 }
186 }
187
188 fn validate(&self) -> Result<(), RuntimeError> {
189 let invalid = match self {
190 Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
191 Self::ProcessSignal {
192 process_id,
193 signal_name,
194 ordinal,
195 } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
196 Self::Custom { key } => key.trim().is_empty(),
197 };
198 if invalid {
199 return Err(RuntimeError::new(
200 "invalid_await_event_wait_identity",
201 "await-event wait identity requires non-empty stable ids",
202 ));
203 }
204 Ok(())
205 }
206}
207
208#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
209pub struct AwaitEventKey {
210 pub scope: ExecutionScope,
211 pub wait: AwaitEventWaitIdentity,
212 pub key_id: String,
213 pub signature: String,
214}
215
216impl AwaitEventKey {
217 pub fn promise_key(&self) -> String {
218 format!("lash-await-event:{}", self.key_id)
219 }
220}
221
222#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
223pub struct ExternalCompletionError {
224 pub code: String,
225 pub message: String,
226 #[serde(default, skip_serializing_if = "Option::is_none")]
227 pub raw: Option<serde_json::Value>,
228}
229
230impl ExternalCompletionError {
231 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
232 Self {
233 code: code.into(),
234 message: message.into(),
235 raw: None,
236 }
237 }
238}
239
240#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
241#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
242pub enum Resolution {
243 Ok(serde_json::Value),
244 Err(ExternalCompletionError),
245 Timeout,
246 Cancelled,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "status", rename_all = "snake_case")]
251pub enum ResolveOutcome {
252 Accepted,
253 AlreadyResolved { terminal: Resolution },
254 UnknownOrRevoked,
255}
256
257#[derive(Debug)]
258struct AwaitEventEntry {
259 terminal: Option<Resolution>,
260 notify: Arc<Notify>,
261}
262
263#[derive(Debug)]
264struct AwaitEventRegistryState {
265 entries: HashMap<String, AwaitEventEntry>,
266 revoked_key_ids: HashSet<String>,
267 revoked_session_ids: HashSet<String>,
268}
269
270#[derive(Debug)]
271struct AwaitEventRegistry {
272 secret: Vec<u8>,
273 state: std::sync::Mutex<AwaitEventRegistryState>,
274}
275
276impl AwaitEventRegistry {
277 fn new() -> Self {
278 Self {
279 secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
280 state: std::sync::Mutex::new(AwaitEventRegistryState {
281 entries: HashMap::new(),
282 revoked_key_ids: HashSet::new(),
283 revoked_session_ids: HashSet::new(),
284 }),
285 }
286 }
287
288 fn key_for(
289 &self,
290 scope: &ExecutionScope,
291 wait: AwaitEventWaitIdentity,
292 ) -> Result<AwaitEventKey, RuntimeError> {
293 scope.validate()?;
294 wait.validate()?;
295 let key_id =
296 crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
297 RuntimeError::new(
298 "await_event_key_hash",
299 format!("failed to hash await-event identity: {err}"),
300 )
301 })?;
302 let signature = self.signature(scope, &wait, &key_id)?;
303 Ok(AwaitEventKey {
304 scope: scope.clone(),
305 wait,
306 key_id,
307 signature,
308 })
309 }
310
311 fn signature(
312 &self,
313 scope: &ExecutionScope,
314 wait: &AwaitEventWaitIdentity,
315 key_id: &str,
316 ) -> Result<String, RuntimeError> {
317 let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
318 RuntimeError::new(
319 "await_event_key_sign",
320 format!("failed to initialize await-event key signer: {err}"),
321 )
322 })?;
323 let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
324 RuntimeError::new(
325 "await_event_key_sign",
326 format!("failed to serialize await-event key identity: {err}"),
327 )
328 })?;
329 mac.update(&canonical);
330 Ok(format!("{:x}", mac.finalize().into_bytes()))
331 }
332
333 fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
334 let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
335 Ok(expected == key.signature)
336 }
337
338 fn resolve(
339 &self,
340 key: &AwaitEventKey,
341 resolution: Resolution,
342 ) -> Result<ResolveOutcome, RuntimeError> {
343 if !self.verify(key)? {
344 return Ok(ResolveOutcome::UnknownOrRevoked);
345 }
346 let mut state = self.state.lock().map_err(|_| {
347 RuntimeError::new(
348 "await_event_registry_poisoned",
349 "await-event registry lock poisoned",
350 )
351 })?;
352 if state.revoked_key_ids.contains(&key.key_id)
353 || key
354 .scope
355 .session_id()
356 .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
357 {
358 return Ok(ResolveOutcome::UnknownOrRevoked);
359 }
360 let entry = state
361 .entries
362 .entry(key.key_id.clone())
363 .or_insert_with(|| AwaitEventEntry {
364 terminal: None,
365 notify: Arc::new(Notify::new()),
366 });
367 if let Some(terminal) = &entry.terminal {
368 return Ok(ResolveOutcome::AlreadyResolved {
369 terminal: terminal.clone(),
370 });
371 }
372 entry.terminal = Some(resolution);
373 entry.notify.notify_waiters();
374 Ok(ResolveOutcome::Accepted)
375 }
376
377 async fn await_resolution(
378 &self,
379 key: &AwaitEventKey,
380 cancel: CancellationToken,
381 deadline: Option<Instant>,
382 clock: &dyn crate::Clock,
383 ) -> Result<Resolution, RuntimeError> {
384 if !self.verify(key)? {
385 return Err(RuntimeError::new(
386 "await_event_unknown_or_revoked",
387 "await-event key is invalid or revoked",
388 ));
389 }
390 loop {
391 let notify =
392 {
393 let mut state = self.state.lock().map_err(|_| {
394 RuntimeError::new(
395 "await_event_registry_poisoned",
396 "await-event registry lock poisoned",
397 )
398 })?;
399 if state.revoked_key_ids.contains(&key.key_id)
400 || key.scope.session_id().is_some_and(|session_id| {
401 state.revoked_session_ids.contains(session_id)
402 })
403 {
404 return Err(RuntimeError::new(
405 "await_event_unknown_or_revoked",
406 "await-event key is invalid or revoked",
407 ));
408 }
409 let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
410 AwaitEventEntry {
411 terminal: None,
412 notify: Arc::new(Notify::new()),
413 }
414 });
415 if let Some(terminal) = entry.terminal.clone() {
416 return Ok(terminal);
417 }
418 Arc::clone(&entry.notify)
419 };
420 if let Some(deadline) = deadline {
421 tokio::select! {
422 _ = cancel.cancelled() => {
423 let _ = self.resolve(key, Resolution::Cancelled)?;
424 }
425 _ = clock.sleep_until(deadline) => {
426 let _ = self.resolve(key, Resolution::Timeout)?;
427 }
428 _ = notify.notified() => {}
429 }
430 } else {
431 tokio::select! {
432 _ = cancel.cancelled() => {
433 let _ = self.resolve(key, Resolution::Cancelled)?;
434 }
435 _ = notify.notified() => {}
436 }
437 }
438 }
439 }
440
441 fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
442 let mut state = self.state.lock().map_err(|_| {
443 RuntimeError::new(
444 "await_event_registry_poisoned",
445 "await-event registry lock poisoned",
446 )
447 })?;
448 state.revoked_session_ids.insert(session_id.to_string());
449 for entry in state.entries.values() {
450 entry.notify.notify_waiters();
451 }
452 Ok(())
453 }
454}
455
456enum ScopedEffectControllerInner<'run> {
457 Borrowed(&'run dyn RuntimeEffectController),
458 Shared(Arc<dyn RuntimeEffectController>),
459}
460
461impl Clone for ScopedEffectControllerInner<'_> {
462 fn clone(&self) -> Self {
463 match self {
464 Self::Borrowed(controller) => Self::Borrowed(*controller),
465 Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
466 }
467 }
468}
469
470#[derive(Clone)]
472pub struct ScopedEffectController<'run> {
473 controller: ScopedEffectControllerInner<'run>,
474 scope: ExecutionScope,
475}
476
477impl<'run> ScopedEffectController<'run> {
478 pub fn borrowed(
479 controller: &'run dyn RuntimeEffectController,
480 scope: ExecutionScope,
481 ) -> Result<Self, RuntimeError> {
482 scope.validate()?;
483 Ok(Self {
484 controller: ScopedEffectControllerInner::Borrowed(controller),
485 scope,
486 })
487 }
488
489 pub fn shared(
490 controller: Arc<dyn RuntimeEffectController>,
491 scope: ExecutionScope,
492 ) -> Result<Self, RuntimeError> {
493 scope.validate()?;
494 Ok(Self {
495 controller: ScopedEffectControllerInner::Shared(controller),
496 scope,
497 })
498 }
499
500 pub fn controller(&self) -> &dyn RuntimeEffectController {
501 match &self.controller {
502 ScopedEffectControllerInner::Borrowed(controller) => *controller,
503 ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
504 }
505 }
506
507 pub fn execution_scope(&self) -> &ExecutionScope {
508 &self.scope
509 }
510
511 pub fn scope_id(&self) -> &str {
512 self.scope.id()
513 }
514
515 pub fn turn_id(&self) -> Option<&str> {
516 self.scope.turn_id()
517 }
518}
519
520#[async_trait::async_trait]
522pub trait EffectHost: Send + Sync {
523 fn durability_tier(&self) -> crate::DurabilityTier {
524 crate::DurabilityTier::Inline
525 }
526
527 fn requires_durable_attachment_store(&self) -> bool {
528 false
529 }
530
531 fn supports_durable_effects(&self) -> bool {
532 false
533 }
534
535 fn scoped<'run>(
536 &'run self,
537 scope: ExecutionScope,
538 ) -> Result<ScopedEffectController<'run>, RuntimeError>;
539
540 fn scoped_static(
541 &self,
542 _scope: ExecutionScope,
543 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
544 Ok(None)
545 }
546
547 async fn await_event_key(
548 &self,
549 _scope: &ExecutionScope,
550 _wait: AwaitEventWaitIdentity,
551 ) -> Result<AwaitEventKey, RuntimeError> {
552 Err(RuntimeError::new(
553 "await_event_unsupported",
554 "this effect host does not support await-event keys",
555 ))
556 }
557
558 async fn resolve_await_event(
559 &self,
560 _key: &AwaitEventKey,
561 _resolution: Resolution,
562 ) -> Result<ResolveOutcome, RuntimeError> {
563 Ok(ResolveOutcome::UnknownOrRevoked)
564 }
565
566 async fn await_await_event(
567 &self,
568 _key: &AwaitEventKey,
569 _cancel: CancellationToken,
570 _deadline: Option<Instant>,
571 ) -> Result<Resolution, RuntimeError> {
572 Err(RuntimeError::new(
573 "await_event_unsupported",
574 "this effect host does not support await-event waits",
575 ))
576 }
577
578 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
579 Ok(())
580 }
581}
582
583#[async_trait::async_trait]
585pub trait RuntimeEffectController: Send + Sync {
586 fn durability_tier(&self) -> crate::DurabilityTier {
589 crate::DurabilityTier::Inline
590 }
591
592 fn requires_durable_attachment_store(&self) -> bool {
593 false
594 }
595
596 fn supports_durable_effects(&self) -> bool {
597 false
598 }
599
600 fn supports_concurrent_effects(&self) -> bool {
610 true
611 }
612
613 async fn execute_effect(
614 &self,
615 envelope: RuntimeEffectEnvelope,
616 local_executor: RuntimeEffectLocalExecutor<'_>,
617 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
618
619 async fn await_event_key(
620 &self,
621 _scope: &ExecutionScope,
622 _wait: AwaitEventWaitIdentity,
623 ) -> Result<AwaitEventKey, RuntimeError> {
624 Err(RuntimeError::new(
625 "await_event_unsupported",
626 "this effect controller does not support await-event keys",
627 ))
628 }
629
630 async fn resolve_await_event(
631 &self,
632 _key: &AwaitEventKey,
633 _resolution: Resolution,
634 ) -> Result<ResolveOutcome, RuntimeError> {
635 Ok(ResolveOutcome::UnknownOrRevoked)
636 }
637
638 async fn await_await_event(
639 &self,
640 _key: &AwaitEventKey,
641 _cancel: CancellationToken,
642 _deadline: Option<Instant>,
643 ) -> Result<Resolution, RuntimeError> {
644 Err(RuntimeError::new(
645 "await_event_unsupported",
646 "this effect controller does not support await-event waits",
647 ))
648 }
649
650 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
651 Ok(())
652 }
653}
654
655#[derive(Clone)]
658pub(crate) enum RuntimeEffectControllerHandle<'run> {
659 Borrowed(ScopedEffectController<'run>),
660 #[cfg(any(test, feature = "testing"))]
661 Shared {
662 controller: Arc<dyn RuntimeEffectController>,
663 scope: ExecutionScope,
664 },
665}
666
667impl<'run> RuntimeEffectControllerHandle<'run> {
668 pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
669 Self::Borrowed(scoped)
670 }
671
672 #[cfg(any(test, feature = "testing"))]
673 pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
674 Self::Shared {
675 controller,
676 scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
677 }
678 }
679
680 pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
681 match self {
682 Self::Borrowed(scoped) => scoped.controller(),
683 #[cfg(any(test, feature = "testing"))]
684 Self::Shared { controller, .. } => controller.as_ref(),
685 }
686 }
687
688 pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
689 match self {
690 Self::Borrowed(scoped) => scoped.clone(),
691 #[cfg(any(test, feature = "testing"))]
692 Self::Shared { controller, scope } => {
693 ScopedEffectController::shared(Arc::clone(controller), scope.clone())
694 .expect("runtime effect controller handle carries a valid scope")
695 }
696 }
697 }
698
699 pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
700 self.clone()
701 }
702}
703
704#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
705#[error("{code}: {message}")]
706pub struct RuntimeEffectControllerError {
707 pub code: String,
708 pub message: String,
709}
710
711impl RuntimeEffectControllerError {
712 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
713 Self {
714 code: code.into(),
715 message: message.into(),
716 }
717 }
718
719 pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
720 Self::new(
721 "runtime_effect_wrong_outcome",
722 format!(
723 "expected {} outcome, got {}",
724 expected.as_str(),
725 actual.as_str()
726 ),
727 )
728 }
729
730 pub(crate) fn into_runtime_error(self) -> RuntimeError {
731 RuntimeError::new(self.code, self.message)
732 }
733}
734
735impl From<RuntimeError> for RuntimeEffectControllerError {
736 fn from(err: RuntimeError) -> Self {
737 Self::new(err.code.as_str(), err.message)
738 }
739}
740
741impl From<PluginError> for RuntimeEffectControllerError {
742 fn from(err: PluginError) -> Self {
743 Self::new("plugin", err.to_string())
744 }
745}
746
747impl From<crate::StoreError> for RuntimeEffectControllerError {
748 fn from(err: crate::StoreError) -> Self {
749 Self::new("runtime_store", err.to_string())
750 }
751}
752
753#[async_trait::async_trait]
758pub(crate) trait ProcessRunner: Send + Sync {
759 async fn run_process(
760 &self,
761 registration: crate::ProcessRegistration,
762 execution_context: crate::ProcessExecutionContext,
763 registry: Arc<dyn ProcessRegistry>,
764 scoped_effect_controller: crate::ScopedEffectController<'_>,
765 cancellation: CancellationToken,
766 ) -> crate::ProcessAwaitOutput;
767}
768
769pub struct ProcessLocalExecution {
770 pub registry: Arc<dyn ProcessRegistry>,
771 pub process_work_driver: Option<crate::ProcessWorkDriver>,
772}
773
774pub(super) struct LocalTurnEffectRunner<'a, 'run> {
775 driver: &'a mut RuntimeTurnDriver<'run>,
776 machine: &'a mut crate::TurnMachine,
777 event_tx: mpsc::Sender<RuntimeStreamEvent>,
778 cancellation: CancellationToken,
779}
780
781pub(super) struct LocalDirectEffectRunner {
782 provider: ProviderHandle,
783 attachment_store: Arc<dyn AttachmentStore>,
784}
785
786struct LocalToolBatchEffectRunner<'run> {
787 context: crate::RuntimeExecutionContext<'run>,
788 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
789}
790
791#[async_trait::async_trait]
792trait RuntimeEffectLocalRunner: Send {
793 async fn execute(
794 self: Box<Self>,
795 envelope: RuntimeEffectEnvelope,
796 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
797}
798
799#[cfg(any(test, feature = "testing"))]
800type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
801 RuntimeEffectEnvelope,
802 ) -> Pin<
803 Box<
804 dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
805 + Send
806 + 'run,
807 >,
808 > + Send
809 + 'run;
810
811#[cfg(any(test, feature = "testing"))]
812struct TestingRuntimeEffectLocalRunner<'run> {
813 run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
814}
815
816type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
817 serde_json::Value,
818 ) -> Pin<
819 Box<
820 dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
821 + Send
822 + 'run,
823 >,
824 > + Send
825 + 'run;
826
827struct DurableStepLocalRunner<'run> {
828 run: Box<DurableStepLocalRunnerFn<'run>>,
829}
830
831enum RuntimeEffectLocalExecutorState<'run> {
832 Unavailable,
833 SleepOnly {
834 cancellation: CancellationToken,
835 clock: Arc<dyn crate::Clock>,
836 },
837 ExternalWaitOptions {
838 cancellation: CancellationToken,
839 deadline: Option<Instant>,
840 clock: Arc<dyn crate::Clock>,
841 },
842 Process(ProcessLocalExecution),
843 Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
844}
845
846pub struct RuntimeEffectLocalExecutor<'run> {
852 state: RuntimeEffectLocalExecutorState<'run>,
853}
854
855impl<'run> RuntimeEffectLocalExecutor<'run> {
856 pub fn unavailable() -> Self {
857 Self {
858 state: RuntimeEffectLocalExecutorState::Unavailable,
859 }
860 }
861
862 pub fn sleep(cancellation: CancellationToken) -> Self {
863 Self::sleep_with_clock(cancellation, Arc::new(crate::SystemClock))
864 }
865
866 pub fn sleep_with_clock(cancellation: CancellationToken, clock: Arc<dyn crate::Clock>) -> Self {
867 Self {
868 state: RuntimeEffectLocalExecutorState::SleepOnly {
869 cancellation,
870 clock,
871 },
872 }
873 }
874
875 pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
876 Self::await_event_with_clock(cancellation, deadline, Arc::new(crate::SystemClock))
877 }
878
879 pub fn await_event_with_clock(
880 cancellation: CancellationToken,
881 deadline: Option<Instant>,
882 clock: Arc<dyn crate::Clock>,
883 ) -> Self {
884 Self {
885 state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
886 cancellation,
887 deadline,
888 clock,
889 },
890 }
891 }
892
893 pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
894 Self::processes_with_driver(registry, None)
895 }
896
897 pub fn processes_with_driver(
898 registry: Arc<dyn ProcessRegistry>,
899 process_work_driver: Option<crate::ProcessWorkDriver>,
900 ) -> Self {
901 Self {
902 state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution {
903 registry,
904 process_work_driver,
905 }),
906 }
907 }
908
909 pub fn durable_step<F, Fut>(run: F) -> Self
910 where
911 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
912 Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
913 {
914 Self {
915 state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
916 run: Box::new(move |input| {
917 Box::pin(async move { run(input).await.map_err(Into::into) })
918 }),
919 })),
920 }
921 }
922
923 #[cfg(any(test, feature = "testing"))]
924 pub fn testing<F, Fut>(run: F) -> Self
925 where
926 F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
927 Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
928 + Send
929 + 'run,
930 {
931 Self {
932 state: RuntimeEffectLocalExecutorState::Runner(Box::new(
933 TestingRuntimeEffectLocalRunner {
934 run: Box::new(move |envelope| Box::pin(run(envelope))),
935 },
936 )),
937 }
938 }
939
940 pub(in crate::runtime) fn turn<'scope>(
941 driver: &'run mut RuntimeTurnDriver<'scope>,
942 machine: &'run mut crate::TurnMachine,
943 event_tx: mpsc::Sender<RuntimeStreamEvent>,
944 cancellation: CancellationToken,
945 ) -> Self
946 where
947 'scope: 'run,
948 {
949 Self {
950 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
951 driver,
952 machine,
953 event_tx,
954 cancellation,
955 })),
956 }
957 }
958
959 pub(in crate::runtime) fn direct(
960 provider: ProviderHandle,
961 attachment_store: Arc<dyn AttachmentStore>,
962 ) -> Self {
963 Self {
964 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
965 provider,
966 attachment_store,
967 })),
968 }
969 }
970
971 pub(crate) fn tool_batch(
972 context: crate::RuntimeExecutionContext<'run>,
973 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
974 ) -> Self {
975 Self {
976 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalToolBatchEffectRunner {
977 context,
978 child_trace_hooks,
979 })),
980 }
981 }
982
983 pub async fn execute(
984 self,
985 envelope: RuntimeEffectEnvelope,
986 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
987 match self.state {
988 RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
989 RuntimeEffectLocalExecutorState::SleepOnly {
990 cancellation,
991 clock,
992 } => execute_local_sleep(envelope, cancellation, clock.as_ref()).await,
993 RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
994 Err(RuntimeEffectControllerError::new(
995 "runtime_effect_local_executor_mismatch",
996 format!(
997 "local await-event options cannot execute {} command directly",
998 envelope.command.kind().as_str()
999 ),
1000 ))
1001 }
1002 RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
1003 "runtime_effect_local_executor_unavailable",
1004 format!(
1005 "no local executor is available for {}",
1006 envelope.command.kind().as_str()
1007 ),
1008 )),
1009 RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
1010 "runtime_effect_local_executor_mismatch",
1011 format!(
1012 "process executor cannot execute {} command directly",
1013 envelope.command.kind().as_str()
1014 ),
1015 )),
1016 }
1017 }
1018
1019 pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
1020 match self.state {
1021 RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
1022 _ => Err(RuntimeEffectControllerError::new(
1023 "runtime_effect_local_executor_unavailable",
1024 "no process executor is available for process command",
1025 )),
1026 }
1027 }
1028
1029 fn into_await_event_options(self) -> Result<AwaitEventOptions, RuntimeEffectControllerError> {
1030 match self.state {
1031 RuntimeEffectLocalExecutorState::ExternalWaitOptions {
1032 cancellation,
1033 deadline,
1034 clock,
1035 } => Ok((cancellation, deadline, clock)),
1036 _ => Ok((CancellationToken::new(), None, Arc::new(crate::SystemClock))),
1037 }
1038 }
1039}
1040
1041#[cfg(any(test, feature = "testing"))]
1042#[async_trait::async_trait]
1043impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
1044 async fn execute(
1045 self: Box<Self>,
1046 envelope: RuntimeEffectEnvelope,
1047 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1048 (self.run)(envelope).await
1049 }
1050}
1051
1052#[async_trait::async_trait]
1053impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
1054 async fn execute(
1055 self: Box<Self>,
1056 envelope: RuntimeEffectEnvelope,
1057 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1058 match envelope.command {
1059 RuntimeEffectCommand::DurableStep { input, .. } => {
1060 let value = (self.run)(input).await?;
1061 Ok(RuntimeEffectOutcome::DurableStep { value })
1062 }
1063 command => Err(RuntimeEffectControllerError::new(
1064 "runtime_effect_local_executor_mismatch",
1065 format!(
1066 "local durable step executor cannot execute {} command",
1067 command.kind().as_str()
1068 ),
1069 )),
1070 }
1071 }
1072}
1073
1074#[async_trait::async_trait]
1075impl RuntimeEffectLocalRunner for LocalToolBatchEffectRunner<'_> {
1076 async fn execute(
1077 self: Box<Self>,
1078 envelope: RuntimeEffectEnvelope,
1079 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1080 match envelope.command {
1081 RuntimeEffectCommand::ToolBatch { batch } => {
1082 let outcome = self
1083 .context
1084 .execute_prepared_tool_batch_launches(
1085 batch,
1086 envelope.invocation,
1087 self.child_trace_hooks,
1088 )
1089 .await?;
1090 Ok(RuntimeEffectOutcome::ToolBatch {
1091 launches: outcome.launches,
1092 triggers: outcome.triggers,
1093 })
1094 }
1095 RuntimeEffectCommand::ToolAttempt {
1096 call,
1097 execution_grant,
1098 attempt,
1099 max_attempts,
1100 } => {
1101 let child_execution_trace_hook = self.child_trace_hooks.get(&call.call_id).cloned();
1102 let outcome = self
1103 .context
1104 .execute_prepared_tool_attempt_effect(
1105 call,
1106 execution_grant,
1107 attempt,
1108 max_attempts,
1109 envelope.invocation,
1110 child_execution_trace_hook,
1111 )
1112 .await?;
1113 Ok(RuntimeEffectOutcome::ToolAttempt {
1114 launch: outcome.launch,
1115 triggers: outcome.triggers,
1116 })
1117 }
1118 command => Err(RuntimeEffectControllerError::new(
1119 "runtime_effect_local_executor_mismatch",
1120 format!(
1121 "local tool executor cannot execute {} command",
1122 command.kind().as_str()
1123 ),
1124 )),
1125 }
1126 }
1127}
1128
1129#[async_trait::async_trait]
1130impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
1131 async fn execute(
1132 self: Box<Self>,
1133 envelope: RuntimeEffectEnvelope,
1134 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1135 let runner = *self;
1136 match envelope.command {
1137 RuntimeEffectCommand::LlmCall { request } => {
1138 let protocol_iteration = runner.machine.protocol_iteration();
1139 let (result, text_streamed) = runner
1140 .driver
1141 .run_llm_call(
1142 Arc::new((*request).into_request(None, None)),
1143 protocol_iteration,
1144 envelope.invocation,
1145 &runner.event_tx,
1146 &runner.cancellation,
1147 )
1148 .await;
1149 Ok(RuntimeEffectOutcome::LlmCall {
1150 result,
1151 text_streamed,
1152 })
1153 }
1154 RuntimeEffectCommand::ToolBatch { batch } => {
1155 let outcome = runner
1156 .driver
1157 .run_tool_batch(
1158 batch,
1159 envelope.invocation,
1160 &runner.event_tx,
1161 &runner.cancellation,
1162 )
1163 .await?;
1164 Ok(RuntimeEffectOutcome::ToolBatch {
1165 launches: outcome.launches,
1166 triggers: outcome.triggers,
1167 })
1168 }
1169 RuntimeEffectCommand::ExecCode { language, code } => {
1170 let protocol_iteration = runner.machine.protocol_iteration();
1171 let messages = runner.machine.message_sequence();
1172 Ok(RuntimeEffectOutcome::ExecCode {
1173 result: runner
1174 .driver
1175 .run_exec_code(
1176 language,
1177 &code,
1178 messages,
1179 protocol_iteration,
1180 envelope.invocation,
1181 &runner.event_tx,
1182 )
1183 .await,
1184 })
1185 }
1186 RuntimeEffectCommand::Checkpoint { checkpoint } => {
1187 Ok(RuntimeEffectOutcome::Checkpoint {
1188 result: runner
1189 .driver
1190 .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1191 .await
1192 .map_err(RuntimeEffectControllerError::from),
1193 })
1194 }
1195 RuntimeEffectCommand::SyncExecutionEnvironment {
1196 update_machine_config,
1197 } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1198 result: runner
1199 .driver
1200 .refresh_execution_environment(runner.machine, update_machine_config)
1201 .await
1202 .map_err(|err| err.to_string()),
1203 }),
1204 RuntimeEffectCommand::Sleep { duration_ms } => {
1205 sleep_with_cancellation(
1206 duration_ms,
1207 &runner.cancellation,
1208 runner.driver.host.core.clock.as_ref(),
1209 )
1210 .await?;
1211 Ok(RuntimeEffectOutcome::Sleep)
1212 }
1213 command => Err(RuntimeEffectControllerError::new(
1214 "runtime_effect_local_executor_mismatch",
1215 format!(
1216 "local turn executor cannot execute {} command",
1217 command.kind().as_str()
1218 ),
1219 )),
1220 }
1221 }
1222}
1223
1224#[async_trait::async_trait]
1225impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1226 async fn execute(
1227 mut self: Box<Self>,
1228 envelope: RuntimeEffectEnvelope,
1229 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1230 match envelope.command {
1231 RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1232 result: self
1233 .run_direct_llm_request((*request).into_request(
1234 crate::session_model::transport_stream_events(&self.provider, None),
1235 None,
1236 ))
1237 .await,
1238 }),
1239 RuntimeEffectCommand::Sleep { duration_ms } => {
1240 sleep_with_cancellation(
1241 duration_ms,
1242 &CancellationToken::new(),
1243 &crate::SystemClock,
1244 )
1245 .await?;
1246 Ok(RuntimeEffectOutcome::Sleep)
1247 }
1248 command => Err(RuntimeEffectControllerError::new(
1249 "runtime_effect_local_executor_mismatch",
1250 format!(
1251 "local direct executor cannot execute {} command",
1252 command.kind().as_str()
1253 ),
1254 )),
1255 }
1256 }
1257}
1258
1259impl LocalDirectEffectRunner {
1260 async fn run_direct_llm_request(
1261 &mut self,
1262 request: CoreLlmRequest,
1263 ) -> Result<LlmResponse, LlmCallError> {
1264 let request = crate::attachments::resolve_llm_request_attachments(
1265 request,
1266 self.attachment_store.as_ref(),
1267 )
1268 .await
1269 .map_err(|err| LlmCallError {
1270 message: err.to_string(),
1271 retryable: false,
1272 raw: None,
1273 code: Some("attachment_resolution_failed".to_string()),
1274 terminal_reason: crate::LlmTerminalReason::ProviderError,
1275 request_body: None,
1276 })?;
1277 self.provider
1278 .complete(request)
1279 .await
1280 .map_err(llm_call_error_from_transport)
1281 }
1282}
1283
1284async fn execute_local_sleep(
1285 envelope: RuntimeEffectEnvelope,
1286 cancellation: CancellationToken,
1287 clock: &dyn crate::Clock,
1288) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1289 match envelope.command {
1290 RuntimeEffectCommand::Sleep { duration_ms } => {
1291 sleep_with_cancellation(duration_ms, &cancellation, clock).await?;
1292 Ok(RuntimeEffectOutcome::Sleep)
1293 }
1294 command => Err(RuntimeEffectControllerError::new(
1295 "runtime_effect_local_executor_mismatch",
1296 format!(
1297 "local sleep executor cannot execute {} command",
1298 command.kind().as_str()
1299 ),
1300 )),
1301 }
1302}
1303
1304async fn sleep_with_cancellation(
1305 duration_ms: u64,
1306 cancellation: &CancellationToken,
1307 clock: &dyn crate::Clock,
1308) -> Result<(), RuntimeEffectControllerError> {
1309 let sleep = clock.sleep(std::time::Duration::from_millis(duration_ms));
1310 tokio::pin!(sleep);
1311 tokio::select! {
1312 _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1313 "runtime_effect_sleep_cancelled",
1314 "runtime effect sleep was cancelled",
1315 )),
1316 _ = &mut sleep => Ok(()),
1317 }
1318}
1319
1320#[derive(Clone, Default)]
1331pub struct InlineRuntimeEffectController;
1332
1333#[async_trait::async_trait]
1334impl RuntimeEffectController for InlineRuntimeEffectController {
1335 fn supports_durable_effects(&self) -> bool {
1336 true
1337 }
1338
1339 async fn execute_effect(
1340 &self,
1341 envelope: RuntimeEffectEnvelope,
1342 local_executor: RuntimeEffectLocalExecutor<'_>,
1343 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1344 match envelope.command {
1345 RuntimeEffectCommand::AwaitEvent { key } => {
1346 let (cancellation, deadline, clock) = local_executor.into_await_event_options()?;
1347 let resolution = inline_await_events()
1348 .await_resolution(&key, cancellation, deadline, clock.as_ref())
1349 .await
1350 .map_err(RuntimeEffectControllerError::from)?;
1351 Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1352 }
1353 RuntimeEffectCommand::Process { command } => {
1354 let execution = local_executor.into_process()?;
1355 let registry = execution.registry;
1356 let process_work_driver = execution.process_work_driver;
1357 let result = tokio::task::spawn(async move {
1358 Self::execute_process_command(registry, process_work_driver, *command).await
1359 })
1360 .await
1361 .map_err(|err| {
1362 RuntimeEffectControllerError::new(
1363 "runtime_effect_process_task_join",
1364 format!("inline process effect task failed: {err}"),
1365 )
1366 })??;
1367 Ok(RuntimeEffectOutcome::Process { result })
1368 }
1369 _ => local_executor.execute(envelope).await,
1370 }
1371 }
1372
1373 async fn await_event_key(
1374 &self,
1375 scope: &ExecutionScope,
1376 wait: AwaitEventWaitIdentity,
1377 ) -> Result<AwaitEventKey, RuntimeError> {
1378 inline_await_events().key_for(scope, wait)
1379 }
1380
1381 async fn resolve_await_event(
1382 &self,
1383 key: &AwaitEventKey,
1384 resolution: Resolution,
1385 ) -> Result<ResolveOutcome, RuntimeError> {
1386 inline_await_events().resolve(key, resolution)
1387 }
1388
1389 async fn await_await_event(
1390 &self,
1391 key: &AwaitEventKey,
1392 cancel: CancellationToken,
1393 deadline: Option<Instant>,
1394 ) -> Result<Resolution, RuntimeError> {
1395 inline_await_events()
1396 .await_resolution(key, cancel, deadline, &crate::SystemClock)
1397 .await
1398 }
1399
1400 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1401 inline_await_events().revoke_session(session_id)
1402 }
1403}
1404
1405#[derive(Clone)]
1407pub struct InlineEffectHost {
1408 controller: Arc<dyn RuntimeEffectController>,
1409}
1410
1411impl InlineEffectHost {
1412 pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1413 Self { controller }
1414 }
1415}
1416
1417impl Default for InlineEffectHost {
1418 fn default() -> Self {
1419 Self::new(Arc::new(InlineRuntimeEffectController))
1420 }
1421}
1422
1423#[async_trait::async_trait]
1424impl EffectHost for InlineEffectHost {
1425 fn durability_tier(&self) -> crate::DurabilityTier {
1426 self.controller.durability_tier()
1427 }
1428
1429 fn requires_durable_attachment_store(&self) -> bool {
1430 self.controller.requires_durable_attachment_store()
1431 }
1432
1433 fn supports_durable_effects(&self) -> bool {
1434 self.controller.supports_durable_effects()
1435 }
1436
1437 fn scoped<'run>(
1438 &'run self,
1439 scope: ExecutionScope,
1440 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1441 ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1442 }
1443
1444 fn scoped_static(
1445 &self,
1446 scope: ExecutionScope,
1447 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1448 Ok(Some(ScopedEffectController::shared(
1449 Arc::clone(&self.controller),
1450 scope,
1451 )?))
1452 }
1453
1454 async fn await_event_key(
1455 &self,
1456 scope: &ExecutionScope,
1457 wait: AwaitEventWaitIdentity,
1458 ) -> Result<AwaitEventKey, RuntimeError> {
1459 self.controller.await_event_key(scope, wait).await
1460 }
1461
1462 async fn resolve_await_event(
1463 &self,
1464 key: &AwaitEventKey,
1465 resolution: Resolution,
1466 ) -> Result<ResolveOutcome, RuntimeError> {
1467 self.controller.resolve_await_event(key, resolution).await
1468 }
1469
1470 async fn await_await_event(
1471 &self,
1472 key: &AwaitEventKey,
1473 cancel: CancellationToken,
1474 deadline: Option<Instant>,
1475 ) -> Result<Resolution, RuntimeError> {
1476 self.controller
1477 .await_await_event(key, cancel, deadline)
1478 .await
1479 }
1480
1481 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1482 self.controller
1483 .revoke_await_events_for_session(session_id)
1484 .await
1485 }
1486}
1487
1488impl InlineRuntimeEffectController {
1489 pub(crate) async fn start_process(
1497 registry: Arc<dyn crate::ProcessRegistry>,
1498 registration: crate::ProcessRegistration,
1499 grant: Option<crate::ProcessStartGrant>,
1500 ) -> Result<ProcessRecord, PluginError> {
1501 let registration_for_record = registration.clone();
1502 let record = registry.register_process(registration_for_record).await?;
1503 if let Some(grant) = grant {
1504 registry
1505 .grant_handle(&grant.session_scope, ®istration.id, grant.descriptor)
1506 .await?;
1507 }
1508 Ok(record)
1509 }
1510
1511 pub(crate) async fn request_process_cancel(
1512 &self,
1513 registry: Arc<dyn crate::ProcessRegistry>,
1514 process_id: &str,
1515 reason: Option<String>,
1516 ) -> Result<ProcessRecord, PluginError> {
1517 registry
1521 .append_event(
1522 process_id,
1523 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1524 )
1525 .await?;
1526 registry
1527 .get_process(process_id)
1528 .await
1529 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1530 }
1531
1532 async fn execute_process_command(
1533 registry: Arc<dyn crate::ProcessRegistry>,
1534 process_work_driver: Option<crate::ProcessWorkDriver>,
1535 command: ProcessCommand,
1536 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1537 match command {
1538 ProcessCommand::Start {
1539 registration,
1540 grant,
1541 execution_context: _,
1542 } => {
1543 let record = Self::start_process(registry, registration, grant).await?;
1544 if let Some(driver) = process_work_driver.as_ref() {
1545 driver.claim_and_run_pending("process_start").await?;
1546 }
1547 Ok(ProcessEffectOutcome::Start { record })
1548 }
1549 ProcessCommand::List {
1550 session_scope,
1551 mode,
1552 } => {
1553 let entries = match mode {
1554 crate::ProcessListMode::Live => {
1555 registry.list_live_handle_grants(&session_scope).await?
1556 }
1557 crate::ProcessListMode::All => {
1558 registry.list_handle_grants(&session_scope).await?
1559 }
1560 };
1561 Ok(ProcessEffectOutcome::List { entries })
1562 }
1563 ProcessCommand::Transfer {
1564 from_scope,
1565 to_scope,
1566 process_ids,
1567 } => {
1568 registry
1569 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1570 .await?;
1571 Ok(ProcessEffectOutcome::Transfer)
1572 }
1573 ProcessCommand::DeleteSession { session_id } => {
1574 let report = registry.delete_session_process_state(&session_id).await?;
1575 Ok(ProcessEffectOutcome::DeleteSession { report })
1576 }
1577 ProcessCommand::Await { process_id } => {
1578 let output = registry.await_process(&process_id).await?;
1579 Ok(ProcessEffectOutcome::Await { output })
1580 }
1581 ProcessCommand::Cancel { process_id, reason } => {
1582 let record = InlineRuntimeEffectController
1583 .request_process_cancel(registry, &process_id, reason)
1584 .await?;
1585 Ok(ProcessEffectOutcome::Cancel { record })
1586 }
1587 ProcessCommand::Signal {
1588 process_id,
1589 request,
1590 ..
1591 } => {
1592 let result = registry.append_event(&process_id, request).await?;
1593 Ok(ProcessEffectOutcome::Signal {
1594 event: result.event,
1595 })
1596 }
1597 }
1598 }
1599}
1600
1601impl std::fmt::Debug for InlineRuntimeEffectController {
1602 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1603 f.debug_struct("InlineRuntimeEffectController").finish()
1604 }
1605}