1#[cfg(any(test, feature = "testing"))]
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7use tokio_util::sync::CancellationToken;
8
9use crate::AttachmentStore;
10use crate::LlmRequest as CoreLlmRequest;
11use crate::LlmResponse;
12use crate::ProcessRecord;
13use crate::ProcessRegistry;
14use crate::provider::ProviderHandle;
15use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
16use crate::sansio::LlmCallError;
17use crate::{PluginError, RuntimeError, RuntimeErrorCode};
18
19use super::envelope::{
20 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
21 RuntimeEffectKind, RuntimeEffectOutcome,
22};
23use super::outcome::llm_call_error_from_transport;
24
25#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(tag = "type", rename_all = "snake_case")]
36pub enum EffectScope {
37 Turn {
38 session_id: String,
39 turn_id: String,
40 },
41 Process {
42 process_id: String,
43 },
44 QueueDrain {
45 session_id: String,
46 drain_id: String,
47 },
48 SessionDelete {
49 session_id: String,
50 },
51 RuntimeOperation {
52 operation_id: String,
53 },
54}
55
56impl EffectScope {
57 pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
58 Self::Turn {
59 session_id: session_id.into(),
60 turn_id: turn_id.into(),
61 }
62 }
63
64 pub fn process(process_id: impl Into<String>) -> Self {
65 Self::Process {
66 process_id: process_id.into(),
67 }
68 }
69
70 pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
71 Self::QueueDrain {
72 session_id: session_id.into(),
73 drain_id: drain_id.into(),
74 }
75 }
76
77 pub fn session_delete(session_id: impl Into<String>) -> Self {
78 Self::SessionDelete {
79 session_id: session_id.into(),
80 }
81 }
82
83 pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
84 Self::RuntimeOperation {
85 operation_id: operation_id.into(),
86 }
87 }
88
89 pub fn id(&self) -> &str {
90 match self {
91 Self::Turn { turn_id, .. } => turn_id,
92 Self::Process { process_id } => process_id,
93 Self::QueueDrain { drain_id, .. } => drain_id,
94 Self::SessionDelete { session_id } => session_id,
95 Self::RuntimeOperation { operation_id } => operation_id,
96 }
97 }
98
99 pub fn session_id(&self) -> Option<&str> {
100 match self {
101 Self::Turn { session_id, .. }
102 | Self::QueueDrain { session_id, .. }
103 | Self::SessionDelete { session_id } => Some(session_id),
104 Self::Process { .. } | Self::RuntimeOperation { .. } => None,
105 }
106 }
107
108 pub fn turn_id(&self) -> Option<&str> {
109 match self {
110 Self::Turn { turn_id, .. } => Some(turn_id),
111 _ => None,
112 }
113 }
114
115 pub fn validates_turn_trace_id(&self) -> bool {
116 matches!(self, Self::Turn { .. })
117 }
118
119 fn validate(&self) -> Result<(), RuntimeError> {
120 let missing = match self {
121 Self::Turn {
122 session_id,
123 turn_id,
124 } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
125 Self::Process { process_id } => process_id.trim().is_empty(),
126 Self::QueueDrain {
127 session_id,
128 drain_id,
129 } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
130 Self::SessionDelete { session_id } => session_id.trim().is_empty(),
131 Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
132 };
133 if missing {
134 return Err(RuntimeError::new(
135 RuntimeErrorCode::MissingEffectScopeId,
136 "effect scopes require non-empty stable ids",
137 ));
138 }
139 Ok(())
140 }
141}
142
143enum ScopedEffectControllerInner<'run> {
144 Borrowed(&'run dyn RuntimeEffectController),
145 Shared(Arc<dyn RuntimeEffectController>),
146}
147
148impl Clone for ScopedEffectControllerInner<'_> {
149 fn clone(&self) -> Self {
150 match self {
151 Self::Borrowed(controller) => Self::Borrowed(*controller),
152 Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
153 }
154 }
155}
156
157#[derive(Clone)]
159pub struct ScopedEffectController<'run> {
160 controller: ScopedEffectControllerInner<'run>,
161 scope: EffectScope,
162}
163
164impl<'run> ScopedEffectController<'run> {
165 pub fn borrowed(
166 controller: &'run dyn RuntimeEffectController,
167 scope: EffectScope,
168 ) -> Result<Self, RuntimeError> {
169 scope.validate()?;
170 Ok(Self {
171 controller: ScopedEffectControllerInner::Borrowed(controller),
172 scope,
173 })
174 }
175
176 pub fn shared(
177 controller: Arc<dyn RuntimeEffectController>,
178 scope: EffectScope,
179 ) -> Result<Self, RuntimeError> {
180 scope.validate()?;
181 Ok(Self {
182 controller: ScopedEffectControllerInner::Shared(controller),
183 scope,
184 })
185 }
186
187 pub fn controller(&self) -> &dyn RuntimeEffectController {
188 match &self.controller {
189 ScopedEffectControllerInner::Borrowed(controller) => *controller,
190 ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
191 }
192 }
193
194 pub fn effect_scope(&self) -> &EffectScope {
195 &self.scope
196 }
197
198 pub fn scope_id(&self) -> &str {
199 self.scope.id()
200 }
201
202 pub fn turn_id(&self) -> Option<&str> {
203 self.scope.turn_id()
204 }
205}
206
207#[async_trait::async_trait]
209pub trait EffectHost: Send + Sync {
210 fn durability_tier(&self) -> crate::DurabilityTier {
211 crate::DurabilityTier::Inline
212 }
213
214 fn requires_durable_attachment_store(&self) -> bool {
215 false
216 }
217
218 fn scoped<'run>(
219 &'run self,
220 scope: EffectScope,
221 ) -> Result<ScopedEffectController<'run>, RuntimeError>;
222
223 fn scoped_static(
224 &self,
225 _scope: EffectScope,
226 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
227 Ok(None)
228 }
229}
230
231#[async_trait::async_trait]
233pub trait RuntimeEffectController: Send + Sync {
234 fn durability_tier(&self) -> crate::DurabilityTier {
237 crate::DurabilityTier::Inline
238 }
239
240 fn requires_durable_attachment_store(&self) -> bool {
241 false
242 }
243
244 async fn execute_effect(
245 &self,
246 envelope: RuntimeEffectEnvelope,
247 local_executor: RuntimeEffectLocalExecutor<'_>,
248 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
249}
250
251#[derive(Clone)]
254pub(crate) enum RuntimeEffectControllerHandle<'run> {
255 Borrowed(ScopedEffectController<'run>),
256 #[cfg(any(test, feature = "testing"))]
257 Shared {
258 controller: Arc<dyn RuntimeEffectController>,
259 scope: EffectScope,
260 },
261}
262
263impl<'run> RuntimeEffectControllerHandle<'run> {
264 pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
265 Self::Borrowed(scoped)
266 }
267
268 #[cfg(any(test, feature = "testing"))]
269 pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
270 Self::Shared {
271 controller,
272 scope: EffectScope::runtime_operation("test-runtime-effect-controller"),
273 }
274 }
275
276 pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
277 match self {
278 Self::Borrowed(scoped) => scoped.controller(),
279 #[cfg(any(test, feature = "testing"))]
280 Self::Shared { controller, .. } => controller.as_ref(),
281 }
282 }
283
284 pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
285 match self {
286 Self::Borrowed(scoped) => scoped.clone(),
287 #[cfg(any(test, feature = "testing"))]
288 Self::Shared { controller, scope } => {
289 ScopedEffectController::shared(Arc::clone(controller), scope.clone())
290 .expect("runtime effect controller handle carries a valid scope")
291 }
292 }
293 }
294
295 pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
296 self.clone()
297 }
298}
299
300#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
301#[error("{code}: {message}")]
302pub struct RuntimeEffectControllerError {
303 pub code: String,
304 pub message: String,
305}
306
307impl RuntimeEffectControllerError {
308 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
309 Self {
310 code: code.into(),
311 message: message.into(),
312 }
313 }
314
315 pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
316 Self::new(
317 "runtime_effect_wrong_outcome",
318 format!(
319 "expected {} outcome, got {}",
320 expected.as_str(),
321 actual.as_str()
322 ),
323 )
324 }
325
326 pub(crate) fn into_runtime_error(self) -> RuntimeError {
327 RuntimeError::new(self.code, self.message)
328 }
329}
330
331impl From<RuntimeError> for RuntimeEffectControllerError {
332 fn from(err: RuntimeError) -> Self {
333 Self::new(err.code.as_str(), err.message)
334 }
335}
336
337impl From<PluginError> for RuntimeEffectControllerError {
338 fn from(err: PluginError) -> Self {
339 Self::new("plugin", err.to_string())
340 }
341}
342
343impl From<crate::StoreError> for RuntimeEffectControllerError {
344 fn from(err: crate::StoreError) -> Self {
345 Self::new("runtime_store", err.to_string())
346 }
347}
348
349#[async_trait::async_trait]
354pub(crate) trait ProcessRunner: Send + Sync {
355 async fn run_process(
356 &self,
357 registration: crate::ProcessRegistration,
358 execution_context: crate::ProcessExecutionContext,
359 registry: Arc<dyn ProcessRegistry>,
360 scoped_effect_controller: crate::ScopedEffectController<'_>,
361 cancellation: CancellationToken,
362 ) -> crate::ProcessAwaitOutput;
363}
364
365pub struct ProcessLocalExecution {
366 pub registry: Arc<dyn ProcessRegistry>,
367}
368
369pub(super) struct LocalTurnEffectRunner<'a, 'run> {
370 driver: &'a mut RuntimeTurnDriver<'run>,
371 machine: &'a mut crate::TurnMachine,
372 event_tx: mpsc::Sender<RuntimeStreamEvent>,
373 cancellation: CancellationToken,
374}
375
376pub(super) struct LocalDirectEffectRunner {
377 provider: ProviderHandle,
378 attachment_store: Arc<dyn AttachmentStore>,
379}
380
381#[async_trait::async_trait]
382trait RuntimeEffectLocalRunner: Send {
383 async fn execute(
384 self: Box<Self>,
385 envelope: RuntimeEffectEnvelope,
386 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
387}
388
389#[cfg(any(test, feature = "testing"))]
390type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
391 RuntimeEffectEnvelope,
392 ) -> Pin<
393 Box<
394 dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
395 + Send
396 + 'run,
397 >,
398 > + Send
399 + 'run;
400
401#[cfg(any(test, feature = "testing"))]
402struct TestingRuntimeEffectLocalRunner<'run> {
403 run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
404}
405
406enum RuntimeEffectLocalExecutorState<'run> {
407 Unavailable,
408 SleepOnly { cancellation: CancellationToken },
409 Process(ProcessLocalExecution),
410 Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
411}
412
413pub struct RuntimeEffectLocalExecutor<'run> {
419 state: RuntimeEffectLocalExecutorState<'run>,
420}
421
422impl<'run> RuntimeEffectLocalExecutor<'run> {
423 pub fn unavailable() -> Self {
424 Self {
425 state: RuntimeEffectLocalExecutorState::Unavailable,
426 }
427 }
428
429 pub fn sleep(cancellation: CancellationToken) -> Self {
430 Self {
431 state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
432 }
433 }
434
435 pub fn process_control(registry: Arc<dyn ProcessRegistry>) -> Self {
436 Self {
437 state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
438 }
439 }
440
441 #[cfg(any(test, feature = "testing"))]
442 pub fn testing<F, Fut>(run: F) -> Self
443 where
444 F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
445 Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
446 + Send
447 + 'run,
448 {
449 Self {
450 state: RuntimeEffectLocalExecutorState::Runner(Box::new(
451 TestingRuntimeEffectLocalRunner {
452 run: Box::new(move |envelope| Box::pin(run(envelope))),
453 },
454 )),
455 }
456 }
457
458 pub(in crate::runtime) fn turn<'scope>(
459 driver: &'run mut RuntimeTurnDriver<'scope>,
460 machine: &'run mut crate::TurnMachine,
461 event_tx: mpsc::Sender<RuntimeStreamEvent>,
462 cancellation: CancellationToken,
463 ) -> Self
464 where
465 'scope: 'run,
466 {
467 Self {
468 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
469 driver,
470 machine,
471 event_tx,
472 cancellation,
473 })),
474 }
475 }
476
477 pub(in crate::runtime) fn direct(
478 provider: ProviderHandle,
479 attachment_store: Arc<dyn AttachmentStore>,
480 ) -> Self {
481 Self {
482 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
483 provider,
484 attachment_store,
485 })),
486 }
487 }
488
489 pub async fn execute(
490 self,
491 envelope: RuntimeEffectEnvelope,
492 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
493 match self.state {
494 RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
495 RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
496 execute_local_sleep(envelope, cancellation).await
497 }
498 RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
499 "runtime_effect_local_executor_unavailable",
500 format!(
501 "no local executor is available for {}",
502 envelope.command.kind().as_str()
503 ),
504 )),
505 RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
506 "runtime_effect_local_executor_mismatch",
507 format!(
508 "process executor cannot execute {} command directly",
509 envelope.command.kind().as_str()
510 ),
511 )),
512 }
513 }
514
515 pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
516 match self.state {
517 RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
518 _ => Err(RuntimeEffectControllerError::new(
519 "runtime_effect_local_executor_unavailable",
520 "no process executor is available for process command",
521 )),
522 }
523 }
524}
525
526#[cfg(any(test, feature = "testing"))]
527#[async_trait::async_trait]
528impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
529 async fn execute(
530 self: Box<Self>,
531 envelope: RuntimeEffectEnvelope,
532 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
533 (self.run)(envelope).await
534 }
535}
536
537#[async_trait::async_trait]
538impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
539 async fn execute(
540 self: Box<Self>,
541 envelope: RuntimeEffectEnvelope,
542 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
543 let runner = *self;
544 match envelope.command {
545 RuntimeEffectCommand::LlmCall { request } => {
546 let protocol_iteration = runner.machine.protocol_iteration();
547 let (result, text_streamed) = runner
548 .driver
549 .run_llm_call(
550 Arc::new((*request).into_request(None, None)),
551 protocol_iteration,
552 envelope.invocation,
553 &runner.event_tx,
554 &runner.cancellation,
555 )
556 .await;
557 Ok(RuntimeEffectOutcome::LlmCall {
558 result,
559 text_streamed,
560 })
561 }
562 RuntimeEffectCommand::ToolCall { call } => {
563 let tool_name = call.tool_name.clone();
564 let mut outcome = runner
565 .driver
566 .run_tool_calls(
567 vec![(call, envelope.invocation)],
568 &runner.event_tx,
569 &runner.cancellation,
570 )
571 .await?;
572 let result = outcome.completed.pop().ok_or_else(|| {
573 RuntimeEffectControllerError::new(
574 "tool_result_missing",
575 format!("tool `{tool_name}` completed without a result"),
576 )
577 })?;
578 Ok(RuntimeEffectOutcome::ToolCall {
579 result,
580 host_events: outcome.host_events,
581 })
582 }
583 RuntimeEffectCommand::ExecCode { code } => {
584 let protocol_iteration = runner.machine.protocol_iteration();
585 let messages = runner.machine.message_sequence();
586 Ok(RuntimeEffectOutcome::ExecCode {
587 result: runner
588 .driver
589 .run_exec_code(
590 &code,
591 messages,
592 protocol_iteration,
593 envelope.invocation,
594 &runner.event_tx,
595 )
596 .await,
597 })
598 }
599 RuntimeEffectCommand::Checkpoint { checkpoint } => {
600 Ok(RuntimeEffectOutcome::Checkpoint {
601 result: runner
602 .driver
603 .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
604 .await
605 .map_err(RuntimeEffectControllerError::from),
606 })
607 }
608 RuntimeEffectCommand::SyncExecutionSurface {
609 update_machine_config,
610 } => Ok(RuntimeEffectOutcome::SyncExecutionSurface {
611 result: runner
612 .driver
613 .refresh_execution_surface(runner.machine, update_machine_config)
614 .await
615 .map_err(|err| err.to_string()),
616 }),
617 RuntimeEffectCommand::Sleep { duration_ms } => {
618 sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
619 Ok(RuntimeEffectOutcome::Sleep)
620 }
621 command => Err(RuntimeEffectControllerError::new(
622 "runtime_effect_local_executor_mismatch",
623 format!(
624 "local turn executor cannot execute {} command",
625 command.kind().as_str()
626 ),
627 )),
628 }
629 }
630}
631
632#[async_trait::async_trait]
633impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
634 async fn execute(
635 mut self: Box<Self>,
636 envelope: RuntimeEffectEnvelope,
637 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
638 match envelope.command {
639 RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
640 result: self
641 .run_direct_llm_request((*request).into_request(
642 crate::session_model::transport_stream_events(&self.provider, None),
643 None,
644 ))
645 .await,
646 }),
647 RuntimeEffectCommand::Sleep { duration_ms } => {
648 sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
649 Ok(RuntimeEffectOutcome::Sleep)
650 }
651 command => Err(RuntimeEffectControllerError::new(
652 "runtime_effect_local_executor_mismatch",
653 format!(
654 "local direct executor cannot execute {} command",
655 command.kind().as_str()
656 ),
657 )),
658 }
659 }
660}
661
662impl LocalDirectEffectRunner {
663 async fn run_direct_llm_request(
664 &mut self,
665 request: CoreLlmRequest,
666 ) -> Result<LlmResponse, LlmCallError> {
667 let request = crate::attachments::resolve_llm_request_attachments(
668 request,
669 self.attachment_store.as_ref(),
670 )
671 .map_err(|err| LlmCallError {
672 message: err.to_string(),
673 retryable: false,
674 raw: None,
675 code: Some("attachment_resolution_failed".to_string()),
676 terminal_reason: crate::LlmTerminalReason::ProviderError,
677 request_body: None,
678 })?;
679 self.provider
680 .complete(request)
681 .await
682 .map_err(llm_call_error_from_transport)
683 }
684}
685
686async fn execute_local_sleep(
687 envelope: RuntimeEffectEnvelope,
688 cancellation: CancellationToken,
689) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
690 match envelope.command {
691 RuntimeEffectCommand::Sleep { duration_ms } => {
692 sleep_with_cancellation(duration_ms, &cancellation).await?;
693 Ok(RuntimeEffectOutcome::Sleep)
694 }
695 command => Err(RuntimeEffectControllerError::new(
696 "runtime_effect_local_executor_mismatch",
697 format!(
698 "local sleep executor cannot execute {} command",
699 command.kind().as_str()
700 ),
701 )),
702 }
703}
704
705async fn sleep_with_cancellation(
706 duration_ms: u64,
707 cancellation: &CancellationToken,
708) -> Result<(), RuntimeEffectControllerError> {
709 let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
710 tokio::pin!(sleep);
711 tokio::select! {
712 _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
713 "runtime_effect_sleep_cancelled",
714 "runtime effect sleep was cancelled",
715 )),
716 _ = &mut sleep => Ok(()),
717 }
718}
719
720#[derive(Clone, Default)]
730pub struct InlineRuntimeEffectController;
731
732#[async_trait::async_trait]
733impl RuntimeEffectController for InlineRuntimeEffectController {
734 async fn execute_effect(
735 &self,
736 envelope: RuntimeEffectEnvelope,
737 local_executor: RuntimeEffectLocalExecutor<'_>,
738 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
739 match envelope.command {
740 RuntimeEffectCommand::Process { command } => {
741 let result = self
742 .execute_process_command(command, local_executor)
743 .await?;
744 Ok(RuntimeEffectOutcome::Process { result })
745 }
746 _ => local_executor.execute(envelope).await,
747 }
748 }
749}
750
751#[derive(Clone)]
753pub struct InlineEffectHost {
754 controller: Arc<dyn RuntimeEffectController>,
755}
756
757impl InlineEffectHost {
758 pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
759 Self { controller }
760 }
761}
762
763impl Default for InlineEffectHost {
764 fn default() -> Self {
765 Self::new(Arc::new(InlineRuntimeEffectController))
766 }
767}
768
769impl EffectHost for InlineEffectHost {
770 fn durability_tier(&self) -> crate::DurabilityTier {
771 self.controller.durability_tier()
772 }
773
774 fn requires_durable_attachment_store(&self) -> bool {
775 self.controller.requires_durable_attachment_store()
776 }
777
778 fn scoped<'run>(
779 &'run self,
780 scope: EffectScope,
781 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
782 ScopedEffectController::shared(Arc::clone(&self.controller), scope)
783 }
784
785 fn scoped_static(
786 &self,
787 scope: EffectScope,
788 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
789 Ok(Some(ScopedEffectController::shared(
790 Arc::clone(&self.controller),
791 scope,
792 )?))
793 }
794}
795
796impl InlineRuntimeEffectController {
797 pub(crate) async fn start_process(
805 &self,
806 registry: Arc<dyn crate::ProcessRegistry>,
807 registration: crate::ProcessRegistration,
808 grant: Option<crate::ProcessStartGrant>,
809 ) -> Result<ProcessRecord, PluginError> {
810 let registration_for_record = registration.clone();
811 let record = registry.register_process(registration_for_record).await?;
812 if let Some(grant) = grant {
813 registry
814 .grant_handle(&grant.owner_scope, ®istration.id, grant.descriptor)
815 .await?;
816 }
817 Ok(record)
818 }
819
820 pub(crate) async fn request_process_cancel(
821 &self,
822 registry: Arc<dyn crate::ProcessRegistry>,
823 process_id: &str,
824 reason: Option<String>,
825 ) -> Result<ProcessRecord, PluginError> {
826 registry
830 .append_event(
831 process_id,
832 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
833 )
834 .await?;
835 registry
836 .get_process(process_id)
837 .await
838 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
839 }
840
841 async fn execute_process_command(
842 &self,
843 command: ProcessCommand,
844 local_executor: RuntimeEffectLocalExecutor<'_>,
845 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
846 let execution = local_executor.into_process()?;
847 let registry = execution.registry;
848 match command {
849 ProcessCommand::Start {
850 registration,
851 grant,
852 execution_context: _,
853 } => {
854 let record = self.start_process(registry, registration, grant).await?;
855 Ok(ProcessEffectOutcome::Start { record })
856 }
857 ProcessCommand::List { owner_scope, mode } => {
858 let entries = match mode {
859 crate::ProcessListMode::Live => {
860 registry.list_live_handle_grants(&owner_scope).await?
861 }
862 crate::ProcessListMode::All => {
863 registry.list_handle_grants(&owner_scope).await?
864 }
865 };
866 Ok(ProcessEffectOutcome::List { entries })
867 }
868 ProcessCommand::Transfer {
869 from_scope,
870 to_scope,
871 process_ids,
872 } => {
873 registry
874 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
875 .await?;
876 Ok(ProcessEffectOutcome::Transfer)
877 }
878 ProcessCommand::DeleteSession { session_id } => {
879 let report = registry.delete_session_process_state(&session_id).await?;
880 for process_id in &report.cancel_process_ids {
881 registry
882 .append_event(
883 process_id,
884 crate::ProcessEventAppendRequest::cancel_requested(
885 process_id,
886 Some("session deleted".to_string()),
887 ),
888 )
889 .await?;
890 }
891 Ok(ProcessEffectOutcome::DeleteSession { report })
892 }
893 ProcessCommand::Await { process_id } => {
894 let output = registry.await_process(&process_id).await?;
895 Ok(ProcessEffectOutcome::Await { output })
896 }
897 ProcessCommand::Cancel { process_id, reason } => {
898 let record = self
899 .request_process_cancel(registry, &process_id, reason)
900 .await?;
901 Ok(ProcessEffectOutcome::Cancel { record })
902 }
903 ProcessCommand::Signal {
904 process_id,
905 request,
906 ..
907 } => {
908 let result = registry.append_event(&process_id, request).await?;
909 Ok(ProcessEffectOutcome::Signal {
910 event: result.event,
911 })
912 }
913 }
914 }
915}
916
917impl std::fmt::Debug for InlineRuntimeEffectController {
918 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
919 f.debug_struct("InlineRuntimeEffectController").finish()
920 }
921}