1#[cfg(any(test, feature = "testing"))]
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7use tokio_util::sync::CancellationToken;
8
9use crate::AttachmentStore;
10use crate::LlmRequest as CoreLlmRequest;
11use crate::LlmResponse;
12use crate::ProcessRecord;
13use crate::ProcessRegistry;
14use crate::provider::ProviderHandle;
15use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
16use crate::sansio::LlmCallError;
17use crate::{PluginError, RuntimeError, RuntimeErrorCode};
18
19use super::envelope::{
20 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
21 RuntimeEffectKind, RuntimeEffectOutcome,
22};
23use super::outcome::llm_call_error_from_transport;
24
25#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(tag = "type", rename_all = "snake_case")]
36pub enum EffectScope {
37 Turn {
38 session_id: String,
39 turn_id: String,
40 },
41 Process {
42 process_id: String,
43 },
44 QueueDrain {
45 session_id: String,
46 drain_id: String,
47 },
48 SessionDelete {
49 session_id: String,
50 },
51 RuntimeOperation {
52 operation_id: String,
53 },
54}
55
56impl EffectScope {
57 pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
58 Self::Turn {
59 session_id: session_id.into(),
60 turn_id: turn_id.into(),
61 }
62 }
63
64 pub fn process(process_id: impl Into<String>) -> Self {
65 Self::Process {
66 process_id: process_id.into(),
67 }
68 }
69
70 pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
71 Self::QueueDrain {
72 session_id: session_id.into(),
73 drain_id: drain_id.into(),
74 }
75 }
76
77 pub fn session_delete(session_id: impl Into<String>) -> Self {
78 Self::SessionDelete {
79 session_id: session_id.into(),
80 }
81 }
82
83 pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
84 Self::RuntimeOperation {
85 operation_id: operation_id.into(),
86 }
87 }
88
89 pub fn id(&self) -> &str {
90 match self {
91 Self::Turn { turn_id, .. } => turn_id,
92 Self::Process { process_id } => process_id,
93 Self::QueueDrain { drain_id, .. } => drain_id,
94 Self::SessionDelete { session_id } => session_id,
95 Self::RuntimeOperation { operation_id } => operation_id,
96 }
97 }
98
99 pub fn session_id(&self) -> Option<&str> {
100 match self {
101 Self::Turn { session_id, .. }
102 | Self::QueueDrain { session_id, .. }
103 | Self::SessionDelete { session_id } => Some(session_id),
104 Self::Process { .. } | Self::RuntimeOperation { .. } => None,
105 }
106 }
107
108 pub fn turn_id(&self) -> Option<&str> {
109 match self {
110 Self::Turn { turn_id, .. } => Some(turn_id),
111 _ => None,
112 }
113 }
114
115 pub fn validates_turn_trace_id(&self) -> bool {
116 matches!(self, Self::Turn { .. })
117 }
118
119 fn validate(&self) -> Result<(), RuntimeError> {
120 let missing = match self {
121 Self::Turn {
122 session_id,
123 turn_id,
124 } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
125 Self::Process { process_id } => process_id.trim().is_empty(),
126 Self::QueueDrain {
127 session_id,
128 drain_id,
129 } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
130 Self::SessionDelete { session_id } => session_id.trim().is_empty(),
131 Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
132 };
133 if missing {
134 return Err(RuntimeError::new(
135 RuntimeErrorCode::MissingEffectScopeId,
136 "effect scopes require non-empty stable ids",
137 ));
138 }
139 Ok(())
140 }
141}
142
143enum ScopedEffectControllerInner<'run> {
144 Borrowed(&'run dyn RuntimeEffectController),
145 Shared(Arc<dyn RuntimeEffectController>),
146}
147
148impl Clone for ScopedEffectControllerInner<'_> {
149 fn clone(&self) -> Self {
150 match self {
151 Self::Borrowed(controller) => Self::Borrowed(*controller),
152 Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
153 }
154 }
155}
156
157#[derive(Clone)]
159pub struct ScopedEffectController<'run> {
160 controller: ScopedEffectControllerInner<'run>,
161 scope: EffectScope,
162}
163
164impl<'run> ScopedEffectController<'run> {
165 pub fn borrowed(
166 controller: &'run dyn RuntimeEffectController,
167 scope: EffectScope,
168 ) -> Result<Self, RuntimeError> {
169 scope.validate()?;
170 Ok(Self {
171 controller: ScopedEffectControllerInner::Borrowed(controller),
172 scope,
173 })
174 }
175
176 pub fn shared(
177 controller: Arc<dyn RuntimeEffectController>,
178 scope: EffectScope,
179 ) -> Result<Self, RuntimeError> {
180 scope.validate()?;
181 Ok(Self {
182 controller: ScopedEffectControllerInner::Shared(controller),
183 scope,
184 })
185 }
186
187 pub fn controller(&self) -> &dyn RuntimeEffectController {
188 match &self.controller {
189 ScopedEffectControllerInner::Borrowed(controller) => *controller,
190 ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
191 }
192 }
193
194 pub fn effect_scope(&self) -> &EffectScope {
195 &self.scope
196 }
197
198 pub fn scope_id(&self) -> &str {
199 self.scope.id()
200 }
201
202 pub fn turn_id(&self) -> Option<&str> {
203 self.scope.turn_id()
204 }
205}
206
207#[async_trait::async_trait]
209pub trait EffectHost: Send + Sync {
210 fn durability_tier(&self) -> crate::DurabilityTier {
211 crate::DurabilityTier::Inline
212 }
213
214 fn requires_durable_attachment_store(&self) -> bool {
215 false
216 }
217
218 fn scoped<'run>(
219 &'run self,
220 scope: EffectScope,
221 ) -> Result<ScopedEffectController<'run>, RuntimeError>;
222
223 fn scoped_static(
224 &self,
225 _scope: EffectScope,
226 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
227 Ok(None)
228 }
229}
230
231#[async_trait::async_trait]
233pub trait RuntimeEffectController: Send + Sync {
234 fn durability_tier(&self) -> crate::DurabilityTier {
237 crate::DurabilityTier::Inline
238 }
239
240 fn requires_durable_attachment_store(&self) -> bool {
241 false
242 }
243
244 async fn execute_effect(
245 &self,
246 envelope: RuntimeEffectEnvelope,
247 local_executor: RuntimeEffectLocalExecutor<'_>,
248 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
249}
250
251#[derive(Clone)]
254pub(crate) enum RuntimeEffectControllerHandle<'run> {
255 Borrowed(ScopedEffectController<'run>),
256 #[cfg(any(test, feature = "testing"))]
257 Shared {
258 controller: Arc<dyn RuntimeEffectController>,
259 scope: EffectScope,
260 },
261}
262
263impl<'run> RuntimeEffectControllerHandle<'run> {
264 pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
265 Self::Borrowed(scoped)
266 }
267
268 #[cfg(any(test, feature = "testing"))]
269 pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
270 Self::Shared {
271 controller,
272 scope: EffectScope::runtime_operation("test-runtime-effect-controller"),
273 }
274 }
275
276 pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
277 match self {
278 Self::Borrowed(scoped) => scoped.controller(),
279 #[cfg(any(test, feature = "testing"))]
280 Self::Shared { controller, .. } => controller.as_ref(),
281 }
282 }
283
284 pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
285 match self {
286 Self::Borrowed(scoped) => scoped.clone(),
287 #[cfg(any(test, feature = "testing"))]
288 Self::Shared { controller, scope } => {
289 ScopedEffectController::shared(Arc::clone(controller), scope.clone())
290 .expect("runtime effect controller handle carries a valid scope")
291 }
292 }
293 }
294
295 pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
296 self.clone()
297 }
298}
299
300#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
301#[error("{code}: {message}")]
302pub struct RuntimeEffectControllerError {
303 pub code: String,
304 pub message: String,
305}
306
307impl RuntimeEffectControllerError {
308 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
309 Self {
310 code: code.into(),
311 message: message.into(),
312 }
313 }
314
315 pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
316 Self::new(
317 "runtime_effect_wrong_outcome",
318 format!(
319 "expected {} outcome, got {}",
320 expected.as_str(),
321 actual.as_str()
322 ),
323 )
324 }
325
326 pub(crate) fn into_runtime_error(self) -> RuntimeError {
327 RuntimeError::new(self.code, self.message)
328 }
329}
330
331impl From<RuntimeError> for RuntimeEffectControllerError {
332 fn from(err: RuntimeError) -> Self {
333 Self::new(err.code.as_str(), err.message)
334 }
335}
336
337impl From<PluginError> for RuntimeEffectControllerError {
338 fn from(err: PluginError) -> Self {
339 Self::new("plugin", err.to_string())
340 }
341}
342
343impl From<crate::StoreError> for RuntimeEffectControllerError {
344 fn from(err: crate::StoreError) -> Self {
345 Self::new("runtime_store", err.to_string())
346 }
347}
348
349#[async_trait::async_trait]
354pub(crate) trait ProcessRunner: Send + Sync {
355 async fn run_process(
356 &self,
357 registration: crate::ProcessRegistration,
358 execution_context: crate::ProcessExecutionContext,
359 registry: Arc<dyn ProcessRegistry>,
360 scoped_effect_controller: crate::ScopedEffectController<'_>,
361 cancellation: CancellationToken,
362 ) -> crate::ProcessAwaitOutput;
363}
364
365pub struct ProcessLocalExecution {
366 pub registry: Arc<dyn ProcessRegistry>,
367}
368
369pub(super) struct LocalTurnEffectRunner<'a, 'run> {
370 driver: &'a mut RuntimeTurnDriver<'run>,
371 machine: &'a mut crate::TurnMachine,
372 event_tx: mpsc::Sender<RuntimeStreamEvent>,
373 cancellation: CancellationToken,
374}
375
376pub(super) struct LocalDirectEffectRunner {
377 provider: ProviderHandle,
378 attachment_store: Arc<dyn AttachmentStore>,
379}
380
381#[async_trait::async_trait]
382trait RuntimeEffectLocalRunner: Send {
383 async fn execute(
384 self: Box<Self>,
385 envelope: RuntimeEffectEnvelope,
386 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
387}
388
389#[cfg(any(test, feature = "testing"))]
390type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
391 RuntimeEffectEnvelope,
392 ) -> Pin<
393 Box<
394 dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
395 + Send
396 + 'run,
397 >,
398 > + Send
399 + 'run;
400
401#[cfg(any(test, feature = "testing"))]
402struct TestingRuntimeEffectLocalRunner<'run> {
403 run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
404}
405
406enum RuntimeEffectLocalExecutorState<'run> {
407 Unavailable,
408 SleepOnly {
409 cancellation: CancellationToken,
410 },
411 AwaitEvent {
412 key: String,
413 registry: Arc<dyn ProcessRegistry>,
414 process_id: String,
415 event_type: String,
416 event_ordinal: u64,
417 cancellation: CancellationToken,
418 },
419 Process(ProcessLocalExecution),
420 Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
421}
422
423pub struct RuntimeEffectLocalExecutor<'run> {
429 state: RuntimeEffectLocalExecutorState<'run>,
430}
431
432impl<'run> RuntimeEffectLocalExecutor<'run> {
433 pub fn unavailable() -> Self {
434 Self {
435 state: RuntimeEffectLocalExecutorState::Unavailable,
436 }
437 }
438
439 pub fn sleep(cancellation: CancellationToken) -> Self {
440 Self {
441 state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
442 }
443 }
444
445 pub fn await_process_event(
446 key: impl Into<String>,
447 registry: Arc<dyn ProcessRegistry>,
448 process_id: impl Into<String>,
449 event_type: impl Into<String>,
450 event_ordinal: u64,
451 cancellation: CancellationToken,
452 ) -> Self {
453 Self {
454 state: RuntimeEffectLocalExecutorState::AwaitEvent {
455 key: key.into(),
456 registry,
457 process_id: process_id.into(),
458 event_type: event_type.into(),
459 event_ordinal,
460 cancellation,
461 },
462 }
463 }
464
465 pub fn process_control(registry: Arc<dyn ProcessRegistry>) -> Self {
466 Self {
467 state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
468 }
469 }
470
471 #[cfg(any(test, feature = "testing"))]
472 pub fn testing<F, Fut>(run: F) -> Self
473 where
474 F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
475 Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
476 + Send
477 + 'run,
478 {
479 Self {
480 state: RuntimeEffectLocalExecutorState::Runner(Box::new(
481 TestingRuntimeEffectLocalRunner {
482 run: Box::new(move |envelope| Box::pin(run(envelope))),
483 },
484 )),
485 }
486 }
487
488 pub(in crate::runtime) fn turn<'scope>(
489 driver: &'run mut RuntimeTurnDriver<'scope>,
490 machine: &'run mut crate::TurnMachine,
491 event_tx: mpsc::Sender<RuntimeStreamEvent>,
492 cancellation: CancellationToken,
493 ) -> Self
494 where
495 'scope: 'run,
496 {
497 Self {
498 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
499 driver,
500 machine,
501 event_tx,
502 cancellation,
503 })),
504 }
505 }
506
507 pub(in crate::runtime) fn direct(
508 provider: ProviderHandle,
509 attachment_store: Arc<dyn AttachmentStore>,
510 ) -> Self {
511 Self {
512 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
513 provider,
514 attachment_store,
515 })),
516 }
517 }
518
519 pub async fn execute(
520 self,
521 envelope: RuntimeEffectEnvelope,
522 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
523 match self.state {
524 RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
525 RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
526 execute_local_sleep(envelope, cancellation).await
527 }
528 RuntimeEffectLocalExecutorState::AwaitEvent {
529 key,
530 registry,
531 process_id,
532 event_type,
533 event_ordinal,
534 cancellation,
535 } => {
536 execute_local_await_event(
537 envelope,
538 &key,
539 registry,
540 &process_id,
541 &event_type,
542 event_ordinal,
543 cancellation,
544 )
545 .await
546 }
547 RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
548 "runtime_effect_local_executor_unavailable",
549 format!(
550 "no local executor is available for {}",
551 envelope.command.kind().as_str()
552 ),
553 )),
554 RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
555 "runtime_effect_local_executor_mismatch",
556 format!(
557 "process executor cannot execute {} command directly",
558 envelope.command.kind().as_str()
559 ),
560 )),
561 }
562 }
563
564 pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
565 match self.state {
566 RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
567 _ => Err(RuntimeEffectControllerError::new(
568 "runtime_effect_local_executor_unavailable",
569 "no process executor is available for process command",
570 )),
571 }
572 }
573}
574
575#[cfg(any(test, feature = "testing"))]
576#[async_trait::async_trait]
577impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
578 async fn execute(
579 self: Box<Self>,
580 envelope: RuntimeEffectEnvelope,
581 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
582 (self.run)(envelope).await
583 }
584}
585
586#[async_trait::async_trait]
587impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
588 async fn execute(
589 self: Box<Self>,
590 envelope: RuntimeEffectEnvelope,
591 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
592 let runner = *self;
593 match envelope.command {
594 RuntimeEffectCommand::LlmCall { request } => {
595 let protocol_iteration = runner.machine.protocol_iteration();
596 let (result, text_streamed) = runner
597 .driver
598 .run_llm_call(
599 Arc::new((*request).into_request(None, None)),
600 protocol_iteration,
601 envelope.invocation,
602 &runner.event_tx,
603 &runner.cancellation,
604 )
605 .await;
606 Ok(RuntimeEffectOutcome::LlmCall {
607 result,
608 text_streamed,
609 })
610 }
611 RuntimeEffectCommand::ToolCall { call } => {
612 let tool_name = call.tool_name.clone();
613 let mut outcome = runner
614 .driver
615 .run_tool_calls(
616 vec![(call, envelope.invocation)],
617 &runner.event_tx,
618 &runner.cancellation,
619 )
620 .await?;
621 let result = outcome.completed.pop().ok_or_else(|| {
622 RuntimeEffectControllerError::new(
623 "tool_result_missing",
624 format!("tool `{tool_name}` completed without a result"),
625 )
626 })?;
627 Ok(RuntimeEffectOutcome::ToolCall {
628 result,
629 host_events: outcome.host_events,
630 })
631 }
632 RuntimeEffectCommand::ExecCode { code } => {
633 let protocol_iteration = runner.machine.protocol_iteration();
634 let messages = runner.machine.message_sequence();
635 Ok(RuntimeEffectOutcome::ExecCode {
636 result: runner
637 .driver
638 .run_exec_code(
639 &code,
640 messages,
641 protocol_iteration,
642 envelope.invocation,
643 &runner.event_tx,
644 )
645 .await,
646 })
647 }
648 RuntimeEffectCommand::Checkpoint { checkpoint } => {
649 Ok(RuntimeEffectOutcome::Checkpoint {
650 result: runner
651 .driver
652 .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
653 .await
654 .map_err(RuntimeEffectControllerError::from),
655 })
656 }
657 RuntimeEffectCommand::SyncExecutionSurface {
658 update_machine_config,
659 } => Ok(RuntimeEffectOutcome::SyncExecutionSurface {
660 result: runner
661 .driver
662 .refresh_execution_surface(runner.machine, update_machine_config)
663 .await
664 .map_err(|err| err.to_string()),
665 }),
666 RuntimeEffectCommand::Sleep { duration_ms } => {
667 sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
668 Ok(RuntimeEffectOutcome::Sleep)
669 }
670 command => Err(RuntimeEffectControllerError::new(
671 "runtime_effect_local_executor_mismatch",
672 format!(
673 "local turn executor cannot execute {} command",
674 command.kind().as_str()
675 ),
676 )),
677 }
678 }
679}
680
681#[async_trait::async_trait]
682impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
683 async fn execute(
684 mut self: Box<Self>,
685 envelope: RuntimeEffectEnvelope,
686 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
687 match envelope.command {
688 RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
689 result: self
690 .run_direct_llm_request((*request).into_request(
691 crate::session_model::transport_stream_events(&self.provider, None),
692 None,
693 ))
694 .await,
695 }),
696 RuntimeEffectCommand::Sleep { duration_ms } => {
697 sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
698 Ok(RuntimeEffectOutcome::Sleep)
699 }
700 command => Err(RuntimeEffectControllerError::new(
701 "runtime_effect_local_executor_mismatch",
702 format!(
703 "local direct executor cannot execute {} command",
704 command.kind().as_str()
705 ),
706 )),
707 }
708 }
709}
710
711impl LocalDirectEffectRunner {
712 async fn run_direct_llm_request(
713 &mut self,
714 request: CoreLlmRequest,
715 ) -> Result<LlmResponse, LlmCallError> {
716 let request = crate::attachments::resolve_llm_request_attachments(
717 request,
718 self.attachment_store.as_ref(),
719 )
720 .await
721 .map_err(|err| LlmCallError {
722 message: err.to_string(),
723 retryable: false,
724 raw: None,
725 code: Some("attachment_resolution_failed".to_string()),
726 terminal_reason: crate::LlmTerminalReason::ProviderError,
727 request_body: None,
728 })?;
729 self.provider
730 .complete(request)
731 .await
732 .map_err(llm_call_error_from_transport)
733 }
734}
735
736async fn execute_local_sleep(
737 envelope: RuntimeEffectEnvelope,
738 cancellation: CancellationToken,
739) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
740 match envelope.command {
741 RuntimeEffectCommand::Sleep { duration_ms } => {
742 sleep_with_cancellation(duration_ms, &cancellation).await?;
743 Ok(RuntimeEffectOutcome::Sleep)
744 }
745 command => Err(RuntimeEffectControllerError::new(
746 "runtime_effect_local_executor_mismatch",
747 format!(
748 "local sleep executor cannot execute {} command",
749 command.kind().as_str()
750 ),
751 )),
752 }
753}
754
755async fn execute_local_await_event(
756 envelope: RuntimeEffectEnvelope,
757 expected_key: &str,
758 registry: Arc<dyn ProcessRegistry>,
759 process_id: &str,
760 event_type: &str,
761 event_ordinal: u64,
762 cancellation: CancellationToken,
763) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
764 match envelope.command {
765 RuntimeEffectCommand::AwaitEvent { key } if key == expected_key => {
766 if event_ordinal == 0 {
767 return Err(RuntimeEffectControllerError::new(
768 "runtime_effect_await_event_invalid_ordinal",
769 "event ordinal must be one-based",
770 ));
771 }
772 let mut after_sequence = 0;
773 let mut matching_count = 0;
774 let event = loop {
775 let wait = registry.wait_event_after(process_id, event_type, after_sequence);
776 tokio::pin!(wait);
777 let event = tokio::select! {
778 _ = cancellation.cancelled() => {
779 return Err(RuntimeEffectControllerError::new(
780 "runtime_effect_await_event_cancelled",
781 "runtime effect event wait was cancelled",
782 ));
783 }
784 event = &mut wait => event?,
785 };
786 matching_count += 1;
787 if matching_count == event_ordinal {
788 break event;
789 }
790 after_sequence = event.sequence;
791 };
792 Ok(RuntimeEffectOutcome::AwaitEvent {
793 payload: event.payload,
794 })
795 }
796 RuntimeEffectCommand::AwaitEvent { key } => Err(RuntimeEffectControllerError::new(
797 "runtime_effect_await_event_key_mismatch",
798 format!("local event wait expected `{expected_key}`, got `{key}`"),
799 )),
800 command => Err(RuntimeEffectControllerError::new(
801 "runtime_effect_local_executor_mismatch",
802 format!(
803 "local event wait executor cannot execute {} command",
804 command.kind().as_str()
805 ),
806 )),
807 }
808}
809
810async fn sleep_with_cancellation(
811 duration_ms: u64,
812 cancellation: &CancellationToken,
813) -> Result<(), RuntimeEffectControllerError> {
814 let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
815 tokio::pin!(sleep);
816 tokio::select! {
817 _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
818 "runtime_effect_sleep_cancelled",
819 "runtime effect sleep was cancelled",
820 )),
821 _ = &mut sleep => Ok(()),
822 }
823}
824
825#[derive(Clone, Default)]
835pub struct InlineRuntimeEffectController;
836
837#[async_trait::async_trait]
838impl RuntimeEffectController for InlineRuntimeEffectController {
839 async fn execute_effect(
840 &self,
841 envelope: RuntimeEffectEnvelope,
842 local_executor: RuntimeEffectLocalExecutor<'_>,
843 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
844 match envelope.command {
845 RuntimeEffectCommand::Process { command } => {
846 let execution = local_executor.into_process()?;
847 let registry = execution.registry;
848 let result = tokio::task::spawn(async move {
849 Self::execute_process_command(registry, *command).await
850 })
851 .await
852 .map_err(|err| {
853 RuntimeEffectControllerError::new(
854 "runtime_effect_process_task_join",
855 format!("inline process effect task failed: {err}"),
856 )
857 })??;
858 Ok(RuntimeEffectOutcome::Process { result })
859 }
860 _ => local_executor.execute(envelope).await,
861 }
862 }
863}
864
865#[derive(Clone)]
867pub struct InlineEffectHost {
868 controller: Arc<dyn RuntimeEffectController>,
869}
870
871impl InlineEffectHost {
872 pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
873 Self { controller }
874 }
875}
876
877impl Default for InlineEffectHost {
878 fn default() -> Self {
879 Self::new(Arc::new(InlineRuntimeEffectController))
880 }
881}
882
883impl EffectHost for InlineEffectHost {
884 fn durability_tier(&self) -> crate::DurabilityTier {
885 self.controller.durability_tier()
886 }
887
888 fn requires_durable_attachment_store(&self) -> bool {
889 self.controller.requires_durable_attachment_store()
890 }
891
892 fn scoped<'run>(
893 &'run self,
894 scope: EffectScope,
895 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
896 ScopedEffectController::shared(Arc::clone(&self.controller), scope)
897 }
898
899 fn scoped_static(
900 &self,
901 scope: EffectScope,
902 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
903 Ok(Some(ScopedEffectController::shared(
904 Arc::clone(&self.controller),
905 scope,
906 )?))
907 }
908}
909
910impl InlineRuntimeEffectController {
911 pub(crate) async fn start_process(
919 registry: Arc<dyn crate::ProcessRegistry>,
920 registration: crate::ProcessRegistration,
921 grant: Option<crate::ProcessStartGrant>,
922 ) -> Result<ProcessRecord, PluginError> {
923 let registration_for_record = registration.clone();
924 let record = registry.register_process(registration_for_record).await?;
925 if let Some(grant) = grant {
926 registry
927 .grant_handle(&grant.session_scope, ®istration.id, grant.descriptor)
928 .await?;
929 }
930 Ok(record)
931 }
932
933 pub(crate) async fn request_process_cancel(
934 &self,
935 registry: Arc<dyn crate::ProcessRegistry>,
936 process_id: &str,
937 reason: Option<String>,
938 ) -> Result<ProcessRecord, PluginError> {
939 registry
943 .append_event(
944 process_id,
945 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
946 )
947 .await?;
948 registry
949 .get_process(process_id)
950 .await
951 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
952 }
953
954 async fn execute_process_command(
955 registry: Arc<dyn crate::ProcessRegistry>,
956 command: ProcessCommand,
957 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
958 match command {
959 ProcessCommand::Start {
960 registration,
961 grant,
962 execution_context: _,
963 } => {
964 let record = Self::start_process(registry, registration, grant).await?;
965 Ok(ProcessEffectOutcome::Start { record })
966 }
967 ProcessCommand::List {
968 session_scope,
969 mode,
970 } => {
971 let entries = match mode {
972 crate::ProcessListMode::Live => {
973 registry.list_live_handle_grants(&session_scope).await?
974 }
975 crate::ProcessListMode::All => {
976 registry.list_handle_grants(&session_scope).await?
977 }
978 };
979 Ok(ProcessEffectOutcome::List { entries })
980 }
981 ProcessCommand::Transfer {
982 from_scope,
983 to_scope,
984 process_ids,
985 } => {
986 registry
987 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
988 .await?;
989 Ok(ProcessEffectOutcome::Transfer)
990 }
991 ProcessCommand::DeleteSession { session_id } => {
992 let report = registry.delete_session_process_state(&session_id).await?;
993 Ok(ProcessEffectOutcome::DeleteSession { report })
994 }
995 ProcessCommand::Await { process_id } => {
996 let output = registry.await_process(&process_id).await?;
997 Ok(ProcessEffectOutcome::Await { output })
998 }
999 ProcessCommand::Cancel { process_id, reason } => {
1000 let record = InlineRuntimeEffectController
1001 .request_process_cancel(registry, &process_id, reason)
1002 .await?;
1003 Ok(ProcessEffectOutcome::Cancel { record })
1004 }
1005 ProcessCommand::Signal {
1006 process_id,
1007 request,
1008 ..
1009 } => {
1010 let result = registry.append_event(&process_id, request).await?;
1011 Ok(ProcessEffectOutcome::Signal {
1012 event: result.event,
1013 })
1014 }
1015 }
1016 }
1017}
1018
1019impl std::fmt::Debug for InlineRuntimeEffectController {
1020 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1021 f.debug_struct("InlineRuntimeEffectController").finish()
1022 }
1023}