1use std::collections::{HashMap, HashSet};
2#[cfg(any(test, feature = "testing"))]
3use std::pin::Pin;
4use std::sync::{Arc, OnceLock};
5use std::time::Instant;
6
7use hmac::{Hmac, Mac};
8use serde::{Deserialize, Serialize};
9use tokio::sync::{Notify, mpsc};
10use tokio_util::sync::CancellationToken;
11
12use crate::AttachmentStore;
13use crate::LlmRequest as CoreLlmRequest;
14use crate::LlmResponse;
15use crate::ProcessRecord;
16use crate::ProcessRegistry;
17use crate::provider::ProviderHandle;
18use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
19use crate::sansio::LlmCallError;
20use crate::{PluginError, RuntimeError, RuntimeErrorCode};
21
22use super::envelope::{
23 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
24 RuntimeEffectKind, RuntimeEffectOutcome,
25};
26use super::outcome::llm_call_error_from_transport;
27
28type HmacSha256 = Hmac<sha2::Sha256>;
29
30fn inline_await_events() -> &'static AwaitEventRegistry {
31 static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
32 REGISTRY.get_or_init(AwaitEventRegistry::new)
33}
34
35#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum ExecutionScope {
47 Turn {
48 session_id: String,
49 turn_id: String,
50 },
51 Process {
52 process_id: String,
53 },
54 QueueDrain {
55 session_id: String,
56 drain_id: String,
57 },
58 SessionDelete {
59 session_id: String,
60 },
61 RuntimeOperation {
62 operation_id: String,
63 },
64}
65
66impl ExecutionScope {
67 pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
68 Self::Turn {
69 session_id: session_id.into(),
70 turn_id: turn_id.into(),
71 }
72 }
73
74 pub fn process(process_id: impl Into<String>) -> Self {
75 Self::Process {
76 process_id: process_id.into(),
77 }
78 }
79
80 pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
81 Self::QueueDrain {
82 session_id: session_id.into(),
83 drain_id: drain_id.into(),
84 }
85 }
86
87 pub fn session_delete(session_id: impl Into<String>) -> Self {
88 Self::SessionDelete {
89 session_id: session_id.into(),
90 }
91 }
92
93 pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
94 Self::RuntimeOperation {
95 operation_id: operation_id.into(),
96 }
97 }
98
99 pub fn id(&self) -> &str {
100 match self {
101 Self::Turn { turn_id, .. } => turn_id,
102 Self::Process { process_id } => process_id,
103 Self::QueueDrain { drain_id, .. } => drain_id,
104 Self::SessionDelete { session_id } => session_id,
105 Self::RuntimeOperation { operation_id } => operation_id,
106 }
107 }
108
109 pub fn session_id(&self) -> Option<&str> {
110 match self {
111 Self::Turn { session_id, .. }
112 | Self::QueueDrain { session_id, .. }
113 | Self::SessionDelete { session_id } => Some(session_id),
114 Self::Process { .. } | Self::RuntimeOperation { .. } => None,
115 }
116 }
117
118 pub fn turn_id(&self) -> Option<&str> {
119 match self {
120 Self::Turn { turn_id, .. } => Some(turn_id),
121 _ => None,
122 }
123 }
124
125 pub fn validates_turn_trace_id(&self) -> bool {
126 matches!(self, Self::Turn { .. })
127 }
128
129 fn validate(&self) -> Result<(), RuntimeError> {
130 let missing = match self {
131 Self::Turn {
132 session_id,
133 turn_id,
134 } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
135 Self::Process { process_id } => process_id.trim().is_empty(),
136 Self::QueueDrain {
137 session_id,
138 drain_id,
139 } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
140 Self::SessionDelete { session_id } => session_id.trim().is_empty(),
141 Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
142 };
143 if missing {
144 return Err(RuntimeError::new(
145 RuntimeErrorCode::MissingExecutionScopeId,
146 "execution scopes require non-empty stable ids",
147 ));
148 }
149 Ok(())
150 }
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
154#[serde(tag = "type", rename_all = "snake_case")]
155pub enum AwaitEventWaitIdentity {
156 ToolCompletion {
157 tool_call_id: String,
158 },
159 ProcessSignal {
160 process_id: String,
161 signal_name: String,
162 ordinal: u64,
163 },
164 Custom {
165 key: String,
166 },
167}
168
169impl AwaitEventWaitIdentity {
170 pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
171 Self::ToolCompletion {
172 tool_call_id: tool_call_id.into(),
173 }
174 }
175
176 pub fn process_signal(
177 process_id: impl Into<String>,
178 signal_name: impl Into<String>,
179 ordinal: u64,
180 ) -> Self {
181 Self::ProcessSignal {
182 process_id: process_id.into(),
183 signal_name: signal_name.into(),
184 ordinal,
185 }
186 }
187
188 fn validate(&self) -> Result<(), RuntimeError> {
189 let invalid = match self {
190 Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
191 Self::ProcessSignal {
192 process_id,
193 signal_name,
194 ordinal,
195 } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
196 Self::Custom { key } => key.trim().is_empty(),
197 };
198 if invalid {
199 return Err(RuntimeError::new(
200 "invalid_await_event_wait_identity",
201 "await-event wait identity requires non-empty stable ids",
202 ));
203 }
204 Ok(())
205 }
206}
207
208#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
209pub struct AwaitEventKey {
210 pub scope: ExecutionScope,
211 pub wait: AwaitEventWaitIdentity,
212 pub key_id: String,
213 pub signature: String,
214}
215
216impl AwaitEventKey {
217 pub fn promise_key(&self) -> String {
218 format!("lash-await-event:{}", self.key_id)
219 }
220}
221
222#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
223pub struct ExternalCompletionError {
224 pub code: String,
225 pub message: String,
226 #[serde(default, skip_serializing_if = "Option::is_none")]
227 pub raw: Option<serde_json::Value>,
228}
229
230impl ExternalCompletionError {
231 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
232 Self {
233 code: code.into(),
234 message: message.into(),
235 raw: None,
236 }
237 }
238}
239
240#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
241#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
242pub enum Resolution {
243 Ok(serde_json::Value),
244 Err(ExternalCompletionError),
245 Timeout,
246 Cancelled,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "status", rename_all = "snake_case")]
251pub enum ResolveOutcome {
252 Accepted,
253 AlreadyResolved { terminal: Resolution },
254 UnknownOrRevoked,
255}
256
257#[derive(Debug)]
258struct AwaitEventEntry {
259 terminal: Option<Resolution>,
260 notify: Arc<Notify>,
261}
262
263#[derive(Debug)]
264struct AwaitEventRegistryState {
265 entries: HashMap<String, AwaitEventEntry>,
266 revoked_key_ids: HashSet<String>,
267 revoked_session_ids: HashSet<String>,
268}
269
270#[derive(Debug)]
271struct AwaitEventRegistry {
272 secret: Vec<u8>,
273 state: std::sync::Mutex<AwaitEventRegistryState>,
274}
275
276impl AwaitEventRegistry {
277 fn new() -> Self {
278 Self {
279 secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
280 state: std::sync::Mutex::new(AwaitEventRegistryState {
281 entries: HashMap::new(),
282 revoked_key_ids: HashSet::new(),
283 revoked_session_ids: HashSet::new(),
284 }),
285 }
286 }
287
288 fn key_for(
289 &self,
290 scope: &ExecutionScope,
291 wait: AwaitEventWaitIdentity,
292 ) -> Result<AwaitEventKey, RuntimeError> {
293 scope.validate()?;
294 wait.validate()?;
295 let key_id =
296 crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
297 RuntimeError::new(
298 "await_event_key_hash",
299 format!("failed to hash await-event identity: {err}"),
300 )
301 })?;
302 let signature = self.signature(scope, &wait, &key_id)?;
303 Ok(AwaitEventKey {
304 scope: scope.clone(),
305 wait,
306 key_id,
307 signature,
308 })
309 }
310
311 fn signature(
312 &self,
313 scope: &ExecutionScope,
314 wait: &AwaitEventWaitIdentity,
315 key_id: &str,
316 ) -> Result<String, RuntimeError> {
317 let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
318 RuntimeError::new(
319 "await_event_key_sign",
320 format!("failed to initialize await-event key signer: {err}"),
321 )
322 })?;
323 let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
324 RuntimeError::new(
325 "await_event_key_sign",
326 format!("failed to serialize await-event key identity: {err}"),
327 )
328 })?;
329 mac.update(&canonical);
330 Ok(format!("{:x}", mac.finalize().into_bytes()))
331 }
332
333 fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
334 let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
335 Ok(expected == key.signature)
336 }
337
338 fn resolve(
339 &self,
340 key: &AwaitEventKey,
341 resolution: Resolution,
342 ) -> Result<ResolveOutcome, RuntimeError> {
343 if !self.verify(key)? {
344 return Ok(ResolveOutcome::UnknownOrRevoked);
345 }
346 let mut state = self.state.lock().map_err(|_| {
347 RuntimeError::new(
348 "await_event_registry_poisoned",
349 "await-event registry lock poisoned",
350 )
351 })?;
352 if state.revoked_key_ids.contains(&key.key_id)
353 || key
354 .scope
355 .session_id()
356 .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
357 {
358 return Ok(ResolveOutcome::UnknownOrRevoked);
359 }
360 let entry = state
361 .entries
362 .entry(key.key_id.clone())
363 .or_insert_with(|| AwaitEventEntry {
364 terminal: None,
365 notify: Arc::new(Notify::new()),
366 });
367 if let Some(terminal) = &entry.terminal {
368 return Ok(ResolveOutcome::AlreadyResolved {
369 terminal: terminal.clone(),
370 });
371 }
372 entry.terminal = Some(resolution);
373 entry.notify.notify_waiters();
374 Ok(ResolveOutcome::Accepted)
375 }
376
377 async fn await_resolution(
378 &self,
379 key: &AwaitEventKey,
380 cancel: CancellationToken,
381 deadline: Option<Instant>,
382 ) -> Result<Resolution, RuntimeError> {
383 if !self.verify(key)? {
384 return Err(RuntimeError::new(
385 "await_event_unknown_or_revoked",
386 "await-event key is invalid or revoked",
387 ));
388 }
389 loop {
390 let notify =
391 {
392 let mut state = self.state.lock().map_err(|_| {
393 RuntimeError::new(
394 "await_event_registry_poisoned",
395 "await-event registry lock poisoned",
396 )
397 })?;
398 if state.revoked_key_ids.contains(&key.key_id)
399 || key.scope.session_id().is_some_and(|session_id| {
400 state.revoked_session_ids.contains(session_id)
401 })
402 {
403 return Err(RuntimeError::new(
404 "await_event_unknown_or_revoked",
405 "await-event key is invalid or revoked",
406 ));
407 }
408 let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
409 AwaitEventEntry {
410 terminal: None,
411 notify: Arc::new(Notify::new()),
412 }
413 });
414 if let Some(terminal) = entry.terminal.clone() {
415 return Ok(terminal);
416 }
417 Arc::clone(&entry.notify)
418 };
419 if let Some(deadline) = deadline {
420 tokio::select! {
421 _ = cancel.cancelled() => {
422 let _ = self.resolve(key, Resolution::Cancelled)?;
423 }
424 _ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
425 let _ = self.resolve(key, Resolution::Timeout)?;
426 }
427 _ = notify.notified() => {}
428 }
429 } else {
430 tokio::select! {
431 _ = cancel.cancelled() => {
432 let _ = self.resolve(key, Resolution::Cancelled)?;
433 }
434 _ = notify.notified() => {}
435 }
436 }
437 }
438 }
439
440 fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
441 let mut state = self.state.lock().map_err(|_| {
442 RuntimeError::new(
443 "await_event_registry_poisoned",
444 "await-event registry lock poisoned",
445 )
446 })?;
447 state.revoked_session_ids.insert(session_id.to_string());
448 for entry in state.entries.values() {
449 entry.notify.notify_waiters();
450 }
451 Ok(())
452 }
453}
454
455enum ScopedEffectControllerInner<'run> {
456 Borrowed(&'run dyn RuntimeEffectController),
457 Shared(Arc<dyn RuntimeEffectController>),
458}
459
460impl Clone for ScopedEffectControllerInner<'_> {
461 fn clone(&self) -> Self {
462 match self {
463 Self::Borrowed(controller) => Self::Borrowed(*controller),
464 Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
465 }
466 }
467}
468
469#[derive(Clone)]
471pub struct ScopedEffectController<'run> {
472 controller: ScopedEffectControllerInner<'run>,
473 scope: ExecutionScope,
474}
475
476impl<'run> ScopedEffectController<'run> {
477 pub fn borrowed(
478 controller: &'run dyn RuntimeEffectController,
479 scope: ExecutionScope,
480 ) -> Result<Self, RuntimeError> {
481 scope.validate()?;
482 Ok(Self {
483 controller: ScopedEffectControllerInner::Borrowed(controller),
484 scope,
485 })
486 }
487
488 pub fn shared(
489 controller: Arc<dyn RuntimeEffectController>,
490 scope: ExecutionScope,
491 ) -> Result<Self, RuntimeError> {
492 scope.validate()?;
493 Ok(Self {
494 controller: ScopedEffectControllerInner::Shared(controller),
495 scope,
496 })
497 }
498
499 pub fn controller(&self) -> &dyn RuntimeEffectController {
500 match &self.controller {
501 ScopedEffectControllerInner::Borrowed(controller) => *controller,
502 ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
503 }
504 }
505
506 pub fn execution_scope(&self) -> &ExecutionScope {
507 &self.scope
508 }
509
510 pub fn scope_id(&self) -> &str {
511 self.scope.id()
512 }
513
514 pub fn turn_id(&self) -> Option<&str> {
515 self.scope.turn_id()
516 }
517}
518
519#[async_trait::async_trait]
521pub trait EffectHost: Send + Sync {
522 fn durability_tier(&self) -> crate::DurabilityTier {
523 crate::DurabilityTier::Inline
524 }
525
526 fn requires_durable_attachment_store(&self) -> bool {
527 false
528 }
529
530 fn scoped<'run>(
531 &'run self,
532 scope: ExecutionScope,
533 ) -> Result<ScopedEffectController<'run>, RuntimeError>;
534
535 fn scoped_static(
536 &self,
537 _scope: ExecutionScope,
538 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
539 Ok(None)
540 }
541
542 async fn await_event_key(
543 &self,
544 _scope: &ExecutionScope,
545 _wait: AwaitEventWaitIdentity,
546 ) -> Result<AwaitEventKey, RuntimeError> {
547 Err(RuntimeError::new(
548 "await_event_unsupported",
549 "this effect host does not support await-event keys",
550 ))
551 }
552
553 async fn resolve_await_event(
554 &self,
555 _key: &AwaitEventKey,
556 _resolution: Resolution,
557 ) -> Result<ResolveOutcome, RuntimeError> {
558 Ok(ResolveOutcome::UnknownOrRevoked)
559 }
560
561 async fn await_await_event(
562 &self,
563 _key: &AwaitEventKey,
564 _cancel: CancellationToken,
565 _deadline: Option<Instant>,
566 ) -> Result<Resolution, RuntimeError> {
567 Err(RuntimeError::new(
568 "await_event_unsupported",
569 "this effect host does not support await-event waits",
570 ))
571 }
572
573 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
574 Ok(())
575 }
576}
577
578#[async_trait::async_trait]
580pub trait RuntimeEffectController: Send + Sync {
581 fn durability_tier(&self) -> crate::DurabilityTier {
584 crate::DurabilityTier::Inline
585 }
586
587 fn requires_durable_attachment_store(&self) -> bool {
588 false
589 }
590
591 async fn execute_effect(
592 &self,
593 envelope: RuntimeEffectEnvelope,
594 local_executor: RuntimeEffectLocalExecutor<'_>,
595 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
596
597 async fn await_event_key(
598 &self,
599 _scope: &ExecutionScope,
600 _wait: AwaitEventWaitIdentity,
601 ) -> Result<AwaitEventKey, RuntimeError> {
602 Err(RuntimeError::new(
603 "await_event_unsupported",
604 "this effect controller does not support await-event keys",
605 ))
606 }
607
608 async fn resolve_await_event(
609 &self,
610 _key: &AwaitEventKey,
611 _resolution: Resolution,
612 ) -> Result<ResolveOutcome, RuntimeError> {
613 Ok(ResolveOutcome::UnknownOrRevoked)
614 }
615
616 async fn await_await_event(
617 &self,
618 _key: &AwaitEventKey,
619 _cancel: CancellationToken,
620 _deadline: Option<Instant>,
621 ) -> Result<Resolution, RuntimeError> {
622 Err(RuntimeError::new(
623 "await_event_unsupported",
624 "this effect controller does not support await-event waits",
625 ))
626 }
627
628 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
629 Ok(())
630 }
631}
632
633#[derive(Clone)]
636pub(crate) enum RuntimeEffectControllerHandle<'run> {
637 Borrowed(ScopedEffectController<'run>),
638 #[cfg(any(test, feature = "testing"))]
639 Shared {
640 controller: Arc<dyn RuntimeEffectController>,
641 scope: ExecutionScope,
642 },
643}
644
645impl<'run> RuntimeEffectControllerHandle<'run> {
646 pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
647 Self::Borrowed(scoped)
648 }
649
650 #[cfg(any(test, feature = "testing"))]
651 pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
652 Self::Shared {
653 controller,
654 scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
655 }
656 }
657
658 pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
659 match self {
660 Self::Borrowed(scoped) => scoped.controller(),
661 #[cfg(any(test, feature = "testing"))]
662 Self::Shared { controller, .. } => controller.as_ref(),
663 }
664 }
665
666 pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
667 match self {
668 Self::Borrowed(scoped) => scoped.clone(),
669 #[cfg(any(test, feature = "testing"))]
670 Self::Shared { controller, scope } => {
671 ScopedEffectController::shared(Arc::clone(controller), scope.clone())
672 .expect("runtime effect controller handle carries a valid scope")
673 }
674 }
675 }
676
677 pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
678 self.clone()
679 }
680}
681
682#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
683#[error("{code}: {message}")]
684pub struct RuntimeEffectControllerError {
685 pub code: String,
686 pub message: String,
687}
688
689impl RuntimeEffectControllerError {
690 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
691 Self {
692 code: code.into(),
693 message: message.into(),
694 }
695 }
696
697 pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
698 Self::new(
699 "runtime_effect_wrong_outcome",
700 format!(
701 "expected {} outcome, got {}",
702 expected.as_str(),
703 actual.as_str()
704 ),
705 )
706 }
707
708 pub(crate) fn into_runtime_error(self) -> RuntimeError {
709 RuntimeError::new(self.code, self.message)
710 }
711}
712
713impl From<RuntimeError> for RuntimeEffectControllerError {
714 fn from(err: RuntimeError) -> Self {
715 Self::new(err.code.as_str(), err.message)
716 }
717}
718
719impl From<PluginError> for RuntimeEffectControllerError {
720 fn from(err: PluginError) -> Self {
721 Self::new("plugin", err.to_string())
722 }
723}
724
725impl From<crate::StoreError> for RuntimeEffectControllerError {
726 fn from(err: crate::StoreError) -> Self {
727 Self::new("runtime_store", err.to_string())
728 }
729}
730
731#[async_trait::async_trait]
736pub(crate) trait ProcessRunner: Send + Sync {
737 async fn run_process(
738 &self,
739 registration: crate::ProcessRegistration,
740 execution_context: crate::ProcessExecutionContext,
741 registry: Arc<dyn ProcessRegistry>,
742 scoped_effect_controller: crate::ScopedEffectController<'_>,
743 cancellation: CancellationToken,
744 ) -> crate::ProcessAwaitOutput;
745}
746
747pub struct ProcessLocalExecution {
748 pub registry: Arc<dyn ProcessRegistry>,
749}
750
751pub(super) struct LocalTurnEffectRunner<'a, 'run> {
752 driver: &'a mut RuntimeTurnDriver<'run>,
753 machine: &'a mut crate::TurnMachine,
754 event_tx: mpsc::Sender<RuntimeStreamEvent>,
755 cancellation: CancellationToken,
756}
757
758pub(super) struct LocalDirectEffectRunner {
759 provider: ProviderHandle,
760 attachment_store: Arc<dyn AttachmentStore>,
761}
762
763#[async_trait::async_trait]
764trait RuntimeEffectLocalRunner: Send {
765 async fn execute(
766 self: Box<Self>,
767 envelope: RuntimeEffectEnvelope,
768 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
769}
770
771#[cfg(any(test, feature = "testing"))]
772type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
773 RuntimeEffectEnvelope,
774 ) -> Pin<
775 Box<
776 dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
777 + Send
778 + 'run,
779 >,
780 > + Send
781 + 'run;
782
783#[cfg(any(test, feature = "testing"))]
784struct TestingRuntimeEffectLocalRunner<'run> {
785 run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
786}
787
788enum RuntimeEffectLocalExecutorState<'run> {
789 Unavailable,
790 SleepOnly {
791 cancellation: CancellationToken,
792 },
793 ExternalWaitOptions {
794 cancellation: CancellationToken,
795 deadline: Option<Instant>,
796 },
797 Process(ProcessLocalExecution),
798 Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
799}
800
801pub struct RuntimeEffectLocalExecutor<'run> {
807 state: RuntimeEffectLocalExecutorState<'run>,
808}
809
810impl<'run> RuntimeEffectLocalExecutor<'run> {
811 pub fn unavailable() -> Self {
812 Self {
813 state: RuntimeEffectLocalExecutorState::Unavailable,
814 }
815 }
816
817 pub fn sleep(cancellation: CancellationToken) -> Self {
818 Self {
819 state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
820 }
821 }
822
823 pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
824 Self {
825 state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
826 cancellation,
827 deadline,
828 },
829 }
830 }
831
832 pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
833 Self {
834 state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
835 }
836 }
837
838 #[cfg(any(test, feature = "testing"))]
839 pub fn testing<F, Fut>(run: F) -> Self
840 where
841 F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
842 Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
843 + Send
844 + 'run,
845 {
846 Self {
847 state: RuntimeEffectLocalExecutorState::Runner(Box::new(
848 TestingRuntimeEffectLocalRunner {
849 run: Box::new(move |envelope| Box::pin(run(envelope))),
850 },
851 )),
852 }
853 }
854
855 pub(in crate::runtime) fn turn<'scope>(
856 driver: &'run mut RuntimeTurnDriver<'scope>,
857 machine: &'run mut crate::TurnMachine,
858 event_tx: mpsc::Sender<RuntimeStreamEvent>,
859 cancellation: CancellationToken,
860 ) -> Self
861 where
862 'scope: 'run,
863 {
864 Self {
865 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
866 driver,
867 machine,
868 event_tx,
869 cancellation,
870 })),
871 }
872 }
873
874 pub(in crate::runtime) fn direct(
875 provider: ProviderHandle,
876 attachment_store: Arc<dyn AttachmentStore>,
877 ) -> Self {
878 Self {
879 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
880 provider,
881 attachment_store,
882 })),
883 }
884 }
885
886 pub async fn execute(
887 self,
888 envelope: RuntimeEffectEnvelope,
889 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
890 match self.state {
891 RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
892 RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
893 execute_local_sleep(envelope, cancellation).await
894 }
895 RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
896 Err(RuntimeEffectControllerError::new(
897 "runtime_effect_local_executor_mismatch",
898 format!(
899 "local await-event options cannot execute {} command directly",
900 envelope.command.kind().as_str()
901 ),
902 ))
903 }
904 RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
905 "runtime_effect_local_executor_unavailable",
906 format!(
907 "no local executor is available for {}",
908 envelope.command.kind().as_str()
909 ),
910 )),
911 RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
912 "runtime_effect_local_executor_mismatch",
913 format!(
914 "process executor cannot execute {} command directly",
915 envelope.command.kind().as_str()
916 ),
917 )),
918 }
919 }
920
921 pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
922 match self.state {
923 RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
924 _ => Err(RuntimeEffectControllerError::new(
925 "runtime_effect_local_executor_unavailable",
926 "no process executor is available for process command",
927 )),
928 }
929 }
930
931 fn into_await_event_options(
932 self,
933 ) -> Result<(CancellationToken, Option<Instant>), RuntimeEffectControllerError> {
934 match self.state {
935 RuntimeEffectLocalExecutorState::ExternalWaitOptions {
936 cancellation,
937 deadline,
938 } => Ok((cancellation, deadline)),
939 _ => Ok((CancellationToken::new(), None)),
940 }
941 }
942}
943
944#[cfg(any(test, feature = "testing"))]
945#[async_trait::async_trait]
946impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
947 async fn execute(
948 self: Box<Self>,
949 envelope: RuntimeEffectEnvelope,
950 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
951 (self.run)(envelope).await
952 }
953}
954
955#[async_trait::async_trait]
956impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
957 async fn execute(
958 self: Box<Self>,
959 envelope: RuntimeEffectEnvelope,
960 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
961 let runner = *self;
962 match envelope.command {
963 RuntimeEffectCommand::LlmCall { request } => {
964 let protocol_iteration = runner.machine.protocol_iteration();
965 let (result, text_streamed) = runner
966 .driver
967 .run_llm_call(
968 Arc::new((*request).into_request(None, None)),
969 protocol_iteration,
970 envelope.invocation,
971 &runner.event_tx,
972 &runner.cancellation,
973 )
974 .await;
975 Ok(RuntimeEffectOutcome::LlmCall {
976 result,
977 text_streamed,
978 })
979 }
980 RuntimeEffectCommand::ToolCall { call } => {
981 let tool_name = call.tool_name.clone();
982 let mut outcome = runner
983 .driver
984 .run_tool_calls(
985 vec![(call, envelope.invocation)],
986 &runner.event_tx,
987 &runner.cancellation,
988 )
989 .await?;
990 let launch = outcome.launches.pop().ok_or_else(|| {
991 RuntimeEffectControllerError::new(
992 "tool_result_missing",
993 format!("tool `{tool_name}` completed without a launch result"),
994 )
995 })?;
996 Ok(RuntimeEffectOutcome::ToolCall {
997 launch,
998 triggers: outcome.triggers,
999 })
1000 }
1001 RuntimeEffectCommand::ExecCode { code } => {
1002 let protocol_iteration = runner.machine.protocol_iteration();
1003 let messages = runner.machine.message_sequence();
1004 Ok(RuntimeEffectOutcome::ExecCode {
1005 result: runner
1006 .driver
1007 .run_exec_code(
1008 &code,
1009 messages,
1010 protocol_iteration,
1011 envelope.invocation,
1012 &runner.event_tx,
1013 )
1014 .await,
1015 })
1016 }
1017 RuntimeEffectCommand::Checkpoint { checkpoint } => {
1018 Ok(RuntimeEffectOutcome::Checkpoint {
1019 result: runner
1020 .driver
1021 .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1022 .await
1023 .map_err(RuntimeEffectControllerError::from),
1024 })
1025 }
1026 RuntimeEffectCommand::SyncExecutionEnvironment {
1027 update_machine_config,
1028 } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1029 result: runner
1030 .driver
1031 .refresh_execution_environment(runner.machine, update_machine_config)
1032 .await
1033 .map_err(|err| err.to_string()),
1034 }),
1035 RuntimeEffectCommand::Sleep { duration_ms } => {
1036 sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
1037 Ok(RuntimeEffectOutcome::Sleep)
1038 }
1039 command => Err(RuntimeEffectControllerError::new(
1040 "runtime_effect_local_executor_mismatch",
1041 format!(
1042 "local turn executor cannot execute {} command",
1043 command.kind().as_str()
1044 ),
1045 )),
1046 }
1047 }
1048}
1049
1050#[async_trait::async_trait]
1051impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1052 async fn execute(
1053 mut self: Box<Self>,
1054 envelope: RuntimeEffectEnvelope,
1055 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1056 match envelope.command {
1057 RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1058 result: self
1059 .run_direct_llm_request((*request).into_request(
1060 crate::session_model::transport_stream_events(&self.provider, None),
1061 None,
1062 ))
1063 .await,
1064 }),
1065 RuntimeEffectCommand::Sleep { duration_ms } => {
1066 sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
1067 Ok(RuntimeEffectOutcome::Sleep)
1068 }
1069 command => Err(RuntimeEffectControllerError::new(
1070 "runtime_effect_local_executor_mismatch",
1071 format!(
1072 "local direct executor cannot execute {} command",
1073 command.kind().as_str()
1074 ),
1075 )),
1076 }
1077 }
1078}
1079
1080impl LocalDirectEffectRunner {
1081 async fn run_direct_llm_request(
1082 &mut self,
1083 request: CoreLlmRequest,
1084 ) -> Result<LlmResponse, LlmCallError> {
1085 let request = crate::attachments::resolve_llm_request_attachments(
1086 request,
1087 self.attachment_store.as_ref(),
1088 )
1089 .await
1090 .map_err(|err| LlmCallError {
1091 message: err.to_string(),
1092 retryable: false,
1093 raw: None,
1094 code: Some("attachment_resolution_failed".to_string()),
1095 terminal_reason: crate::LlmTerminalReason::ProviderError,
1096 request_body: None,
1097 })?;
1098 self.provider
1099 .complete(request)
1100 .await
1101 .map_err(llm_call_error_from_transport)
1102 }
1103}
1104
1105async fn execute_local_sleep(
1106 envelope: RuntimeEffectEnvelope,
1107 cancellation: CancellationToken,
1108) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1109 match envelope.command {
1110 RuntimeEffectCommand::Sleep { duration_ms } => {
1111 sleep_with_cancellation(duration_ms, &cancellation).await?;
1112 Ok(RuntimeEffectOutcome::Sleep)
1113 }
1114 command => Err(RuntimeEffectControllerError::new(
1115 "runtime_effect_local_executor_mismatch",
1116 format!(
1117 "local sleep executor cannot execute {} command",
1118 command.kind().as_str()
1119 ),
1120 )),
1121 }
1122}
1123
1124async fn sleep_with_cancellation(
1125 duration_ms: u64,
1126 cancellation: &CancellationToken,
1127) -> Result<(), RuntimeEffectControllerError> {
1128 let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
1129 tokio::pin!(sleep);
1130 tokio::select! {
1131 _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1132 "runtime_effect_sleep_cancelled",
1133 "runtime effect sleep was cancelled",
1134 )),
1135 _ = &mut sleep => Ok(()),
1136 }
1137}
1138
1139#[derive(Clone, Default)]
1149pub struct InlineRuntimeEffectController;
1150
1151#[async_trait::async_trait]
1152impl RuntimeEffectController for InlineRuntimeEffectController {
1153 async fn execute_effect(
1154 &self,
1155 envelope: RuntimeEffectEnvelope,
1156 local_executor: RuntimeEffectLocalExecutor<'_>,
1157 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1158 match envelope.command {
1159 RuntimeEffectCommand::AwaitEvent { key } => {
1160 let (cancellation, deadline) = local_executor.into_await_event_options()?;
1161 let resolution = self
1162 .await_await_event(&key, cancellation, deadline)
1163 .await
1164 .map_err(RuntimeEffectControllerError::from)?;
1165 Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1166 }
1167 RuntimeEffectCommand::Process { command } => {
1168 let execution = local_executor.into_process()?;
1169 let registry = execution.registry;
1170 let result = tokio::task::spawn(async move {
1171 Self::execute_process_command(registry, *command).await
1172 })
1173 .await
1174 .map_err(|err| {
1175 RuntimeEffectControllerError::new(
1176 "runtime_effect_process_task_join",
1177 format!("inline process effect task failed: {err}"),
1178 )
1179 })??;
1180 Ok(RuntimeEffectOutcome::Process { result })
1181 }
1182 _ => local_executor.execute(envelope).await,
1183 }
1184 }
1185
1186 async fn await_event_key(
1187 &self,
1188 scope: &ExecutionScope,
1189 wait: AwaitEventWaitIdentity,
1190 ) -> Result<AwaitEventKey, RuntimeError> {
1191 inline_await_events().key_for(scope, wait)
1192 }
1193
1194 async fn resolve_await_event(
1195 &self,
1196 key: &AwaitEventKey,
1197 resolution: Resolution,
1198 ) -> Result<ResolveOutcome, RuntimeError> {
1199 inline_await_events().resolve(key, resolution)
1200 }
1201
1202 async fn await_await_event(
1203 &self,
1204 key: &AwaitEventKey,
1205 cancel: CancellationToken,
1206 deadline: Option<Instant>,
1207 ) -> Result<Resolution, RuntimeError> {
1208 inline_await_events()
1209 .await_resolution(key, cancel, deadline)
1210 .await
1211 }
1212
1213 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1214 inline_await_events().revoke_session(session_id)
1215 }
1216}
1217
1218#[derive(Clone)]
1220pub struct InlineEffectHost {
1221 controller: Arc<dyn RuntimeEffectController>,
1222}
1223
1224impl InlineEffectHost {
1225 pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1226 Self { controller }
1227 }
1228}
1229
1230impl Default for InlineEffectHost {
1231 fn default() -> Self {
1232 Self::new(Arc::new(InlineRuntimeEffectController))
1233 }
1234}
1235
1236#[async_trait::async_trait]
1237impl EffectHost for InlineEffectHost {
1238 fn durability_tier(&self) -> crate::DurabilityTier {
1239 self.controller.durability_tier()
1240 }
1241
1242 fn requires_durable_attachment_store(&self) -> bool {
1243 self.controller.requires_durable_attachment_store()
1244 }
1245
1246 fn scoped<'run>(
1247 &'run self,
1248 scope: ExecutionScope,
1249 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1250 ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1251 }
1252
1253 fn scoped_static(
1254 &self,
1255 scope: ExecutionScope,
1256 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1257 Ok(Some(ScopedEffectController::shared(
1258 Arc::clone(&self.controller),
1259 scope,
1260 )?))
1261 }
1262
1263 async fn await_event_key(
1264 &self,
1265 scope: &ExecutionScope,
1266 wait: AwaitEventWaitIdentity,
1267 ) -> Result<AwaitEventKey, RuntimeError> {
1268 self.controller.await_event_key(scope, wait).await
1269 }
1270
1271 async fn resolve_await_event(
1272 &self,
1273 key: &AwaitEventKey,
1274 resolution: Resolution,
1275 ) -> Result<ResolveOutcome, RuntimeError> {
1276 self.controller.resolve_await_event(key, resolution).await
1277 }
1278
1279 async fn await_await_event(
1280 &self,
1281 key: &AwaitEventKey,
1282 cancel: CancellationToken,
1283 deadline: Option<Instant>,
1284 ) -> Result<Resolution, RuntimeError> {
1285 self.controller
1286 .await_await_event(key, cancel, deadline)
1287 .await
1288 }
1289
1290 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1291 self.controller
1292 .revoke_await_events_for_session(session_id)
1293 .await
1294 }
1295}
1296
1297impl InlineRuntimeEffectController {
1298 pub(crate) async fn start_process(
1306 registry: Arc<dyn crate::ProcessRegistry>,
1307 registration: crate::ProcessRegistration,
1308 grant: Option<crate::ProcessStartGrant>,
1309 ) -> Result<ProcessRecord, PluginError> {
1310 let registration_for_record = registration.clone();
1311 let record = registry.register_process(registration_for_record).await?;
1312 if let Some(grant) = grant {
1313 registry
1314 .grant_handle(&grant.session_scope, ®istration.id, grant.descriptor)
1315 .await?;
1316 }
1317 Ok(record)
1318 }
1319
1320 pub(crate) async fn request_process_cancel(
1321 &self,
1322 registry: Arc<dyn crate::ProcessRegistry>,
1323 process_id: &str,
1324 reason: Option<String>,
1325 ) -> Result<ProcessRecord, PluginError> {
1326 registry
1330 .append_event(
1331 process_id,
1332 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1333 )
1334 .await?;
1335 registry
1336 .get_process(process_id)
1337 .await
1338 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1339 }
1340
1341 async fn execute_process_command(
1342 registry: Arc<dyn crate::ProcessRegistry>,
1343 command: ProcessCommand,
1344 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1345 match command {
1346 ProcessCommand::Start {
1347 registration,
1348 grant,
1349 execution_context: _,
1350 } => {
1351 let record = Self::start_process(registry, registration, grant).await?;
1352 Ok(ProcessEffectOutcome::Start { record })
1353 }
1354 ProcessCommand::List {
1355 session_scope,
1356 mode,
1357 } => {
1358 let entries = match mode {
1359 crate::ProcessListMode::Live => {
1360 registry.list_live_handle_grants(&session_scope).await?
1361 }
1362 crate::ProcessListMode::All => {
1363 registry.list_handle_grants(&session_scope).await?
1364 }
1365 };
1366 Ok(ProcessEffectOutcome::List { entries })
1367 }
1368 ProcessCommand::Transfer {
1369 from_scope,
1370 to_scope,
1371 process_ids,
1372 } => {
1373 registry
1374 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1375 .await?;
1376 Ok(ProcessEffectOutcome::Transfer)
1377 }
1378 ProcessCommand::DeleteSession { session_id } => {
1379 let report = registry.delete_session_process_state(&session_id).await?;
1380 Ok(ProcessEffectOutcome::DeleteSession { report })
1381 }
1382 ProcessCommand::Await { process_id } => {
1383 let output = registry.await_process(&process_id).await?;
1384 Ok(ProcessEffectOutcome::Await { output })
1385 }
1386 ProcessCommand::Cancel { process_id, reason } => {
1387 let record = InlineRuntimeEffectController
1388 .request_process_cancel(registry, &process_id, reason)
1389 .await?;
1390 Ok(ProcessEffectOutcome::Cancel { record })
1391 }
1392 ProcessCommand::Signal {
1393 process_id,
1394 request,
1395 ..
1396 } => {
1397 let result = registry.append_event(&process_id, request).await?;
1398 Ok(ProcessEffectOutcome::Signal {
1399 event: result.event,
1400 })
1401 }
1402 }
1403 }
1404}
1405
1406impl std::fmt::Debug for InlineRuntimeEffectController {
1407 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1408 f.debug_struct("InlineRuntimeEffectController").finish()
1409 }
1410}