1use std::collections::{HashMap, HashSet};
2use std::pin::Pin;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5
6use hmac::{Hmac, Mac};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{Notify, mpsc};
9use tokio_util::sync::CancellationToken;
10
11use crate::AttachmentStore;
12use crate::LlmRequest as CoreLlmRequest;
13use crate::LlmResponse;
14use crate::ProcessRecord;
15use crate::ProcessRegistry;
16use crate::provider::ProviderHandle;
17use crate::runtime::{RuntimeStreamEvent, RuntimeTurnDriver};
18use crate::sansio::LlmCallError;
19use crate::{PluginError, RuntimeError, RuntimeErrorCode};
20
21use super::envelope::{
22 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
23 RuntimeEffectKind, RuntimeEffectOutcome,
24};
25use super::outcome::llm_call_error_from_transport;
26
27type HmacSha256 = Hmac<sha2::Sha256>;
28
29fn inline_await_events() -> &'static AwaitEventRegistry {
30 static REGISTRY: OnceLock<AwaitEventRegistry> = OnceLock::new();
31 REGISTRY.get_or_init(AwaitEventRegistry::new)
32}
33
34#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
44#[serde(tag = "type", rename_all = "snake_case")]
45pub enum ExecutionScope {
46 Turn {
47 session_id: String,
48 turn_id: String,
49 },
50 Process {
51 process_id: String,
52 },
53 QueueDrain {
54 session_id: String,
55 drain_id: String,
56 },
57 SessionDelete {
58 session_id: String,
59 },
60 RuntimeOperation {
61 operation_id: String,
62 },
63}
64
65impl ExecutionScope {
66 pub fn turn(session_id: impl Into<String>, turn_id: impl Into<String>) -> Self {
67 Self::Turn {
68 session_id: session_id.into(),
69 turn_id: turn_id.into(),
70 }
71 }
72
73 pub fn process(process_id: impl Into<String>) -> Self {
74 Self::Process {
75 process_id: process_id.into(),
76 }
77 }
78
79 pub fn queue_drain(session_id: impl Into<String>, drain_id: impl Into<String>) -> Self {
80 Self::QueueDrain {
81 session_id: session_id.into(),
82 drain_id: drain_id.into(),
83 }
84 }
85
86 pub fn session_delete(session_id: impl Into<String>) -> Self {
87 Self::SessionDelete {
88 session_id: session_id.into(),
89 }
90 }
91
92 pub fn runtime_operation(operation_id: impl Into<String>) -> Self {
93 Self::RuntimeOperation {
94 operation_id: operation_id.into(),
95 }
96 }
97
98 pub fn id(&self) -> &str {
99 match self {
100 Self::Turn { turn_id, .. } => turn_id,
101 Self::Process { process_id } => process_id,
102 Self::QueueDrain { drain_id, .. } => drain_id,
103 Self::SessionDelete { session_id } => session_id,
104 Self::RuntimeOperation { operation_id } => operation_id,
105 }
106 }
107
108 pub fn session_id(&self) -> Option<&str> {
109 match self {
110 Self::Turn { session_id, .. }
111 | Self::QueueDrain { session_id, .. }
112 | Self::SessionDelete { session_id } => Some(session_id),
113 Self::Process { .. } | Self::RuntimeOperation { .. } => None,
114 }
115 }
116
117 pub fn turn_id(&self) -> Option<&str> {
118 match self {
119 Self::Turn { turn_id, .. } => Some(turn_id),
120 _ => None,
121 }
122 }
123
124 pub fn validates_turn_trace_id(&self) -> bool {
125 matches!(self, Self::Turn { .. })
126 }
127
128 fn validate(&self) -> Result<(), RuntimeError> {
129 let missing = match self {
130 Self::Turn {
131 session_id,
132 turn_id,
133 } => session_id.trim().is_empty() || turn_id.trim().is_empty(),
134 Self::Process { process_id } => process_id.trim().is_empty(),
135 Self::QueueDrain {
136 session_id,
137 drain_id,
138 } => session_id.trim().is_empty() || drain_id.trim().is_empty(),
139 Self::SessionDelete { session_id } => session_id.trim().is_empty(),
140 Self::RuntimeOperation { operation_id } => operation_id.trim().is_empty(),
141 };
142 if missing {
143 return Err(RuntimeError::new(
144 RuntimeErrorCode::MissingExecutionScopeId,
145 "execution scopes require non-empty stable ids",
146 ));
147 }
148 Ok(())
149 }
150}
151
152#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
153#[serde(tag = "type", rename_all = "snake_case")]
154pub enum AwaitEventWaitIdentity {
155 ToolCompletion {
156 tool_call_id: String,
157 },
158 ProcessSignal {
159 process_id: String,
160 signal_name: String,
161 ordinal: u64,
162 },
163 Custom {
164 key: String,
165 },
166}
167
168impl AwaitEventWaitIdentity {
169 pub fn tool_completion(tool_call_id: impl Into<String>) -> Self {
170 Self::ToolCompletion {
171 tool_call_id: tool_call_id.into(),
172 }
173 }
174
175 pub fn process_signal(
176 process_id: impl Into<String>,
177 signal_name: impl Into<String>,
178 ordinal: u64,
179 ) -> Self {
180 Self::ProcessSignal {
181 process_id: process_id.into(),
182 signal_name: signal_name.into(),
183 ordinal,
184 }
185 }
186
187 fn validate(&self) -> Result<(), RuntimeError> {
188 let invalid = match self {
189 Self::ToolCompletion { tool_call_id } => tool_call_id.trim().is_empty(),
190 Self::ProcessSignal {
191 process_id,
192 signal_name,
193 ordinal,
194 } => process_id.trim().is_empty() || signal_name.trim().is_empty() || *ordinal == 0,
195 Self::Custom { key } => key.trim().is_empty(),
196 };
197 if invalid {
198 return Err(RuntimeError::new(
199 "invalid_await_event_wait_identity",
200 "await-event wait identity requires non-empty stable ids",
201 ));
202 }
203 Ok(())
204 }
205}
206
207#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub struct AwaitEventKey {
209 pub scope: ExecutionScope,
210 pub wait: AwaitEventWaitIdentity,
211 pub key_id: String,
212 pub signature: String,
213}
214
215impl AwaitEventKey {
216 pub fn promise_key(&self) -> String {
217 format!("lash-await-event:{}", self.key_id)
218 }
219}
220
221#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
222pub struct ExternalCompletionError {
223 pub code: String,
224 pub message: String,
225 #[serde(default, skip_serializing_if = "Option::is_none")]
226 pub raw: Option<serde_json::Value>,
227}
228
229impl ExternalCompletionError {
230 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
231 Self {
232 code: code.into(),
233 message: message.into(),
234 raw: None,
235 }
236 }
237}
238
239#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
240#[serde(tag = "status", content = "payload", rename_all = "snake_case")]
241pub enum Resolution {
242 Ok(serde_json::Value),
243 Err(ExternalCompletionError),
244 Timeout,
245 Cancelled,
246}
247
248#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
249#[serde(tag = "status", rename_all = "snake_case")]
250pub enum ResolveOutcome {
251 Accepted,
252 AlreadyResolved { terminal: Resolution },
253 UnknownOrRevoked,
254}
255
256#[derive(Debug)]
257struct AwaitEventEntry {
258 terminal: Option<Resolution>,
259 notify: Arc<Notify>,
260}
261
262#[derive(Debug)]
263struct AwaitEventRegistryState {
264 entries: HashMap<String, AwaitEventEntry>,
265 revoked_key_ids: HashSet<String>,
266 revoked_session_ids: HashSet<String>,
267}
268
269#[derive(Debug)]
270struct AwaitEventRegistry {
271 secret: Vec<u8>,
272 state: std::sync::Mutex<AwaitEventRegistryState>,
273}
274
275impl AwaitEventRegistry {
276 fn new() -> Self {
277 Self {
278 secret: uuid::Uuid::new_v4().as_bytes().to_vec(),
279 state: std::sync::Mutex::new(AwaitEventRegistryState {
280 entries: HashMap::new(),
281 revoked_key_ids: HashSet::new(),
282 revoked_session_ids: HashSet::new(),
283 }),
284 }
285 }
286
287 fn key_for(
288 &self,
289 scope: &ExecutionScope,
290 wait: AwaitEventWaitIdentity,
291 ) -> Result<AwaitEventKey, RuntimeError> {
292 scope.validate()?;
293 wait.validate()?;
294 let key_id =
295 crate::stable_hash::stable_json_sha256_hex(&(scope, &wait)).map_err(|err| {
296 RuntimeError::new(
297 "await_event_key_hash",
298 format!("failed to hash await-event identity: {err}"),
299 )
300 })?;
301 let signature = self.signature(scope, &wait, &key_id)?;
302 Ok(AwaitEventKey {
303 scope: scope.clone(),
304 wait,
305 key_id,
306 signature,
307 })
308 }
309
310 fn signature(
311 &self,
312 scope: &ExecutionScope,
313 wait: &AwaitEventWaitIdentity,
314 key_id: &str,
315 ) -> Result<String, RuntimeError> {
316 let mut mac = HmacSha256::new_from_slice(&self.secret).map_err(|err| {
317 RuntimeError::new(
318 "await_event_key_sign",
319 format!("failed to initialize await-event key signer: {err}"),
320 )
321 })?;
322 let canonical = serde_json::to_vec(&(scope, wait, key_id)).map_err(|err| {
323 RuntimeError::new(
324 "await_event_key_sign",
325 format!("failed to serialize await-event key identity: {err}"),
326 )
327 })?;
328 mac.update(&canonical);
329 Ok(format!("{:x}", mac.finalize().into_bytes()))
330 }
331
332 fn verify(&self, key: &AwaitEventKey) -> Result<bool, RuntimeError> {
333 let expected = self.signature(&key.scope, &key.wait, &key.key_id)?;
334 Ok(expected == key.signature)
335 }
336
337 fn resolve(
338 &self,
339 key: &AwaitEventKey,
340 resolution: Resolution,
341 ) -> Result<ResolveOutcome, RuntimeError> {
342 if !self.verify(key)? {
343 return Ok(ResolveOutcome::UnknownOrRevoked);
344 }
345 let mut state = self.state.lock().map_err(|_| {
346 RuntimeError::new(
347 "await_event_registry_poisoned",
348 "await-event registry lock poisoned",
349 )
350 })?;
351 if state.revoked_key_ids.contains(&key.key_id)
352 || key
353 .scope
354 .session_id()
355 .is_some_and(|session_id| state.revoked_session_ids.contains(session_id))
356 {
357 return Ok(ResolveOutcome::UnknownOrRevoked);
358 }
359 let entry = state
360 .entries
361 .entry(key.key_id.clone())
362 .or_insert_with(|| AwaitEventEntry {
363 terminal: None,
364 notify: Arc::new(Notify::new()),
365 });
366 if let Some(terminal) = &entry.terminal {
367 return Ok(ResolveOutcome::AlreadyResolved {
368 terminal: terminal.clone(),
369 });
370 }
371 entry.terminal = Some(resolution);
372 entry.notify.notify_waiters();
373 Ok(ResolveOutcome::Accepted)
374 }
375
376 async fn await_resolution(
377 &self,
378 key: &AwaitEventKey,
379 cancel: CancellationToken,
380 deadline: Option<Instant>,
381 ) -> Result<Resolution, RuntimeError> {
382 if !self.verify(key)? {
383 return Err(RuntimeError::new(
384 "await_event_unknown_or_revoked",
385 "await-event key is invalid or revoked",
386 ));
387 }
388 loop {
389 let notify =
390 {
391 let mut state = self.state.lock().map_err(|_| {
392 RuntimeError::new(
393 "await_event_registry_poisoned",
394 "await-event registry lock poisoned",
395 )
396 })?;
397 if state.revoked_key_ids.contains(&key.key_id)
398 || key.scope.session_id().is_some_and(|session_id| {
399 state.revoked_session_ids.contains(session_id)
400 })
401 {
402 return Err(RuntimeError::new(
403 "await_event_unknown_or_revoked",
404 "await-event key is invalid or revoked",
405 ));
406 }
407 let entry = state.entries.entry(key.key_id.clone()).or_insert_with(|| {
408 AwaitEventEntry {
409 terminal: None,
410 notify: Arc::new(Notify::new()),
411 }
412 });
413 if let Some(terminal) = entry.terminal.clone() {
414 return Ok(terminal);
415 }
416 Arc::clone(&entry.notify)
417 };
418 if let Some(deadline) = deadline {
419 tokio::select! {
420 _ = cancel.cancelled() => {
421 let _ = self.resolve(key, Resolution::Cancelled)?;
422 }
423 _ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
424 let _ = self.resolve(key, Resolution::Timeout)?;
425 }
426 _ = notify.notified() => {}
427 }
428 } else {
429 tokio::select! {
430 _ = cancel.cancelled() => {
431 let _ = self.resolve(key, Resolution::Cancelled)?;
432 }
433 _ = notify.notified() => {}
434 }
435 }
436 }
437 }
438
439 fn revoke_session(&self, session_id: &str) -> Result<(), RuntimeError> {
440 let mut state = self.state.lock().map_err(|_| {
441 RuntimeError::new(
442 "await_event_registry_poisoned",
443 "await-event registry lock poisoned",
444 )
445 })?;
446 state.revoked_session_ids.insert(session_id.to_string());
447 for entry in state.entries.values() {
448 entry.notify.notify_waiters();
449 }
450 Ok(())
451 }
452}
453
454enum ScopedEffectControllerInner<'run> {
455 Borrowed(&'run dyn RuntimeEffectController),
456 Shared(Arc<dyn RuntimeEffectController>),
457}
458
459impl Clone for ScopedEffectControllerInner<'_> {
460 fn clone(&self) -> Self {
461 match self {
462 Self::Borrowed(controller) => Self::Borrowed(*controller),
463 Self::Shared(controller) => Self::Shared(Arc::clone(controller)),
464 }
465 }
466}
467
468#[derive(Clone)]
470pub struct ScopedEffectController<'run> {
471 controller: ScopedEffectControllerInner<'run>,
472 scope: ExecutionScope,
473}
474
475impl<'run> ScopedEffectController<'run> {
476 pub fn borrowed(
477 controller: &'run dyn RuntimeEffectController,
478 scope: ExecutionScope,
479 ) -> Result<Self, RuntimeError> {
480 scope.validate()?;
481 Ok(Self {
482 controller: ScopedEffectControllerInner::Borrowed(controller),
483 scope,
484 })
485 }
486
487 pub fn shared(
488 controller: Arc<dyn RuntimeEffectController>,
489 scope: ExecutionScope,
490 ) -> Result<Self, RuntimeError> {
491 scope.validate()?;
492 Ok(Self {
493 controller: ScopedEffectControllerInner::Shared(controller),
494 scope,
495 })
496 }
497
498 pub fn controller(&self) -> &dyn RuntimeEffectController {
499 match &self.controller {
500 ScopedEffectControllerInner::Borrowed(controller) => *controller,
501 ScopedEffectControllerInner::Shared(controller) => controller.as_ref(),
502 }
503 }
504
505 pub fn execution_scope(&self) -> &ExecutionScope {
506 &self.scope
507 }
508
509 pub fn scope_id(&self) -> &str {
510 self.scope.id()
511 }
512
513 pub fn turn_id(&self) -> Option<&str> {
514 self.scope.turn_id()
515 }
516}
517
518#[async_trait::async_trait]
520pub trait EffectHost: Send + Sync {
521 fn durability_tier(&self) -> crate::DurabilityTier {
522 crate::DurabilityTier::Inline
523 }
524
525 fn requires_durable_attachment_store(&self) -> bool {
526 false
527 }
528
529 fn supports_durable_effects(&self) -> bool {
530 false
531 }
532
533 fn scoped<'run>(
534 &'run self,
535 scope: ExecutionScope,
536 ) -> Result<ScopedEffectController<'run>, RuntimeError>;
537
538 fn scoped_static(
539 &self,
540 _scope: ExecutionScope,
541 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
542 Ok(None)
543 }
544
545 async fn await_event_key(
546 &self,
547 _scope: &ExecutionScope,
548 _wait: AwaitEventWaitIdentity,
549 ) -> Result<AwaitEventKey, RuntimeError> {
550 Err(RuntimeError::new(
551 "await_event_unsupported",
552 "this effect host does not support await-event keys",
553 ))
554 }
555
556 async fn resolve_await_event(
557 &self,
558 _key: &AwaitEventKey,
559 _resolution: Resolution,
560 ) -> Result<ResolveOutcome, RuntimeError> {
561 Ok(ResolveOutcome::UnknownOrRevoked)
562 }
563
564 async fn await_await_event(
565 &self,
566 _key: &AwaitEventKey,
567 _cancel: CancellationToken,
568 _deadline: Option<Instant>,
569 ) -> Result<Resolution, RuntimeError> {
570 Err(RuntimeError::new(
571 "await_event_unsupported",
572 "this effect host does not support await-event waits",
573 ))
574 }
575
576 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
577 Ok(())
578 }
579}
580
581#[async_trait::async_trait]
583pub trait RuntimeEffectController: Send + Sync {
584 fn durability_tier(&self) -> crate::DurabilityTier {
587 crate::DurabilityTier::Inline
588 }
589
590 fn requires_durable_attachment_store(&self) -> bool {
591 false
592 }
593
594 fn supports_durable_effects(&self) -> bool {
595 false
596 }
597
598 async fn execute_effect(
599 &self,
600 envelope: RuntimeEffectEnvelope,
601 local_executor: RuntimeEffectLocalExecutor<'_>,
602 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
603
604 async fn await_event_key(
605 &self,
606 _scope: &ExecutionScope,
607 _wait: AwaitEventWaitIdentity,
608 ) -> Result<AwaitEventKey, RuntimeError> {
609 Err(RuntimeError::new(
610 "await_event_unsupported",
611 "this effect controller does not support await-event keys",
612 ))
613 }
614
615 async fn resolve_await_event(
616 &self,
617 _key: &AwaitEventKey,
618 _resolution: Resolution,
619 ) -> Result<ResolveOutcome, RuntimeError> {
620 Ok(ResolveOutcome::UnknownOrRevoked)
621 }
622
623 async fn await_await_event(
624 &self,
625 _key: &AwaitEventKey,
626 _cancel: CancellationToken,
627 _deadline: Option<Instant>,
628 ) -> Result<Resolution, RuntimeError> {
629 Err(RuntimeError::new(
630 "await_event_unsupported",
631 "this effect controller does not support await-event waits",
632 ))
633 }
634
635 async fn revoke_await_events_for_session(&self, _session_id: &str) -> Result<(), RuntimeError> {
636 Ok(())
637 }
638}
639
640#[derive(Clone)]
643pub(crate) enum RuntimeEffectControllerHandle<'run> {
644 Borrowed(ScopedEffectController<'run>),
645 #[cfg(any(test, feature = "testing"))]
646 Shared {
647 controller: Arc<dyn RuntimeEffectController>,
648 scope: ExecutionScope,
649 },
650}
651
652impl<'run> RuntimeEffectControllerHandle<'run> {
653 pub(crate) fn borrowed(scoped: ScopedEffectController<'run>) -> Self {
654 Self::Borrowed(scoped)
655 }
656
657 #[cfg(any(test, feature = "testing"))]
658 pub(crate) fn shared(controller: Arc<dyn RuntimeEffectController>) -> Self {
659 Self::Shared {
660 controller,
661 scope: ExecutionScope::runtime_operation("test-runtime-effect-controller"),
662 }
663 }
664
665 pub(crate) fn controller(&self) -> &dyn RuntimeEffectController {
666 match self {
667 Self::Borrowed(scoped) => scoped.controller(),
668 #[cfg(any(test, feature = "testing"))]
669 Self::Shared { controller, .. } => controller.as_ref(),
670 }
671 }
672
673 pub(crate) fn scoped(&self) -> ScopedEffectController<'_> {
674 match self {
675 Self::Borrowed(scoped) => scoped.clone(),
676 #[cfg(any(test, feature = "testing"))]
677 Self::Shared { controller, scope } => {
678 ScopedEffectController::shared(Arc::clone(controller), scope.clone())
679 .expect("runtime effect controller handle carries a valid scope")
680 }
681 }
682 }
683
684 pub(crate) fn clone_scoped(&self) -> RuntimeEffectControllerHandle<'run> {
685 self.clone()
686 }
687}
688
689#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
690#[error("{code}: {message}")]
691pub struct RuntimeEffectControllerError {
692 pub code: String,
693 pub message: String,
694}
695
696impl RuntimeEffectControllerError {
697 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
698 Self {
699 code: code.into(),
700 message: message.into(),
701 }
702 }
703
704 pub(super) fn wrong_outcome(expected: RuntimeEffectKind, actual: RuntimeEffectKind) -> Self {
705 Self::new(
706 "runtime_effect_wrong_outcome",
707 format!(
708 "expected {} outcome, got {}",
709 expected.as_str(),
710 actual.as_str()
711 ),
712 )
713 }
714
715 pub(crate) fn into_runtime_error(self) -> RuntimeError {
716 RuntimeError::new(self.code, self.message)
717 }
718}
719
720impl From<RuntimeError> for RuntimeEffectControllerError {
721 fn from(err: RuntimeError) -> Self {
722 Self::new(err.code.as_str(), err.message)
723 }
724}
725
726impl From<PluginError> for RuntimeEffectControllerError {
727 fn from(err: PluginError) -> Self {
728 Self::new("plugin", err.to_string())
729 }
730}
731
732impl From<crate::StoreError> for RuntimeEffectControllerError {
733 fn from(err: crate::StoreError) -> Self {
734 Self::new("runtime_store", err.to_string())
735 }
736}
737
738#[async_trait::async_trait]
743pub(crate) trait ProcessRunner: Send + Sync {
744 async fn run_process(
745 &self,
746 registration: crate::ProcessRegistration,
747 execution_context: crate::ProcessExecutionContext,
748 registry: Arc<dyn ProcessRegistry>,
749 scoped_effect_controller: crate::ScopedEffectController<'_>,
750 cancellation: CancellationToken,
751 ) -> crate::ProcessAwaitOutput;
752}
753
754pub struct ProcessLocalExecution {
755 pub registry: Arc<dyn ProcessRegistry>,
756}
757
758pub(super) struct LocalTurnEffectRunner<'a, 'run> {
759 driver: &'a mut RuntimeTurnDriver<'run>,
760 machine: &'a mut crate::TurnMachine,
761 event_tx: mpsc::Sender<RuntimeStreamEvent>,
762 cancellation: CancellationToken,
763}
764
765pub(super) struct LocalDirectEffectRunner {
766 provider: ProviderHandle,
767 attachment_store: Arc<dyn AttachmentStore>,
768}
769
770#[async_trait::async_trait]
771trait RuntimeEffectLocalRunner: Send {
772 async fn execute(
773 self: Box<Self>,
774 envelope: RuntimeEffectEnvelope,
775 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError>;
776}
777
778#[cfg(any(test, feature = "testing"))]
779type TestingRuntimeEffectLocalRunnerFn<'run> = dyn FnOnce(
780 RuntimeEffectEnvelope,
781 ) -> Pin<
782 Box<
783 dyn Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
784 + Send
785 + 'run,
786 >,
787 > + Send
788 + 'run;
789
790#[cfg(any(test, feature = "testing"))]
791struct TestingRuntimeEffectLocalRunner<'run> {
792 run: Box<TestingRuntimeEffectLocalRunnerFn<'run>>,
793}
794
795type DurableStepLocalRunnerFn<'run> = dyn FnOnce(
796 serde_json::Value,
797 ) -> Pin<
798 Box<
799 dyn Future<Output = Result<serde_json::Value, RuntimeEffectControllerError>>
800 + Send
801 + 'run,
802 >,
803 > + Send
804 + 'run;
805
806struct DurableStepLocalRunner<'run> {
807 run: Box<DurableStepLocalRunnerFn<'run>>,
808}
809
810enum RuntimeEffectLocalExecutorState<'run> {
811 Unavailable,
812 SleepOnly {
813 cancellation: CancellationToken,
814 },
815 ExternalWaitOptions {
816 cancellation: CancellationToken,
817 deadline: Option<Instant>,
818 },
819 Process(ProcessLocalExecution),
820 Runner(Box<dyn RuntimeEffectLocalRunner + Send + 'run>),
821}
822
823pub struct RuntimeEffectLocalExecutor<'run> {
829 state: RuntimeEffectLocalExecutorState<'run>,
830}
831
832impl<'run> RuntimeEffectLocalExecutor<'run> {
833 pub fn unavailable() -> Self {
834 Self {
835 state: RuntimeEffectLocalExecutorState::Unavailable,
836 }
837 }
838
839 pub fn sleep(cancellation: CancellationToken) -> Self {
840 Self {
841 state: RuntimeEffectLocalExecutorState::SleepOnly { cancellation },
842 }
843 }
844
845 pub fn await_event(cancellation: CancellationToken, deadline: Option<Instant>) -> Self {
846 Self {
847 state: RuntimeEffectLocalExecutorState::ExternalWaitOptions {
848 cancellation,
849 deadline,
850 },
851 }
852 }
853
854 pub fn processes(registry: Arc<dyn ProcessRegistry>) -> Self {
855 Self {
856 state: RuntimeEffectLocalExecutorState::Process(ProcessLocalExecution { registry }),
857 }
858 }
859
860 pub fn durable_step<F, Fut>(run: F) -> Self
861 where
862 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
863 Fut: Future<Output = Result<serde_json::Value, RuntimeError>> + Send + 'run,
864 {
865 Self {
866 state: RuntimeEffectLocalExecutorState::Runner(Box::new(DurableStepLocalRunner {
867 run: Box::new(move |input| {
868 Box::pin(async move { run(input).await.map_err(Into::into) })
869 }),
870 })),
871 }
872 }
873
874 #[cfg(any(test, feature = "testing"))]
875 pub fn testing<F, Fut>(run: F) -> Self
876 where
877 F: FnOnce(RuntimeEffectEnvelope) -> Fut + Send + 'run,
878 Fut: Future<Output = Result<RuntimeEffectOutcome, RuntimeEffectControllerError>>
879 + Send
880 + 'run,
881 {
882 Self {
883 state: RuntimeEffectLocalExecutorState::Runner(Box::new(
884 TestingRuntimeEffectLocalRunner {
885 run: Box::new(move |envelope| Box::pin(run(envelope))),
886 },
887 )),
888 }
889 }
890
891 pub(in crate::runtime) fn turn<'scope>(
892 driver: &'run mut RuntimeTurnDriver<'scope>,
893 machine: &'run mut crate::TurnMachine,
894 event_tx: mpsc::Sender<RuntimeStreamEvent>,
895 cancellation: CancellationToken,
896 ) -> Self
897 where
898 'scope: 'run,
899 {
900 Self {
901 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalTurnEffectRunner {
902 driver,
903 machine,
904 event_tx,
905 cancellation,
906 })),
907 }
908 }
909
910 pub(in crate::runtime) fn direct(
911 provider: ProviderHandle,
912 attachment_store: Arc<dyn AttachmentStore>,
913 ) -> Self {
914 Self {
915 state: RuntimeEffectLocalExecutorState::Runner(Box::new(LocalDirectEffectRunner {
916 provider,
917 attachment_store,
918 })),
919 }
920 }
921
922 pub async fn execute(
923 self,
924 envelope: RuntimeEffectEnvelope,
925 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
926 match self.state {
927 RuntimeEffectLocalExecutorState::Runner(runner) => runner.execute(envelope).await,
928 RuntimeEffectLocalExecutorState::SleepOnly { cancellation } => {
929 execute_local_sleep(envelope, cancellation).await
930 }
931 RuntimeEffectLocalExecutorState::ExternalWaitOptions { .. } => {
932 Err(RuntimeEffectControllerError::new(
933 "runtime_effect_local_executor_mismatch",
934 format!(
935 "local await-event options cannot execute {} command directly",
936 envelope.command.kind().as_str()
937 ),
938 ))
939 }
940 RuntimeEffectLocalExecutorState::Unavailable => Err(RuntimeEffectControllerError::new(
941 "runtime_effect_local_executor_unavailable",
942 format!(
943 "no local executor is available for {}",
944 envelope.command.kind().as_str()
945 ),
946 )),
947 RuntimeEffectLocalExecutorState::Process(_) => Err(RuntimeEffectControllerError::new(
948 "runtime_effect_local_executor_mismatch",
949 format!(
950 "process executor cannot execute {} command directly",
951 envelope.command.kind().as_str()
952 ),
953 )),
954 }
955 }
956
957 pub fn into_process(self) -> Result<ProcessLocalExecution, RuntimeEffectControllerError> {
958 match self.state {
959 RuntimeEffectLocalExecutorState::Process(execution) => Ok(execution),
960 _ => Err(RuntimeEffectControllerError::new(
961 "runtime_effect_local_executor_unavailable",
962 "no process executor is available for process command",
963 )),
964 }
965 }
966
967 fn into_await_event_options(
968 self,
969 ) -> Result<(CancellationToken, Option<Instant>), RuntimeEffectControllerError> {
970 match self.state {
971 RuntimeEffectLocalExecutorState::ExternalWaitOptions {
972 cancellation,
973 deadline,
974 } => Ok((cancellation, deadline)),
975 _ => Ok((CancellationToken::new(), None)),
976 }
977 }
978}
979
980#[cfg(any(test, feature = "testing"))]
981#[async_trait::async_trait]
982impl RuntimeEffectLocalRunner for TestingRuntimeEffectLocalRunner<'_> {
983 async fn execute(
984 self: Box<Self>,
985 envelope: RuntimeEffectEnvelope,
986 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
987 (self.run)(envelope).await
988 }
989}
990
991#[async_trait::async_trait]
992impl RuntimeEffectLocalRunner for DurableStepLocalRunner<'_> {
993 async fn execute(
994 self: Box<Self>,
995 envelope: RuntimeEffectEnvelope,
996 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
997 match envelope.command {
998 RuntimeEffectCommand::DurableStep { input, .. } => {
999 let value = (self.run)(input).await?;
1000 Ok(RuntimeEffectOutcome::DurableStep { value })
1001 }
1002 command => Err(RuntimeEffectControllerError::new(
1003 "runtime_effect_local_executor_mismatch",
1004 format!(
1005 "local durable step executor cannot execute {} command",
1006 command.kind().as_str()
1007 ),
1008 )),
1009 }
1010 }
1011}
1012
1013#[async_trait::async_trait]
1014impl RuntimeEffectLocalRunner for LocalTurnEffectRunner<'_, '_> {
1015 async fn execute(
1016 self: Box<Self>,
1017 envelope: RuntimeEffectEnvelope,
1018 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1019 let runner = *self;
1020 match envelope.command {
1021 RuntimeEffectCommand::LlmCall { request } => {
1022 let protocol_iteration = runner.machine.protocol_iteration();
1023 let (result, text_streamed) = runner
1024 .driver
1025 .run_llm_call(
1026 Arc::new((*request).into_request(None, None)),
1027 protocol_iteration,
1028 envelope.invocation,
1029 &runner.event_tx,
1030 &runner.cancellation,
1031 )
1032 .await;
1033 Ok(RuntimeEffectOutcome::LlmCall {
1034 result,
1035 text_streamed,
1036 })
1037 }
1038 RuntimeEffectCommand::ToolCall { call } => {
1039 let tool_name = call.tool_name.clone();
1040 let mut outcome = runner
1041 .driver
1042 .run_tool_calls(
1043 vec![(call, envelope.invocation)],
1044 &runner.event_tx,
1045 &runner.cancellation,
1046 )
1047 .await?;
1048 let launch = outcome.launches.pop().ok_or_else(|| {
1049 RuntimeEffectControllerError::new(
1050 "tool_result_missing",
1051 format!("tool `{tool_name}` completed without a launch result"),
1052 )
1053 })?;
1054 Ok(RuntimeEffectOutcome::ToolCall {
1055 launch,
1056 triggers: outcome.triggers,
1057 })
1058 }
1059 RuntimeEffectCommand::ExecCode { code } => {
1060 let protocol_iteration = runner.machine.protocol_iteration();
1061 let messages = runner.machine.message_sequence();
1062 Ok(RuntimeEffectOutcome::ExecCode {
1063 result: runner
1064 .driver
1065 .run_exec_code(
1066 &code,
1067 messages,
1068 protocol_iteration,
1069 envelope.invocation,
1070 &runner.event_tx,
1071 )
1072 .await,
1073 })
1074 }
1075 RuntimeEffectCommand::Checkpoint { checkpoint } => {
1076 Ok(RuntimeEffectOutcome::Checkpoint {
1077 result: runner
1078 .driver
1079 .run_checkpoint(runner.machine, checkpoint, &runner.event_tx)
1080 .await
1081 .map_err(RuntimeEffectControllerError::from),
1082 })
1083 }
1084 RuntimeEffectCommand::SyncExecutionEnvironment {
1085 update_machine_config,
1086 } => Ok(RuntimeEffectOutcome::SyncExecutionEnvironment {
1087 result: runner
1088 .driver
1089 .refresh_execution_environment(runner.machine, update_machine_config)
1090 .await
1091 .map_err(|err| err.to_string()),
1092 }),
1093 RuntimeEffectCommand::Sleep { duration_ms } => {
1094 sleep_with_cancellation(duration_ms, &runner.cancellation).await?;
1095 Ok(RuntimeEffectOutcome::Sleep)
1096 }
1097 command => Err(RuntimeEffectControllerError::new(
1098 "runtime_effect_local_executor_mismatch",
1099 format!(
1100 "local turn executor cannot execute {} command",
1101 command.kind().as_str()
1102 ),
1103 )),
1104 }
1105 }
1106}
1107
1108#[async_trait::async_trait]
1109impl RuntimeEffectLocalRunner for LocalDirectEffectRunner {
1110 async fn execute(
1111 mut self: Box<Self>,
1112 envelope: RuntimeEffectEnvelope,
1113 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1114 match envelope.command {
1115 RuntimeEffectCommand::Direct { request, .. } => Ok(RuntimeEffectOutcome::Direct {
1116 result: self
1117 .run_direct_llm_request((*request).into_request(
1118 crate::session_model::transport_stream_events(&self.provider, None),
1119 None,
1120 ))
1121 .await,
1122 }),
1123 RuntimeEffectCommand::Sleep { duration_ms } => {
1124 sleep_with_cancellation(duration_ms, &CancellationToken::new()).await?;
1125 Ok(RuntimeEffectOutcome::Sleep)
1126 }
1127 command => Err(RuntimeEffectControllerError::new(
1128 "runtime_effect_local_executor_mismatch",
1129 format!(
1130 "local direct executor cannot execute {} command",
1131 command.kind().as_str()
1132 ),
1133 )),
1134 }
1135 }
1136}
1137
1138impl LocalDirectEffectRunner {
1139 async fn run_direct_llm_request(
1140 &mut self,
1141 request: CoreLlmRequest,
1142 ) -> Result<LlmResponse, LlmCallError> {
1143 let request = crate::attachments::resolve_llm_request_attachments(
1144 request,
1145 self.attachment_store.as_ref(),
1146 )
1147 .await
1148 .map_err(|err| LlmCallError {
1149 message: err.to_string(),
1150 retryable: false,
1151 raw: None,
1152 code: Some("attachment_resolution_failed".to_string()),
1153 terminal_reason: crate::LlmTerminalReason::ProviderError,
1154 request_body: None,
1155 })?;
1156 self.provider
1157 .complete(request)
1158 .await
1159 .map_err(llm_call_error_from_transport)
1160 }
1161}
1162
1163async fn execute_local_sleep(
1164 envelope: RuntimeEffectEnvelope,
1165 cancellation: CancellationToken,
1166) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1167 match envelope.command {
1168 RuntimeEffectCommand::Sleep { duration_ms } => {
1169 sleep_with_cancellation(duration_ms, &cancellation).await?;
1170 Ok(RuntimeEffectOutcome::Sleep)
1171 }
1172 command => Err(RuntimeEffectControllerError::new(
1173 "runtime_effect_local_executor_mismatch",
1174 format!(
1175 "local sleep executor cannot execute {} command",
1176 command.kind().as_str()
1177 ),
1178 )),
1179 }
1180}
1181
1182async fn sleep_with_cancellation(
1183 duration_ms: u64,
1184 cancellation: &CancellationToken,
1185) -> Result<(), RuntimeEffectControllerError> {
1186 let sleep = tokio::time::sleep(std::time::Duration::from_millis(duration_ms));
1187 tokio::pin!(sleep);
1188 tokio::select! {
1189 _ = cancellation.cancelled() => Err(RuntimeEffectControllerError::new(
1190 "runtime_effect_sleep_cancelled",
1191 "runtime effect sleep was cancelled",
1192 )),
1193 _ = &mut sleep => Ok(()),
1194 }
1195}
1196
1197#[derive(Clone, Default)]
1208pub struct InlineRuntimeEffectController;
1209
1210#[async_trait::async_trait]
1211impl RuntimeEffectController for InlineRuntimeEffectController {
1212 fn supports_durable_effects(&self) -> bool {
1213 true
1214 }
1215
1216 async fn execute_effect(
1217 &self,
1218 envelope: RuntimeEffectEnvelope,
1219 local_executor: RuntimeEffectLocalExecutor<'_>,
1220 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
1221 match envelope.command {
1222 RuntimeEffectCommand::AwaitEvent { key } => {
1223 let (cancellation, deadline) = local_executor.into_await_event_options()?;
1224 let resolution = self
1225 .await_await_event(&key, cancellation, deadline)
1226 .await
1227 .map_err(RuntimeEffectControllerError::from)?;
1228 Ok(RuntimeEffectOutcome::AwaitEvent { resolution })
1229 }
1230 RuntimeEffectCommand::Process { command } => {
1231 let execution = local_executor.into_process()?;
1232 let registry = execution.registry;
1233 let result = tokio::task::spawn(async move {
1234 Self::execute_process_command(registry, *command).await
1235 })
1236 .await
1237 .map_err(|err| {
1238 RuntimeEffectControllerError::new(
1239 "runtime_effect_process_task_join",
1240 format!("inline process effect task failed: {err}"),
1241 )
1242 })??;
1243 Ok(RuntimeEffectOutcome::Process { result })
1244 }
1245 _ => local_executor.execute(envelope).await,
1246 }
1247 }
1248
1249 async fn await_event_key(
1250 &self,
1251 scope: &ExecutionScope,
1252 wait: AwaitEventWaitIdentity,
1253 ) -> Result<AwaitEventKey, RuntimeError> {
1254 inline_await_events().key_for(scope, wait)
1255 }
1256
1257 async fn resolve_await_event(
1258 &self,
1259 key: &AwaitEventKey,
1260 resolution: Resolution,
1261 ) -> Result<ResolveOutcome, RuntimeError> {
1262 inline_await_events().resolve(key, resolution)
1263 }
1264
1265 async fn await_await_event(
1266 &self,
1267 key: &AwaitEventKey,
1268 cancel: CancellationToken,
1269 deadline: Option<Instant>,
1270 ) -> Result<Resolution, RuntimeError> {
1271 inline_await_events()
1272 .await_resolution(key, cancel, deadline)
1273 .await
1274 }
1275
1276 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1277 inline_await_events().revoke_session(session_id)
1278 }
1279}
1280
1281#[derive(Clone)]
1283pub struct InlineEffectHost {
1284 controller: Arc<dyn RuntimeEffectController>,
1285}
1286
1287impl InlineEffectHost {
1288 pub fn new(controller: Arc<dyn RuntimeEffectController>) -> Self {
1289 Self { controller }
1290 }
1291}
1292
1293impl Default for InlineEffectHost {
1294 fn default() -> Self {
1295 Self::new(Arc::new(InlineRuntimeEffectController))
1296 }
1297}
1298
1299#[async_trait::async_trait]
1300impl EffectHost for InlineEffectHost {
1301 fn durability_tier(&self) -> crate::DurabilityTier {
1302 self.controller.durability_tier()
1303 }
1304
1305 fn requires_durable_attachment_store(&self) -> bool {
1306 self.controller.requires_durable_attachment_store()
1307 }
1308
1309 fn supports_durable_effects(&self) -> bool {
1310 self.controller.supports_durable_effects()
1311 }
1312
1313 fn scoped<'run>(
1314 &'run self,
1315 scope: ExecutionScope,
1316 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
1317 ScopedEffectController::shared(Arc::clone(&self.controller), scope)
1318 }
1319
1320 fn scoped_static(
1321 &self,
1322 scope: ExecutionScope,
1323 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
1324 Ok(Some(ScopedEffectController::shared(
1325 Arc::clone(&self.controller),
1326 scope,
1327 )?))
1328 }
1329
1330 async fn await_event_key(
1331 &self,
1332 scope: &ExecutionScope,
1333 wait: AwaitEventWaitIdentity,
1334 ) -> Result<AwaitEventKey, RuntimeError> {
1335 self.controller.await_event_key(scope, wait).await
1336 }
1337
1338 async fn resolve_await_event(
1339 &self,
1340 key: &AwaitEventKey,
1341 resolution: Resolution,
1342 ) -> Result<ResolveOutcome, RuntimeError> {
1343 self.controller.resolve_await_event(key, resolution).await
1344 }
1345
1346 async fn await_await_event(
1347 &self,
1348 key: &AwaitEventKey,
1349 cancel: CancellationToken,
1350 deadline: Option<Instant>,
1351 ) -> Result<Resolution, RuntimeError> {
1352 self.controller
1353 .await_await_event(key, cancel, deadline)
1354 .await
1355 }
1356
1357 async fn revoke_await_events_for_session(&self, session_id: &str) -> Result<(), RuntimeError> {
1358 self.controller
1359 .revoke_await_events_for_session(session_id)
1360 .await
1361 }
1362}
1363
1364impl InlineRuntimeEffectController {
1365 pub(crate) async fn start_process(
1373 registry: Arc<dyn crate::ProcessRegistry>,
1374 registration: crate::ProcessRegistration,
1375 grant: Option<crate::ProcessStartGrant>,
1376 ) -> Result<ProcessRecord, PluginError> {
1377 let registration_for_record = registration.clone();
1378 let record = registry.register_process(registration_for_record).await?;
1379 if let Some(grant) = grant {
1380 registry
1381 .grant_handle(&grant.session_scope, ®istration.id, grant.descriptor)
1382 .await?;
1383 }
1384 Ok(record)
1385 }
1386
1387 pub(crate) async fn request_process_cancel(
1388 &self,
1389 registry: Arc<dyn crate::ProcessRegistry>,
1390 process_id: &str,
1391 reason: Option<String>,
1392 ) -> Result<ProcessRecord, PluginError> {
1393 registry
1397 .append_event(
1398 process_id,
1399 crate::ProcessEventAppendRequest::cancel_requested(process_id, reason.clone()),
1400 )
1401 .await?;
1402 registry
1403 .get_process(process_id)
1404 .await
1405 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))
1406 }
1407
1408 async fn execute_process_command(
1409 registry: Arc<dyn crate::ProcessRegistry>,
1410 command: ProcessCommand,
1411 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
1412 match command {
1413 ProcessCommand::Start {
1414 registration,
1415 grant,
1416 execution_context: _,
1417 } => {
1418 let record = Self::start_process(registry, registration, grant).await?;
1419 Ok(ProcessEffectOutcome::Start { record })
1420 }
1421 ProcessCommand::List {
1422 session_scope,
1423 mode,
1424 } => {
1425 let entries = match mode {
1426 crate::ProcessListMode::Live => {
1427 registry.list_live_handle_grants(&session_scope).await?
1428 }
1429 crate::ProcessListMode::All => {
1430 registry.list_handle_grants(&session_scope).await?
1431 }
1432 };
1433 Ok(ProcessEffectOutcome::List { entries })
1434 }
1435 ProcessCommand::Transfer {
1436 from_scope,
1437 to_scope,
1438 process_ids,
1439 } => {
1440 registry
1441 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
1442 .await?;
1443 Ok(ProcessEffectOutcome::Transfer)
1444 }
1445 ProcessCommand::DeleteSession { session_id } => {
1446 let report = registry.delete_session_process_state(&session_id).await?;
1447 Ok(ProcessEffectOutcome::DeleteSession { report })
1448 }
1449 ProcessCommand::Await { process_id } => {
1450 let output = registry.await_process(&process_id).await?;
1451 Ok(ProcessEffectOutcome::Await { output })
1452 }
1453 ProcessCommand::Cancel { process_id, reason } => {
1454 let record = InlineRuntimeEffectController
1455 .request_process_cancel(registry, &process_id, reason)
1456 .await?;
1457 Ok(ProcessEffectOutcome::Cancel { record })
1458 }
1459 ProcessCommand::Signal {
1460 process_id,
1461 request,
1462 ..
1463 } => {
1464 let result = registry.append_event(&process_id, request).await?;
1465 Ok(ProcessEffectOutcome::Signal {
1466 event: result.event,
1467 })
1468 }
1469 }
1470 }
1471}
1472
1473impl std::fmt::Debug for InlineRuntimeEffectController {
1474 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1475 f.debug_struct("InlineRuntimeEffectController").finish()
1476 }
1477}