1#[cfg(any(test, feature = "testing"))]
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7use tokio_util::sync::CancellationToken;
8
9use crate::AttachmentStore;
10use crate::LlmRequest as CoreLlmRequest;
11use crate::LlmResponse;
12use crate::ProcessRecord;
13use crate::ProcessRegistry;
14use crate::provider::ProviderHandle;
15use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
16use crate::sansio::LlmCallError;
17use crate::{PluginError, RuntimeError, RuntimeErrorCode};
18
19use super::envelope::{
20 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
21 RuntimeEffectKind, RuntimeEffectOutcome,
22};
23use super::outcome::llm_call_error_from_transport;
24
25#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(tag = "type", rename_all = "snake_case")]
36pub enum EffectScope {
37 Turn {
38 session_id: String,
39 turn_id: String,
40 },
41 Process {
42 process_id: String,
43 },
44 HostEvent {
45 session_id: String,
46 event_id: String,
47 },
48 QueueDrain {
49 session_id: String,
50 drain_id: String,
51 },
52 Cron {
53 job_id: String,
54 execution_id: String,
55 },
56 SessionDelete {
57 session_id: String,
58 },
59 RuntimeOperation {
60 operation_id: String,
61 },
62}
63
64impl EffectScope {
65 pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
66 Self::Turn {
67 session_id: session_id.into(),
68 turn_id: turn_id.into(),
69 }
70 }
71
72 pub fn process(process_id: impl Into<String>) -> Self {
73 Self::Process {
74 process_id: process_id.into(),
75 }
76 }
77
78 pub fn host_event(session_id: impl Into<String>, event_id: impl Into<String>) -> Self {
79 Self::HostEvent {
80 session_id: session_id.into(),
81 event_id: event_id.into(),
82 }
83 }
84
85 pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
86 Self::QueueDrain {
87 session_id: session_id.into(),
88 drain_id: drain_id.into(),
89 }
90 }
91
92 pub fn cron(job_id: impl Into<String>, execution_id: impl Into<String>) -> Self {
93 Self::Cron {
94 job_id: job_id.into(),
95 execution_id: execution_id.into(),
96 }
97 }
98
99 pub fn session_delete(session_id: impl Into<String>) -> Self {
100 Self::SessionDelete {
101 session_id: session_id.into(),
102 }
103 }
104
105 pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
106 Self::RuntimeOperation {
107 operation_id: operation_id.into(),
108 }
109 }
110
111 pub fn id(&self) -> &str {
112 match self {
113 Self::Turn { turn_id, .. } => turn_id,
114 Self::Process { process_id } => process_id,
115 Self::HostEvent { event_id, .. } => event_id,
116 Self::QueueDrain { drain_id, .. } => drain_id,
117 Self::Cron { execution_id, .. } => execution_id,
118 Self::SessionDelete { session_id } => session_id,
119 Self::RuntimeOperation { operation_id } => operation_id,
120 }
121 }
122
123 pub fn session_id(&self) -> Option<&str> {
124 match self {
125 Self::Turn { session_id, .. }
126 | Self::HostEvent { session_id, .. }
127 | Self::QueueDrain { session_id, .. }
128 | Self::SessionDelete { session_id } => Some(session_id),
129 Self::Process { .. } | Self::Cron { .. } | Self::RuntimeOperation { .. } => None,
130 }
131 }
132
133 pub fn turn_id(&self) -> Option<&str> {
134 match self {
135 Self::Turn { turn_id, .. } => Some(turn_id),
136 _ => None,
137 }
138 }
139
140 pub fn validates_turn_trace_id(&self) -> bool {
141 matches!(self, Self::Turn { .. })
142 }
143
144 fn validate(&self) -> Result<(), RuntimeError> {
145 let missing = match self {
146 Self::Turn {
147 session_id,
148 turn_id,
149 } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
150 Self::Process { process_id } => process_id.trim().is_empty(),
151 Self::HostEvent {
152 session_id,
153 event_id,
154 } => session_id.trim().is_empty() || event_id.trim().is_empty(),
155 Self::QueueDrain {
156 session_id,
157 drain_id,
158 } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
159 Self::Cron {
160 job_id,
161 execution_id,
162 } => job_id.trim().is_empty() || execution_id.trim().is_empty(),
163 Self::SessionDelete { session_id } => session_id.trim().is_empty(),
164 Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
165 };
166 if missing {
167 return Err(RuntimeError::new(
168 RuntimeErrorCode::MissingEffectScopeId,
169 "effect scopes require non-empty stable ids",
170 ));
171 }
172 Ok(())
173 }
174}
175
176enum ScopedEffectControllerInner<'run> {
177 Borrowed(&'run dyn RuntimeEffectController),
178 Shared(Arc<dyn RuntimeEffectController>),
179}
180
181impl Clone for ScopedEffectControllerInner<'_> {
182 fn clone(&self) -> Self {
183 match self {
184 Self::Borrowed(controller) => Self::Borrowed(*controller),
185 Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
186 }
187 }
188}
189
190#[derive(Clone)]
192pub struct ScopedEffectController<'run> {
193 controller: ScopedEffectControllerInner<'run>,
194 scope: EffectScope,
195}
196
197impl<'run> ScopedEffectController<'run> {
198 pub fn borrowed(
199 controller: &'run dyn RuntimeEffectController,
200 scope: EffectScope,
201 ) -> Result<Self, RuntimeError> {
202 scope.validate()?;
203 Ok(Self {
204 controller: ScopedEffectControllerInner::Borrowed(controller),
205 scope,
206 })
207 }
208
209 pub fn shared(
210 controller: Arc<dyn RuntimeEffectController>,
211 scope: EffectScope,
212 ) -> Result<Self, RuntimeError> {
213 scope.validate()?;
214 Ok(Self {
215 controller: ScopedEffectControllerInner::Shared(controller),
216 scope,
217 })
218 }
219
220 pub fn controller(&self) -> &dyn RuntimeEffectController {
221 match &self.controller {
222 ScopedEffectControllerInner::Borrowed(controller) => *controller,
223 ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
224 }
225 }
226
227 pub fn effect_scope(&self) -> &EffectScope {
228 &self.scope
229 }
230
231 pub fn scope_id(&self) -> &str {
232 self.scope.id()
233 }
234
235 pub fn turn_id(&self) -> Option<&str> {
236 self.scope.turn_id()
237 }
238}
239
240#[async_trait::async_trait]
242pub trait EffectHost: Send + Sync {
243 fn durability_tier(&self) -> crate::DurabilityTier {
244 crate::DurabilityTier::Inline
245 }
246
247 fn requires_durable_attachment_store(&self) -> bool {
248 false
249 }
250
251 fn scoped<'run>(
252 &'run self,
253 scope: EffectScope,
254 ) -> Result<ScopedEffectController<'run>, RuntimeError>;
255
256 fn scoped_static(
257 &self,
258 _scope: EffectScope,
259 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
260 Ok(None)
261 }
262}
263
264#[async_trait::async_trait]
266pub trait RuntimeEffectController: Send + Sync {
267 fn durability_tier(&self) -> crate::DurabilityTier {
270 crate::DurabilityTier::Inline
271 }
272
273 fn requires_durable_attachment_store(&self) -> bool {
274 false
275 }
276
277 async fn execute_effect(
278 &self,
279 envelope: RuntimeEffectEnvelope,
280 local_executor: RuntimeEffectLocalExecutor<'_>,
281 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
282}
283
284#[derive(Clone)]
287pub(crate) enum RuntimeEffectControllerHandle<'run> {
288 Borrowed(ScopedEffectController<'run>),
289 #[cfg(any(test, feature = "testing"))]
290 Shared {
291 controller: Arc<dyn RuntimeEffectController>,
292 scope: EffectScope,
293 },
294}
295
296impl<'run> RuntimeEffectControllerHandle<'run> {
297 pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
298 Self::Borrowed(scoped)
299 }
300
301 #[cfg(any(test, feature = "testing"))]
302 pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
303 Self::Shared {
304 controller,
305 scope: EffectScope::runtime_operation("test-runtime-effect-controller"),
306 }
307 }
308
309 pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
310 match self {
311 Self::Borrowed(scoped) => scoped.controller(),
312 #[cfg(any(test, feature = "testing"))]
313 Self::Shared { controller, .. } => controller.as_ref(),
314 }
315 }
316
317 pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
318 match self {
319 Self::Borrowed(scoped) => scoped.clone(),
320 #[cfg(any(test, feature = "testing"))]
321 Self::Shared { controller, scope } => {
322 ScopedEffectController::shared(Arc::clone(controller), scope.clone())
323 .expect("runtime effect controller handle carries a valid scope")
324 }
325 }
326 }
327
328 pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
329 self.clone()
330 }
331}
332
333#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
334#[error("{code}: {message}")]
335pub struct RuntimeEffectControllerError {
336 pub code: String,
337 pub message: String,
338}
339
340impl RuntimeEffectControllerError {
341 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
342 Self {
343 code: code.into(),
344 message: message.into(),
345 }
346 }
347
348 pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
349 Self::new(
350 "runtime_effect_wrong_outcome",
351 format!(
352 "expected {} outcome, got {}",
353 expected.as_str(),
354 actual.as_str()
355 ),
356 )
357 }
358
359 pub(crate) fn into_runtime_error(self) -> RuntimeError {
360 RuntimeError::new(self.code, self.message)
361 }
362}
363
364impl From<RuntimeError> for RuntimeEffectControllerError {
365 fn from(err: RuntimeError) -> Self {
366 Self::new(err.code.as_str(), err.message)
367 }
368}
369
370impl From<PluginError> for RuntimeEffectControllerError {
371 fn from(err: PluginError) -> Self {
372 Self::new("plugin", err.to_string())
373 }
374}
375
376impl From<crate::StoreError> for RuntimeEffectControllerError {
377 fn from(err: crate::StoreError) -> Self {
378 Self::new("runtime_store", err.to_string())
379 }
380}
381
382#[async_trait::async_trait]
387pub(crate) trait ProcessRunner: Send + Sync {
388 async fn run_process(
389 &self,
390 registration: crate::ProcessRegistration,
391 execution_context: crate::ProcessExecutionContext,
392 registry: Arc<dyn ProcessRegistry>,
393 scoped_effect_controller: crate::ScopedEffectController<'_>,
394 cancellation: CancellationToken,
395 ) -> crate::ProcessAwaitOutput;
396}
397
398pub struct ProcessLocalExecution {
399 pub registry: Arc<dyn ProcessRegistry>,
400}
401
402pub(super) struct LocalTurnEffectRunner<'a, 'run> {
403 driver: &'a mut RuntimeTurnDriver<'run>,
404 machine: &'a mut crate::TurnMachine,
405 event_tx: mpsc::Sender<RuntimeStreamEvent>,
406 cancellation: CancellationToken,
407}
408
409pub(super) struct LocalDirectEffectRunner {
410 provider: ProviderHandle,
411 attachment_store: Arc<dyn AttachmentStore>,
412}
413
414#[async_trait::async_trait]
415trait RuntimeEffectLocalRunner: Send {
416 async fn execute(
417 self: Box<Self>,
418 envelope: RuntimeEffectEnvelope,
419 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
420}
421
422#[cfg(any(test, feature = "testing"))]
423type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
424 RuntimeEffectEnvelope,
425 ) -> Pin<
426 Box<
427 dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
428 + Send
429 + 'run,
430 >,
431 > + Send
432 + 'run;
433
434#[cfg(any(test, feature = "testing"))]
435struct TestingRuntimeEffectLocalRunner<'run> {
436 run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
437}
438
439enum RuntimeEffectLocalExecutorState<'run> {
440 Unavailable,
441 SleepOnly { cancellation: CancellationToken },
442 Process(ProcessLocalExecution),
443 Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
444}
445
446pub struct RuntimeEffectLocalExecutor<'run> {
452 state: RuntimeEffectLocalExecutorState<'run>,
453}
454
455impl<'run> RuntimeEffectLocalExecutor<'run> {
456 pub fn unavailable() -> Self {
457 Self {
458 state: RuntimeEffectLocalExecutorState::Unavailable,
459 }
460 }
461
462 pub fn sleep(cancellation: CancellationToken) -> Self {
463 Self {
464 state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
465 }
466 }
467
468 pub fn process_control(registry: Arc<dyn ProcessRegistry>) -> Self {
469 Self {
470 state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
471 }
472 }
473
474 #[cfg(any(test, feature = "testing"))]
475 pub fn testing<F, Fut>(run: F) -> Self
476 where
477 F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
478 Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
479 + Send
480 + 'run,
481 {
482 Self {
483 state: RuntimeEffectLocalExecutorState::Runner(Box::new(
484 TestingRuntimeEffectLocalRunner {
485 run: Box::new(move |envelope| Box::pin(run(envelope))),
486 },
487 )),
488 }
489 }
490
491 pub(in crate::runtime) fn turn<'scope>(
492 driver: &'run mut RuntimeTurnDriver<'scope>,
493 machine: &'run mut crate::TurnMachine,
494 event_tx: mpsc::Sender<RuntimeStreamEvent>,
495 cancellation: CancellationToken,
496 ) -> Self
497 where
498 'scope: 'run,
499 {
500 Self {
501 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
502 driver,
503 machine,
504 event_tx,
505 cancellation,
506 })),
507 }
508 }
509
510 pub(in crate::runtime) fn direct(
511 provider: ProviderHandle,
512 attachment_store: Arc<dyn AttachmentStore>,
513 ) -> Self {
514 Self {
515 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
516 provider,
517 attachment_store,
518 })),
519 }
520 }
521
522 pub async fn execute(
523 self,
524 envelope: RuntimeEffectEnvelope,
525 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
526 match self.state {
527 RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
528 RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
529 execute_local_sleep(envelope, cancellation).await
530 }
531 RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
532 "runtime_effect_local_executor_unavailable",
533 format!(
534 "no local executor is available for {}",
535 envelope.command.kind().as_str()
536 ),
537 )),
538 RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
539 "runtime_effect_local_executor_mismatch",
540 format!(
541 "process executor cannot execute {} command directly",
542 envelope.command.kind().as_str()
543 ),
544 )),
545 }
546 }
547
548 pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
549 match self.state {
550 RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
551 _ => Err(RuntimeEffectControllerError::new(
552 "runtime_effect_local_executor_unavailable",
553 "no process executor is available for process command",
554 )),
555 }
556 }
557}
558
559#[cfg(any(test, feature = "testing"))]
560#[async_trait::async_trait]
561impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
562 async fn execute(
563 self: Box<Self>,
564 envelope: RuntimeEffectEnvelope,
565 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
566 (self.run)(envelope).await
567 }
568}
569
570#[async_trait::async_trait]
571impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
572 async fn execute(
573 self: Box<Self>,
574 envelope: RuntimeEffectEnvelope,
575 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
576 let runner = *self;
577 match envelope.command {
578 RuntimeEffectCommand::LlmCall { request } => {
579 let protocol_iteration = runner.machine.protocol_iteration();
580 let (result, text_streamed) = runner
581 .driver
582 .run_llm_call(
583 Arc::new((*request).into_request(None, None)),
584 protocol_iteration,
585 envelope.invocation,
586 &runner.event_tx,
587 &runner.cancellation,
588 )
589 .await;
590 Ok(RuntimeEffectOutcome::LlmCall {
591 result,
592 text_streamed,
593 })
594 }
595 RuntimeEffectCommand::ToolCall { call } => {
596 let tool_name = call.tool_name.clone();
597 let mut outcome = runner
598 .driver
599 .run_tool_calls(
600 vec![(call, envelope.invocation)],
601 &runner.event_tx,
602 &runner.cancellation,
603 )
604 .await?;
605 let result = outcome.completed.pop().ok_or_else(|| {
606 RuntimeEffectControllerError::new(
607 "tool_result_missing",
608 format!("tool `{tool_name}` completed without a result"),
609 )
610 })?;
611 Ok(RuntimeEffectOutcome::ToolCall {
612 result,
613 host_events: outcome.host_events,
614 })
615 }
616 RuntimeEffectCommand::ExecCode { code } => {
617 let protocol_iteration = runner.machine.protocol_iteration();
618 let messages = runner.machine.message_sequence();
619 Ok(RuntimeEffectOutcome::ExecCode {
620 result: runner
621 .driver
622 .run_exec_code(
623 &code,
624 messages,
625 protocol_iteration,
626 envelope.invocation,
627 &runner.event_tx,
628 )
629 .await,
630 })
631 }
632 RuntimeEffectCommand::Checkpoint { checkpoint } => {
633 Ok(RuntimeEffectOutcome::Checkpoint {
634 result: runner
635 .driver
636 .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
637 .await
638 .map_err(RuntimeEffectControllerError::from),
639 })
640 }
641 RuntimeEffectCommand::SyncExecutionSurface {
642 update_machine_config,
643 } => Ok(RuntimeEffectOutcome::SyncExecutionSurface {
644 result: runner
645 .driver
646 .refresh_execution_surface(runner.machine, update_machine_config)
647 .await
648 .map_err(|err| err.to_string()),
649 }),
650 RuntimeEffectCommand::Sleep { duration_ms } => {
651 sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
652 Ok(RuntimeEffectOutcome::Sleep)
653 }
654 command => Err(RuntimeEffectControllerError::new(
655 "runtime_effect_local_executor_mismatch",
656 format!(
657 "local turn executor cannot execute {} command",
658 command.kind().as_str()
659 ),
660 )),
661 }
662 }
663}
664
665#[async_trait::async_trait]
666impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
667 async fn execute(
668 mut self: Box<Self>,
669 envelope: RuntimeEffectEnvelope,
670 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
671 match envelope.command {
672 RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
673 result: self
674 .run_direct_llm_request((*request).into_request(
675 crate::session_model::transport_stream_events(&self.provider, None),
676 None,
677 ))
678 .await,
679 }),
680 RuntimeEffectCommand::Sleep { duration_ms } => {
681 sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
682 Ok(RuntimeEffectOutcome::Sleep)
683 }
684 command => Err(RuntimeEffectControllerError::new(
685 "runtime_effect_local_executor_mismatch",
686 format!(
687 "local direct executor cannot execute {} command",
688 command.kind().as_str()
689 ),
690 )),
691 }
692 }
693}
694
695impl LocalDirectEffectRunner {
696 async fn run_direct_llm_request(
697 &mut self,
698 request: CoreLlmRequest,
699 ) -> Result<LlmResponse, LlmCallError> {
700 let request = crate::attachments::resolve_llm_request_attachments(
701 request,
702 self.attachment_store.as_ref(),
703 )
704 .map_err(|err| LlmCallError {
705 message: err.to_string(),
706 retryable: false,
707 raw: None,
708 code: Some("attachment_resolution_failed".to_string()),
709 terminal_reason: crate::LlmTerminalReason::ProviderError,
710 request_body: None,
711 })?;
712 self.provider
713 .complete(request)
714 .await
715 .map_err(llm_call_error_from_transport)
716 }
717}
718
719async fn execute_local_sleep(
720 envelope: RuntimeEffectEnvelope,
721 cancellation: CancellationToken,
722) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
723 match envelope.command {
724 RuntimeEffectCommand::Sleep { duration_ms } => {
725 sleep_with_cancellation(duration_ms, &cancellation).await?;
726 Ok(RuntimeEffectOutcome::Sleep)
727 }
728 command => Err(RuntimeEffectControllerError::new(
729 "runtime_effect_local_executor_mismatch",
730 format!(
731 "local sleep executor cannot execute {} command",
732 command.kind().as_str()
733 ),
734 )),
735 }
736}
737
738async fn sleep_with_cancellation(
739 duration_ms: u64,
740 cancellation: &CancellationToken,
741) -> Result<(), RuntimeEffectControllerError> {
742 let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
743 tokio::pin!(sleep);
744 tokio::select! {
745 _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
746 "runtime_effect_sleep_cancelled",
747 "runtime effect sleep was cancelled",
748 )),
749 _ = &mut sleep => Ok(()),
750 }
751}
752
753#[derive(Clone, Default)]
763pub struct InlineRuntimeEffectController;
764
765#[async_trait::async_trait]
766impl RuntimeEffectController for InlineRuntimeEffectController {
767 async fn execute_effect(
768 &self,
769 envelope: RuntimeEffectEnvelope,
770 local_executor: RuntimeEffectLocalExecutor<'_>,
771 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
772 match envelope.command {
773 RuntimeEffectCommand::Process { command } => {
774 let result = self
775 .execute_process_command(command, local_executor)
776 .await?;
777 Ok(RuntimeEffectOutcome::Process { result })
778 }
779 _ => local_executor.execute(envelope).await,
780 }
781 }
782}
783
784#[derive(Clone)]
786pub struct InlineEffectHost {
787 controller: Arc<dyn RuntimeEffectController>,
788}
789
790impl InlineEffectHost {
791 pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
792 Self { controller }
793 }
794}
795
796impl Default for InlineEffectHost {
797 fn default() -> Self {
798 Self::new(Arc::new(InlineRuntimeEffectController))
799 }
800}
801
802impl EffectHost for InlineEffectHost {
803 fn durability_tier(&self) -> crate::DurabilityTier {
804 self.controller.durability_tier()
805 }
806
807 fn requires_durable_attachment_store(&self) -> bool {
808 self.controller.requires_durable_attachment_store()
809 }
810
811 fn scoped<'run>(
812 &'run self,
813 scope: EffectScope,
814 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
815 ScopedEffectController::shared(Arc::clone(&self.controller), scope)
816 }
817
818 fn scoped_static(
819 &self,
820 scope: EffectScope,
821 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
822 Ok(Some(ScopedEffectController::shared(
823 Arc::clone(&self.controller),
824 scope,
825 )?))
826 }
827}
828
829impl InlineRuntimeEffectController {
830 pub(crate) async fn start_process(
838 &self,
839 registry: Arc<dyn crate::ProcessRegistry>,
840 registration: crate::ProcessRegistration,
841 grant: Option<crate::ProcessStartGrant>,
842 ) -> Result<ProcessRecord, PluginError> {
843 let registration_for_record = registration.clone();
844 let record = registry.register_process(registration_for_record).await?;
845 if let Some(grant) = grant {
846 registry
847 .grant_handle(&grant.owner_scope, ®istration.id, grant.descriptor)
848 .await?;
849 }
850 Ok(record)
851 }
852
853 pub(crate) async fn request_process_cancel(
854 &self,
855 registry: Arc<dyn crate::ProcessRegistry>,
856 process_id: &str,
857 reason: Option<String>,
858 ) -> Result<ProcessRecord, PluginError> {
859 registry
863 .append_event(
864 process_id,
865 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
866 )
867 .await?;
868 registry
869 .get_process(process_id)
870 .await
871 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
872 }
873
874 async fn execute_process_command(
875 &self,
876 command: ProcessCommand,
877 local_executor: RuntimeEffectLocalExecutor<'_>,
878 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
879 let execution = local_executor.into_process()?;
880 let registry = execution.registry;
881 match command {
882 ProcessCommand::Start {
883 registration,
884 grant,
885 execution_context: _,
886 } => {
887 let record = self.start_process(registry, registration, grant).await?;
888 Ok(ProcessEffectOutcome::Start { record })
889 }
890 ProcessCommand::List { owner_scope, mode } => {
891 let entries = match mode {
892 crate::ProcessListMode::Live => {
893 registry.list_live_handle_grants(&owner_scope).await?
894 }
895 crate::ProcessListMode::All => {
896 registry.list_handle_grants(&owner_scope).await?
897 }
898 };
899 Ok(ProcessEffectOutcome::List { entries })
900 }
901 ProcessCommand::Transfer {
902 from_scope,
903 to_scope,
904 process_ids,
905 } => {
906 registry
907 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
908 .await?;
909 Ok(ProcessEffectOutcome::Transfer)
910 }
911 ProcessCommand::DeleteSession { session_id } => {
912 let report = registry.delete_session_process_state(&session_id).await?;
913 for process_id in &report.cancel_process_ids {
914 registry
915 .append_event(
916 process_id,
917 crate::ProcessEventAppendRequest::cancel_requested(
918 process_id,
919 Some("session deleted".to_string()),
920 ),
921 )
922 .await?;
923 }
924 Ok(ProcessEffectOutcome::DeleteSession { report })
925 }
926 ProcessCommand::Await { process_id } => {
927 let output = registry.await_process(&process_id).await?;
928 Ok(ProcessEffectOutcome::Await { output })
929 }
930 ProcessCommand::Cancel { process_id, reason } => {
931 let record = self
932 .request_process_cancel(registry, &process_id, reason)
933 .await?;
934 Ok(ProcessEffectOutcome::Cancel { record })
935 }
936 ProcessCommand::Signal {
937 process_id,
938 request,
939 ..
940 } => {
941 let result = registry.append_event(&process_id, request).await?;
942 Ok(ProcessEffectOutcome::Signal {
943 event: result.event,
944 })
945 }
946 }
947 }
948}
949
950impl std::fmt::Debug for InlineRuntimeEffectController {
951 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
952 f.debug_struct("InlineRuntimeEffectController").finish()
953 }
954}